storagenode/gracefulexit: logic moved from worker to service
Change-Id: I8b12606a96b712050bf40d587664fb1b2c578fbc
This commit is contained in:
parent
9abdcc05e5
commit
76d4977b6a
@ -34,7 +34,6 @@ import (
|
|||||||
"storj.io/storj/storage"
|
"storj.io/storj/storage"
|
||||||
"storj.io/storj/storagenode"
|
"storj.io/storj/storagenode"
|
||||||
"storj.io/storj/storagenode/gracefulexit"
|
"storj.io/storj/storagenode/gracefulexit"
|
||||||
"storj.io/storj/storagenode/pieces"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const numObjects = 6
|
const numObjects = 6
|
||||||
@ -254,6 +253,15 @@ func TestRecvTimeout(t *testing.T) {
|
|||||||
config.Metainfo.RS.SuccessThreshold = successThreshold
|
config.Metainfo.RS.SuccessThreshold = successThreshold
|
||||||
config.Metainfo.RS.TotalThreshold = successThreshold
|
config.Metainfo.RS.TotalThreshold = successThreshold
|
||||||
},
|
},
|
||||||
|
StorageNode: func(index int, config *storagenode.Config) {
|
||||||
|
config.GracefulExit = gracefulexit.Config{
|
||||||
|
ChoreInterval: 2 * time.Minute,
|
||||||
|
NumWorkers: 2,
|
||||||
|
NumConcurrentTransfers: 2,
|
||||||
|
MinBytesPerSecond: 128,
|
||||||
|
MinDownloadTimeout: 2 * time.Minute,
|
||||||
|
}
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||||
satellite := planet.Satellites[0]
|
satellite := planet.Satellites[0]
|
||||||
@ -293,17 +301,9 @@ func TestRecvTimeout(t *testing.T) {
|
|||||||
// make uploads on storage node slower than the timeout for transferring bytes to another node
|
// make uploads on storage node slower than the timeout for transferring bytes to another node
|
||||||
delay := 200 * time.Millisecond
|
delay := 200 * time.Millisecond
|
||||||
storageNodeDB.SetLatency(delay)
|
storageNodeDB.SetLatency(delay)
|
||||||
store := pieces.NewStore(zaptest.NewLogger(t), storageNodeDB.Pieces(), nil, nil, storageNodeDB.PieceSpaceUsedDB(), pieces.DefaultConfig)
|
|
||||||
|
|
||||||
// run the SN chore again to start processing transfers.
|
// run the SN chore again to start processing transfers.
|
||||||
worker := gracefulexit.NewWorker(zaptest.NewLogger(t), store, exitingNode.Storage2.Trust, exitingNode.DB.Satellites(), exitingNode.Dialer, satellite.NodeURL(),
|
worker := gracefulexit.NewWorker(zaptest.NewLogger(t), exitingNode.GracefulExit.Service, exitingNode.PieceTransfer.Service, exitingNode.Dialer, satellite.NodeURL(), exitingNode.Config.GracefulExit)
|
||||||
gracefulexit.Config{
|
|
||||||
ChoreInterval: 0,
|
|
||||||
NumWorkers: 2,
|
|
||||||
NumConcurrentTransfers: 2,
|
|
||||||
MinBytesPerSecond: 128,
|
|
||||||
MinDownloadTimeout: 2 * time.Minute,
|
|
||||||
})
|
|
||||||
defer ctx.Check(worker.Close)
|
defer ctx.Check(worker.Close)
|
||||||
|
|
||||||
err = worker.Run(ctx, func() {})
|
err = worker.Run(ctx, func() {})
|
||||||
@ -1486,11 +1486,11 @@ func testTransfers(t *testing.T, objects int, verifier func(t *testing.T, ctx *t
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer ctx.Check(c.CloseSend)
|
defer ctx.Check(c.CloseSend)
|
||||||
|
|
||||||
verifier(t, ctx, nodeFullIDs, satellite, c, exitingNode, len(incompleteTransfers))
|
verifier(t, ctx, nodeFullIDs, satellite, c, exitingNode.Peer, len(incompleteTransfers))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func findNodeToExit(ctx context.Context, planet *testplanet.Planet, objects int) (*storagenode.Peer, error) {
|
func findNodeToExit(ctx context.Context, planet *testplanet.Planet, objects int) (*testplanet.StorageNode, error) {
|
||||||
satellite := planet.Satellites[0]
|
satellite := planet.Satellites[0]
|
||||||
keys, err := satellite.Metainfo.Database.List(ctx, nil, objects)
|
keys, err := satellite.Metainfo.Database.List(ctx, nil, objects)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1527,6 +1527,5 @@ func findNodeToExit(ctx context.Context, planet *testplanet.Planet, objects int)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
node := planet.FindNode(exitingNodeID)
|
return planet.FindNode(exitingNodeID), nil
|
||||||
return node.Peer, nil
|
|
||||||
}
|
}
|
||||||
|
@ -11,9 +11,7 @@ import (
|
|||||||
|
|
||||||
"storj.io/common/rpc"
|
"storj.io/common/rpc"
|
||||||
"storj.io/common/sync2"
|
"storj.io/common/sync2"
|
||||||
"storj.io/storj/storagenode/pieces"
|
"storj.io/storj/storagenode/piecetransfer"
|
||||||
"storj.io/storj/storagenode/satellites"
|
|
||||||
"storj.io/storj/storagenode/trust"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Chore checks for satellites that the node is exiting and creates a worker per satellite to complete the process.
|
// Chore checks for satellites that the node is exiting and creates a worker per satellite to complete the process.
|
||||||
@ -21,26 +19,24 @@ import (
|
|||||||
// architecture: Chore
|
// architecture: Chore
|
||||||
type Chore struct {
|
type Chore struct {
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
store *pieces.Store
|
|
||||||
satelliteDB satellites.DB
|
|
||||||
trust *trust.Pool
|
|
||||||
dialer rpc.Dialer
|
dialer rpc.Dialer
|
||||||
|
|
||||||
config Config
|
config Config
|
||||||
|
|
||||||
|
service Service
|
||||||
|
transferService piecetransfer.Service
|
||||||
|
|
||||||
exitingMap sync.Map
|
exitingMap sync.Map
|
||||||
Loop *sync2.Cycle
|
Loop *sync2.Cycle
|
||||||
limiter *sync2.Limiter
|
limiter *sync2.Limiter
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewChore instantiates Chore.
|
// NewChore instantiates Chore.
|
||||||
func NewChore(log *zap.Logger, config Config, store *pieces.Store, trust *trust.Pool, dialer rpc.Dialer, satelliteDB satellites.DB) *Chore {
|
func NewChore(log *zap.Logger, service Service, transferService piecetransfer.Service, dialer rpc.Dialer, config Config) *Chore {
|
||||||
return &Chore{
|
return &Chore{
|
||||||
log: log,
|
log: log,
|
||||||
store: store,
|
|
||||||
satelliteDB: satelliteDB,
|
|
||||||
trust: trust,
|
|
||||||
dialer: dialer,
|
dialer: dialer,
|
||||||
|
service: service,
|
||||||
|
transferService: transferService,
|
||||||
config: config,
|
config: config,
|
||||||
Loop: sync2.NewCycle(config.ChoreInterval),
|
Loop: sync2.NewCycle(config.ChoreInterval),
|
||||||
limiter: sync2.NewLimiter(config.NumWorkers),
|
limiter: sync2.NewLimiter(config.NumWorkers),
|
||||||
@ -54,40 +50,35 @@ func (chore *Chore) Run(ctx context.Context) (err error) {
|
|||||||
err = chore.Loop.Run(ctx, func(ctx context.Context) (err error) {
|
err = chore.Loop.Run(ctx, func(ctx context.Context) (err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
satellites, err := chore.satelliteDB.ListGracefulExits(ctx)
|
geSatellites, err := chore.service.ListPendingExits(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
chore.log.Error("error retrieving satellites.", zap.Error(err))
|
chore.log.Error("error retrieving satellites.", zap.Error(err))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(satellites) == 0 {
|
if len(geSatellites) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
chore.log.Debug("exiting", zap.Int("satellites", len(satellites)))
|
chore.log.Debug("exiting", zap.Int("satellites", len(geSatellites)))
|
||||||
|
|
||||||
for _, satellite := range satellites {
|
for _, satellite := range geSatellites {
|
||||||
mon.Meter("satellite_gracefulexit_request").Mark(1) //mon:locked
|
mon.Meter("satellite_gracefulexit_request").Mark(1) //mon:locked
|
||||||
|
satellite := satellite
|
||||||
if satellite.FinishedAt != nil {
|
if satellite.FinishedAt != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
nodeurl, err := chore.trust.GetNodeURL(ctx, satellite.SatelliteID)
|
worker := NewWorker(chore.log, chore.service, chore.transferService, chore.dialer, satellite.NodeURL, chore.config)
|
||||||
if err != nil {
|
if _, ok := chore.exitingMap.LoadOrStore(satellite.SatelliteID, worker); ok {
|
||||||
chore.log.Error("failed to get satellite address.", zap.Error(err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
worker := NewWorker(chore.log, chore.store, chore.trust, chore.satelliteDB, chore.dialer, nodeurl, chore.config)
|
|
||||||
if _, ok := chore.exitingMap.LoadOrStore(nodeurl.ID, worker); ok {
|
|
||||||
// already running a worker for this satellite
|
// already running a worker for this satellite
|
||||||
chore.log.Debug("skipping for satellite, worker already exists.", zap.Stringer("Satellite ID", nodeurl.ID))
|
chore.log.Debug("skipping for satellite, worker already exists.", zap.Stringer("Satellite ID", satellite.SatelliteID))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
chore.limiter.Go(ctx, func() {
|
chore.limiter.Go(ctx, func() {
|
||||||
err := worker.Run(ctx, func() {
|
err := worker.Run(ctx, func() {
|
||||||
chore.log.Debug("finished for satellite.", zap.Stringer("Satellite ID", nodeurl.ID))
|
chore.log.Debug("finished for satellite.", zap.Stringer("Satellite ID", satellite.SatelliteID))
|
||||||
chore.exitingMap.Delete(nodeurl.ID)
|
chore.exitingMap.Delete(satellite.SatelliteID)
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -68,7 +68,7 @@ func (e *Endpoint) GetNonExitingSatellites(ctx context.Context, req *pb.GetNonEx
|
|||||||
// get domain name
|
// get domain name
|
||||||
nodeurl, err := e.trust.GetNodeURL(ctx, trusted)
|
nodeurl, err := e.trust.GetNodeURL(ctx, trusted)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.log.Debug("graceful exit: get satellite domian name", zap.Stringer("Satellite ID", trusted), zap.Error(err))
|
e.log.Error("graceful exit: get satellite address", zap.Stringer("Satellite ID", trusted), zap.Error(err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// get space usage by satellites
|
// get space usage by satellites
|
||||||
|
188
storagenode/gracefulexit/service.go
Normal file
188
storagenode/gracefulexit/service.go
Normal file
@ -0,0 +1,188 @@
|
|||||||
|
// Copyright (C) 2020 Storj Labs, Inc.
|
||||||
|
// See LICENSE for copying information.
|
||||||
|
|
||||||
|
package gracefulexit
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/zeebo/errs"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"storj.io/common/errs2"
|
||||||
|
"storj.io/common/pb"
|
||||||
|
"storj.io/common/rpc"
|
||||||
|
"storj.io/common/storj"
|
||||||
|
"storj.io/storj/storagenode/pieces"
|
||||||
|
"storj.io/storj/storagenode/satellites"
|
||||||
|
"storj.io/storj/storagenode/trust"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Service acts as the gateway to the `satellites` db for graceful exit
|
||||||
|
// code (querying and updating that db as necessary).
|
||||||
|
//
|
||||||
|
// architecture: Service
|
||||||
|
type Service interface {
|
||||||
|
// ListPendingExits returns a slice with one record for every satellite
|
||||||
|
// from which this node is gracefully exiting. Each record includes the
|
||||||
|
// satellite's ID/address and information about the graceful exit status
|
||||||
|
// and progress.
|
||||||
|
ListPendingExits(ctx context.Context) ([]ExitingSatellite, error)
|
||||||
|
|
||||||
|
// DeletePiece deletes one piece stored for a satellite, and updates
|
||||||
|
// the deleted byte count for the corresponding graceful exit operation.
|
||||||
|
DeletePiece(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) error
|
||||||
|
|
||||||
|
// DeleteSatellitePieces deletes all pieces stored for a satellite, and updates
|
||||||
|
// the deleted byte count for the corresponding graceful exit operation.
|
||||||
|
DeleteSatellitePieces(ctx context.Context, satelliteID storj.NodeID) error
|
||||||
|
|
||||||
|
// ExitFailed updates the database when a graceful exit has failed.
|
||||||
|
ExitFailed(ctx context.Context, satelliteID storj.NodeID, reason pb.ExitFailed_Reason, exitFailedBytes []byte) error
|
||||||
|
|
||||||
|
// ExitCompleted updates the database when a graceful exit is completed. It also
|
||||||
|
// deletes all pieces and blobs for that satellite.
|
||||||
|
ExitCompleted(ctx context.Context, satelliteID storj.NodeID, completionReceipt []byte, wait func()) error
|
||||||
|
|
||||||
|
// ExitNotPossible deletes the entry for the corresponding graceful exit operation.
|
||||||
|
// This is intended to be called when a graceful exit operation was initiated but
|
||||||
|
// the satellite rejected it.
|
||||||
|
ExitNotPossible(ctx context.Context, satelliteID storj.NodeID) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// ensures that service implements Service.
|
||||||
|
var _ Service = (*service)(nil)
|
||||||
|
|
||||||
|
// service exposes methods to manage GE progress.
|
||||||
|
//
|
||||||
|
// architecture: Service
|
||||||
|
type service struct {
|
||||||
|
log *zap.Logger
|
||||||
|
store *pieces.Store
|
||||||
|
trust *trust.Pool
|
||||||
|
satelliteDB satellites.DB
|
||||||
|
|
||||||
|
nowFunc func() time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewService is a constructor for a GE service.
|
||||||
|
func NewService(log *zap.Logger, store *pieces.Store, trust *trust.Pool, satelliteDB satellites.DB, dialer rpc.Dialer, config Config) Service {
|
||||||
|
return &service{
|
||||||
|
log: log,
|
||||||
|
store: store,
|
||||||
|
trust: trust,
|
||||||
|
satelliteDB: satelliteDB,
|
||||||
|
nowFunc: func() time.Time { return time.Now().UTC() },
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ExitingSatellite encapsulates a node address with its graceful exit progress.
|
||||||
|
type ExitingSatellite struct {
|
||||||
|
satellites.ExitProgress
|
||||||
|
NodeURL storj.NodeURL
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *service) ListPendingExits(ctx context.Context) (_ []ExitingSatellite, err error) {
|
||||||
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
|
exitProgress, err := c.satelliteDB.ListGracefulExits(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
exitingSatellites := make([]ExitingSatellite, 0, len(exitProgress))
|
||||||
|
for _, sat := range exitProgress {
|
||||||
|
nodeURL, err := c.trust.GetNodeURL(ctx, sat.SatelliteID)
|
||||||
|
if err != nil {
|
||||||
|
c.log.Error("failed to get satellite address", zap.Stringer("satellite-id", sat.SatelliteID), zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
exitingSatellites = append(exitingSatellites, ExitingSatellite{ExitProgress: sat, NodeURL: nodeURL})
|
||||||
|
}
|
||||||
|
return exitingSatellites, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeletePiece deletes one piece stored for a satellite, and updates
|
||||||
|
// the deleted byte count for the corresponding graceful exit operation.
|
||||||
|
func (c *service) DeletePiece(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) (err error) {
|
||||||
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
|
piece, err := c.store.Reader(ctx, satelliteID, pieceID)
|
||||||
|
if err != nil {
|
||||||
|
return Error.Wrap(err)
|
||||||
|
}
|
||||||
|
err = c.store.Delete(ctx, satelliteID, pieceID)
|
||||||
|
if err != nil {
|
||||||
|
return Error.Wrap(err)
|
||||||
|
}
|
||||||
|
// update graceful exit progress
|
||||||
|
size := piece.Size()
|
||||||
|
return c.satelliteDB.UpdateGracefulExit(ctx, satelliteID, size)
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteSatellitePieces deletes all pieces stored for a satellite, and updates
|
||||||
|
// the deleted byte count for the corresponding graceful exit operation.
|
||||||
|
func (c *service) DeleteSatellitePieces(ctx context.Context, satelliteID storj.NodeID) (err error) {
|
||||||
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
|
var totalDeleted int64
|
||||||
|
logger := c.log.With(zap.Stringer("Satellite ID", satelliteID), zap.String("action", "delete all pieces"))
|
||||||
|
err = c.store.WalkSatellitePieces(ctx, satelliteID, func(piece pieces.StoredPieceAccess) error {
|
||||||
|
err := c.store.Delete(ctx, satelliteID, piece.PieceID())
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("failed to delete piece",
|
||||||
|
zap.Stringer("Piece ID", piece.PieceID()), zap.Error(err))
|
||||||
|
// but continue
|
||||||
|
}
|
||||||
|
_, size, err := piece.Size(ctx)
|
||||||
|
if err != nil {
|
||||||
|
logger.Warn("failed to get piece size",
|
||||||
|
zap.Stringer("Piece ID", piece.PieceID()), zap.Error(err))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
totalDeleted += size
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil && !errs2.IsCanceled(err) {
|
||||||
|
logger.Error("failed to delete all pieces", zap.Error(err))
|
||||||
|
}
|
||||||
|
// update graceful exit progress
|
||||||
|
return c.satelliteDB.UpdateGracefulExit(ctx, satelliteID, totalDeleted)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ExitFailed updates the database when a graceful exit has failed.
|
||||||
|
func (c *service) ExitFailed(ctx context.Context, satelliteID storj.NodeID, reason pb.ExitFailed_Reason, exitFailedBytes []byte) (err error) {
|
||||||
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
return c.satelliteDB.CompleteGracefulExit(ctx, satelliteID, c.nowFunc(), satellites.ExitFailed, exitFailedBytes)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ExitCompleted updates the database when a graceful exit is completed. It also
|
||||||
|
// deletes all pieces and blobs for that satellite.
|
||||||
|
func (c *service) ExitCompleted(ctx context.Context, satelliteID storj.NodeID, completionReceipt []byte, wait func()) (err error) {
|
||||||
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
|
err = c.satelliteDB.CompleteGracefulExit(ctx, satelliteID, c.nowFunc(), satellites.ExitSucceeded, completionReceipt)
|
||||||
|
if err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait for deletes to complete
|
||||||
|
wait()
|
||||||
|
|
||||||
|
// delete all remaining pieces
|
||||||
|
err = c.DeleteSatellitePieces(ctx, satelliteID)
|
||||||
|
if err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// delete everything left in blobs folder of specific satellites
|
||||||
|
return c.store.DeleteSatelliteBlobs(ctx, satelliteID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ExitNotPossible deletes the entry from satellite table and inform graceful exit
|
||||||
|
// has failed to start.
|
||||||
|
func (c *service) ExitNotPossible(ctx context.Context, satelliteID storj.NodeID) (err error) {
|
||||||
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
|
return c.satelliteDB.CancelGracefulExit(ctx, satelliteID)
|
||||||
|
}
|
@ -4,57 +4,42 @@
|
|||||||
package gracefulexit
|
package gracefulexit
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/zeebo/errs"
|
"github.com/zeebo/errs"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"storj.io/common/errs2"
|
"storj.io/common/errs2"
|
||||||
"storj.io/common/memory"
|
|
||||||
"storj.io/common/pb"
|
"storj.io/common/pb"
|
||||||
"storj.io/common/rpc"
|
"storj.io/common/rpc"
|
||||||
"storj.io/common/rpc/rpcstatus"
|
"storj.io/common/rpc/rpcstatus"
|
||||||
"storj.io/common/signing"
|
|
||||||
"storj.io/common/storj"
|
"storj.io/common/storj"
|
||||||
"storj.io/common/sync2"
|
"storj.io/common/sync2"
|
||||||
"storj.io/storj/storagenode/pieces"
|
"storj.io/storj/storagenode/piecetransfer"
|
||||||
"storj.io/storj/storagenode/piecestore"
|
|
||||||
"storj.io/storj/storagenode/satellites"
|
|
||||||
"storj.io/storj/storagenode/trust"
|
|
||||||
"storj.io/uplink/private/ecclient"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Worker is responsible for completing the graceful exit for a given satellite.
|
// Worker is responsible for completing the graceful exit for a given satellite.
|
||||||
type Worker struct {
|
type Worker struct {
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
store *pieces.Store
|
|
||||||
trust *trust.Pool
|
service Service
|
||||||
satelliteDB satellites.DB
|
transferService piecetransfer.Service
|
||||||
|
|
||||||
dialer rpc.Dialer
|
dialer rpc.Dialer
|
||||||
limiter *sync2.Limiter
|
limiter *sync2.Limiter
|
||||||
satelliteURL storj.NodeURL
|
satelliteURL storj.NodeURL
|
||||||
ecclient ecclient.Client
|
|
||||||
minBytesPerSecond memory.Size
|
|
||||||
minDownloadTimeout time.Duration
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewWorker instantiates Worker.
|
// NewWorker instantiates Worker.
|
||||||
func NewWorker(log *zap.Logger, store *pieces.Store, trust *trust.Pool, satelliteDB satellites.DB, dialer rpc.Dialer, satelliteURL storj.NodeURL, config Config) *Worker {
|
func NewWorker(log *zap.Logger, service Service, transferService piecetransfer.Service, dialer rpc.Dialer, satelliteURL storj.NodeURL, config Config) *Worker {
|
||||||
return &Worker{
|
return &Worker{
|
||||||
log: log,
|
log: log,
|
||||||
store: store,
|
service: service,
|
||||||
trust: trust,
|
transferService: transferService,
|
||||||
satelliteDB: satelliteDB,
|
|
||||||
dialer: dialer,
|
dialer: dialer,
|
||||||
limiter: sync2.NewLimiter(config.NumConcurrentTransfers),
|
limiter: sync2.NewLimiter(config.NumConcurrentTransfers),
|
||||||
satelliteURL: satelliteURL,
|
satelliteURL: satelliteURL,
|
||||||
ecclient: ecclient.NewClient(log, dialer, 0),
|
|
||||||
minBytesPerSecond: config.MinBytesPerSecond,
|
|
||||||
minDownloadTimeout: config.MinDownloadTimeout,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -89,7 +74,7 @@ func (worker *Worker) Run(ctx context.Context, done func()) (err error) {
|
|||||||
}
|
}
|
||||||
if errs2.IsRPC(err, rpcstatus.FailedPrecondition) {
|
if errs2.IsRPC(err, rpcstatus.FailedPrecondition) {
|
||||||
// delete the entry from satellite table and inform graceful exit has failed to start
|
// delete the entry from satellite table and inform graceful exit has failed to start
|
||||||
deleteErr := worker.satelliteDB.CancelGracefulExit(ctx, worker.satelliteURL.ID)
|
deleteErr := worker.service.ExitNotPossible(ctx, worker.satelliteURL.ID)
|
||||||
if deleteErr != nil {
|
if deleteErr != nil {
|
||||||
// TODO: what to do now?
|
// TODO: what to do now?
|
||||||
return errs.Combine(deleteErr, err)
|
return errs.Combine(deleteErr, err)
|
||||||
@ -108,9 +93,9 @@ func (worker *Worker) Run(ctx context.Context, done func()) (err error) {
|
|||||||
case *pb.SatelliteMessage_TransferPiece:
|
case *pb.SatelliteMessage_TransferPiece:
|
||||||
transferPieceMsg := msg.TransferPiece
|
transferPieceMsg := msg.TransferPiece
|
||||||
worker.limiter.Go(ctx, func() {
|
worker.limiter.Go(ctx, func() {
|
||||||
err = worker.transferPiece(ctx, transferPieceMsg, c)
|
resp := worker.transferService.TransferPiece(ctx, worker.satelliteURL.ID, transferPieceMsg)
|
||||||
if err != nil {
|
if err := c.Send(resp); err != nil {
|
||||||
worker.log.Error("failed to transfer piece.",
|
worker.log.Error("failed to send notification about piece transfer.",
|
||||||
zap.Stringer("Satellite ID", worker.satelliteURL.ID),
|
zap.Stringer("Satellite ID", worker.satelliteURL.ID),
|
||||||
zap.Error(errs.Wrap(err)))
|
zap.Error(errs.Wrap(err)))
|
||||||
}
|
}
|
||||||
@ -120,7 +105,7 @@ func (worker *Worker) Run(ctx context.Context, done func()) (err error) {
|
|||||||
deletePieceMsg := msg.DeletePiece
|
deletePieceMsg := msg.DeletePiece
|
||||||
worker.limiter.Go(ctx, func() {
|
worker.limiter.Go(ctx, func() {
|
||||||
pieceID := deletePieceMsg.OriginalPieceId
|
pieceID := deletePieceMsg.OriginalPieceId
|
||||||
err := worker.deleteOnePiece(ctx, pieceID)
|
err := worker.service.DeletePiece(ctx, worker.satelliteURL.ID, pieceID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
worker.log.Error("failed to delete piece.",
|
worker.log.Error("failed to delete piece.",
|
||||||
zap.Stringer("Satellite ID", worker.satelliteURL.ID),
|
zap.Stringer("Satellite ID", worker.satelliteURL.ID),
|
||||||
@ -138,8 +123,8 @@ func (worker *Worker) Run(ctx context.Context, done func()) (err error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
worker.log.Error("failed to marshal exit failed message.")
|
worker.log.Error("failed to marshal exit failed message.")
|
||||||
}
|
}
|
||||||
err = worker.satelliteDB.CompleteGracefulExit(ctx, worker.satelliteURL.ID, time.Now(), satellites.ExitFailed, exitFailedBytes)
|
|
||||||
return errs.Wrap(err)
|
return errs.Wrap(worker.service.ExitFailed(ctx, worker.satelliteURL.ID, msg.ExitFailed.Reason, exitFailedBytes))
|
||||||
|
|
||||||
case *pb.SatelliteMessage_ExitCompleted:
|
case *pb.SatelliteMessage_ExitCompleted:
|
||||||
worker.log.Info("graceful exit completed.", zap.Stringer("Satellite ID", worker.satelliteURL.ID))
|
worker.log.Info("graceful exit completed.", zap.Stringer("Satellite ID", worker.satelliteURL.ID))
|
||||||
@ -149,22 +134,7 @@ func (worker *Worker) Run(ctx context.Context, done func()) (err error) {
|
|||||||
worker.log.Error("failed to marshal exit completed message.")
|
worker.log.Error("failed to marshal exit completed message.")
|
||||||
}
|
}
|
||||||
|
|
||||||
err = worker.satelliteDB.CompleteGracefulExit(ctx, worker.satelliteURL.ID, time.Now(), satellites.ExitSucceeded, exitCompletedBytes)
|
return errs.Wrap(worker.service.ExitCompleted(ctx, worker.satelliteURL.ID, exitCompletedBytes, worker.limiter.Wait))
|
||||||
if err != nil {
|
|
||||||
return errs.Wrap(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// wait for deletes to complete
|
|
||||||
worker.limiter.Wait()
|
|
||||||
|
|
||||||
// delete all remaining pieces
|
|
||||||
err = worker.deleteAllPieces(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return errs.Wrap(err)
|
|
||||||
}
|
|
||||||
// delete everything left in blobs folder of specific satellites
|
|
||||||
err = worker.store.DeleteSatelliteBlobs(ctx, worker.satelliteURL.ID)
|
|
||||||
return errs.Wrap(err)
|
|
||||||
default:
|
default:
|
||||||
// TODO handle err
|
// TODO handle err
|
||||||
worker.log.Error("unknown graceful exit message.", zap.Stringer("Satellite ID", worker.satelliteURL.ID))
|
worker.log.Error("unknown graceful exit message.", zap.Stringer("Satellite ID", worker.satelliteURL.ID))
|
||||||
@ -172,236 +142,6 @@ func (worker *Worker) Run(ctx context.Context, done func()) (err error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type gracefulExitStream interface {
|
|
||||||
Context() context.Context
|
|
||||||
Send(*pb.StorageNodeMessage) error
|
|
||||||
Recv() (*pb.SatelliteMessage, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (worker *Worker) transferPiece(ctx context.Context, transferPiece *pb.TransferPiece, c gracefulExitStream) error {
|
|
||||||
pieceID := transferPiece.OriginalPieceId
|
|
||||||
reader, err := worker.store.Reader(ctx, worker.satelliteURL.ID, pieceID)
|
|
||||||
if err != nil {
|
|
||||||
transferErr := pb.TransferFailed_UNKNOWN
|
|
||||||
if errs.Is(err, os.ErrNotExist) {
|
|
||||||
transferErr = pb.TransferFailed_NOT_FOUND
|
|
||||||
}
|
|
||||||
worker.log.Error("failed to get piece reader.",
|
|
||||||
zap.Stringer("Satellite ID", worker.satelliteURL.ID),
|
|
||||||
zap.Stringer("Piece ID", pieceID),
|
|
||||||
zap.Error(errs.Wrap(err)))
|
|
||||||
worker.handleFailure(ctx, transferErr, pieceID, c.Send)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
addrLimit := transferPiece.GetAddressedOrderLimit()
|
|
||||||
pk := transferPiece.PrivateKey
|
|
||||||
|
|
||||||
originalHash, originalOrderLimit, err := worker.store.GetHashAndLimit(ctx, worker.satelliteURL.ID, pieceID, reader)
|
|
||||||
if err != nil {
|
|
||||||
worker.log.Error("failed to get piece hash and order limit.",
|
|
||||||
zap.Stringer("Satellite ID", worker.satelliteURL.ID),
|
|
||||||
zap.Stringer("Piece ID", pieceID),
|
|
||||||
zap.Error(errs.Wrap(err)))
|
|
||||||
worker.handleFailure(ctx, pb.TransferFailed_UNKNOWN, pieceID, c.Send)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
satelliteSigner, err := worker.trust.GetSignee(ctx, worker.satelliteURL.ID)
|
|
||||||
if err != nil {
|
|
||||||
worker.log.Error("failed to get satellite signer identity from trust store!",
|
|
||||||
zap.Stringer("Satellite ID", worker.satelliteURL.ID),
|
|
||||||
zap.Error(errs.Wrap(err)))
|
|
||||||
worker.handleFailure(ctx, pb.TransferFailed_UNKNOWN, pieceID, c.Send)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// verify the satellite signature on the original order limit; if we hand in something
|
|
||||||
// with an invalid signature, the satellite will assume we're cheating and disqualify
|
|
||||||
// immediately.
|
|
||||||
err = signing.VerifyOrderLimitSignature(ctx, satelliteSigner, &originalOrderLimit)
|
|
||||||
if err != nil {
|
|
||||||
worker.log.Error("The order limit stored for this piece does not have a valid signature from the owning satellite! It was verified before storing, so something went wrong in storage. We have to report this to the satellite as a missing piece.",
|
|
||||||
zap.Stringer("Satellite ID", worker.satelliteURL.ID),
|
|
||||||
zap.Stringer("Piece ID", pieceID),
|
|
||||||
zap.Error(errs.Wrap(err)))
|
|
||||||
worker.handleFailure(ctx, pb.TransferFailed_NOT_FOUND, pieceID, c.Send)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// verify that the public key on the order limit signed the original piece hash; if we
|
|
||||||
// hand in something with an invalid signature, the satellite will assume we're cheating
|
|
||||||
// and disqualify immediately.
|
|
||||||
err = signing.VerifyUplinkPieceHashSignature(ctx, originalOrderLimit.UplinkPublicKey, &originalHash)
|
|
||||||
if err != nil {
|
|
||||||
worker.log.Error("The piece hash stored for this piece does not have a valid signature from the public key stored in the order limit! It was verified before storing, so something went wrong in storage. We have to report this to the satellite as a missing piece.",
|
|
||||||
zap.Stringer("Satellite ID", worker.satelliteURL.ID),
|
|
||||||
zap.Stringer("Piece ID", pieceID),
|
|
||||||
zap.Error(errs.Wrap(err)))
|
|
||||||
worker.handleFailure(ctx, pb.TransferFailed_NOT_FOUND, pieceID, c.Send)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if worker.minBytesPerSecond == 0 {
|
|
||||||
// set minBytesPerSecond to default 5KiB if set to 0
|
|
||||||
worker.minBytesPerSecond = 5 * memory.KiB
|
|
||||||
}
|
|
||||||
maxTransferTime := time.Duration(int64(time.Second) * originalHash.PieceSize / worker.minBytesPerSecond.Int64())
|
|
||||||
if maxTransferTime < worker.minDownloadTimeout {
|
|
||||||
maxTransferTime = worker.minDownloadTimeout
|
|
||||||
}
|
|
||||||
putCtx, cancel := context.WithTimeout(ctx, maxTransferTime)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
pieceHash, peerID, err := worker.ecclient.PutPiece(putCtx, ctx, addrLimit, pk, reader)
|
|
||||||
if err != nil {
|
|
||||||
if piecestore.ErrVerifyUntrusted.Has(err) {
|
|
||||||
worker.log.Error("failed hash verification.",
|
|
||||||
zap.Stringer("Satellite ID", worker.satelliteURL.ID),
|
|
||||||
zap.Stringer("Piece ID", pieceID),
|
|
||||||
zap.Error(errs.Wrap(err)))
|
|
||||||
worker.handleFailure(ctx, pb.TransferFailed_HASH_VERIFICATION, pieceID, c.Send)
|
|
||||||
} else {
|
|
||||||
worker.log.Error("failed to put piece.",
|
|
||||||
zap.Stringer("Satellite ID", worker.satelliteURL.ID),
|
|
||||||
zap.Stringer("Piece ID", pieceID),
|
|
||||||
zap.Error(errs.Wrap(err)))
|
|
||||||
// TODO look at error type to decide on the transfer error
|
|
||||||
worker.handleFailure(ctx, pb.TransferFailed_STORAGE_NODE_UNAVAILABLE, pieceID, c.Send)
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if !bytes.Equal(originalHash.Hash, pieceHash.Hash) {
|
|
||||||
worker.log.Error("piece hash from new storagenode does not match",
|
|
||||||
zap.Stringer("Storagenode ID", addrLimit.Limit.StorageNodeId),
|
|
||||||
zap.Stringer("Satellite ID", worker.satelliteURL.ID),
|
|
||||||
zap.Stringer("Piece ID", pieceID))
|
|
||||||
worker.handleFailure(ctx, pb.TransferFailed_HASH_VERIFICATION, pieceID, c.Send)
|
|
||||||
return Error.New("piece hash from new storagenode does not match")
|
|
||||||
}
|
|
||||||
if pieceHash.PieceId != addrLimit.Limit.PieceId {
|
|
||||||
worker.log.Error("piece id from new storagenode does not match order limit",
|
|
||||||
zap.Stringer("Storagenode ID", addrLimit.Limit.StorageNodeId),
|
|
||||||
zap.Stringer("Satellite ID", worker.satelliteURL.ID),
|
|
||||||
zap.Stringer("Piece ID", pieceID))
|
|
||||||
worker.handleFailure(ctx, pb.TransferFailed_HASH_VERIFICATION, pieceID, c.Send)
|
|
||||||
return Error.New("piece id from new storagenode does not match order limit")
|
|
||||||
}
|
|
||||||
|
|
||||||
signee := signing.SigneeFromPeerIdentity(peerID)
|
|
||||||
err = signing.VerifyPieceHashSignature(ctx, signee, pieceHash)
|
|
||||||
if err != nil {
|
|
||||||
worker.log.Error("invalid piece hash signature from new storagenode",
|
|
||||||
zap.Stringer("Storagenode ID", addrLimit.Limit.StorageNodeId),
|
|
||||||
zap.Stringer("Satellite ID", worker.satelliteURL.ID),
|
|
||||||
zap.Stringer("Piece ID", pieceID),
|
|
||||||
zap.Error(errs.Wrap(err)))
|
|
||||||
worker.handleFailure(ctx, pb.TransferFailed_HASH_VERIFICATION, pieceID, c.Send)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
success := &pb.StorageNodeMessage{
|
|
||||||
Message: &pb.StorageNodeMessage_Succeeded{
|
|
||||||
Succeeded: &pb.TransferSucceeded{
|
|
||||||
OriginalPieceId: transferPiece.OriginalPieceId,
|
|
||||||
OriginalPieceHash: &originalHash,
|
|
||||||
OriginalOrderLimit: &originalOrderLimit,
|
|
||||||
ReplacementPieceHash: pieceHash,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
worker.log.Info("piece transferred to new storagenode",
|
|
||||||
zap.Stringer("Storagenode ID", addrLimit.Limit.StorageNodeId),
|
|
||||||
zap.Stringer("Satellite ID", worker.satelliteURL.ID),
|
|
||||||
zap.Stringer("Piece ID", pieceID))
|
|
||||||
return c.Send(success)
|
|
||||||
}
|
|
||||||
|
|
||||||
// deleteOnePiece deletes one piece stored for a satellite.
|
|
||||||
func (worker *Worker) deleteOnePiece(ctx context.Context, pieceID storj.PieceID) error {
|
|
||||||
piece, err := worker.store.Reader(ctx, worker.satelliteURL.ID, pieceID)
|
|
||||||
if err != nil {
|
|
||||||
if !errs2.IsCanceled(err) {
|
|
||||||
worker.log.Debug("failed to retrieve piece info", zap.Stringer("Satellite ID", worker.satelliteURL.ID),
|
|
||||||
zap.Stringer("Piece ID", pieceID), zap.Error(err))
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
err = worker.deletePiece(ctx, pieceID)
|
|
||||||
if err != nil {
|
|
||||||
worker.log.Debug("failed to retrieve piece info", zap.Stringer("Satellite ID", worker.satelliteURL.ID), zap.Error(err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// update graceful exit progress
|
|
||||||
size := piece.Size()
|
|
||||||
return worker.satelliteDB.UpdateGracefulExit(ctx, worker.satelliteURL.ID, size)
|
|
||||||
}
|
|
||||||
|
|
||||||
// deletePiece deletes one piece stored for a satellite, without updating satellite Graceful Exit status.
|
|
||||||
func (worker *Worker) deletePiece(ctx context.Context, pieceID storj.PieceID) error {
|
|
||||||
err := worker.store.Delete(ctx, worker.satelliteURL.ID, pieceID)
|
|
||||||
if err != nil {
|
|
||||||
worker.log.Debug("failed to delete a piece",
|
|
||||||
zap.Stringer("Satellite ID", worker.satelliteURL.ID),
|
|
||||||
zap.Stringer("Piece ID", pieceID),
|
|
||||||
zap.Error(err))
|
|
||||||
delErr := worker.store.DeleteFailed(ctx, pieces.ExpiredInfo{
|
|
||||||
SatelliteID: worker.satelliteURL.ID,
|
|
||||||
PieceID: pieceID,
|
|
||||||
InPieceInfo: true,
|
|
||||||
}, time.Now().UTC())
|
|
||||||
if delErr != nil {
|
|
||||||
worker.log.Debug("failed to mark a deletion failure for a piece",
|
|
||||||
zap.Stringer("Satellite ID", worker.satelliteURL.ID),
|
|
||||||
zap.Stringer("Piece ID", pieceID), zap.Error(err))
|
|
||||||
}
|
|
||||||
return errs.Combine(err, delErr)
|
|
||||||
}
|
|
||||||
worker.log.Debug("delete piece",
|
|
||||||
zap.Stringer("Satellite ID", worker.satelliteURL.ID),
|
|
||||||
zap.Stringer("Piece ID", pieceID))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// deleteAllPieces deletes pieces stored for a satellite.
|
|
||||||
func (worker *Worker) deleteAllPieces(ctx context.Context) error {
|
|
||||||
var totalDeleted int64
|
|
||||||
err := worker.store.WalkSatellitePieces(ctx, worker.satelliteURL.ID, func(piece pieces.StoredPieceAccess) error {
|
|
||||||
err := worker.deletePiece(ctx, piece.PieceID())
|
|
||||||
if err == nil {
|
|
||||||
_, size, err := piece.Size(ctx)
|
|
||||||
if err != nil {
|
|
||||||
worker.log.Debug("failed to retrieve piece info", zap.Stringer("Satellite ID", worker.satelliteURL.ID),
|
|
||||||
zap.Stringer("Piece ID", piece.PieceID()), zap.Error(err))
|
|
||||||
}
|
|
||||||
totalDeleted += size
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
})
|
|
||||||
if err != nil && !errs2.IsCanceled(err) {
|
|
||||||
worker.log.Debug("failed to retrieve piece info", zap.Stringer("Satellite ID", worker.satelliteURL.ID), zap.Error(err))
|
|
||||||
}
|
|
||||||
// update graceful exit progress
|
|
||||||
return worker.satelliteDB.UpdateGracefulExit(ctx, worker.satelliteURL.ID, totalDeleted)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (worker *Worker) handleFailure(ctx context.Context, transferError pb.TransferFailed_Error, pieceID pb.PieceID, send func(*pb.StorageNodeMessage) error) {
|
|
||||||
failure := &pb.StorageNodeMessage{
|
|
||||||
Message: &pb.StorageNodeMessage_Failed{
|
|
||||||
Failed: &pb.TransferFailed{
|
|
||||||
OriginalPieceId: pieceID,
|
|
||||||
Error: transferError,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
sendErr := send(failure)
|
|
||||||
if sendErr != nil {
|
|
||||||
worker.log.Error("unable to send failure.", zap.Stringer("Satellite ID", worker.satelliteURL.ID))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close halts the worker.
|
// Close halts the worker.
|
||||||
func (worker *Worker) Close() error {
|
func (worker *Worker) Close() error {
|
||||||
worker.limiter.Wait()
|
worker.limiter.Wait()
|
||||||
|
@ -22,7 +22,6 @@ import (
|
|||||||
"storj.io/storj/satellite/overlay"
|
"storj.io/storj/satellite/overlay"
|
||||||
"storj.io/storj/storagenode"
|
"storj.io/storj/storagenode"
|
||||||
"storj.io/storj/storagenode/gracefulexit"
|
"storj.io/storj/storagenode/gracefulexit"
|
||||||
"storj.io/storj/storagenode/pieces"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestWorkerSuccess(t *testing.T) {
|
func TestWorkerSuccess(t *testing.T) {
|
||||||
@ -33,6 +32,12 @@ func TestWorkerSuccess(t *testing.T) {
|
|||||||
UplinkCount: 1,
|
UplinkCount: 1,
|
||||||
Reconfigure: testplanet.Reconfigure{
|
Reconfigure: testplanet.Reconfigure{
|
||||||
Satellite: testplanet.ReconfigureRS(2, 3, successThreshold, successThreshold),
|
Satellite: testplanet.ReconfigureRS(2, 3, successThreshold, successThreshold),
|
||||||
|
StorageNode: func(index int, config *storagenode.Config) {
|
||||||
|
config.GracefulExit.NumWorkers = 2
|
||||||
|
config.GracefulExit.NumConcurrentTransfers = 2
|
||||||
|
config.GracefulExit.MinBytesPerSecond = 128
|
||||||
|
config.GracefulExit.MinDownloadTimeout = 2 * time.Minute
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||||
satellite := planet.Satellites[0]
|
satellite := planet.Satellites[0]
|
||||||
@ -69,14 +74,7 @@ func TestWorkerSuccess(t *testing.T) {
|
|||||||
require.Len(t, queueItems, 1)
|
require.Len(t, queueItems, 1)
|
||||||
|
|
||||||
// run the SN chore again to start processing transfers.
|
// run the SN chore again to start processing transfers.
|
||||||
worker := gracefulexit.NewWorker(zaptest.NewLogger(t), exitingNode.Storage2.Store, exitingNode.Peer.Storage2.Trust, exitingNode.DB.Satellites(), exitingNode.Dialer, satellite.NodeURL(),
|
worker := gracefulexit.NewWorker(zaptest.NewLogger(t), exitingNode.GracefulExit.Service, exitingNode.PieceTransfer.Service, exitingNode.Dialer, satellite.NodeURL(), exitingNode.Config.GracefulExit)
|
||||||
gracefulexit.Config{
|
|
||||||
ChoreInterval: 0,
|
|
||||||
NumWorkers: 2,
|
|
||||||
NumConcurrentTransfers: 2,
|
|
||||||
MinBytesPerSecond: 128,
|
|
||||||
MinDownloadTimeout: 2 * time.Minute,
|
|
||||||
})
|
|
||||||
defer ctx.Check(worker.Close)
|
defer ctx.Check(worker.Close)
|
||||||
|
|
||||||
err = worker.Run(ctx, func() {})
|
err = worker.Run(ctx, func() {})
|
||||||
@ -105,6 +103,14 @@ func TestWorkerTimeout(t *testing.T) {
|
|||||||
return testblobs.NewSlowDB(log.Named("slowdb"), db), nil
|
return testblobs.NewSlowDB(log.Named("slowdb"), db), nil
|
||||||
},
|
},
|
||||||
Satellite: testplanet.ReconfigureRS(2, 3, successThreshold, successThreshold),
|
Satellite: testplanet.ReconfigureRS(2, 3, successThreshold, successThreshold),
|
||||||
|
StorageNode: func(index int, config *storagenode.Config) {
|
||||||
|
config.GracefulExit.NumWorkers = 2
|
||||||
|
config.GracefulExit.NumConcurrentTransfers = 2
|
||||||
|
// This config value will create a very short timeframe allowed for receiving
|
||||||
|
// data from storage nodes. This will cause context to cancel with timeout.
|
||||||
|
config.GracefulExit.MinBytesPerSecond = 10 * memory.MiB
|
||||||
|
config.GracefulExit.MinDownloadTimeout = 2 * time.Millisecond
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||||
satellite := planet.Satellites[0]
|
satellite := planet.Satellites[0]
|
||||||
@ -144,19 +150,9 @@ func TestWorkerTimeout(t *testing.T) {
|
|||||||
// make uploads on storage node slower than the timeout for transferring bytes to another node
|
// make uploads on storage node slower than the timeout for transferring bytes to another node
|
||||||
delay := 200 * time.Millisecond
|
delay := 200 * time.Millisecond
|
||||||
storageNodeDB.SetLatency(delay)
|
storageNodeDB.SetLatency(delay)
|
||||||
store := pieces.NewStore(zaptest.NewLogger(t), storageNodeDB.Pieces(), nil, nil, storageNodeDB.PieceSpaceUsedDB(), pieces.DefaultConfig)
|
|
||||||
|
|
||||||
// run the SN chore again to start processing transfers.
|
// run the SN chore again to start processing transfers.
|
||||||
worker := gracefulexit.NewWorker(zaptest.NewLogger(t), store, exitingNode.Peer.Storage2.Trust, exitingNode.DB.Satellites(), exitingNode.Dialer, satellite.NodeURL(),
|
worker := gracefulexit.NewWorker(zaptest.NewLogger(t), exitingNode.GracefulExit.Service, exitingNode.PieceTransfer.Service, exitingNode.Dialer, satellite.NodeURL(), exitingNode.Config.GracefulExit)
|
||||||
gracefulexit.Config{
|
|
||||||
ChoreInterval: 0,
|
|
||||||
NumWorkers: 2,
|
|
||||||
NumConcurrentTransfers: 2,
|
|
||||||
// This config value will create a very short timeframe allowed for receiving
|
|
||||||
// data from storage nodes. This will cause context to cancel with timeout.
|
|
||||||
MinBytesPerSecond: 10 * memory.MiB,
|
|
||||||
MinDownloadTimeout: 2 * time.Millisecond,
|
|
||||||
})
|
|
||||||
defer ctx.Check(worker.Close)
|
defer ctx.Check(worker.Close)
|
||||||
|
|
||||||
err = worker.Run(ctx, func() {})
|
err = worker.Run(ctx, func() {})
|
||||||
@ -190,6 +186,12 @@ func TestWorkerFailure_IneligibleNodeAge(t *testing.T) {
|
|||||||
config.Metainfo.RS.SuccessThreshold = successThreshold
|
config.Metainfo.RS.SuccessThreshold = successThreshold
|
||||||
config.Metainfo.RS.TotalThreshold = successThreshold
|
config.Metainfo.RS.TotalThreshold = successThreshold
|
||||||
},
|
},
|
||||||
|
StorageNode: func(index int, config *storagenode.Config) {
|
||||||
|
config.GracefulExit.NumWorkers = 2
|
||||||
|
config.GracefulExit.NumConcurrentTransfers = 2
|
||||||
|
config.GracefulExit.MinBytesPerSecond = 128
|
||||||
|
config.GracefulExit.MinDownloadTimeout = 2 * time.Minute
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||||
satellite := planet.Satellites[0]
|
satellite := planet.Satellites[0]
|
||||||
@ -209,14 +211,7 @@ func TestWorkerFailure_IneligibleNodeAge(t *testing.T) {
|
|||||||
err = exitingNode.DB.Satellites().InitiateGracefulExit(ctx, satellite.ID(), time.Now(), piecesContentSize)
|
err = exitingNode.DB.Satellites().InitiateGracefulExit(ctx, satellite.ID(), time.Now(), piecesContentSize)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
worker := gracefulexit.NewWorker(zaptest.NewLogger(t), exitingNode.Storage2.Store, exitingNode.Peer.Storage2.Trust, exitingNode.DB.Satellites(), exitingNode.Dialer, satellite.NodeURL(),
|
worker := gracefulexit.NewWorker(zaptest.NewLogger(t), exitingNode.GracefulExit.Service, exitingNode.PieceTransfer.Service, exitingNode.Dialer, satellite.NodeURL(), exitingNode.Config.GracefulExit)
|
||||||
gracefulexit.Config{
|
|
||||||
ChoreInterval: 0,
|
|
||||||
NumWorkers: 2,
|
|
||||||
NumConcurrentTransfers: 2,
|
|
||||||
MinBytesPerSecond: 128,
|
|
||||||
MinDownloadTimeout: 2 * time.Minute,
|
|
||||||
})
|
|
||||||
defer ctx.Check(worker.Close)
|
defer ctx.Check(worker.Close)
|
||||||
|
|
||||||
err = worker.Run(ctx, func() {})
|
err = worker.Run(ctx, func() {})
|
||||||
|
@ -48,6 +48,7 @@ import (
|
|||||||
"storj.io/storj/storagenode/pieces"
|
"storj.io/storj/storagenode/pieces"
|
||||||
"storj.io/storj/storagenode/piecestore"
|
"storj.io/storj/storagenode/piecestore"
|
||||||
"storj.io/storj/storagenode/piecestore/usedserials"
|
"storj.io/storj/storagenode/piecestore/usedserials"
|
||||||
|
"storj.io/storj/storagenode/piecetransfer"
|
||||||
"storj.io/storj/storagenode/preflight"
|
"storj.io/storj/storagenode/preflight"
|
||||||
"storj.io/storj/storagenode/pricing"
|
"storj.io/storj/storagenode/pricing"
|
||||||
"storj.io/storj/storagenode/reputation"
|
"storj.io/storj/storagenode/reputation"
|
||||||
@ -254,7 +255,12 @@ type Peer struct {
|
|||||||
Endpoint *consoleserver.Server
|
Endpoint *consoleserver.Server
|
||||||
}
|
}
|
||||||
|
|
||||||
|
PieceTransfer struct {
|
||||||
|
Service piecetransfer.Service
|
||||||
|
}
|
||||||
|
|
||||||
GracefulExit struct {
|
GracefulExit struct {
|
||||||
|
Service gracefulexit.Service
|
||||||
Endpoint *gracefulexit.Endpoint
|
Endpoint *gracefulexit.Endpoint
|
||||||
Chore *gracefulexit.Chore
|
Chore *gracefulexit.Chore
|
||||||
BlobsCleaner *gracefulexit.BlobsCleaner
|
BlobsCleaner *gracefulexit.BlobsCleaner
|
||||||
@ -669,7 +675,28 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{ // setup piecetransfer service
|
||||||
|
peer.PieceTransfer.Service = piecetransfer.NewService(
|
||||||
|
peer.Log.Named("piecetransfer"),
|
||||||
|
peer.Storage2.Store,
|
||||||
|
peer.Storage2.Trust,
|
||||||
|
peer.Dialer,
|
||||||
|
// using GracefulExit config here for historical reasons
|
||||||
|
config.GracefulExit.MinDownloadTimeout,
|
||||||
|
config.GracefulExit.MinBytesPerSecond,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
{ // setup graceful exit service
|
{ // setup graceful exit service
|
||||||
|
peer.GracefulExit.Service = gracefulexit.NewService(
|
||||||
|
peer.Log.Named("gracefulexit:service"),
|
||||||
|
peer.Storage2.Store,
|
||||||
|
peer.Storage2.Trust,
|
||||||
|
peer.DB.Satellites(),
|
||||||
|
peer.Dialer,
|
||||||
|
config.GracefulExit,
|
||||||
|
)
|
||||||
|
|
||||||
peer.GracefulExit.Endpoint = gracefulexit.NewEndpoint(
|
peer.GracefulExit.Endpoint = gracefulexit.NewEndpoint(
|
||||||
peer.Log.Named("gracefulexit:endpoint"),
|
peer.Log.Named("gracefulexit:endpoint"),
|
||||||
peer.Storage2.Trust,
|
peer.Storage2.Trust,
|
||||||
@ -683,14 +710,13 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
|
|||||||
|
|
||||||
peer.GracefulExit.Chore = gracefulexit.NewChore(
|
peer.GracefulExit.Chore = gracefulexit.NewChore(
|
||||||
peer.Log.Named("gracefulexit:chore"),
|
peer.Log.Named("gracefulexit:chore"),
|
||||||
config.GracefulExit,
|
peer.GracefulExit.Service,
|
||||||
peer.Storage2.Store,
|
peer.PieceTransfer.Service,
|
||||||
peer.Storage2.Trust,
|
|
||||||
peer.Dialer,
|
peer.Dialer,
|
||||||
peer.DB.Satellites(),
|
config.GracefulExit,
|
||||||
)
|
)
|
||||||
peer.GracefulExit.BlobsCleaner = gracefulexit.NewBlobsCleaner(
|
peer.GracefulExit.BlobsCleaner = gracefulexit.NewBlobsCleaner(
|
||||||
peer.Log.Named("gracefuexit:blobscleaner"),
|
peer.Log.Named("gracefulexit:blobscleaner"),
|
||||||
peer.Storage2.Store,
|
peer.Storage2.Store,
|
||||||
peer.Storage2.Trust,
|
peer.Storage2.Trust,
|
||||||
peer.DB.Satellites(),
|
peer.DB.Satellites(),
|
||||||
|
10
storagenode/piecetransfer/doc.go
Normal file
10
storagenode/piecetransfer/doc.go
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
// Copyright (C) 2020 Storj Labs, Inc.
|
||||||
|
// See LICENSE for copying information.
|
||||||
|
|
||||||
|
/*
|
||||||
|
Package piecetransfer contains code meant to deal with transferring pieces
|
||||||
|
from one node to another. This does not happen under typical circumstances,
|
||||||
|
but may happen when a node wants to become unavailable in a "clean" way.
|
||||||
|
(Graceful exit, planned downtime)
|
||||||
|
*/
|
||||||
|
package piecetransfer
|
185
storagenode/piecetransfer/service.go
Normal file
185
storagenode/piecetransfer/service.go
Normal file
@ -0,0 +1,185 @@
|
|||||||
|
// Copyright (C) 2020 Storj Labs, Inc.
|
||||||
|
// See LICENSE for copying information.
|
||||||
|
|
||||||
|
package piecetransfer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/spacemonkeygo/monkit/v3"
|
||||||
|
"github.com/zeebo/errs"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"storj.io/common/memory"
|
||||||
|
"storj.io/common/pb"
|
||||||
|
"storj.io/common/rpc"
|
||||||
|
"storj.io/common/signing"
|
||||||
|
"storj.io/common/storj"
|
||||||
|
"storj.io/storj/storagenode/pieces"
|
||||||
|
"storj.io/storj/storagenode/piecestore"
|
||||||
|
"storj.io/storj/storagenode/trust"
|
||||||
|
"storj.io/uplink/private/ecclient"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// Error is the default error class for graceful exit package.
|
||||||
|
Error = errs.Class("internode transfer")
|
||||||
|
|
||||||
|
mon = monkit.Package()
|
||||||
|
)
|
||||||
|
|
||||||
|
// Service allows for transfer of pieces from one storage node to
|
||||||
|
// another, as directed by the satellite that owns the piece.
|
||||||
|
type Service interface {
|
||||||
|
// TransferPiece validates a transfer order, validates the locally stored
|
||||||
|
// piece, and then (if appropriate) transfers the piece to the specified
|
||||||
|
// destination node, obtaining a signed receipt. TransferPiece returns a
|
||||||
|
// message appropriate for responding to the transfer order (whether the
|
||||||
|
// transfer succeeded or failed).
|
||||||
|
TransferPiece(ctx context.Context, satelliteID storj.NodeID, transferPiece *pb.TransferPiece) *pb.StorageNodeMessage
|
||||||
|
}
|
||||||
|
|
||||||
|
type service struct {
|
||||||
|
log *zap.Logger
|
||||||
|
store *pieces.Store
|
||||||
|
trust *trust.Pool
|
||||||
|
ecClient ecclient.Client
|
||||||
|
|
||||||
|
minDownloadTimeout time.Duration
|
||||||
|
minBytesPerSecond memory.Size
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewService is a constructor for Service.
|
||||||
|
func NewService(log *zap.Logger, store *pieces.Store, trust *trust.Pool, dialer rpc.Dialer, minDownloadTimeout time.Duration, minBytesPerSecond memory.Size) Service {
|
||||||
|
ecClient := ecclient.NewClient(log, dialer, 0)
|
||||||
|
return &service{
|
||||||
|
log: log,
|
||||||
|
store: store,
|
||||||
|
trust: trust,
|
||||||
|
ecClient: ecClient,
|
||||||
|
minDownloadTimeout: minDownloadTimeout,
|
||||||
|
minBytesPerSecond: minBytesPerSecond,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TransferPiece validates a transfer order, validates the locally stored
|
||||||
|
// piece, and then (if appropriate) transfers the piece to the specified
|
||||||
|
// destination node, obtaining a signed receipt. TransferPiece returns a
|
||||||
|
// message appropriate for responding to the transfer order (whether the
|
||||||
|
// transfer succeeded or failed).
|
||||||
|
func (c *service) TransferPiece(ctx context.Context, satelliteID storj.NodeID, transferPiece *pb.TransferPiece) *pb.StorageNodeMessage {
|
||||||
|
// errForMonkit doesn't get returned, but we'd still like for monkit to be able
|
||||||
|
// to differentiate between counts of failures returned and successes returned.
|
||||||
|
var errForMonkit error
|
||||||
|
defer mon.Task()(&ctx)(&errForMonkit)
|
||||||
|
|
||||||
|
pieceID := transferPiece.OriginalPieceId
|
||||||
|
logger := c.log.With(zap.Stringer("Satellite ID", satelliteID), zap.Stringer("Piece ID", pieceID))
|
||||||
|
|
||||||
|
failMessage := func(errString string, err error, transferErr pb.TransferFailed_Error) *pb.StorageNodeMessage {
|
||||||
|
logger.Error(errString, zap.Error(err))
|
||||||
|
errForMonkit = err
|
||||||
|
return &pb.StorageNodeMessage{
|
||||||
|
Message: &pb.StorageNodeMessage_Failed{
|
||||||
|
Failed: &pb.TransferFailed{
|
||||||
|
OriginalPieceId: pieceID,
|
||||||
|
Error: transferErr,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
reader, err := c.store.Reader(ctx, satelliteID, pieceID)
|
||||||
|
if err != nil {
|
||||||
|
transferErr := pb.TransferFailed_UNKNOWN
|
||||||
|
if errs.Is(err, os.ErrNotExist) {
|
||||||
|
transferErr = pb.TransferFailed_NOT_FOUND
|
||||||
|
}
|
||||||
|
return failMessage("failed to get piece reader", err, transferErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
addrLimit := transferPiece.GetAddressedOrderLimit()
|
||||||
|
pk := transferPiece.PrivateKey
|
||||||
|
|
||||||
|
originalHash, originalOrderLimit, err := c.store.GetHashAndLimit(ctx, satelliteID, pieceID, reader)
|
||||||
|
if err != nil {
|
||||||
|
return failMessage("failed to get piece hash and order limit.", err, pb.TransferFailed_UNKNOWN)
|
||||||
|
}
|
||||||
|
|
||||||
|
satelliteSigner, err := c.trust.GetSignee(ctx, satelliteID)
|
||||||
|
if err != nil {
|
||||||
|
return failMessage("failed to get satellite signer identity from trust store!", err, pb.TransferFailed_UNKNOWN)
|
||||||
|
}
|
||||||
|
|
||||||
|
// verify the satellite signature on the original order limit; if we hand in something
|
||||||
|
// with an invalid signature, the satellite will assume we're cheating and disqualify
|
||||||
|
// immediately.
|
||||||
|
err = signing.VerifyOrderLimitSignature(ctx, satelliteSigner, &originalOrderLimit)
|
||||||
|
if err != nil {
|
||||||
|
msg := "The order limit stored for this piece does not have a valid signature from the owning satellite! It was verified before storing, so something went wrong in storage. We have to report this to the satellite as a missing piece."
|
||||||
|
return failMessage(msg, err, pb.TransferFailed_NOT_FOUND)
|
||||||
|
}
|
||||||
|
|
||||||
|
// verify that the public key on the order limit signed the original piece hash; if we
|
||||||
|
// hand in something with an invalid signature, the satellite will assume we're cheating
|
||||||
|
// and disqualify immediately.
|
||||||
|
err = signing.VerifyUplinkPieceHashSignature(ctx, originalOrderLimit.UplinkPublicKey, &originalHash)
|
||||||
|
if err != nil {
|
||||||
|
msg := "The piece hash stored for this piece does not have a valid signature from the public key stored in the order limit! It was verified before storing, so something went wrong in storage. We have to report this to the satellite as a missing piece."
|
||||||
|
return failMessage(msg, err, pb.TransferFailed_NOT_FOUND)
|
||||||
|
}
|
||||||
|
|
||||||
|
// after this point, the destination storage node ID is relevant
|
||||||
|
logger = logger.With(zap.Stringer("Storagenode ID", addrLimit.Limit.StorageNodeId))
|
||||||
|
|
||||||
|
if c.minBytesPerSecond == 0 {
|
||||||
|
// set minBytesPerSecond to default 5KiB if set to 0
|
||||||
|
c.minBytesPerSecond = 5 * memory.KiB
|
||||||
|
}
|
||||||
|
maxTransferTime := time.Duration(int64(time.Second) * originalHash.PieceSize / c.minBytesPerSecond.Int64())
|
||||||
|
if maxTransferTime < c.minDownloadTimeout {
|
||||||
|
maxTransferTime = c.minDownloadTimeout
|
||||||
|
}
|
||||||
|
putCtx, cancel := context.WithTimeout(ctx, maxTransferTime)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
pieceHash, peerID, err := c.ecClient.PutPiece(putCtx, ctx, addrLimit, pk, reader)
|
||||||
|
if err != nil {
|
||||||
|
if piecestore.ErrVerifyUntrusted.Has(err) {
|
||||||
|
return failMessage("failed hash verification", err, pb.TransferFailed_HASH_VERIFICATION)
|
||||||
|
}
|
||||||
|
// TODO look at error type to decide on the transfer error
|
||||||
|
return failMessage("failed to put piece", err, pb.TransferFailed_STORAGE_NODE_UNAVAILABLE)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !bytes.Equal(originalHash.Hash, pieceHash.Hash) {
|
||||||
|
msg := "piece hash from new storagenode does not match"
|
||||||
|
return failMessage(msg, Error.New(msg), pb.TransferFailed_HASH_VERIFICATION)
|
||||||
|
}
|
||||||
|
if pieceHash.PieceId != addrLimit.Limit.PieceId {
|
||||||
|
msg := "piece id from new storagenode does not match order limit"
|
||||||
|
return failMessage(msg, Error.New(msg), pb.TransferFailed_HASH_VERIFICATION)
|
||||||
|
}
|
||||||
|
|
||||||
|
signee := signing.SigneeFromPeerIdentity(peerID)
|
||||||
|
err = signing.VerifyPieceHashSignature(ctx, signee, pieceHash)
|
||||||
|
if err != nil {
|
||||||
|
return failMessage("invalid piece hash signature from new storagenode", err, pb.TransferFailed_HASH_VERIFICATION)
|
||||||
|
}
|
||||||
|
|
||||||
|
success := &pb.StorageNodeMessage{
|
||||||
|
Message: &pb.StorageNodeMessage_Succeeded{
|
||||||
|
Succeeded: &pb.TransferSucceeded{
|
||||||
|
OriginalPieceId: transferPiece.OriginalPieceId,
|
||||||
|
OriginalPieceHash: &originalHash,
|
||||||
|
OriginalOrderLimit: &originalOrderLimit,
|
||||||
|
ReplacementPieceHash: pieceHash,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
logger.Info("piece transferred to new storagenode")
|
||||||
|
return success
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user