2019-10-15 16:29:47 +01:00
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package gracefulexit
import (
2019-10-25 18:16:20 +01:00
"bytes"
2019-10-15 16:29:47 +01:00
"context"
2019-10-22 21:42:21 +01:00
"io"
"os"
2019-10-15 16:29:47 +01:00
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
2020-01-08 02:33:41 +00:00
"storj.io/common/errs2"
2019-12-27 11:48:47 +00:00
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/rpc"
2020-01-08 02:33:41 +00:00
"storj.io/common/rpc/rpcstatus"
2019-12-27 11:48:47 +00:00
"storj.io/common/signing"
"storj.io/common/storj"
"storj.io/common/sync2"
2019-10-22 21:42:21 +01:00
"storj.io/storj/storagenode/pieces"
"storj.io/storj/storagenode/piecestore"
2019-10-15 16:29:47 +01:00
"storj.io/storj/storagenode/satellites"
2020-05-28 15:35:49 +01:00
"storj.io/storj/storagenode/trust"
2020-02-21 14:07:29 +00:00
"storj.io/uplink/private/ecclient"
2019-10-15 16:29:47 +01:00
)
// Worker is responsible for completing the graceful exit for a given satellite.
type Worker struct {
2019-10-30 17:40:57 +00:00
log * zap . Logger
store * pieces . Store
2020-05-28 15:35:49 +01:00
trust * trust . Pool
2019-10-30 17:40:57 +00:00
satelliteDB satellites . DB
dialer rpc . Dialer
2019-11-05 15:33:44 +00:00
limiter * sync2 . Limiter
2020-05-19 17:11:30 +01:00
satelliteURL storj . NodeURL
2019-10-30 17:40:57 +00:00
ecclient ecclient . Client
minBytesPerSecond memory . Size
minDownloadTimeout time . Duration
2019-10-15 16:29:47 +01:00
}
// NewWorker instantiates Worker.
2020-05-28 15:35:49 +01:00
func NewWorker ( log * zap . Logger , store * pieces . Store , trust * trust . Pool , satelliteDB satellites . DB , dialer rpc . Dialer , satelliteURL storj . NodeURL , config Config ) * Worker {
2019-10-15 16:29:47 +01:00
return & Worker {
2019-10-30 17:40:57 +00:00
log : log ,
store : store ,
2020-05-28 15:35:49 +01:00
trust : trust ,
2019-10-30 17:40:57 +00:00
satelliteDB : satelliteDB ,
dialer : dialer ,
2019-11-05 15:33:44 +00:00
limiter : sync2 . NewLimiter ( config . NumConcurrentTransfers ) ,
2020-05-19 17:11:30 +01:00
satelliteURL : satelliteURL ,
2019-10-30 17:40:57 +00:00
ecclient : ecclient . NewClient ( log , dialer , 0 ) ,
2019-11-05 15:33:44 +00:00
minBytesPerSecond : config . MinBytesPerSecond ,
minDownloadTimeout : config . MinDownloadTimeout ,
2019-10-15 16:29:47 +01:00
}
}
// Run calls the satellite endpoint, transfers pieces, validates, and responds with success or failure.
2020-07-16 15:18:02 +01:00
// It also marks the satellite finished once all the pieces have been transferred.
2019-10-22 21:42:21 +01:00
func ( worker * Worker ) Run ( ctx context . Context , done func ( ) ) ( err error ) {
2019-10-15 16:29:47 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
defer done ( )
2019-10-22 21:42:21 +01:00
2019-10-15 16:29:47 +01:00
worker . log . Debug ( "running worker" )
2020-05-19 17:11:30 +01:00
conn , err := worker . dialer . DialNodeURL ( ctx , worker . satelliteURL )
2019-10-22 21:42:21 +01:00
if err != nil {
return errs . Wrap ( err )
}
defer func ( ) {
err = errs . Combine ( err , conn . Close ( ) )
} ( )
2020-03-25 12:15:27 +00:00
client := pb . NewDRPCSatelliteGracefulExitClient ( conn )
2019-10-22 21:42:21 +01:00
c , err := client . Process ( ctx )
if err != nil {
return errs . Wrap ( err )
}
for {
response , err := c . Recv ( )
if errs . Is ( err , io . EOF ) {
// Done
2019-11-14 08:31:30 +00:00
return nil
2019-10-22 21:42:21 +01:00
}
2020-01-08 02:33:41 +00:00
if errs2 . IsRPC ( err , rpcstatus . FailedPrecondition ) {
// delete the entry from satellite table and inform graceful exit has failed to start
2020-05-19 17:11:30 +01:00
deleteErr := worker . satelliteDB . CancelGracefulExit ( ctx , worker . satelliteURL . ID )
2020-01-08 02:33:41 +00:00
if deleteErr != nil {
// TODO: what to do now?
return errs . Combine ( deleteErr , err )
}
return errs . Wrap ( err )
}
2019-10-22 21:42:21 +01:00
if err != nil {
// TODO what happened
return errs . Wrap ( err )
}
switch msg := response . GetMessage ( ) . ( type ) {
case * pb . SatelliteMessage_NotReady :
2019-11-14 08:31:30 +00:00
return nil
2019-10-22 21:42:21 +01:00
case * pb . SatelliteMessage_TransferPiece :
2019-11-05 15:33:44 +00:00
transferPieceMsg := msg . TransferPiece
worker . limiter . Go ( ctx , func ( ) {
err = worker . transferPiece ( ctx , transferPieceMsg , c )
if err != nil {
2019-11-22 02:10:02 +00:00
worker . log . Error ( "failed to transfer piece." ,
2020-05-19 17:11:30 +01:00
zap . Stringer ( "Satellite ID" , worker . satelliteURL . ID ) ,
2019-11-22 02:10:02 +00:00
zap . Error ( errs . Wrap ( err ) ) )
2019-11-05 15:33:44 +00:00
}
} )
2019-11-14 08:31:30 +00:00
2019-10-22 21:42:21 +01:00
case * pb . SatelliteMessage_DeletePiece :
2019-11-05 15:33:44 +00:00
deletePieceMsg := msg . DeletePiece
worker . limiter . Go ( ctx , func ( ) {
pieceID := deletePieceMsg . OriginalPieceId
2020-01-09 20:46:49 +00:00
err := worker . deleteOnePiece ( ctx , pieceID )
2019-11-05 15:33:44 +00:00
if err != nil {
2019-11-05 21:04:07 +00:00
worker . log . Error ( "failed to delete piece." ,
2020-05-19 17:11:30 +01:00
zap . Stringer ( "Satellite ID" , worker . satelliteURL . ID ) ,
2019-11-05 21:04:07 +00:00
zap . Stringer ( "Piece ID" , pieceID ) ,
zap . Error ( errs . Wrap ( err ) ) )
2019-11-05 15:33:44 +00:00
}
} )
2019-10-30 14:46:56 +00:00
2019-10-22 21:42:21 +01:00
case * pb . SatelliteMessage_ExitFailed :
2019-11-05 21:04:07 +00:00
worker . log . Error ( "graceful exit failed." ,
2020-05-19 17:11:30 +01:00
zap . Stringer ( "Satellite ID" , worker . satelliteURL . ID ) ,
2019-11-05 21:04:07 +00:00
zap . Stringer ( "reason" , msg . ExitFailed . Reason ) )
2019-10-22 21:42:21 +01:00
2020-04-08 13:08:57 +01:00
exitFailedBytes , err := pb . Marshal ( msg . ExitFailed )
2019-12-03 22:09:39 +00:00
if err != nil {
worker . log . Error ( "failed to marshal exit failed message." )
}
2020-05-19 17:11:30 +01:00
err = worker . satelliteDB . CompleteGracefulExit ( ctx , worker . satelliteURL . ID , time . Now ( ) , satellites . ExitFailed , exitFailedBytes )
2019-11-14 08:31:30 +00:00
return errs . Wrap ( err )
2019-10-22 21:42:21 +01:00
case * pb . SatelliteMessage_ExitCompleted :
2020-05-19 17:11:30 +01:00
worker . log . Info ( "graceful exit completed." , zap . Stringer ( "Satellite ID" , worker . satelliteURL . ID ) )
2019-10-22 21:42:21 +01:00
2020-04-08 13:08:57 +01:00
exitCompletedBytes , err := pb . Marshal ( msg . ExitCompleted )
2019-12-03 22:09:39 +00:00
if err != nil {
worker . log . Error ( "failed to marshal exit completed message." )
}
2020-05-19 17:11:30 +01:00
err = worker . satelliteDB . CompleteGracefulExit ( ctx , worker . satelliteURL . ID , time . Now ( ) , satellites . ExitSucceeded , exitCompletedBytes )
2019-10-22 21:42:21 +01:00
if err != nil {
return errs . Wrap ( err )
}
2020-08-18 22:08:53 +01:00
// wait for deletes to complete
worker . limiter . Wait ( )
2019-10-30 14:46:56 +00:00
// delete all remaining pieces
2020-01-09 20:46:49 +00:00
err = worker . deleteAllPieces ( ctx )
2020-07-08 11:50:40 +01:00
if err != nil {
return errs . Wrap ( err )
}
// delete everything left in blobs folder of specific satellites
err = worker . store . DeleteSatelliteBlobs ( ctx , worker . satelliteURL . ID )
2019-11-14 08:31:30 +00:00
return errs . Wrap ( err )
2019-10-22 21:42:21 +01:00
default :
// TODO handle err
2020-05-19 17:11:30 +01:00
worker . log . Error ( "unknown graceful exit message." , zap . Stringer ( "Satellite ID" , worker . satelliteURL . ID ) )
2019-10-22 21:42:21 +01:00
}
}
2019-10-15 16:29:47 +01:00
}
2019-10-26 14:53:35 +01:00
type gracefulExitStream interface {
Context ( ) context . Context
Send ( * pb . StorageNodeMessage ) error
Recv ( ) ( * pb . SatelliteMessage , error )
}
func ( worker * Worker ) transferPiece ( ctx context . Context , transferPiece * pb . TransferPiece , c gracefulExitStream ) error {
2019-10-25 18:16:20 +01:00
pieceID := transferPiece . OriginalPieceId
2020-05-19 17:11:30 +01:00
reader , err := worker . store . Reader ( ctx , worker . satelliteURL . ID , pieceID )
2019-10-25 18:16:20 +01:00
if err != nil {
transferErr := pb . TransferFailed_UNKNOWN
if errs . Is ( err , os . ErrNotExist ) {
transferErr = pb . TransferFailed_NOT_FOUND
}
2019-11-05 21:04:07 +00:00
worker . log . Error ( "failed to get piece reader." ,
2020-05-19 17:11:30 +01:00
zap . Stringer ( "Satellite ID" , worker . satelliteURL . ID ) ,
2019-11-05 21:04:07 +00:00
zap . Stringer ( "Piece ID" , pieceID ) ,
zap . Error ( errs . Wrap ( err ) ) )
2019-10-25 18:16:20 +01:00
worker . handleFailure ( ctx , transferErr , pieceID , c . Send )
return err
}
addrLimit := transferPiece . GetAddressedOrderLimit ( )
pk := transferPiece . PrivateKey
2020-05-19 17:11:30 +01:00
originalHash , originalOrderLimit , err := worker . store . GetHashAndLimit ( ctx , worker . satelliteURL . ID , pieceID , reader )
2019-10-25 18:16:20 +01:00
if err != nil {
2019-11-05 21:04:07 +00:00
worker . log . Error ( "failed to get piece hash and order limit." ,
2020-05-19 17:11:30 +01:00
zap . Stringer ( "Satellite ID" , worker . satelliteURL . ID ) ,
2019-11-05 21:04:07 +00:00
zap . Stringer ( "Piece ID" , pieceID ) ,
zap . Error ( errs . Wrap ( err ) ) )
2019-10-25 18:16:20 +01:00
worker . handleFailure ( ctx , pb . TransferFailed_UNKNOWN , pieceID , c . Send )
return err
}
2020-05-28 15:35:49 +01:00
satelliteSigner , err := worker . trust . GetSignee ( ctx , worker . satelliteURL . ID )
if err != nil {
worker . log . Error ( "failed to get satellite signer identity from trust store!" ,
zap . Stringer ( "Satellite ID" , worker . satelliteURL . ID ) ,
zap . Error ( errs . Wrap ( err ) ) )
worker . handleFailure ( ctx , pb . TransferFailed_UNKNOWN , pieceID , c . Send )
return err
}
// verify the satellite signature on the original order limit; if we hand in something
// with an invalid signature, the satellite will assume we're cheating and disqualify
// immediately.
err = signing . VerifyOrderLimitSignature ( ctx , satelliteSigner , & originalOrderLimit )
if err != nil {
worker . log . Error ( "The order limit stored for this piece does not have a valid signature from the owning satellite! It was verified before storing, so something went wrong in storage. We have to report this to the satellite as a missing piece." ,
zap . Stringer ( "Satellite ID" , worker . satelliteURL . ID ) ,
zap . Stringer ( "Piece ID" , pieceID ) ,
zap . Error ( errs . Wrap ( err ) ) )
worker . handleFailure ( ctx , pb . TransferFailed_NOT_FOUND , pieceID , c . Send )
return err
}
// verify that the public key on the order limit signed the original piece hash; if we
// hand in something with an invalid signature, the satellite will assume we're cheating
// and disqualify immediately.
err = signing . VerifyUplinkPieceHashSignature ( ctx , originalOrderLimit . UplinkPublicKey , & originalHash )
if err != nil {
worker . log . Error ( "The piece hash stored for this piece does not have a valid signature from the public key stored in the order limit! It was verified before storing, so something went wrong in storage. We have to report this to the satellite as a missing piece." ,
zap . Stringer ( "Satellite ID" , worker . satelliteURL . ID ) ,
zap . Stringer ( "Piece ID" , pieceID ) ,
zap . Error ( errs . Wrap ( err ) ) )
worker . handleFailure ( ctx , pb . TransferFailed_NOT_FOUND , pieceID , c . Send )
}
2019-10-30 17:40:57 +00:00
if worker . minBytesPerSecond == 0 {
2020-06-23 21:10:13 +01:00
// set minBytesPerSecond to default 5KiB if set to 0
worker . minBytesPerSecond = 5 * memory . KiB
2019-10-30 17:40:57 +00:00
}
maxTransferTime := time . Duration ( int64 ( time . Second ) * originalHash . PieceSize / worker . minBytesPerSecond . Int64 ( ) )
if maxTransferTime < worker . minDownloadTimeout {
maxTransferTime = worker . minDownloadTimeout
}
putCtx , cancel := context . WithTimeout ( ctx , maxTransferTime )
2019-10-25 18:16:20 +01:00
defer cancel ( )
2019-10-30 15:35:00 +00:00
pieceHash , peerID , err := worker . ecclient . PutPiece ( putCtx , ctx , addrLimit , pk , reader )
2019-10-25 18:16:20 +01:00
if err != nil {
if piecestore . ErrVerifyUntrusted . Has ( err ) {
2019-11-05 21:04:07 +00:00
worker . log . Error ( "failed hash verification." ,
2020-05-19 17:11:30 +01:00
zap . Stringer ( "Satellite ID" , worker . satelliteURL . ID ) ,
2019-11-05 21:04:07 +00:00
zap . Stringer ( "Piece ID" , pieceID ) ,
zap . Error ( errs . Wrap ( err ) ) )
2019-10-25 18:16:20 +01:00
worker . handleFailure ( ctx , pb . TransferFailed_HASH_VERIFICATION , pieceID , c . Send )
} else {
2019-11-05 21:04:07 +00:00
worker . log . Error ( "failed to put piece." ,
2020-05-19 17:11:30 +01:00
zap . Stringer ( "Satellite ID" , worker . satelliteURL . ID ) ,
2019-11-05 21:04:07 +00:00
zap . Stringer ( "Piece ID" , pieceID ) ,
zap . Error ( errs . Wrap ( err ) ) )
2019-10-25 18:16:20 +01:00
// TODO look at error type to decide on the transfer error
worker . handleFailure ( ctx , pb . TransferFailed_STORAGE_NODE_UNAVAILABLE , pieceID , c . Send )
}
return err
}
if ! bytes . Equal ( originalHash . Hash , pieceHash . Hash ) {
2019-11-05 21:04:07 +00:00
worker . log . Error ( "piece hash from new storagenode does not match" ,
zap . Stringer ( "Storagenode ID" , addrLimit . Limit . StorageNodeId ) ,
2020-05-19 17:11:30 +01:00
zap . Stringer ( "Satellite ID" , worker . satelliteURL . ID ) ,
2019-11-05 21:04:07 +00:00
zap . Stringer ( "Piece ID" , pieceID ) )
2019-10-25 18:16:20 +01:00
worker . handleFailure ( ctx , pb . TransferFailed_HASH_VERIFICATION , pieceID , c . Send )
return Error . New ( "piece hash from new storagenode does not match" )
}
if pieceHash . PieceId != addrLimit . Limit . PieceId {
2019-11-05 21:04:07 +00:00
worker . log . Error ( "piece id from new storagenode does not match order limit" ,
zap . Stringer ( "Storagenode ID" , addrLimit . Limit . StorageNodeId ) ,
2020-05-19 17:11:30 +01:00
zap . Stringer ( "Satellite ID" , worker . satelliteURL . ID ) ,
2019-11-05 21:04:07 +00:00
zap . Stringer ( "Piece ID" , pieceID ) )
2019-10-25 18:16:20 +01:00
worker . handleFailure ( ctx , pb . TransferFailed_HASH_VERIFICATION , pieceID , c . Send )
return Error . New ( "piece id from new storagenode does not match order limit" )
}
signee := signing . SigneeFromPeerIdentity ( peerID )
err = signing . VerifyPieceHashSignature ( ctx , signee , pieceHash )
if err != nil {
2019-11-05 21:04:07 +00:00
worker . log . Error ( "invalid piece hash signature from new storagenode" ,
zap . Stringer ( "Storagenode ID" , addrLimit . Limit . StorageNodeId ) ,
2020-05-19 17:11:30 +01:00
zap . Stringer ( "Satellite ID" , worker . satelliteURL . ID ) ,
2019-11-05 21:04:07 +00:00
zap . Stringer ( "Piece ID" , pieceID ) ,
zap . Error ( errs . Wrap ( err ) ) )
2019-10-25 18:16:20 +01:00
worker . handleFailure ( ctx , pb . TransferFailed_HASH_VERIFICATION , pieceID , c . Send )
return err
}
success := & pb . StorageNodeMessage {
Message : & pb . StorageNodeMessage_Succeeded {
Succeeded : & pb . TransferSucceeded {
OriginalPieceId : transferPiece . OriginalPieceId ,
2019-11-13 19:15:31 +00:00
OriginalPieceHash : & originalHash ,
OriginalOrderLimit : & originalOrderLimit ,
2019-10-25 18:16:20 +01:00
ReplacementPieceHash : pieceHash ,
} ,
} ,
}
2019-11-22 02:10:02 +00:00
worker . log . Info ( "piece transferred to new storagenode" ,
zap . Stringer ( "Storagenode ID" , addrLimit . Limit . StorageNodeId ) ,
2020-05-19 17:11:30 +01:00
zap . Stringer ( "Satellite ID" , worker . satelliteURL . ID ) ,
2019-11-22 02:10:02 +00:00
zap . Stringer ( "Piece ID" , pieceID ) )
2019-10-25 18:16:20 +01:00
return c . Send ( success )
}
2020-07-16 15:18:02 +01:00
// deleteOnePiece deletes one piece stored for a satellite.
2020-01-09 20:46:49 +00:00
func ( worker * Worker ) deleteOnePiece ( ctx context . Context , pieceID storj . PieceID ) error {
2020-05-19 17:11:30 +01:00
piece , err := worker . store . Reader ( ctx , worker . satelliteURL . ID , pieceID )
2020-01-09 20:46:49 +00:00
if err != nil {
if ! errs2 . IsCanceled ( err ) {
2020-05-19 17:11:30 +01:00
worker . log . Debug ( "failed to retrieve piece info" , zap . Stringer ( "Satellite ID" , worker . satelliteURL . ID ) ,
2020-01-09 20:46:49 +00:00
zap . Stringer ( "Piece ID" , pieceID ) , zap . Error ( err ) )
2019-10-30 14:46:56 +00:00
}
2020-01-09 20:46:49 +00:00
return err
}
err = worker . deletePiece ( ctx , pieceID )
if err != nil {
2020-05-19 17:11:30 +01:00
worker . log . Debug ( "failed to retrieve piece info" , zap . Stringer ( "Satellite ID" , worker . satelliteURL . ID ) , zap . Error ( err ) )
2020-01-09 20:46:49 +00:00
return err
2019-10-30 14:46:56 +00:00
}
2020-01-09 20:46:49 +00:00
// update graceful exit progress
size := piece . Size ( )
2020-05-19 17:11:30 +01:00
return worker . satelliteDB . UpdateGracefulExit ( ctx , worker . satelliteURL . ID , size )
2020-01-09 20:46:49 +00:00
}
2019-10-30 14:46:56 +00:00
2020-07-16 15:18:02 +01:00
// deletePiece deletes one piece stored for a satellite, without updating satellite Graceful Exit status.
2020-01-09 20:46:49 +00:00
func ( worker * Worker ) deletePiece ( ctx context . Context , pieceID storj . PieceID ) error {
2020-05-19 17:11:30 +01:00
err := worker . store . Delete ( ctx , worker . satelliteURL . ID , pieceID )
2020-01-09 20:46:49 +00:00
if err != nil {
worker . log . Debug ( "failed to delete a piece" ,
2020-05-19 17:11:30 +01:00
zap . Stringer ( "Satellite ID" , worker . satelliteURL . ID ) ,
2020-01-09 20:46:49 +00:00
zap . Stringer ( "Piece ID" , pieceID ) ,
zap . Error ( err ) )
delErr := worker . store . DeleteFailed ( ctx , pieces . ExpiredInfo {
2020-05-19 17:11:30 +01:00
SatelliteID : worker . satelliteURL . ID ,
2020-01-09 20:46:49 +00:00
PieceID : pieceID ,
InPieceInfo : true ,
} , time . Now ( ) . UTC ( ) )
if delErr != nil {
worker . log . Debug ( "failed to mark a deletion failure for a piece" ,
2020-05-19 17:11:30 +01:00
zap . Stringer ( "Satellite ID" , worker . satelliteURL . ID ) ,
2020-01-09 20:46:49 +00:00
zap . Stringer ( "Piece ID" , pieceID ) , zap . Error ( err ) )
}
return errs . Combine ( err , delErr )
}
worker . log . Debug ( "delete piece" ,
2020-05-19 17:11:30 +01:00
zap . Stringer ( "Satellite ID" , worker . satelliteURL . ID ) ,
2020-01-09 20:46:49 +00:00
zap . Stringer ( "Piece ID" , pieceID ) )
return err
}
2020-07-16 15:18:02 +01:00
// deleteAllPieces deletes pieces stored for a satellite.
2020-01-09 20:46:49 +00:00
func ( worker * Worker ) deleteAllPieces ( ctx context . Context ) error {
var totalDeleted int64
2020-05-19 17:11:30 +01:00
err := worker . store . WalkSatellitePieces ( ctx , worker . satelliteURL . ID , func ( piece pieces . StoredPieceAccess ) error {
2020-01-09 20:46:49 +00:00
err := worker . deletePiece ( ctx , piece . PieceID ( ) )
if err == nil {
_ , size , err := piece . Size ( ctx )
2019-10-30 14:46:56 +00:00
if err != nil {
2020-05-19 17:11:30 +01:00
worker . log . Debug ( "failed to retrieve piece info" , zap . Stringer ( "Satellite ID" , worker . satelliteURL . ID ) ,
2020-01-09 20:46:49 +00:00
zap . Stringer ( "Piece ID" , piece . PieceID ( ) ) , zap . Error ( err ) )
2019-10-30 14:46:56 +00:00
}
2020-01-09 20:46:49 +00:00
totalDeleted += size
2019-10-30 14:46:56 +00:00
}
2020-01-09 20:46:49 +00:00
return err
} )
if err != nil && ! errs2 . IsCanceled ( err ) {
2020-05-19 17:11:30 +01:00
worker . log . Debug ( "failed to retrieve piece info" , zap . Stringer ( "Satellite ID" , worker . satelliteURL . ID ) , zap . Error ( err ) )
2019-10-30 14:46:56 +00:00
}
2020-01-09 20:46:49 +00:00
// update graceful exit progress
2020-05-19 17:11:30 +01:00
return worker . satelliteDB . UpdateGracefulExit ( ctx , worker . satelliteURL . ID , totalDeleted )
2019-10-30 14:46:56 +00:00
}
2019-10-22 21:42:21 +01:00
func ( worker * Worker ) handleFailure ( ctx context . Context , transferError pb . TransferFailed_Error , pieceID pb . PieceID , send func ( * pb . StorageNodeMessage ) error ) {
failure := & pb . StorageNodeMessage {
Message : & pb . StorageNodeMessage_Failed {
Failed : & pb . TransferFailed {
OriginalPieceId : pieceID ,
Error : transferError ,
} ,
} ,
}
sendErr := send ( failure )
if sendErr != nil {
2020-05-19 17:11:30 +01:00
worker . log . Error ( "unable to send failure." , zap . Stringer ( "Satellite ID" , worker . satelliteURL . ID ) )
2019-10-22 21:42:21 +01:00
}
}
2019-10-15 16:29:47 +01:00
// Close halts the worker.
func ( worker * Worker ) Close ( ) error {
2019-12-17 15:06:47 +00:00
worker . limiter . Wait ( )
2019-10-15 16:29:47 +01:00
return nil
}