satellitePeer -> satellite rename consistency in repair test (#3032)
This commit is contained in:
parent
ab1147afb6
commit
a085b05ec5
@ -50,22 +50,22 @@ func TestDataRepair(t *testing.T) {
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
// first, upload some remote data
|
||||
uplinkPeer := planet.Uplinks[0]
|
||||
satellitePeer := planet.Satellites[0]
|
||||
satellite := planet.Satellites[0]
|
||||
// stop discovery service so that we do not get a race condition when we delete nodes from overlay
|
||||
satellitePeer.Discovery.Service.Discovery.Stop()
|
||||
satellitePeer.Discovery.Service.Refresh.Stop()
|
||||
satellite.Discovery.Service.Discovery.Stop()
|
||||
satellite.Discovery.Service.Refresh.Stop()
|
||||
// stop audit to prevent possible interactions i.e. repair timeout problems
|
||||
planet.Satellites[0].Audit.Worker.Loop.Pause()
|
||||
satellite.Audit.Worker.Loop.Pause()
|
||||
|
||||
satellitePeer.Repair.Checker.Loop.Pause()
|
||||
satellitePeer.Repair.Repairer.Loop.Pause()
|
||||
satellite.Repair.Checker.Loop.Pause()
|
||||
satellite.Repair.Repairer.Loop.Pause()
|
||||
|
||||
var (
|
||||
testData = testrand.Bytes(8 * memory.KiB)
|
||||
minThreshold = 3
|
||||
successThreshold = 7
|
||||
)
|
||||
err := uplinkPeer.UploadWithConfig(ctx, satellitePeer, &uplink.RSConfig{
|
||||
err := uplinkPeer.UploadWithConfig(ctx, satellite, &uplink.RSConfig{
|
||||
MinThreshold: minThreshold,
|
||||
RepairThreshold: 5,
|
||||
SuccessThreshold: successThreshold,
|
||||
@ -73,7 +73,7 @@ func TestDataRepair(t *testing.T) {
|
||||
}, "testbucket", "test/path", testData)
|
||||
require.NoError(t, err)
|
||||
|
||||
pointer, path := getRemoteSegment(t, ctx, satellitePeer)
|
||||
pointer, path := getRemoteSegment(t, ctx, satellite)
|
||||
|
||||
// calculate how many storagenodes to kill
|
||||
redundancy := pointer.GetRemote().GetRedundancy()
|
||||
@ -117,27 +117,27 @@ func TestDataRepair(t *testing.T) {
|
||||
|
||||
for _, node := range planet.StorageNodes {
|
||||
if nodesToDisqualify[node.ID()] {
|
||||
disqualifyNode(t, ctx, satellitePeer, node.ID())
|
||||
disqualifyNode(t, ctx, satellite, node.ID())
|
||||
continue
|
||||
}
|
||||
if nodesToKill[node.ID()] {
|
||||
err = planet.StopPeer(node)
|
||||
require.NoError(t, err)
|
||||
_, err = satellitePeer.Overlay.Service.UpdateUptime(ctx, node.ID(), false)
|
||||
_, err = satellite.Overlay.Service.UpdateUptime(ctx, node.ID(), false)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
satellitePeer.Repair.Checker.Loop.Restart()
|
||||
satellitePeer.Repair.Checker.Loop.TriggerWait()
|
||||
satellitePeer.Repair.Checker.Loop.Pause()
|
||||
satellitePeer.Repair.Repairer.Loop.Restart()
|
||||
satellitePeer.Repair.Repairer.Loop.TriggerWait()
|
||||
satellitePeer.Repair.Repairer.Loop.Pause()
|
||||
satellitePeer.Repair.Repairer.Limiter.Wait()
|
||||
satellite.Repair.Checker.Loop.Restart()
|
||||
satellite.Repair.Checker.Loop.TriggerWait()
|
||||
satellite.Repair.Checker.Loop.Pause()
|
||||
satellite.Repair.Repairer.Loop.Restart()
|
||||
satellite.Repair.Repairer.Loop.TriggerWait()
|
||||
satellite.Repair.Repairer.Loop.Pause()
|
||||
satellite.Repair.Repairer.Limiter.Wait()
|
||||
|
||||
// repaired segment should not contain any piece in the killed and DQ nodes
|
||||
metainfoService := satellitePeer.Metainfo.Service
|
||||
metainfoService := satellite.Metainfo.Service
|
||||
pointer, err = metainfoService.Get(ctx, path)
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -156,7 +156,7 @@ func TestDataRepair(t *testing.T) {
|
||||
}
|
||||
|
||||
// we should be able to download data without any of the original nodes
|
||||
newData, err := uplinkPeer.Download(ctx, satellitePeer, "testbucket", "test/path")
|
||||
newData, err := uplinkPeer.Download(ctx, satellite, "testbucket", "test/path")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, newData, testData)
|
||||
})
|
||||
@ -184,19 +184,19 @@ func TestCorruptDataRepair(t *testing.T) {
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
uplinkPeer := planet.Uplinks[0]
|
||||
satellitePeer := planet.Satellites[0]
|
||||
satellite := planet.Satellites[0]
|
||||
// stop discovery service so that we do not get a race condition when we delete nodes from overlay
|
||||
satellitePeer.Discovery.Service.Discovery.Stop()
|
||||
satellitePeer.Discovery.Service.Refresh.Stop()
|
||||
satellite.Discovery.Service.Discovery.Stop()
|
||||
satellite.Discovery.Service.Refresh.Stop()
|
||||
// stop audit to prevent possible interactions i.e. repair timeout problems
|
||||
satellitePeer.Audit.Worker.Loop.Pause()
|
||||
satellite.Audit.Worker.Loop.Pause()
|
||||
|
||||
satellitePeer.Repair.Checker.Loop.Pause()
|
||||
satellitePeer.Repair.Repairer.Loop.Pause()
|
||||
satellite.Repair.Checker.Loop.Pause()
|
||||
satellite.Repair.Repairer.Loop.Pause()
|
||||
|
||||
var testData = testrand.Bytes(8 * memory.KiB)
|
||||
// first, upload some remote data
|
||||
err := uplinkPeer.UploadWithConfig(ctx, satellitePeer, &uplink.RSConfig{
|
||||
err := uplinkPeer.UploadWithConfig(ctx, satellite, &uplink.RSConfig{
|
||||
MinThreshold: 3,
|
||||
RepairThreshold: 5,
|
||||
SuccessThreshold: 7,
|
||||
@ -204,7 +204,7 @@ func TestCorruptDataRepair(t *testing.T) {
|
||||
}, "testbucket", "test/path", testData)
|
||||
require.NoError(t, err)
|
||||
|
||||
pointer, path := getRemoteSegment(t, ctx, satellitePeer)
|
||||
pointer, path := getRemoteSegment(t, ctx, satellite)
|
||||
|
||||
// calculate how many storagenodes to kill
|
||||
redundancy := pointer.GetRemote().GetRedundancy()
|
||||
@ -245,14 +245,14 @@ func TestCorruptDataRepair(t *testing.T) {
|
||||
if nodesToKill[node.ID()] {
|
||||
err = planet.StopPeer(node)
|
||||
require.NoError(t, err)
|
||||
_, err = satellitePeer.Overlay.Service.UpdateUptime(ctx, node.ID(), false)
|
||||
_, err = satellite.Overlay.Service.UpdateUptime(ctx, node.ID(), false)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
require.NotNil(t, corruptedNode)
|
||||
|
||||
blobRef := storage.BlobRef{
|
||||
Namespace: satellitePeer.ID().Bytes(),
|
||||
Namespace: satellite.ID().Bytes(),
|
||||
Key: corruptedPiece.Bytes(),
|
||||
}
|
||||
|
||||
@ -283,16 +283,16 @@ func TestCorruptDataRepair(t *testing.T) {
|
||||
err = writer.Commit(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
satellitePeer.Repair.Checker.Loop.Restart()
|
||||
satellitePeer.Repair.Checker.Loop.TriggerWait()
|
||||
satellitePeer.Repair.Checker.Loop.Pause()
|
||||
satellitePeer.Repair.Repairer.Loop.Restart()
|
||||
satellitePeer.Repair.Repairer.Loop.TriggerWait()
|
||||
satellitePeer.Repair.Repairer.Loop.Pause()
|
||||
satellitePeer.Repair.Repairer.Limiter.Wait()
|
||||
satellite.Repair.Checker.Loop.Restart()
|
||||
satellite.Repair.Checker.Loop.TriggerWait()
|
||||
satellite.Repair.Checker.Loop.Pause()
|
||||
satellite.Repair.Repairer.Loop.Restart()
|
||||
satellite.Repair.Repairer.Loop.TriggerWait()
|
||||
satellite.Repair.Repairer.Loop.Pause()
|
||||
satellite.Repair.Repairer.Limiter.Wait()
|
||||
|
||||
// repair should fail, so segment should contain all the original nodes
|
||||
metainfoService := satellitePeer.Metainfo.Service
|
||||
metainfoService := satellite.Metainfo.Service
|
||||
pointer, err = metainfoService.Get(ctx, path)
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -318,19 +318,19 @@ func TestRemoveIrreparableSegmentFromQueue(t *testing.T) {
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
// first, upload some remote data
|
||||
uplinkPeer := planet.Uplinks[0]
|
||||
satellitePeer := planet.Satellites[0]
|
||||
satellite := planet.Satellites[0]
|
||||
// stop discovery service so that we do not get a race condition when we delete nodes from overlay
|
||||
satellitePeer.Discovery.Service.Discovery.Stop()
|
||||
satellitePeer.Discovery.Service.Refresh.Stop()
|
||||
satellite.Discovery.Service.Discovery.Stop()
|
||||
satellite.Discovery.Service.Refresh.Stop()
|
||||
// stop audit to prevent possible interactions i.e. repair timeout problems
|
||||
planet.Satellites[0].Audit.Worker.Loop.Stop()
|
||||
satellite.Audit.Worker.Loop.Stop()
|
||||
|
||||
satellitePeer.Repair.Checker.Loop.Pause()
|
||||
satellitePeer.Repair.Repairer.Loop.Pause()
|
||||
satellite.Repair.Checker.Loop.Pause()
|
||||
satellite.Repair.Repairer.Loop.Pause()
|
||||
|
||||
testData := testrand.Bytes(8 * memory.KiB)
|
||||
|
||||
err := uplinkPeer.UploadWithConfig(ctx, satellitePeer, &uplink.RSConfig{
|
||||
err := uplinkPeer.UploadWithConfig(ctx, satellite, &uplink.RSConfig{
|
||||
MinThreshold: 3,
|
||||
RepairThreshold: 5,
|
||||
SuccessThreshold: 7,
|
||||
@ -338,7 +338,7 @@ func TestRemoveIrreparableSegmentFromQueue(t *testing.T) {
|
||||
}, "testbucket", "test/path", testData)
|
||||
require.NoError(t, err)
|
||||
|
||||
pointer, _ := getRemoteSegment(t, ctx, satellitePeer)
|
||||
pointer, _ := getRemoteSegment(t, ctx, satellite)
|
||||
|
||||
// kill nodes and track lost pieces
|
||||
nodesToDQ := make(map[storj.NodeID]bool)
|
||||
@ -356,33 +356,33 @@ func TestRemoveIrreparableSegmentFromQueue(t *testing.T) {
|
||||
}
|
||||
|
||||
for nodeID := range nodesToDQ {
|
||||
disqualifyNode(t, ctx, satellitePeer, nodeID)
|
||||
disqualifyNode(t, ctx, satellite, nodeID)
|
||||
}
|
||||
|
||||
// trigger checker to add segment to repair queue
|
||||
satellitePeer.Repair.Checker.Loop.Restart()
|
||||
satellitePeer.Repair.Checker.Loop.TriggerWait()
|
||||
satellitePeer.Repair.Checker.Loop.Pause()
|
||||
satellite.Repair.Checker.Loop.Restart()
|
||||
satellite.Repair.Checker.Loop.TriggerWait()
|
||||
satellite.Repair.Checker.Loop.Pause()
|
||||
|
||||
// TODO: Verify segment is in queue by making a query to the database
|
||||
|
||||
// Kill nodes so that online nodes < minimum threshold
|
||||
// This will make the segment irreparable
|
||||
for _, piece := range remotePieces {
|
||||
disqualifyNode(t, ctx, satellitePeer, piece.NodeId)
|
||||
disqualifyNode(t, ctx, satellite, piece.NodeId)
|
||||
}
|
||||
|
||||
count, err := satellitePeer.DB.RepairQueue().Count(ctx)
|
||||
count, err := satellite.DB.RepairQueue().Count(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, count, 1)
|
||||
|
||||
// Run the repairer
|
||||
satellitePeer.Repair.Repairer.Loop.Restart()
|
||||
satellitePeer.Repair.Repairer.Loop.TriggerWait()
|
||||
satellitePeer.Repair.Repairer.Loop.Pause()
|
||||
satellitePeer.Repair.Repairer.Limiter.Wait()
|
||||
satellite.Repair.Repairer.Loop.Restart()
|
||||
satellite.Repair.Repairer.Loop.TriggerWait()
|
||||
satellite.Repair.Repairer.Loop.Pause()
|
||||
satellite.Repair.Repairer.Limiter.Wait()
|
||||
|
||||
count, err = satellitePeer.DB.RepairQueue().Count(ctx)
|
||||
count, err = satellite.DB.RepairQueue().Count(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, count, 0)
|
||||
})
|
||||
@ -403,17 +403,17 @@ func TestRepairMultipleDisqualified(t *testing.T) {
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
// first, upload some remote data
|
||||
uplinkPeer := planet.Uplinks[0]
|
||||
satellitePeer := planet.Satellites[0]
|
||||
satellite := planet.Satellites[0]
|
||||
// stop discovery service so that we do not get a race condition when we delete nodes from overlay
|
||||
satellitePeer.Discovery.Service.Discovery.Stop()
|
||||
satellitePeer.Discovery.Service.Refresh.Stop()
|
||||
satellite.Discovery.Service.Discovery.Stop()
|
||||
satellite.Discovery.Service.Refresh.Stop()
|
||||
|
||||
satellitePeer.Repair.Checker.Loop.Pause()
|
||||
satellitePeer.Repair.Repairer.Loop.Pause()
|
||||
satellite.Repair.Checker.Loop.Pause()
|
||||
satellite.Repair.Repairer.Loop.Pause()
|
||||
|
||||
testData := testrand.Bytes(8 * memory.KiB)
|
||||
|
||||
err := uplinkPeer.UploadWithConfig(ctx, satellitePeer, &uplink.RSConfig{
|
||||
err := uplinkPeer.UploadWithConfig(ctx, satellite, &uplink.RSConfig{
|
||||
MinThreshold: 3,
|
||||
RepairThreshold: 5,
|
||||
SuccessThreshold: 7,
|
||||
@ -422,7 +422,7 @@ func TestRepairMultipleDisqualified(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
// get a remote segment from metainfo
|
||||
metainfo := satellitePeer.Metainfo.Service
|
||||
metainfo := satellite.Metainfo.Service
|
||||
listResponse, _, err := metainfo.List(ctx, "", "", "", true, 0, 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -461,16 +461,16 @@ func TestRepairMultipleDisqualified(t *testing.T) {
|
||||
|
||||
for _, node := range planet.StorageNodes {
|
||||
if nodesToDisqualify[node.ID()] {
|
||||
disqualifyNode(t, ctx, satellitePeer, node.ID())
|
||||
disqualifyNode(t, ctx, satellite, node.ID())
|
||||
}
|
||||
}
|
||||
|
||||
err = satellitePeer.Repair.Checker.RefreshReliabilityCache(ctx)
|
||||
err = satellite.Repair.Checker.RefreshReliabilityCache(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
satellitePeer.Repair.Checker.Loop.TriggerWait()
|
||||
satellitePeer.Repair.Repairer.Loop.TriggerWait()
|
||||
satellitePeer.Repair.Repairer.Limiter.Wait()
|
||||
satellite.Repair.Checker.Loop.TriggerWait()
|
||||
satellite.Repair.Repairer.Loop.TriggerWait()
|
||||
satellite.Repair.Repairer.Limiter.Wait()
|
||||
|
||||
// kill nodes kept alive to ensure repair worked
|
||||
for _, node := range planet.StorageNodes {
|
||||
@ -478,13 +478,13 @@ func TestRepairMultipleDisqualified(t *testing.T) {
|
||||
err = planet.StopPeer(node)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = satellitePeer.Overlay.Service.UpdateUptime(ctx, node.ID(), false)
|
||||
_, err = satellite.Overlay.Service.UpdateUptime(ctx, node.ID(), false)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
// we should be able to download data without any of the original nodes
|
||||
newData, err := uplinkPeer.Download(ctx, satellitePeer, "testbucket", "test/path")
|
||||
newData, err := uplinkPeer.Download(ctx, satellite, "testbucket", "test/path")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, newData, testData)
|
||||
|
||||
@ -524,7 +524,7 @@ func TestDataRepairUploadLimit(t *testing.T) {
|
||||
satellite.Discovery.Service.Discovery.Stop()
|
||||
satellite.Discovery.Service.Refresh.Stop()
|
||||
// stop audit to prevent possible interactions i.e. repair timeout problems
|
||||
planet.Satellites[0].Audit.Worker.Loop.Pause()
|
||||
satellite.Audit.Worker.Loop.Pause()
|
||||
satellite.Repair.Checker.Loop.Pause()
|
||||
satellite.Repair.Repairer.Loop.Pause()
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user