satellite/gc/sender: avoid sending BF to disqualified and exited nodes
We don't want to waste our time on disqualified and exited nodes. Change-Id: I11709350ad291c24f3b46670dd6a418c0ddbb44f
This commit is contained in:
parent
ef4b564b82
commit
75b77d53ff
@ -164,6 +164,11 @@ func (service *Service) sendRetainRequest(ctx context.Context, retainInfo *inter
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
// avoid sending bloom filters to disqualified and exited nodes
|
||||
if dossier.Disqualified != nil || dossier.ExitStatus.ExitSuccess {
|
||||
return nil
|
||||
}
|
||||
|
||||
if service.Config.RetainSendTimeout > 0 {
|
||||
var cancel func()
|
||||
ctx, cancel = context.WithTimeout(ctx, service.Config.RetainSendTimeout)
|
||||
|
@ -17,6 +17,7 @@ import (
|
||||
"storj.io/common/testrand"
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/satellite/gc/bloomfilter"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
"storj.io/storj/storagenode"
|
||||
"storj.io/uplink"
|
||||
)
|
||||
@ -93,6 +94,67 @@ func TestSendRetainFilters(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestSendRetainFiltersDisqualifedNode(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1,
|
||||
StorageNodeCount: 2,
|
||||
UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: testplanet.ReconfigureRS(2, 2, 2, 2),
|
||||
StorageNode: func(index int, config *storagenode.Config) {
|
||||
// stop processing at storagenode side so it can be inspected
|
||||
config.Retain.Concurrency = 0
|
||||
},
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
// Set satellite 1 to store bloom filters of satellite 0
|
||||
access := planet.Uplinks[0].Access[planet.Satellites[0].NodeURL().ID]
|
||||
accessString, err := access.Serialize()
|
||||
require.NoError(t, err)
|
||||
|
||||
// configure sender
|
||||
gcsender := planet.Satellites[0].GarbageCollection.Sender
|
||||
gcsender.Config.AccessGrant = accessString
|
||||
|
||||
// upload 1 piece
|
||||
upl := planet.Uplinks[0]
|
||||
testData := testrand.Bytes(8 * memory.KiB)
|
||||
err = upl.Upload(ctx, planet.Satellites[0], "testbucket", "test/path/1", testData)
|
||||
require.NoError(t, err)
|
||||
|
||||
// configure filter uploader
|
||||
config := planet.Satellites[0].Config.GarbageCollectionBF
|
||||
config.AccessGrant = accessString
|
||||
config.ZipBatchSize = 2
|
||||
|
||||
bloomFilterService := bloomfilter.NewService(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB, planet.Satellites[0].Metabase.SegmentLoop)
|
||||
err = bloomFilterService.RunOnce(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
storageNode0 := planet.StorageNodes[0]
|
||||
err = planet.Satellites[0].Overlay.Service.DisqualifyNode(ctx, storageNode0.ID(), overlay.DisqualificationReasonAuditFailure)
|
||||
require.NoError(t, err)
|
||||
|
||||
storageNode1 := planet.StorageNodes[1]
|
||||
_, err = planet.Satellites[0].DB.OverlayCache().UpdateExitStatus(ctx, &overlay.ExitStatusRequest{
|
||||
NodeID: storageNode1.ID(),
|
||||
ExitSuccess: true,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, node := range planet.StorageNodes {
|
||||
require.Zero(t, node.Peer.Storage2.RetainService.HowManyQueued())
|
||||
}
|
||||
|
||||
// send to storagenodes
|
||||
require.NoError(t, gcsender.RunOnce(ctx))
|
||||
|
||||
for _, node := range planet.StorageNodes {
|
||||
require.Zero(t, node.Peer.Storage2.RetainService.HowManyQueued())
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestSendInvalidZip(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 2,
|
||||
|
Loading…
Reference in New Issue
Block a user