189 lines
6.6 KiB
Go
189 lines
6.6 KiB
Go
|
// Copyright (C) 2020 Storj Labs, Inc.
|
||
|
// See LICENSE for copying information.
|
||
|
|
||
|
package gracefulexit
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"time"
|
||
|
|
||
|
"github.com/zeebo/errs"
|
||
|
"go.uber.org/zap"
|
||
|
|
||
|
"storj.io/common/errs2"
|
||
|
"storj.io/common/pb"
|
||
|
"storj.io/common/rpc"
|
||
|
"storj.io/common/storj"
|
||
|
"storj.io/storj/storagenode/pieces"
|
||
|
"storj.io/storj/storagenode/satellites"
|
||
|
"storj.io/storj/storagenode/trust"
|
||
|
)
|
||
|
|
||
|
// Service acts as the gateway to the `satellites` db for graceful exit
|
||
|
// code (querying and updating that db as necessary).
|
||
|
//
|
||
|
// architecture: Service
|
||
|
type Service interface {
|
||
|
// ListPendingExits returns a slice with one record for every satellite
|
||
|
// from which this node is gracefully exiting. Each record includes the
|
||
|
// satellite's ID/address and information about the graceful exit status
|
||
|
// and progress.
|
||
|
ListPendingExits(ctx context.Context) ([]ExitingSatellite, error)
|
||
|
|
||
|
// DeletePiece deletes one piece stored for a satellite, and updates
|
||
|
// the deleted byte count for the corresponding graceful exit operation.
|
||
|
DeletePiece(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) error
|
||
|
|
||
|
// DeleteSatellitePieces deletes all pieces stored for a satellite, and updates
|
||
|
// the deleted byte count for the corresponding graceful exit operation.
|
||
|
DeleteSatellitePieces(ctx context.Context, satelliteID storj.NodeID) error
|
||
|
|
||
|
// ExitFailed updates the database when a graceful exit has failed.
|
||
|
ExitFailed(ctx context.Context, satelliteID storj.NodeID, reason pb.ExitFailed_Reason, exitFailedBytes []byte) error
|
||
|
|
||
|
// ExitCompleted updates the database when a graceful exit is completed. It also
|
||
|
// deletes all pieces and blobs for that satellite.
|
||
|
ExitCompleted(ctx context.Context, satelliteID storj.NodeID, completionReceipt []byte, wait func()) error
|
||
|
|
||
|
// ExitNotPossible deletes the entry for the corresponding graceful exit operation.
|
||
|
// This is intended to be called when a graceful exit operation was initiated but
|
||
|
// the satellite rejected it.
|
||
|
ExitNotPossible(ctx context.Context, satelliteID storj.NodeID) error
|
||
|
}
|
||
|
|
||
|
// ensures that service implements Service.
|
||
|
var _ Service = (*service)(nil)
|
||
|
|
||
|
// service exposes methods to manage GE progress.
|
||
|
//
|
||
|
// architecture: Service
|
||
|
type service struct {
|
||
|
log *zap.Logger
|
||
|
store *pieces.Store
|
||
|
trust *trust.Pool
|
||
|
satelliteDB satellites.DB
|
||
|
|
||
|
nowFunc func() time.Time
|
||
|
}
|
||
|
|
||
|
// NewService is a constructor for a GE service.
|
||
|
func NewService(log *zap.Logger, store *pieces.Store, trust *trust.Pool, satelliteDB satellites.DB, dialer rpc.Dialer, config Config) Service {
|
||
|
return &service{
|
||
|
log: log,
|
||
|
store: store,
|
||
|
trust: trust,
|
||
|
satelliteDB: satelliteDB,
|
||
|
nowFunc: func() time.Time { return time.Now().UTC() },
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// ExitingSatellite encapsulates a node address with its graceful exit progress.
|
||
|
type ExitingSatellite struct {
|
||
|
satellites.ExitProgress
|
||
|
NodeURL storj.NodeURL
|
||
|
}
|
||
|
|
||
|
func (c *service) ListPendingExits(ctx context.Context) (_ []ExitingSatellite, err error) {
|
||
|
defer mon.Task()(&ctx)(&err)
|
||
|
|
||
|
exitProgress, err := c.satelliteDB.ListGracefulExits(ctx)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
exitingSatellites := make([]ExitingSatellite, 0, len(exitProgress))
|
||
|
for _, sat := range exitProgress {
|
||
|
nodeURL, err := c.trust.GetNodeURL(ctx, sat.SatelliteID)
|
||
|
if err != nil {
|
||
|
c.log.Error("failed to get satellite address", zap.Stringer("satellite-id", sat.SatelliteID), zap.Error(err))
|
||
|
continue
|
||
|
}
|
||
|
exitingSatellites = append(exitingSatellites, ExitingSatellite{ExitProgress: sat, NodeURL: nodeURL})
|
||
|
}
|
||
|
return exitingSatellites, nil
|
||
|
}
|
||
|
|
||
|
// DeletePiece deletes one piece stored for a satellite, and updates
|
||
|
// the deleted byte count for the corresponding graceful exit operation.
|
||
|
func (c *service) DeletePiece(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) (err error) {
|
||
|
defer mon.Task()(&ctx)(&err)
|
||
|
|
||
|
piece, err := c.store.Reader(ctx, satelliteID, pieceID)
|
||
|
if err != nil {
|
||
|
return Error.Wrap(err)
|
||
|
}
|
||
|
err = c.store.Delete(ctx, satelliteID, pieceID)
|
||
|
if err != nil {
|
||
|
return Error.Wrap(err)
|
||
|
}
|
||
|
// update graceful exit progress
|
||
|
size := piece.Size()
|
||
|
return c.satelliteDB.UpdateGracefulExit(ctx, satelliteID, size)
|
||
|
}
|
||
|
|
||
|
// DeleteSatellitePieces deletes all pieces stored for a satellite, and updates
|
||
|
// the deleted byte count for the corresponding graceful exit operation.
|
||
|
func (c *service) DeleteSatellitePieces(ctx context.Context, satelliteID storj.NodeID) (err error) {
|
||
|
defer mon.Task()(&ctx)(&err)
|
||
|
|
||
|
var totalDeleted int64
|
||
|
logger := c.log.With(zap.Stringer("Satellite ID", satelliteID), zap.String("action", "delete all pieces"))
|
||
|
err = c.store.WalkSatellitePieces(ctx, satelliteID, func(piece pieces.StoredPieceAccess) error {
|
||
|
err := c.store.Delete(ctx, satelliteID, piece.PieceID())
|
||
|
if err != nil {
|
||
|
logger.Error("failed to delete piece",
|
||
|
zap.Stringer("Piece ID", piece.PieceID()), zap.Error(err))
|
||
|
// but continue
|
||
|
}
|
||
|
_, size, err := piece.Size(ctx)
|
||
|
if err != nil {
|
||
|
logger.Warn("failed to get piece size",
|
||
|
zap.Stringer("Piece ID", piece.PieceID()), zap.Error(err))
|
||
|
return nil
|
||
|
}
|
||
|
totalDeleted += size
|
||
|
return nil
|
||
|
})
|
||
|
if err != nil && !errs2.IsCanceled(err) {
|
||
|
logger.Error("failed to delete all pieces", zap.Error(err))
|
||
|
}
|
||
|
// update graceful exit progress
|
||
|
return c.satelliteDB.UpdateGracefulExit(ctx, satelliteID, totalDeleted)
|
||
|
}
|
||
|
|
||
|
// ExitFailed updates the database when a graceful exit has failed.
|
||
|
func (c *service) ExitFailed(ctx context.Context, satelliteID storj.NodeID, reason pb.ExitFailed_Reason, exitFailedBytes []byte) (err error) {
|
||
|
defer mon.Task()(&ctx)(&err)
|
||
|
return c.satelliteDB.CompleteGracefulExit(ctx, satelliteID, c.nowFunc(), satellites.ExitFailed, exitFailedBytes)
|
||
|
}
|
||
|
|
||
|
// ExitCompleted updates the database when a graceful exit is completed. It also
|
||
|
// deletes all pieces and blobs for that satellite.
|
||
|
func (c *service) ExitCompleted(ctx context.Context, satelliteID storj.NodeID, completionReceipt []byte, wait func()) (err error) {
|
||
|
defer mon.Task()(&ctx)(&err)
|
||
|
|
||
|
err = c.satelliteDB.CompleteGracefulExit(ctx, satelliteID, c.nowFunc(), satellites.ExitSucceeded, completionReceipt)
|
||
|
if err != nil {
|
||
|
return errs.Wrap(err)
|
||
|
}
|
||
|
|
||
|
// wait for deletes to complete
|
||
|
wait()
|
||
|
|
||
|
// delete all remaining pieces
|
||
|
err = c.DeleteSatellitePieces(ctx, satelliteID)
|
||
|
if err != nil {
|
||
|
return errs.Wrap(err)
|
||
|
}
|
||
|
|
||
|
// delete everything left in blobs folder of specific satellites
|
||
|
return c.store.DeleteSatelliteBlobs(ctx, satelliteID)
|
||
|
}
|
||
|
|
||
|
// ExitNotPossible deletes the entry from satellite table and inform graceful exit
|
||
|
// has failed to start.
|
||
|
func (c *service) ExitNotPossible(ctx context.Context, satelliteID storj.NodeID) (err error) {
|
||
|
defer mon.Task()(&ctx)(&err)
|
||
|
|
||
|
return c.satelliteDB.CancelGracefulExit(ctx, satelliteID)
|
||
|
}
|