satellite/pieces: Fix race in piece deleter
There was a race in the test code for piece deleter, which made it possible to broadcast on the condition variable before anyone was waiting. This change fixes that and has Wait take a context so it times out with the context. Change-Id: Ia4f77a7b7d2287d5ab1d7ba541caeb1ba036dba3
This commit is contained in:
parent
a3eeab2919
commit
13bf0c62ab
@ -89,7 +89,7 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) {
|
||||
// calculate the SNs used space after delete the pieces
|
||||
var totalUsedSpaceAfterDelete int64
|
||||
for _, sn := range planet.StorageNodes {
|
||||
sn.Peer.Storage2.PieceDeleter.Wait()
|
||||
sn.Peer.Storage2.PieceDeleter.Wait(ctx)
|
||||
piecesTotal, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
|
||||
require.NoError(t, err)
|
||||
totalUsedSpaceAfterDelete += piecesTotal
|
||||
@ -166,7 +166,7 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, sn := range planet.StorageNodes {
|
||||
sn.Peer.Storage2.PieceDeleter.Wait()
|
||||
sn.Peer.Storage2.PieceDeleter.Wait(ctx)
|
||||
}
|
||||
|
||||
// Check that storage nodes that were offline when deleting the pieces
|
||||
|
@ -144,7 +144,7 @@ func TestService_DeletePieces_AllNodesUp(t *testing.T) {
|
||||
// calculate the SNs used space after delete the pieces
|
||||
var totalUsedSpaceAfterDelete int64
|
||||
for _, sn := range planet.StorageNodes {
|
||||
sn.Peer.Storage2.PieceDeleter.Wait()
|
||||
sn.Peer.Storage2.PieceDeleter.Wait(ctx)
|
||||
piecesTotal, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
|
||||
require.NoError(t, err)
|
||||
totalUsedSpaceAfterDelete += piecesTotal
|
||||
@ -219,7 +219,7 @@ func TestService_DeletePieces_SomeNodesDown(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, sn := range planet.StorageNodes {
|
||||
sn.Peer.Storage2.PieceDeleter.Wait()
|
||||
sn.Peer.Storage2.PieceDeleter.Wait(ctx)
|
||||
}
|
||||
|
||||
// Check that storage nodes which are online when deleting pieces don't
|
||||
@ -298,7 +298,7 @@ func TestService_DeletePieces_AllNodesDown(t *testing.T) {
|
||||
|
||||
var totalUsedSpace int64
|
||||
for _, sn := range planet.StorageNodes {
|
||||
sn.Peer.Storage2.PieceDeleter.Wait()
|
||||
sn.Peer.Storage2.PieceDeleter.Wait(ctx)
|
||||
// calculate the SNs total used space after data upload
|
||||
piecesTotal, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
|
||||
require.NoError(t, err)
|
||||
|
@ -37,7 +37,8 @@ type Deleter struct {
|
||||
|
||||
// The test variables are only used when testing.
|
||||
testToDelete int
|
||||
testCond *sync.Cond
|
||||
testOnce sync.Once
|
||||
testDone chan struct{}
|
||||
}
|
||||
|
||||
// NewDeleter creates a new Deleter.
|
||||
@ -88,25 +89,43 @@ func (d *Deleter) Run(ctx context.Context) error {
|
||||
// true if all pieceIDs were successfully placed on the queue, false if some
|
||||
// pieceIDs were dropped.
|
||||
func (d *Deleter) Enqueue(ctx context.Context, satelliteID storj.NodeID, pieceIDs []storj.PieceID) (unhandled int) {
|
||||
if len(pieceIDs) == 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
// If we are in testMode add the number of pieceIDs waiting to be processed.
|
||||
if d.testDone != nil {
|
||||
d.checkDone(len(pieceIDs))
|
||||
}
|
||||
|
||||
for i, pieceID := range pieceIDs {
|
||||
select {
|
||||
case d.ch <- DeleteRequest{satelliteID, pieceID, time.Now()}:
|
||||
default:
|
||||
unhandled := len(pieceIDs) - i
|
||||
mon.Counter("piecedeleter-queue-full").Inc(1)
|
||||
return len(pieceIDs) - i
|
||||
if d.testDone != nil {
|
||||
d.checkDone(-unhandled)
|
||||
}
|
||||
return unhandled
|
||||
}
|
||||
}
|
||||
|
||||
// If we are in testMode add the number of pieceIDs waiting to be processed.
|
||||
if d.testCond != nil {
|
||||
d.mu.Lock()
|
||||
d.testToDelete += len(pieceIDs)
|
||||
d.mu.Unlock()
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
|
||||
func (d *Deleter) checkDone(delta int) {
|
||||
d.mu.Lock()
|
||||
d.testToDelete += delta
|
||||
if d.testToDelete < 0 {
|
||||
d.testToDelete = 0
|
||||
}
|
||||
if d.testToDelete == 0 {
|
||||
d.testOnce.Do(func() { close(d.testDone) })
|
||||
}
|
||||
d.mu.Unlock()
|
||||
}
|
||||
|
||||
func (d *Deleter) work(ctx context.Context) error {
|
||||
for {
|
||||
select {
|
||||
@ -130,17 +149,9 @@ func (d *Deleter) work(ctx context.Context) error {
|
||||
)
|
||||
}
|
||||
|
||||
// If we are in test mode, check whether we have processed all known
|
||||
// deletes, and if so broadcast on the cond.
|
||||
if d.testCond != nil {
|
||||
d.mu.Lock()
|
||||
if d.testToDelete > 0 {
|
||||
d.testToDelete--
|
||||
}
|
||||
if d.testToDelete == 0 {
|
||||
d.testCond.Broadcast()
|
||||
}
|
||||
d.mu.Unlock()
|
||||
// If we are in test mode, check if we are done processing deletes
|
||||
if d.testDone != nil {
|
||||
d.checkDone(-1)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -163,16 +174,21 @@ func (d *Deleter) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Wait blocks until the queue is empty. This can only be called after SetupTest.
|
||||
func (d *Deleter) Wait() {
|
||||
// Wait blocks until the queue is empty and each enqueued delete has been
|
||||
// successfully processed.
|
||||
func (d *Deleter) Wait(ctx context.Context) {
|
||||
d.mu.Lock()
|
||||
for d.testToDelete > 0 {
|
||||
d.testCond.Wait()
|
||||
}
|
||||
toDelete := d.testToDelete
|
||||
d.mu.Unlock()
|
||||
if toDelete > 0 {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-d.testDone:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SetupTest puts the deleter in test mode. This should only be called in tests.
|
||||
func (d *Deleter) SetupTest() {
|
||||
d.testCond = sync.NewCond(&d.mu)
|
||||
d.testDone = make(chan struct{})
|
||||
}
|
||||
|
@ -68,7 +68,7 @@ func TestDeleter(t *testing.T) {
|
||||
require.Equal(t, 0, unhandled)
|
||||
|
||||
// wait for test hook to fire twice
|
||||
deleter.Wait()
|
||||
deleter.Wait(ctx)
|
||||
|
||||
_, err = store.Reader(ctx, satelliteID, pieceID)
|
||||
require.Condition(t, func() bool {
|
||||
|
Loading…
Reference in New Issue
Block a user