storagenode/pieces: Deleter can handle multiple tests
Before the deleter would close its done channel once, so if additional tests shared a storagenode, even if not in parallel, the later waits would not work properly. This fixes that problem. Change-Id: I7dcacf6699cef7c2c2948ba0f4369ef520601bf5
This commit is contained in:
parent
131654b080
commit
237d9da477
@ -37,3 +37,10 @@ func (planet *Planet) storageNodeLiveRequestCount() int {
|
||||
}
|
||||
return total
|
||||
}
|
||||
|
||||
// WaitForStorageNodeDeleters calls the Wait method on each storagenode's PieceDeleter.
|
||||
func (planet *Planet) WaitForStorageNodeDeleters(ctx context.Context) {
|
||||
for _, sn := range planet.StorageNodes {
|
||||
sn.Peer.Storage2.PieceDeleter.Wait(ctx)
|
||||
}
|
||||
}
|
||||
|
@ -82,10 +82,11 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) {
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
planet.WaitForStorageNodeDeleters(ctx)
|
||||
|
||||
// calculate the SNs used space after delete the pieces
|
||||
var totalUsedSpaceAfterDelete int64
|
||||
for _, sn := range planet.StorageNodes {
|
||||
sn.Peer.Storage2.PieceDeleter.Wait(ctx)
|
||||
piecesTotal, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
|
||||
require.NoError(t, err)
|
||||
totalUsedSpaceAfterDelete += piecesTotal
|
||||
@ -157,9 +158,7 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) {
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, sn := range planet.StorageNodes {
|
||||
sn.Peer.Storage2.PieceDeleter.Wait(ctx)
|
||||
}
|
||||
planet.WaitForStorageNodeDeleters(ctx)
|
||||
|
||||
// Check that storage nodes that were offline when deleting the pieces
|
||||
// they are still holding data
|
||||
@ -324,10 +323,11 @@ func TestEndpoint_DeleteObjectPieces_ObjectWithoutLastSegment(t *testing.T) {
|
||||
require.Error(t, err)
|
||||
require.Equal(t, rpcstatus.Code(err), rpcstatus.NotFound)
|
||||
|
||||
planet.WaitForStorageNodeDeleters(ctx)
|
||||
|
||||
// calculate the SNs used space after delete the pieces
|
||||
var totalUsedSpaceAfterDelete int64
|
||||
for _, sn := range planet.StorageNodes {
|
||||
sn.Peer.Storage2.PieceDeleter.Wait(ctx)
|
||||
usedSpace, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
|
||||
require.NoError(t, err)
|
||||
totalUsedSpaceAfterDelete += usedSpace
|
||||
@ -438,10 +438,11 @@ func TestEndpoint_DeleteObjectPieces_ObjectWithoutLastSegment(t *testing.T) {
|
||||
require.Error(t, err)
|
||||
require.Equal(t, rpcstatus.Code(err), rpcstatus.NotFound)
|
||||
|
||||
planet.WaitForStorageNodeDeleters(ctx)
|
||||
|
||||
// calculate the SNs used space after delete the pieces
|
||||
var totalUsedSpaceAfterDelete int64
|
||||
for _, sn := range planet.StorageNodes {
|
||||
sn.Peer.Storage2.PieceDeleter.Wait(ctx)
|
||||
usedSpace, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
|
||||
require.NoError(t, err)
|
||||
totalUsedSpaceAfterDelete += usedSpace
|
||||
|
@ -137,10 +137,11 @@ func TestService_DeletePieces_AllNodesUp(t *testing.T) {
|
||||
err = satelliteSys.API.Metainfo.PieceDeletion.Delete(ctx, requests, percentExp)
|
||||
require.NoError(t, err)
|
||||
|
||||
planet.WaitForStorageNodeDeleters(ctx)
|
||||
|
||||
// calculate the SNs used space after delete the pieces
|
||||
var totalUsedSpaceAfterDelete int64
|
||||
for _, sn := range planet.StorageNodes {
|
||||
sn.Peer.Storage2.PieceDeleter.Wait(ctx)
|
||||
piecesTotal, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
|
||||
require.NoError(t, err)
|
||||
totalUsedSpaceAfterDelete += piecesTotal
|
||||
@ -210,9 +211,7 @@ func TestService_DeletePieces_SomeNodesDown(t *testing.T) {
|
||||
err := satelliteSys.API.Metainfo.PieceDeletion.Delete(ctx, requests, 0.9999)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, sn := range planet.StorageNodes {
|
||||
sn.Peer.Storage2.PieceDeleter.Wait(ctx)
|
||||
}
|
||||
planet.WaitForStorageNodeDeleters(ctx)
|
||||
|
||||
// Check that storage nodes which are online when deleting pieces don't
|
||||
// hold any piece
|
||||
@ -284,9 +283,10 @@ func TestService_DeletePieces_AllNodesDown(t *testing.T) {
|
||||
err := satelliteSys.API.Metainfo.PieceDeletion.Delete(ctx, requests, 0.9999)
|
||||
require.NoError(t, err)
|
||||
|
||||
planet.WaitForStorageNodeDeleters(ctx)
|
||||
|
||||
var totalUsedSpace int64
|
||||
for _, sn := range planet.StorageNodes {
|
||||
sn.Peer.Storage2.PieceDeleter.Wait(ctx)
|
||||
// calculate the SNs total used space after data upload
|
||||
piecesTotal, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
|
||||
require.NoError(t, err)
|
||||
|
@ -36,8 +36,8 @@ type Deleter struct {
|
||||
closed bool
|
||||
|
||||
// The test variables are only used when testing.
|
||||
testMode bool
|
||||
testToDelete int
|
||||
testOnce sync.Once
|
||||
testDone chan struct{}
|
||||
}
|
||||
|
||||
@ -94,7 +94,7 @@ func (d *Deleter) Enqueue(ctx context.Context, satelliteID storj.NodeID, pieceID
|
||||
}
|
||||
|
||||
// If we are in testMode add the number of pieceIDs waiting to be processed.
|
||||
if d.testDone != nil {
|
||||
if d.testMode {
|
||||
d.checkDone(len(pieceIDs))
|
||||
}
|
||||
|
||||
@ -104,7 +104,7 @@ func (d *Deleter) Enqueue(ctx context.Context, satelliteID storj.NodeID, pieceID
|
||||
default:
|
||||
unhandled := len(pieceIDs) - i
|
||||
mon.Counter("piecedeleter-queue-full").Inc(1)
|
||||
if d.testDone != nil {
|
||||
if d.testMode {
|
||||
d.checkDone(-unhandled)
|
||||
}
|
||||
return unhandled
|
||||
@ -119,9 +119,15 @@ func (d *Deleter) checkDone(delta int) {
|
||||
d.testToDelete += delta
|
||||
if d.testToDelete < 0 {
|
||||
d.testToDelete = 0
|
||||
}
|
||||
if d.testToDelete == 0 {
|
||||
d.testOnce.Do(func() { close(d.testDone) })
|
||||
} else if d.testToDelete == 0 {
|
||||
if d.testDone != nil {
|
||||
close(d.testDone)
|
||||
d.testDone = nil
|
||||
}
|
||||
} else if d.testToDelete > 0 {
|
||||
if d.testDone == nil {
|
||||
d.testDone = make(chan struct{})
|
||||
}
|
||||
}
|
||||
d.mu.Unlock()
|
||||
}
|
||||
@ -150,7 +156,7 @@ func (d *Deleter) work(ctx context.Context) error {
|
||||
}
|
||||
|
||||
// If we are in test mode, check if we are done processing deletes
|
||||
if d.testDone != nil {
|
||||
if d.testMode {
|
||||
d.checkDone(-1)
|
||||
}
|
||||
}
|
||||
@ -178,17 +184,17 @@ func (d *Deleter) Close() error {
|
||||
// successfully processed.
|
||||
func (d *Deleter) Wait(ctx context.Context) {
|
||||
d.mu.Lock()
|
||||
toDelete := d.testToDelete
|
||||
testDone := d.testDone
|
||||
d.mu.Unlock()
|
||||
if toDelete > 0 {
|
||||
if testDone != nil {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-d.testDone:
|
||||
case <-testDone:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SetupTest puts the deleter in test mode. This should only be called in tests.
|
||||
func (d *Deleter) SetupTest() {
|
||||
d.testDone = make(chan struct{})
|
||||
d.testMode = true
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user