diff --git a/satellite/metainfo/endpoint_test.go b/satellite/metainfo/endpoint_test.go index 6cc9de048..f80d6e33c 100644 --- a/satellite/metainfo/endpoint_test.go +++ b/satellite/metainfo/endpoint_test.go @@ -89,7 +89,7 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) { // calculate the SNs used space after delete the pieces var totalUsedSpaceAfterDelete int64 for _, sn := range planet.StorageNodes { - sn.Peer.Storage2.PieceDeleter.Wait() + sn.Peer.Storage2.PieceDeleter.Wait(ctx) piecesTotal, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx) require.NoError(t, err) totalUsedSpaceAfterDelete += piecesTotal @@ -166,7 +166,7 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) { require.NoError(t, err) for _, sn := range planet.StorageNodes { - sn.Peer.Storage2.PieceDeleter.Wait() + sn.Peer.Storage2.PieceDeleter.Wait(ctx) } // Check that storage nodes that were offline when deleting the pieces diff --git a/satellite/metainfo/piecedeletion/service_test.go b/satellite/metainfo/piecedeletion/service_test.go index 9d5112332..ed816c7d1 100644 --- a/satellite/metainfo/piecedeletion/service_test.go +++ b/satellite/metainfo/piecedeletion/service_test.go @@ -144,7 +144,7 @@ func TestService_DeletePieces_AllNodesUp(t *testing.T) { // calculate the SNs used space after delete the pieces var totalUsedSpaceAfterDelete int64 for _, sn := range planet.StorageNodes { - sn.Peer.Storage2.PieceDeleter.Wait() + sn.Peer.Storage2.PieceDeleter.Wait(ctx) piecesTotal, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx) require.NoError(t, err) totalUsedSpaceAfterDelete += piecesTotal @@ -219,7 +219,7 @@ func TestService_DeletePieces_SomeNodesDown(t *testing.T) { require.NoError(t, err) for _, sn := range planet.StorageNodes { - sn.Peer.Storage2.PieceDeleter.Wait() + sn.Peer.Storage2.PieceDeleter.Wait(ctx) } // Check that storage nodes which are online when deleting pieces don't @@ -298,7 +298,7 @@ func TestService_DeletePieces_AllNodesDown(t *testing.T) { var totalUsedSpace int64 for _, sn := range planet.StorageNodes { - sn.Peer.Storage2.PieceDeleter.Wait() + sn.Peer.Storage2.PieceDeleter.Wait(ctx) // calculate the SNs total used space after data upload piecesTotal, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx) require.NoError(t, err) diff --git a/storagenode/pieces/deleter.go b/storagenode/pieces/deleter.go index 665e95f70..fcc662cba 100644 --- a/storagenode/pieces/deleter.go +++ b/storagenode/pieces/deleter.go @@ -37,7 +37,8 @@ type Deleter struct { // The test variables are only used when testing. testToDelete int - testCond *sync.Cond + testOnce sync.Once + testDone chan struct{} } // NewDeleter creates a new Deleter. @@ -88,25 +89,43 @@ func (d *Deleter) Run(ctx context.Context) error { // true if all pieceIDs were successfully placed on the queue, false if some // pieceIDs were dropped. func (d *Deleter) Enqueue(ctx context.Context, satelliteID storj.NodeID, pieceIDs []storj.PieceID) (unhandled int) { + if len(pieceIDs) == 0 { + return 0 + } + + // If we are in testMode add the number of pieceIDs waiting to be processed. + if d.testDone != nil { + d.checkDone(len(pieceIDs)) + } + for i, pieceID := range pieceIDs { select { case d.ch <- DeleteRequest{satelliteID, pieceID, time.Now()}: default: + unhandled := len(pieceIDs) - i mon.Counter("piecedeleter-queue-full").Inc(1) - return len(pieceIDs) - i + if d.testDone != nil { + d.checkDone(-unhandled) + } + return unhandled } } - // If we are in testMode add the number of pieceIDs waiting to be processed. - if d.testCond != nil { - d.mu.Lock() - d.testToDelete += len(pieceIDs) - d.mu.Unlock() - } - return 0 } +func (d *Deleter) checkDone(delta int) { + d.mu.Lock() + d.testToDelete += delta + if d.testToDelete < 0 { + d.testToDelete = 0 + } + if d.testToDelete == 0 { + d.testOnce.Do(func() { close(d.testDone) }) + } + d.mu.Unlock() +} + func (d *Deleter) work(ctx context.Context) error { for { select { @@ -130,17 +149,9 @@ func (d *Deleter) work(ctx context.Context) error { ) } - // If we are in test mode, check whether we have processed all known - // deletes, and if so broadcast on the cond. - if d.testCond != nil { - d.mu.Lock() - if d.testToDelete > 0 { - d.testToDelete-- - } - if d.testToDelete == 0 { - d.testCond.Broadcast() - } - d.mu.Unlock() + // If we are in test mode, check if we are done processing deletes + if d.testDone != nil { + d.checkDone(-1) } } } @@ -163,16 +174,21 @@ func (d *Deleter) Close() error { return nil } -// Wait blocks until the queue is empty. This can only be called after SetupTest. -func (d *Deleter) Wait() { +// Wait blocks until the queue is empty and each enqueued delete has been +// successfully processed. +func (d *Deleter) Wait(ctx context.Context) { d.mu.Lock() - for d.testToDelete > 0 { - d.testCond.Wait() - } + toDelete := d.testToDelete d.mu.Unlock() + if toDelete > 0 { + select { + case <-ctx.Done(): + case <-d.testDone: + } + } } // SetupTest puts the deleter in test mode. This should only be called in tests. func (d *Deleter) SetupTest() { - d.testCond = sync.NewCond(&d.mu) + d.testDone = make(chan struct{}) } diff --git a/storagenode/pieces/deleter_test.go b/storagenode/pieces/deleter_test.go index 3443745b3..0c37ba20f 100644 --- a/storagenode/pieces/deleter_test.go +++ b/storagenode/pieces/deleter_test.go @@ -68,7 +68,7 @@ func TestDeleter(t *testing.T) { require.Equal(t, 0, unhandled) // wait for test hook to fire twice - deleter.Wait() + deleter.Wait(ctx) _, err = store.Reader(ctx, satelliteID, pieceID) require.Condition(t, func() bool {