satellite/gracefulexit: use graceful_exit_segment_transfer_queue
For being able to use the segment metainfo loop, graceful exit transfers have to include the segment stream_id/position instead of the path. For this, we created a new table graceful_exit_segment_transfer_queue that will replace the graceful_exit_transfer_queue. The table has been created in a previous migration and made accessible through graceful exit db in another one. This changes makes graceful exit enqueue transfer items for new exiting nodes in the new table. Change-Id: I7bd00de13e749be521d63ef3b80c168df66b9433
This commit is contained in:
parent
449c873681
commit
c4202b9451
@ -92,7 +92,7 @@ func (chore *Chore) Run(ctx context.Context) (err error) {
|
||||
}
|
||||
|
||||
// remove all items from the transfer queue
|
||||
err := chore.db.DeleteTransferQueueItems(ctx, node.NodeID, false)
|
||||
err := chore.db.DeleteTransferQueueItems(ctx, node.NodeID, progress.UsesSegmentTransferQueue)
|
||||
if err != nil {
|
||||
chore.log.Error("error deleting node from transfer queue", zap.Error(err))
|
||||
}
|
||||
|
@ -84,7 +84,7 @@ func TestChore(t *testing.T) {
|
||||
|
||||
satellite.GracefulExit.Chore.Loop.TriggerWait()
|
||||
|
||||
incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0, false)
|
||||
incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0, true)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, incompleteTransfers, 3)
|
||||
for _, incomplete := range incompleteTransfers {
|
||||
@ -97,7 +97,7 @@ func TestChore(t *testing.T) {
|
||||
if node.ID() == exitingNode.ID() {
|
||||
continue
|
||||
}
|
||||
incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, node.ID(), 20, 0, false)
|
||||
incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, node.ID(), 20, 0, true)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, incompleteTransfers, 0)
|
||||
}
|
||||
@ -116,7 +116,7 @@ func TestChore(t *testing.T) {
|
||||
err = satellite.DB.GracefulExit().IncrementProgress(ctx, exitingNode.ID(), 0, 0, 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
incompleteTransfers, err = satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0, false)
|
||||
incompleteTransfers, err = satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0, true)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, incompleteTransfers, 3)
|
||||
|
||||
@ -129,7 +129,7 @@ func TestChore(t *testing.T) {
|
||||
require.False(t, exitStatus.ExitSuccess)
|
||||
require.NotNil(t, exitStatus.ExitFinishedAt)
|
||||
|
||||
incompleteTransfers, err = satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0, false)
|
||||
incompleteTransfers, err = satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0, true)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, incompleteTransfers, 0)
|
||||
|
||||
@ -223,7 +223,7 @@ func TestDurabilityRatio(t *testing.T) {
|
||||
|
||||
satellite.GracefulExit.Chore.Loop.TriggerWait()
|
||||
|
||||
incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0, false)
|
||||
incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0, true)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, incompleteTransfers, 2)
|
||||
for _, incomplete := range incompleteTransfers {
|
||||
@ -270,7 +270,7 @@ func batch(ctx context.Context, b *testing.B, db gracefulexit.DB, size int) {
|
||||
transferQueueItems = append(transferQueueItems, item)
|
||||
}
|
||||
batchSize := 1000
|
||||
err := db.Enqueue(ctx, transferQueueItems, batchSize, false)
|
||||
err := db.Enqueue(ctx, transferQueueItems, batchSize, true)
|
||||
require.NoError(b, err)
|
||||
}
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ package gracefulexit
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"io"
|
||||
"sort"
|
||||
"sync"
|
||||
@ -131,7 +132,17 @@ func (endpoint *Endpoint) Process(stream pb.DRPCSatelliteGracefulExit_ProcessStr
|
||||
endpoint.connections.delete(nodeID)
|
||||
}()
|
||||
|
||||
isDisqualified, err := endpoint.handleDisqualifiedNode(ctx, nodeID)
|
||||
//TODO: remove this call when we remove graceful_exit_transfer_queue is removed
|
||||
usesSegmentTransferQueue := true
|
||||
progress, err := endpoint.db.GetProgress(ctx, nodeID)
|
||||
if err != nil && !errs.Is(err, sql.ErrNoRows) {
|
||||
return rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
}
|
||||
if progress != nil {
|
||||
usesSegmentTransferQueue = progress.UsesSegmentTransferQueue
|
||||
}
|
||||
|
||||
isDisqualified, err := endpoint.handleDisqualifiedNode(ctx, nodeID, usesSegmentTransferQueue)
|
||||
if err != nil {
|
||||
return rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
}
|
||||
@ -177,14 +188,14 @@ func (endpoint *Endpoint) Process(stream pb.DRPCSatelliteGracefulExit_ProcessStr
|
||||
|
||||
loopErr := incompleteLoop.Run(ctx, func(ctx context.Context) error {
|
||||
if pending.Length() == 0 {
|
||||
incomplete, err := endpoint.db.GetIncompleteNotFailed(ctx, nodeID, endpoint.config.EndpointBatchSize, 0, false)
|
||||
incomplete, err := endpoint.db.GetIncompleteNotFailed(ctx, nodeID, endpoint.config.EndpointBatchSize, 0, usesSegmentTransferQueue)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return pending.DoneSending(err)
|
||||
}
|
||||
|
||||
if len(incomplete) == 0 {
|
||||
incomplete, err = endpoint.db.GetIncompleteFailed(ctx, nodeID, endpoint.config.MaxFailuresPerPiece, endpoint.config.EndpointBatchSize, 0, false)
|
||||
incomplete, err = endpoint.db.GetIncompleteFailed(ctx, nodeID, endpoint.config.MaxFailuresPerPiece, endpoint.config.EndpointBatchSize, 0, usesSegmentTransferQueue)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return pending.DoneSending(err)
|
||||
@ -220,7 +231,7 @@ func (endpoint *Endpoint) Process(stream pb.DRPCSatelliteGracefulExit_ProcessStr
|
||||
|
||||
// if there is no more work to receive send complete
|
||||
if finished {
|
||||
isDisqualified, err := endpoint.handleDisqualifiedNode(ctx, nodeID)
|
||||
isDisqualified, err := endpoint.handleDisqualifiedNode(ctx, nodeID, usesSegmentTransferQueue)
|
||||
if err != nil {
|
||||
return rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
}
|
||||
@ -234,7 +245,7 @@ func (endpoint *Endpoint) Process(stream pb.DRPCSatelliteGracefulExit_ProcessStr
|
||||
return rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
}
|
||||
|
||||
err = endpoint.handleFinished(ctx, stream, exitStatusRequest, exitFailedReason)
|
||||
err = endpoint.handleFinished(ctx, stream, exitStatusRequest, exitFailedReason, usesSegmentTransferQueue)
|
||||
if err != nil {
|
||||
return rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
}
|
||||
@ -297,7 +308,7 @@ func (endpoint *Endpoint) Process(stream pb.DRPCSatelliteGracefulExit_ProcessStr
|
||||
ExitSuccess: false,
|
||||
}
|
||||
|
||||
err := endpoint.handleFinished(ctx, stream, exitStatusRequest, pb.ExitFailed_VERIFICATION_FAILED)
|
||||
err := endpoint.handleFinished(ctx, stream, exitStatusRequest, pb.ExitFailed_VERIFICATION_FAILED, usesSegmentTransferQueue)
|
||||
if err != nil {
|
||||
return rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
}
|
||||
@ -326,7 +337,7 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream pb.DRPCS
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.Key, uuid.UUID{}, metabase.SegmentPosition{}, incomplete.PieceNum)
|
||||
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.Key, incomplete.StreamID, incomplete.Position, incomplete.PieceNum)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
@ -334,10 +345,10 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream pb.DRPCS
|
||||
return nil
|
||||
}
|
||||
|
||||
segment, err := endpoint.getValidSegment(ctx, incomplete.Key, incomplete.RootPieceID)
|
||||
segment, err := endpoint.getValidSegment(ctx, incomplete.Key, incomplete.StreamID, incomplete.Position, incomplete.RootPieceID)
|
||||
if err != nil {
|
||||
endpoint.log.Warn("invalid segment", zap.Error(err))
|
||||
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.Key, uuid.UUID{}, metabase.SegmentPosition{}, incomplete.PieceNum)
|
||||
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.Key, incomplete.StreamID, incomplete.Position, incomplete.PieceNum)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
@ -347,7 +358,7 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream pb.DRPCS
|
||||
|
||||
nodePiece, err := endpoint.getNodePiece(ctx, segment, incomplete)
|
||||
if err != nil {
|
||||
deleteErr := endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.Key, uuid.UUID{}, metabase.SegmentPosition{}, incomplete.PieceNum)
|
||||
deleteErr := endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.Key, incomplete.StreamID, incomplete.Position, incomplete.PieceNum)
|
||||
if deleteErr != nil {
|
||||
return Error.Wrap(deleteErr)
|
||||
}
|
||||
@ -361,7 +372,7 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream pb.DRPCS
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.Key, uuid.UUID{}, metabase.SegmentPosition{}, incomplete.PieceNum)
|
||||
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.Key, incomplete.StreamID, incomplete.Position, incomplete.PieceNum)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
@ -395,16 +406,13 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream pb.DRPCS
|
||||
|
||||
newNode := newNodes[0]
|
||||
endpoint.log.Debug("found new node for piece transfer", zap.Stringer("original node ID", nodeID), zap.Stringer("replacement node ID", newNode.ID),
|
||||
zap.ByteString("key", incomplete.Key), zap.Int32("piece num", incomplete.PieceNum))
|
||||
zap.ByteString("key", incomplete.Key),
|
||||
zap.ByteString("streamID", incomplete.StreamID[:]), zap.Uint32("Part", incomplete.Position.Part), zap.Uint32("Index", incomplete.Position.Index),
|
||||
zap.Int32("piece num", incomplete.PieceNum))
|
||||
|
||||
pieceID := segment.RootPieceID.Derive(nodeID, incomplete.PieceNum)
|
||||
|
||||
segmentLocation, err := metabase.ParseSegmentKey(incomplete.Key)
|
||||
if err != nil {
|
||||
return Error.New("invalid key for node ID %v, piece ID %v: %w", incomplete.NodeID, pieceID, err)
|
||||
}
|
||||
|
||||
limit, privateKey, err := endpoint.orders.CreateGracefulExitPutOrderLimit(ctx, segmentLocation.Bucket(), newNode.ID, incomplete.PieceNum, segment.RootPieceID, int32(pieceSize))
|
||||
limit, privateKey, err := endpoint.orders.CreateGracefulExitPutOrderLimit(ctx, metabase.BucketLocation{}, newNode.ID, incomplete.PieceNum, segment.RootPieceID, int32(pieceSize))
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
@ -423,7 +431,7 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream pb.DRPCS
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
err = endpoint.db.IncrementOrderLimitSendCount(ctx, nodeID, incomplete.Key, uuid.UUID{}, metabase.SegmentPosition{}, incomplete.PieceNum)
|
||||
err = endpoint.db.IncrementOrderLimitSendCount(ctx, nodeID, incomplete.Key, incomplete.StreamID, incomplete.Position, incomplete.PieceNum)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
@ -431,6 +439,8 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream pb.DRPCS
|
||||
// update pending queue with the transfer item
|
||||
err = pending.Put(pieceID, &PendingTransfer{
|
||||
Key: incomplete.Key,
|
||||
StreamID: incomplete.StreamID,
|
||||
Position: incomplete.Position,
|
||||
PieceSize: pieceSize,
|
||||
SatelliteMessage: transferMsg,
|
||||
OriginalRootPieceID: segment.RootPieceID,
|
||||
@ -467,12 +477,12 @@ func (endpoint *Endpoint) handleSucceeded(ctx context.Context, stream pb.DRPCSat
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
transferQueueItem, err := endpoint.db.GetTransferQueueItem(ctx, exitingNodeID, transfer.Key, uuid.UUID{}, metabase.SegmentPosition{}, int32(transfer.PieceNum))
|
||||
transferQueueItem, err := endpoint.db.GetTransferQueueItem(ctx, exitingNodeID, transfer.Key, transfer.StreamID, transfer.Position, int32(transfer.PieceNum))
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
err = endpoint.updateSegment(ctx, exitingNodeID, receivingNodeID, transfer.Key, transfer.PieceNum, transferQueueItem.RootPieceID)
|
||||
err = endpoint.updateSegment(ctx, exitingNodeID, receivingNodeID, transfer.Key, transfer.StreamID, transfer.Position, transfer.PieceNum, transferQueueItem.RootPieceID)
|
||||
if err != nil {
|
||||
// remove the piece from the pending queue so it gets retried
|
||||
deleteErr := pending.Delete(originalPieceID)
|
||||
@ -490,7 +500,7 @@ func (endpoint *Endpoint) handleSucceeded(ctx context.Context, stream pb.DRPCSat
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
err = endpoint.db.DeleteTransferQueueItem(ctx, exitingNodeID, transfer.Key, uuid.UUID{}, metabase.SegmentPosition{}, int32(transfer.PieceNum))
|
||||
err = endpoint.db.DeleteTransferQueueItem(ctx, exitingNodeID, transfer.Key, transfer.StreamID, transfer.Position, int32(transfer.PieceNum))
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
@ -536,7 +546,7 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *PendingMap,
|
||||
return nil
|
||||
}
|
||||
|
||||
transferQueueItem, err := endpoint.db.GetTransferQueueItem(ctx, nodeID, transfer.Key, uuid.UUID{}, metabase.SegmentPosition{}, int32(transfer.PieceNum))
|
||||
transferQueueItem, err := endpoint.db.GetTransferQueueItem(ctx, nodeID, transfer.Key, transfer.StreamID, transfer.Position, int32(transfer.PieceNum))
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
@ -552,9 +562,11 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *PendingMap,
|
||||
// Remove the queue item and remove the node from the pointer.
|
||||
// If the pointer is not piece hash verified, do not count this as a failure.
|
||||
if pb.TransferFailed_Error(errorCode) == pb.TransferFailed_NOT_FOUND {
|
||||
endpoint.log.Debug("piece not found on node", zap.Stringer("node ID", nodeID), zap.ByteString("key", transfer.Key), zap.Uint16("piece num", transfer.PieceNum))
|
||||
endpoint.log.Debug("piece not found on node", zap.Stringer("node ID", nodeID), zap.ByteString("key", transfer.Key),
|
||||
zap.ByteString("streamID", transfer.StreamID[:]), zap.Uint32("Part", transfer.Position.Part), zap.Uint32("Index", transfer.Position.Index),
|
||||
zap.Uint16("piece num", transfer.PieceNum))
|
||||
|
||||
segment, err := endpoint.getValidSegment(ctx, transfer.Key, storj.PieceID{})
|
||||
segment, err := endpoint.getValidSegment(ctx, transfer.Key, transfer.StreamID, transfer.Position, storj.PieceID{})
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
@ -567,7 +579,7 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *PendingMap,
|
||||
}
|
||||
}
|
||||
if nodePiece == (metabase.Piece{}) {
|
||||
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.Key, uuid.UUID{}, metabase.SegmentPosition{}, int32(transfer.PieceNum))
|
||||
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.Key, transfer.StreamID, transfer.Position, int32(transfer.PieceNum))
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
@ -584,7 +596,7 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *PendingMap,
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.Key, uuid.UUID{}, metabase.SegmentPosition{}, int32(transfer.PieceNum))
|
||||
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.Key, transfer.StreamID, transfer.Position, int32(transfer.PieceNum))
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
@ -594,7 +606,7 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *PendingMap,
|
||||
transferQueueItem.LastFailedAt = &now
|
||||
transferQueueItem.FailedCount = &failedCount
|
||||
transferQueueItem.LastFailedCode = &errorCode
|
||||
err = endpoint.db.UpdateTransferQueueItem(ctx, *transferQueueItem, false)
|
||||
err = endpoint.db.UpdateTransferQueueItem(ctx, *transferQueueItem, transfer.Key == nil)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
@ -610,7 +622,7 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *PendingMap,
|
||||
return pending.Delete(pieceID)
|
||||
}
|
||||
|
||||
func (endpoint *Endpoint) handleDisqualifiedNode(ctx context.Context, nodeID storj.NodeID) (isDisqualified bool, err error) {
|
||||
func (endpoint *Endpoint) handleDisqualifiedNode(ctx context.Context, nodeID storj.NodeID, usesSegmentTransferQueue bool) (isDisqualified bool, err error) {
|
||||
// check if node is disqualified
|
||||
nodeInfo, err := endpoint.overlay.Get(ctx, nodeID)
|
||||
if err != nil {
|
||||
@ -631,7 +643,7 @@ func (endpoint *Endpoint) handleDisqualifiedNode(ctx context.Context, nodeID sto
|
||||
}
|
||||
|
||||
// remove remaining items from the queue
|
||||
err = endpoint.db.DeleteTransferQueueItems(ctx, nodeID, false)
|
||||
err = endpoint.db.DeleteTransferQueueItems(ctx, nodeID, usesSegmentTransferQueue)
|
||||
if err != nil {
|
||||
return true, Error.Wrap(err)
|
||||
}
|
||||
@ -642,7 +654,7 @@ func (endpoint *Endpoint) handleDisqualifiedNode(ctx context.Context, nodeID sto
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (endpoint *Endpoint) handleFinished(ctx context.Context, stream pb.DRPCSatelliteGracefulExit_ProcessStream, exitStatusRequest *overlay.ExitStatusRequest, failedReason pb.ExitFailed_Reason) error {
|
||||
func (endpoint *Endpoint) handleFinished(ctx context.Context, stream pb.DRPCSatelliteGracefulExit_ProcessStream, exitStatusRequest *overlay.ExitStatusRequest, failedReason pb.ExitFailed_Reason, usesSegmentTransferQueue bool) error {
|
||||
finishedMsg, err := endpoint.getFinishedMessage(ctx, exitStatusRequest.NodeID, exitStatusRequest.ExitFinishedAt, exitStatusRequest.ExitSuccess, failedReason)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
@ -659,7 +671,7 @@ func (endpoint *Endpoint) handleFinished(ctx context.Context, stream pb.DRPCSate
|
||||
}
|
||||
|
||||
// remove remaining items from the queue after notifying nodes about their exit status
|
||||
err = endpoint.db.DeleteTransferQueueItems(ctx, exitStatusRequest.NodeID, false)
|
||||
err = endpoint.db.DeleteTransferQueueItems(ctx, exitStatusRequest.NodeID, usesSegmentTransferQueue)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
@ -706,11 +718,11 @@ func (endpoint *Endpoint) getFinishedMessage(ctx context.Context, nodeID storj.N
|
||||
return message, nil
|
||||
}
|
||||
|
||||
func (endpoint *Endpoint) updateSegment(ctx context.Context, exitingNodeID storj.NodeID, receivingNodeID storj.NodeID, key metabase.SegmentKey, pieceNumber uint16, originalRootPieceID storj.PieceID) (err error) {
|
||||
func (endpoint *Endpoint) updateSegment(ctx context.Context, exitingNodeID storj.NodeID, receivingNodeID storj.NodeID, key metabase.SegmentKey, streamID uuid.UUID, position metabase.SegmentPosition, pieceNumber uint16, originalRootPieceID storj.PieceID) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
// remove the node from the segment
|
||||
segment, err := endpoint.getValidSegment(ctx, key, originalRootPieceID)
|
||||
segment, err := endpoint.getValidSegment(ctx, key, streamID, position, originalRootPieceID)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
@ -856,17 +868,30 @@ func (endpoint *Endpoint) calculatePieceSize(ctx context.Context, segment metaba
|
||||
return eestream.CalcPieceSize(int64(segment.EncryptedSize), redundancy), nil
|
||||
}
|
||||
|
||||
func (endpoint *Endpoint) getValidSegment(ctx context.Context, key metabase.SegmentKey, originalRootPieceID storj.PieceID) (metabase.Segment, error) {
|
||||
location, err := metabase.ParseSegmentKey(key)
|
||||
if err != nil {
|
||||
return metabase.Segment{}, Error.Wrap(err)
|
||||
func (endpoint *Endpoint) getValidSegment(ctx context.Context, key metabase.SegmentKey, streamID uuid.UUID, position metabase.SegmentPosition, originalRootPieceID storj.PieceID) (metabase.Segment, error) {
|
||||
// TODO: remove when table graceful_exit_transfer_queue is dropped
|
||||
if key != nil {
|
||||
location, err := metabase.ParseSegmentKey(key)
|
||||
if err != nil {
|
||||
return metabase.Segment{}, Error.Wrap(err)
|
||||
}
|
||||
|
||||
segment, err := endpoint.metabase.GetSegmentByLocation(ctx, metabase.GetSegmentByLocation{
|
||||
SegmentLocation: location,
|
||||
})
|
||||
if err != nil {
|
||||
return metabase.Segment{}, Error.Wrap(err)
|
||||
}
|
||||
|
||||
if !originalRootPieceID.IsZero() && originalRootPieceID != segment.RootPieceID {
|
||||
return metabase.Segment{}, Error.New("segment has changed")
|
||||
}
|
||||
return segment, nil
|
||||
}
|
||||
|
||||
// TODO refactor PendingTransfer and TransferQueueItem to provide StreamID/Position to be able
|
||||
// to get segment from object with specific version, this will work only until we won't have
|
||||
// multiple object versions
|
||||
segment, err := endpoint.metabase.GetSegmentByLocation(ctx, metabase.GetSegmentByLocation{
|
||||
SegmentLocation: location,
|
||||
segment, err := endpoint.metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
||||
StreamID: streamID,
|
||||
Position: position,
|
||||
})
|
||||
if err != nil {
|
||||
return metabase.Segment{}, Error.Wrap(err)
|
||||
|
@ -304,7 +304,7 @@ func TestRecvTimeout(t *testing.T) {
|
||||
require.Len(t, exitingNodes, 1)
|
||||
require.Equal(t, exitingNode.ID(), exitingNodes[0].NodeID)
|
||||
|
||||
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0, false)
|
||||
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0, true)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queueItems, 1)
|
||||
|
||||
@ -1031,7 +1031,7 @@ func TestSegmentChangedOrDeleted(t *testing.T) {
|
||||
satellite.GracefulExit.Chore.Loop.TriggerWait()
|
||||
|
||||
// make sure all the pieces are in the transfer queue
|
||||
incomplete, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0, false)
|
||||
incomplete, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0, true)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, incomplete, 2)
|
||||
|
||||
@ -1071,7 +1071,7 @@ func TestSegmentChangedOrDeleted(t *testing.T) {
|
||||
require.FailNow(t, "should not reach this case: %#v", m)
|
||||
}
|
||||
|
||||
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 2, 0, false)
|
||||
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 2, 0, true)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queueItems, 0)
|
||||
})
|
||||
@ -1142,7 +1142,7 @@ func TestSegmentChangedOrDeletedMultipart(t *testing.T) {
|
||||
satellite.GracefulExit.Chore.Loop.TriggerWait()
|
||||
|
||||
// make sure all the pieces are in the transfer queue
|
||||
incomplete, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0, false)
|
||||
incomplete, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0, true)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, incomplete, 1)
|
||||
// TODO: change to this when an object part can be overwritten
|
||||
@ -1185,7 +1185,7 @@ func TestSegmentChangedOrDeletedMultipart(t *testing.T) {
|
||||
require.FailNow(t, "should not reach this case: %#v", m)
|
||||
}
|
||||
|
||||
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 2, 0, false)
|
||||
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 2, 0, true)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queueItems, 0)
|
||||
})
|
||||
@ -1321,7 +1321,7 @@ func TestFailureStorageNodeIgnoresTransferMessages(t *testing.T) {
|
||||
satellite.GracefulExit.Chore.Loop.TriggerWait()
|
||||
|
||||
// make sure all the pieces are in the transfer queue
|
||||
_, err = satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 1, 0, false)
|
||||
_, err = satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 1, 0, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
var messageCount int
|
||||
@ -1365,7 +1365,7 @@ func TestFailureStorageNodeIgnoresTransferMessages(t *testing.T) {
|
||||
require.Equal(t, messageCount, maxOrderLimitSendCount)
|
||||
|
||||
// make sure not responding piece not in queue
|
||||
incompletes, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0, false)
|
||||
incompletes, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0, true)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, incompletes, 0)
|
||||
|
||||
@ -1525,7 +1525,7 @@ func testTransfers(t *testing.T, objects int, multipartObjects int, verifier fun
|
||||
satellite.GracefulExit.Chore.Loop.TriggerWait()
|
||||
|
||||
// make sure all the pieces are in the transfer queue
|
||||
incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), objects+multipartObjects, 0, false)
|
||||
incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), objects+multipartObjects, 0, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
// connect to satellite again to start receiving transfers
|
||||
|
@ -15,6 +15,7 @@ import (
|
||||
"storj.io/common/pb"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/testrand"
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/gracefulexit"
|
||||
@ -172,7 +173,7 @@ func TestGracefulExit_DeleteAllFinishedTransferQueueItems(t *testing.T) {
|
||||
gracefulExitDB := planet.Satellites[0].DB.GracefulExit()
|
||||
batchSize := 1000
|
||||
|
||||
err = gracefulExitDB.Enqueue(ctx, queueItems, batchSize, false)
|
||||
err = gracefulExitDB.Enqueue(ctx, queueItems, batchSize, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
asOfSystemTime := -1 * time.Microsecond
|
||||
@ -259,7 +260,7 @@ func TestGracefulExit_Enqueue_And_DeleteAllFinishedTransferQueueItems_batchsize(
|
||||
queueItems := generateNTransferQueueItemsPerNode(t, numItems, exitedNodeIDs...)
|
||||
|
||||
// Add some items to the transfer queue for the exited nodes.
|
||||
err := gracefulExitDB.Enqueue(ctx, queueItems, batchSize, false)
|
||||
err := gracefulExitDB.Enqueue(ctx, queueItems, batchSize, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
disableAsOfSystemTime := time.Second * 0
|
||||
@ -404,7 +405,7 @@ func TestGracefulExit_DeleteAllFinishedTransferQueueItems_batch(t *testing.T) {
|
||||
|
||||
queueItems := generateNTransferQueueItemsPerNode(t, 25, exitedNodeIDs...)
|
||||
// Add some items to the transfer queue for the exited nodes.
|
||||
err := gracefulExitDB.Enqueue(ctx, queueItems, batchSize, false)
|
||||
err := gracefulExitDB.Enqueue(ctx, queueItems, batchSize, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
disableAsOfSystemTime := time.Second * 0
|
||||
@ -456,7 +457,8 @@ func generateNTransferQueueItemsPerNode(t *testing.T, n int, nodeIDs ...storj.No
|
||||
for i := 0; i < n; i++ {
|
||||
items = append(items, gracefulexit.TransferQueueItem{
|
||||
NodeID: nodeID,
|
||||
Key: metabase.SegmentKey{byte(rand.Int31n(256))},
|
||||
StreamID: testrand.UUID(),
|
||||
Position: metabase.SegmentPositionFromEncoded(rand.Uint64()),
|
||||
PieceNum: rand.Int31(),
|
||||
})
|
||||
}
|
||||
|
@ -86,7 +86,8 @@ func (collector *PathCollector) RemoteSegment(ctx context.Context, segment *meta
|
||||
|
||||
item := TransferQueueItem{
|
||||
NodeID: piece.StorageNode,
|
||||
Key: key,
|
||||
StreamID: segment.StreamID,
|
||||
Position: segment.Position,
|
||||
PieceNum: int32(piece.Number),
|
||||
RootPieceID: segment.RootPieceID,
|
||||
DurabilityRatio: float64(numPieces) / float64(segment.Redundancy.TotalShares),
|
||||
@ -120,7 +121,7 @@ func (collector *PathCollector) flush(ctx context.Context, limit int) (err error
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if len(collector.buffer) >= limit {
|
||||
err = collector.db.Enqueue(ctx, collector.buffer, collector.batchSize, false)
|
||||
err = collector.db.Enqueue(ctx, collector.buffer, collector.batchSize, true)
|
||||
collector.buffer = collector.buffer[:0]
|
||||
|
||||
return errs.Wrap(err)
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
|
||||
"storj.io/common/pb"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
)
|
||||
|
||||
@ -51,6 +52,8 @@ func (promise *PendingFinishedPromise) finishCalled(err error) {
|
||||
// It contains information about a transfer request that has been sent to a storagenode by the satellite.
|
||||
type PendingTransfer struct {
|
||||
Key metabase.SegmentKey
|
||||
StreamID uuid.UUID
|
||||
Position metabase.SegmentPosition
|
||||
PieceSize int64
|
||||
SatelliteMessage *pb.SatelliteMessage
|
||||
OriginalRootPieceID storj.PieceID
|
||||
|
@ -25,8 +25,8 @@ func (endpoint *Endpoint) validatePendingTransfer(ctx context.Context, transfer
|
||||
if transfer.SatelliteMessage.GetTransferPiece().GetAddressedOrderLimit().GetLimit() == nil {
|
||||
return Error.New("Addressed order limit on transfer piece cannot be nil")
|
||||
}
|
||||
if transfer.Key == nil {
|
||||
return Error.New("Transfer key cannot be nil")
|
||||
if transfer.Key == nil && transfer.StreamID.IsZero() {
|
||||
return Error.New("Transfer key and StreamID cannot be both empty")
|
||||
}
|
||||
if transfer.OriginalRootPieceID.IsZero() {
|
||||
return Error.New("could not get original root piece ID from transfer item")
|
||||
|
@ -36,7 +36,7 @@ func (db *gracefulexitDB) IncrementProgress(ctx context.Context, nodeID storj.No
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
statement := db.db.Rebind(
|
||||
`INSERT INTO graceful_exit_progress (node_id, bytes_transferred, pieces_transferred, pieces_failed, updated_at) VALUES (?, ?, ?, ?, ?)
|
||||
`INSERT INTO graceful_exit_progress (node_id, bytes_transferred, pieces_transferred, pieces_failed, updated_at, uses_segment_transfer_queue) VALUES (?, ?, ?, ?, ?, TRUE)
|
||||
ON CONFLICT(node_id)
|
||||
DO UPDATE SET bytes_transferred = graceful_exit_progress.bytes_transferred + excluded.bytes_transferred,
|
||||
pieces_transferred = graceful_exit_progress.pieces_transferred + excluded.pieces_transferred,
|
||||
|
@ -100,7 +100,7 @@ func exitSatellite(ctx context.Context, t *testing.T, planet *testplanet.Planet,
|
||||
require.Len(t, exitingNodes, 1)
|
||||
require.Equal(t, exitingNode.ID(), exitingNodes[0].NodeID)
|
||||
|
||||
queueItems, err := satellite1.DB.GracefulExit().GetIncomplete(ctx, exitStatus.NodeID, 10, 0, false)
|
||||
queueItems, err := satellite1.DB.GracefulExit().GetIncomplete(ctx, exitStatus.NodeID, 10, 0, true)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queueItems, 1)
|
||||
|
||||
@ -110,7 +110,7 @@ func exitSatellite(ctx context.Context, t *testing.T, planet *testplanet.Planet,
|
||||
exitingNode.GracefulExit.Chore.TestWaitForWorkers()
|
||||
|
||||
// check that there are no more items to process
|
||||
queueItems, err = satellite1.DB.GracefulExit().GetIncomplete(ctx, exitStatus.NodeID, 10, 0, false)
|
||||
queueItems, err = satellite1.DB.GracefulExit().GetIncomplete(ctx, exitStatus.NodeID, 10, 0, true)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queueItems, 0)
|
||||
|
||||
|
@ -69,7 +69,7 @@ func TestWorkerSuccess(t *testing.T) {
|
||||
require.Len(t, exitingNodes, 1)
|
||||
require.Equal(t, exitingNode.ID(), exitingNodes[0].NodeID)
|
||||
|
||||
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0, false)
|
||||
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0, true)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queueItems, 1)
|
||||
|
||||
@ -84,6 +84,7 @@ func TestWorkerSuccess(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, progress.PiecesFailed, 0)
|
||||
require.EqualValues(t, progress.PiecesTransferred, 1)
|
||||
require.True(t, progress.UsesSegmentTransferQueue)
|
||||
|
||||
exitStatus, err := satellite.DB.OverlayCache().GetExitStatus(ctx, exitingNode.ID())
|
||||
require.NoError(t, err)
|
||||
@ -142,7 +143,7 @@ func TestWorkerTimeout(t *testing.T) {
|
||||
require.Len(t, exitingNodes, 1)
|
||||
require.Equal(t, exitingNode.ID(), exitingNodes[0].NodeID)
|
||||
|
||||
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0, false)
|
||||
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0, true)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queueItems, 1)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user