From 29fd36a20e6613a79fd1390efa71bcf75ef8e737 Mon Sep 17 00:00:00 2001 From: Fadila Khadar Date: Thu, 3 Mar 2022 01:23:11 +0100 Subject: [PATCH] satellite/repairer: handle excluded countries For nodes in excluded areas, we don't necessarily want to remove them from the pointer, but we do want to increase the number of pieces in the segment in case those excluded area nodes go down. To do that, we increase the number of pieces repaired by the number of pieces in excluded areas. Change-Id: I0424f1bcd7e93f33eb3eeeec79dbada3b3ea1f3a --- private/testplanet/reconfigure.go | 14 ++ satellite/audit/verifier_test.go | 179 +++++++++++++++++ .../nodeselection/uploadselection/criteria.go | 3 + satellite/orders/service.go | 5 +- satellite/overlay/service.go | 37 +++- satellite/overlay/service_test.go | 22 +++ satellite/repair/checker/checker_test.go | 56 ------ satellite/repair/repair_test.go | 182 ++++++++++++++++++ satellite/repair/repairer/segments.go | 15 +- satellite/satellitedb/overlaycache.go | 90 ++++++++- 10 files changed, 538 insertions(+), 65 deletions(-) diff --git a/private/testplanet/reconfigure.go b/private/testplanet/reconfigure.go index 2fbd1399b..186d28a20 100644 --- a/private/testplanet/reconfigure.go +++ b/private/testplanet/reconfigure.go @@ -74,6 +74,20 @@ var ReconfigureRS = func(minThreshold, repairThreshold, successThreshold, totalT } } +// RepairExcludedCountryCodes returns function to change satellite repair excluded country codes. +var RepairExcludedCountryCodes = func(repairExcludedCountryCodes []string) func(log *zap.Logger, index int, config *satellite.Config) { + return func(log *zap.Logger, index int, config *satellite.Config) { + config.Overlay.RepairExcludedCountryCodes = repairExcludedCountryCodes + } +} + +// UploadExcludedCountryCodes returns function to change satellite upload excluded country codes. +var UploadExcludedCountryCodes = func(uploadExcludedCountryCodes []string) func(log *zap.Logger, index int, config *satellite.Config) { + return func(log *zap.Logger, index int, config *satellite.Config) { + config.Overlay.Node.UploadExcludedCountryCodes = uploadExcludedCountryCodes + } +} + // MaxSegmentSize returns function to change satellite max segment size value. var MaxSegmentSize = func(maxSegmentSize memory.Size) func(log *zap.Logger, index int, config *satellite.Config) { return func(log *zap.Logger, index int, config *satellite.Config) { diff --git a/satellite/audit/verifier_test.go b/satellite/audit/verifier_test.go index bc02f5bbd..6a1659c71 100644 --- a/satellite/audit/verifier_test.go +++ b/satellite/audit/verifier_test.go @@ -5,6 +5,8 @@ package audit_test import ( "context" + "crypto/rand" + "fmt" "testing" "time" @@ -21,11 +23,13 @@ import ( "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/common/testrand" + "storj.io/common/uuid" "storj.io/storj/private/testblobs" "storj.io/storj/private/testplanet" "storj.io/storj/satellite" "storj.io/storj/satellite/audit" "storj.io/storj/satellite/metabase" + "storj.io/storj/storage" "storj.io/storj/storagenode" ) @@ -927,3 +931,178 @@ func TestVerifierUnknownError(t *testing.T) { assert.Equal(t, report.Unknown[0], badNode.ID()) }) } + +func TestAuditRepairedSegmentInExcludedCountries(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, + StorageNodeCount: 20, + UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: testplanet.Combine( + func(log *zap.Logger, index int, config *satellite.Config) { + config.Repairer.InMemoryRepair = true + }, + testplanet.ReconfigureRS(3, 5, 8, 10), + testplanet.RepairExcludedCountryCodes([]string{"FR", "BE"}), + ), + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + uplinkPeer := planet.Uplinks[0] + satellite := planet.Satellites[0] + // stop audit to prevent possible interactions i.e. repair timeout problems + satellite.Audit.Worker.Loop.Pause() + + satellite.Repair.Checker.Loop.Pause() + satellite.Repair.Repairer.Loop.Pause() + + var testData = testrand.Bytes(8 * memory.KiB) + bucket := "testbucket" + // first, upload some remote data + err := uplinkPeer.Upload(ctx, satellite, bucket, "test/path", testData) + require.NoError(t, err) + + segment, _ := getRemoteSegment(ctx, t, satellite, uplinkPeer.Projects[0].ID, bucket) + + remotePieces := segment.Pieces + + numExcluded := 5 + var nodesInExcluded storj.NodeIDList + for i := 0; i < numExcluded; i++ { + err = planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, remotePieces[i].StorageNode, "FR") + require.NoError(t, err) + nodesInExcluded = append(nodesInExcluded, remotePieces[i].StorageNode) + } + + // make extra pieces after optimal bad + for i := int(segment.Redundancy.OptimalShares); i < len(remotePieces); i++ { + err = planet.StopNodeAndUpdate(ctx, planet.FindNode(remotePieces[i].StorageNode)) + require.NoError(t, err) + } + + // trigger checker to add segment to repair queue + satellite.Repair.Checker.Loop.Restart() + satellite.Repair.Checker.Loop.TriggerWait() + satellite.Repair.Checker.Loop.Pause() + + count, err := satellite.DB.RepairQueue().Count(ctx) + require.NoError(t, err) + require.Equal(t, 1, count) + + satellite.Repair.Repairer.Loop.Restart() + satellite.Repair.Repairer.Loop.TriggerWait() + satellite.Repair.Repairer.Loop.Pause() + satellite.Repair.Repairer.WaitForPendingRepairs() + + // Verify that the segment was removed + count, err = satellite.DB.RepairQueue().Count(ctx) + require.NoError(t, err) + require.Zero(t, count) + + // Verify the segment has been repaired + segmentAfterRepair, _ := getRemoteSegment(ctx, t, satellite, planet.Uplinks[0].Projects[0].ID, bucket) + require.NotEqual(t, segment.Pieces, segmentAfterRepair.Pieces) + require.Equal(t, 10, len(segmentAfterRepair.Pieces)) + + // check excluded area nodes still exist + for i, n := range nodesInExcluded { + var found bool + for _, p := range segmentAfterRepair.Pieces { + if p.StorageNode == n { + found = true + break + } + } + require.True(t, found, fmt.Sprintf("node %s not in segment, but should be\n", segmentAfterRepair.Pieces[i].StorageNode.String())) + } + nodesInPointer := make(map[storj.NodeID]bool) + for _, n := range segmentAfterRepair.Pieces { + // check for duplicates + _, ok := nodesInPointer[n.StorageNode] + require.False(t, ok) + nodesInPointer[n.StorageNode] = true + } + + lastPieceIndex := segmentAfterRepair.Pieces.Len() - 1 + lastPiece := segmentAfterRepair.Pieces[lastPieceIndex] + for _, n := range planet.StorageNodes { + if n.ID() == lastPiece.StorageNode { + pieceID := segmentAfterRepair.RootPieceID.Derive(n.ID(), int32(lastPiece.Number)) + corruptPieceData(ctx, t, planet, n, pieceID) + } + } + + // now audit + report, err := satellite.Audit.Verifier.Verify(ctx, audit.Segment{ + StreamID: segmentAfterRepair.StreamID, + Position: segmentAfterRepair.Position, + ExpiresAt: segmentAfterRepair.ExpiresAt, + EncryptedSize: segmentAfterRepair.EncryptedSize, + }, nil) + require.NoError(t, err) + require.Len(t, report.Fails, 1) + require.Equal(t, report.Fails[0], lastPiece.StorageNode) + }) +} + +// getRemoteSegment returns a remote pointer its path from satellite. +// nolint:golint +func getRemoteSegment( + ctx context.Context, t *testing.T, satellite *testplanet.Satellite, projectID uuid.UUID, bucketName string, +) (_ metabase.Segment, key metabase.SegmentKey) { + t.Helper() + + objects, err := satellite.Metabase.DB.TestingAllObjects(ctx) + require.NoError(t, err) + require.Len(t, objects, 1) + + segments, err := satellite.Metabase.DB.TestingAllSegments(ctx) + require.NoError(t, err) + require.Len(t, segments, 1) + require.False(t, segments[0].Inline()) + + return segments[0], metabase.SegmentLocation{ + ProjectID: projectID, + BucketName: bucketName, + ObjectKey: objects[0].ObjectKey, + Position: segments[0].Position, + }.Encode() +} + +// corruptPieceData manipulates piece data on a storage node. +func corruptPieceData(ctx context.Context, t *testing.T, planet *testplanet.Planet, corruptedNode *testplanet.StorageNode, corruptedPieceID storj.PieceID) { + t.Helper() + + blobRef := storage.BlobRef{ + Namespace: planet.Satellites[0].ID().Bytes(), + Key: corruptedPieceID.Bytes(), + } + + // get currently stored piece data from storagenode + reader, err := corruptedNode.Storage2.BlobsCache.Open(ctx, blobRef) + require.NoError(t, err) + pieceSize, err := reader.Size() + require.NoError(t, err) + require.True(t, pieceSize > 0) + pieceData := make([]byte, pieceSize) + + // delete piece data + err = corruptedNode.Storage2.BlobsCache.Delete(ctx, blobRef) + require.NoError(t, err) + + // create new random data + _, err = rand.Read(pieceData) + require.NoError(t, err) + + // corrupt piece data (not PieceHeader) and write back to storagenode + // this means repair downloading should fail during piece hash verification + pieceData[pieceSize-1]++ // if we don't do this, this test should fail + writer, err := corruptedNode.Storage2.BlobsCache.Create(ctx, blobRef, pieceSize) + require.NoError(t, err) + + n, err := writer.Write(pieceData) + require.NoError(t, err) + require.EqualValues(t, n, pieceSize) + + err = writer.Commit(ctx) + require.NoError(t, err) +} diff --git a/satellite/nodeselection/uploadselection/criteria.go b/satellite/nodeselection/uploadselection/criteria.go index bd94f893c..d1f477959 100644 --- a/satellite/nodeselection/uploadselection/criteria.go +++ b/satellite/nodeselection/uploadselection/criteria.go @@ -34,6 +34,9 @@ func (c *Criteria) MatchInclude(node *Node) bool { } for _, code := range c.ExcludedCountryCodes { + if code.String() == "" { + continue + } if node.CountryCode == code { return false } diff --git a/satellite/orders/service.go b/satellite/orders/service.go index 2060ee7fd..1a4265ce6 100644 --- a/satellite/orders/service.go +++ b/satellite/orders/service.go @@ -434,7 +434,7 @@ func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, bucket m } // CreatePutRepairOrderLimits creates the order limits for uploading the repaired pieces of segment to newNodes. -func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, bucket metabase.BucketLocation, segment metabase.Segment, getOrderLimits []*pb.AddressedOrderLimit, newNodes []*overlay.SelectedNode, optimalThresholdMultiplier float64) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error) { +func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, bucket metabase.BucketLocation, segment metabase.Segment, getOrderLimits []*pb.AddressedOrderLimit, newNodes []*overlay.SelectedNode, optimalThresholdMultiplier float64, numPiecesInExcludedCountries int) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error) { defer mon.Task()(&ctx)(&err) // Create the order limits for being used to upload the repaired pieces @@ -445,7 +445,8 @@ func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, bucket m pieceSize := eestream.CalcPieceSize(int64(segment.EncryptedSize), redundancy) totalPieces := redundancy.TotalCount() - totalPiecesAfterRepair := int(math.Ceil(float64(redundancy.OptimalThreshold()) * optimalThresholdMultiplier)) + totalPiecesAfterRepair := int(math.Ceil(float64(redundancy.OptimalThreshold())*optimalThresholdMultiplier)) + numPiecesInExcludedCountries + if totalPiecesAfterRepair > totalPieces { totalPiecesAfterRepair = totalPieces } diff --git a/satellite/overlay/service.go b/satellite/overlay/service.go index b4e75e691..0e74fa488 100644 --- a/satellite/overlay/service.go +++ b/satellite/overlay/service.go @@ -60,6 +60,8 @@ type DB interface { KnownOffline(context.Context, *NodeCriteria, storj.NodeIDList) (storj.NodeIDList, error) // KnownUnreliableOrOffline filters a set of nodes to unhealth or offlines node, independent of new KnownUnreliableOrOffline(context.Context, *NodeCriteria, storj.NodeIDList) (storj.NodeIDList, error) + // KnownReliableInExcludedCountries filters healthy nodes that are in excluded countries. + KnownReliableInExcludedCountries(context.Context, *NodeCriteria, storj.NodeIDList) (storj.NodeIDList, error) // KnownReliable filters a set of nodes to reliable (online and qualified) nodes. KnownReliable(ctx context.Context, onlineWindow time.Duration, nodeIDs storj.NodeIDList) ([]*pb.Node, error) // Reliable returns all nodes that are reliable @@ -489,6 +491,17 @@ func (service *Service) KnownUnreliableOrOffline(ctx context.Context, nodeIds st return service.db.KnownUnreliableOrOffline(ctx, criteria, nodeIds) } +// KnownReliableInExcludedCountries filters healthy nodes that are in excluded countries. +func (service *Service) KnownReliableInExcludedCountries(ctx context.Context, nodeIds storj.NodeIDList) (reliableInExcluded storj.NodeIDList, err error) { + defer mon.Task()(&ctx)(&err) + + criteria := &NodeCriteria{ + OnlineWindow: service.config.Node.OnlineWindow, + ExcludedCountries: service.config.RepairExcludedCountryCodes, + } + return service.db.KnownReliableInExcludedCountries(ctx, criteria, nodeIds) +} + // KnownReliable filters a set of nodes to reliable (online and qualified) nodes. func (service *Service) KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList) (nodes []*pb.Node, err error) { defer mon.Task()(&ctx)(&err) @@ -599,7 +612,7 @@ func (service *Service) UpdateCheckIn(ctx context.Context, node NodeCheckInInfo, return nil } -// GetMissingPieces returns the list of offline nodes. +// GetMissingPieces returns the list of offline nodes and the corresponding pieces. func (service *Service) GetMissingPieces(ctx context.Context, pieces metabase.Pieces) (missingPieces []uint16, err error) { defer mon.Task()(&ctx)(&err) var nodeIDs storj.NodeIDList @@ -621,6 +634,28 @@ func (service *Service) GetMissingPieces(ctx context.Context, pieces metabase.Pi return missingPieces, nil } +// GetReliablePiecesInExcludedCountries returns the list of pieces held by nodes located in excluded countries. +func (service *Service) GetReliablePiecesInExcludedCountries(ctx context.Context, pieces metabase.Pieces) (piecesInExcluded []uint16, err error) { + defer mon.Task()(&ctx)(&err) + var nodeIDs storj.NodeIDList + for _, p := range pieces { + nodeIDs = append(nodeIDs, p.StorageNode) + } + inExcluded, err := service.KnownReliableInExcludedCountries(ctx, nodeIDs) + if err != nil { + return nil, Error.New("error getting nodes %s", err) + } + + for _, p := range pieces { + for _, nodeID := range inExcluded { + if nodeID == p.StorageNode { + piecesInExcluded = append(piecesInExcluded, p.Number) + } + } + } + return piecesInExcluded, nil +} + // DisqualifyNode disqualifies a storage node. func (service *Service) DisqualifyNode(ctx context.Context, nodeID storj.NodeID) (err error) { defer mon.Task()(&ctx)(&err) diff --git a/satellite/overlay/service_test.go b/satellite/overlay/service_test.go index 644f086bb..e9ba1ee33 100644 --- a/satellite/overlay/service_test.go +++ b/satellite/overlay/service_test.go @@ -845,3 +845,25 @@ func TestReliable(t *testing.T) { require.NotEqual(t, node.ID(), nodes[0]) }) } + +func TestKnownReliableInExcludedCountries(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 2, UplinkCount: 0, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + service := planet.Satellites[0].Overlay.Service + node := planet.StorageNodes[0] + + nodes, err := service.Reliable(ctx) + require.NoError(t, err) + require.Len(t, nodes, 2) + + err = planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), "FR") + require.NoError(t, err) + + // first node should be excluded from Reliable result because of country code + nodes, err = service.KnownReliableInExcludedCountries(ctx, nodes) + require.NoError(t, err) + require.Len(t, nodes, 1) + require.Equal(t, node.ID(), nodes[0]) + }) +} diff --git a/satellite/repair/checker/checker_test.go b/satellite/repair/checker/checker_test.go index a30b653b2..8c428c89c 100644 --- a/satellite/repair/checker/checker_test.go +++ b/satellite/repair/checker/checker_test.go @@ -11,7 +11,6 @@ import ( "github.com/stretchr/testify/require" - "storj.io/common/memory" "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/common/testrand" @@ -84,61 +83,6 @@ func TestIdentifyInjuredSegments(t *testing.T) { }) } -func TestInjuredsSegmentWhenPiecesAreInExcludedCountries(t *testing.T) { - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, - Reconfigure: testplanet.Reconfigure{ - Satellite: testplanet.ReconfigureRS(2, 3, 4, 4), - }, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - checker := planet.Satellites[0].Repair.Checker - - checker.Loop.Pause() - planet.Satellites[0].Repair.Repairer.Loop.Pause() - - err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "key", testrand.Bytes(5*memory.KiB)) - require.NoError(t, err) - - objects, err := planet.Satellites[0].Metabase.DB.TestingAllObjects(ctx) - require.NoError(t, err) - require.Len(t, objects, 1) - - segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) - require.NoError(t, err) - require.Len(t, segments, 1) - require.False(t, segments[0].Inline()) - - err = planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, planet.StorageNodes[0].ID(), "FR") - require.NoError(t, err) - err = planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, planet.StorageNodes[1].ID(), "FR") - require.NoError(t, err) - - checker.Loop.TriggerWait() - - // check that the healthy segments was added to repair queue - // because of part of nodes have country code value on exclude - // list - count, err := planet.Satellites[0].DB.RepairQueue().Count(ctx) - require.NoError(t, err) - require.Equal(t, 1, count) - - // trigger checker to add segment to repair queue - planet.Satellites[0].Repair.Repairer.Loop.Restart() - planet.Satellites[0].Repair.Repairer.Loop.TriggerWait() - planet.Satellites[0].Repair.Repairer.Loop.Pause() - planet.Satellites[0].Repair.Repairer.WaitForPendingRepairs() - - count, err = planet.Satellites[0].DB.RepairQueue().Count(ctx) - require.NoError(t, err) - require.Equal(t, 0, count) - - segmentsAfterRepair, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) - require.NoError(t, err) - - require.Equal(t, segments[0].Pieces, segmentsAfterRepair[0].Pieces) - }) -} - func TestIdentifyIrreparableSegments(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 3, UplinkCount: 1, diff --git a/satellite/repair/repair_test.go b/satellite/repair/repair_test.go index 4148dc0e2..65ac45ad0 100644 --- a/satellite/repair/repair_test.go +++ b/satellite/repair/repair_test.go @@ -3076,3 +3076,185 @@ func TestECRepairerGetPrefersCachedIPPort(t *testing.T) { require.NotContains(t, mock.addressesDialed, realAddresses) }) } + +func TestSegmentInExcludedCountriesRepair(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, + StorageNodeCount: 20, + UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: testplanet.Combine( + func(log *zap.Logger, index int, config *satellite.Config) { + config.Repairer.InMemoryRepair = true + }, + testplanet.ReconfigureRS(3, 5, 8, 10), + testplanet.RepairExcludedCountryCodes([]string{"FR", "BE"}), + ), + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + uplinkPeer := planet.Uplinks[0] + satellite := planet.Satellites[0] + // stop audit to prevent possible interactions i.e. repair timeout problems + satellite.Audit.Worker.Loop.Pause() + + satellite.Repair.Checker.Loop.Pause() + satellite.Repair.Repairer.Loop.Pause() + + var testData = testrand.Bytes(8 * memory.KiB) + // first, upload some remote data + err := uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path", testData) + require.NoError(t, err) + + segment, _ := getRemoteSegment(ctx, t, satellite, planet.Uplinks[0].Projects[0].ID, "testbucket") + require.Equal(t, 3, int(segment.Redundancy.RequiredShares)) + + remotePieces := segment.Pieces + + numExcluded := 5 + var nodesInExcluded storj.NodeIDList + for i := 0; i < numExcluded; i++ { + err = planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, remotePieces[i].StorageNode, "FR") + require.NoError(t, err) + nodesInExcluded = append(nodesInExcluded, remotePieces[i].StorageNode) + } + // make extra pieces after optimal bad + for i := int(segment.Redundancy.OptimalShares); i < len(remotePieces); i++ { + err = planet.StopNodeAndUpdate(ctx, planet.FindNode(remotePieces[i].StorageNode)) + require.NoError(t, err) + } + + // trigger checker to add segment to repair queue + satellite.Repair.Checker.Loop.Restart() + satellite.Repair.Checker.Loop.TriggerWait() + satellite.Repair.Checker.Loop.Pause() + + count, err := satellite.DB.RepairQueue().Count(ctx) + require.NoError(t, err) + require.Equal(t, 1, count) + + satellite.Repair.Repairer.Loop.Restart() + satellite.Repair.Repairer.Loop.TriggerWait() + satellite.Repair.Repairer.Loop.Pause() + satellite.Repair.Repairer.WaitForPendingRepairs() + + // Verify that the segment was removed + count, err = satellite.DB.RepairQueue().Count(ctx) + require.NoError(t, err) + require.Zero(t, count) + + // Verify the segment has been repaired + segmentAfterRepair, _ := getRemoteSegment(ctx, t, satellite, planet.Uplinks[0].Projects[0].ID, "testbucket") + require.NotEqual(t, segment.Pieces, segmentAfterRepair.Pieces) + require.Equal(t, 10, len(segmentAfterRepair.Pieces)) + + // check excluded area nodes still exist + for i, n := range nodesInExcluded { + var found bool + for _, p := range segmentAfterRepair.Pieces { + if p.StorageNode == n { + found = true + break + } + } + require.True(t, found, fmt.Sprintf("node %s not in segment, but should be\n", segmentAfterRepair.Pieces[i].StorageNode.String())) + } + nodesInPointer := make(map[storj.NodeID]bool) + for _, n := range segmentAfterRepair.Pieces { + // check for duplicates + _, ok := nodesInPointer[n.StorageNode] + require.False(t, ok) + nodesInPointer[n.StorageNode] = true + } + }) +} + +func TestSegmentInExcludedCountriesRepairIrreparable(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, + StorageNodeCount: 20, + UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: testplanet.Combine( + func(log *zap.Logger, index int, config *satellite.Config) { + config.Repairer.InMemoryRepair = true + }, + testplanet.ReconfigureRS(3, 5, 8, 10), + testplanet.RepairExcludedCountryCodes([]string{"FR", "BE"}), + ), + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + uplinkPeer := planet.Uplinks[0] + satellite := planet.Satellites[0] + // stop audit to prevent possible interactions i.e. repair timeout problems + satellite.Audit.Worker.Loop.Pause() + + satellite.Repair.Checker.Loop.Pause() + satellite.Repair.Repairer.Loop.Pause() + + var testData = testrand.Bytes(8 * memory.KiB) + // first, upload some remote data + err := uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path", testData) + require.NoError(t, err) + + segment, _ := getRemoteSegment(ctx, t, satellite, planet.Uplinks[0].Projects[0].ID, "testbucket") + require.Equal(t, 3, int(segment.Redundancy.RequiredShares)) + + remotePieces := segment.Pieces + + numExcluded := 6 + var nodesInExcluded storj.NodeIDList + for i := 0; i < numExcluded; i++ { + err = planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, remotePieces[i].StorageNode, "FR") + require.NoError(t, err) + nodesInExcluded = append(nodesInExcluded, remotePieces[i].StorageNode) + } + // make the rest unhealthy + for i := numExcluded; i < len(remotePieces); i++ { + err = planet.StopNodeAndUpdate(ctx, planet.FindNode(remotePieces[i].StorageNode)) + require.NoError(t, err) + } + + // trigger checker to add segment to repair queue + satellite.Repair.Checker.Loop.Restart() + satellite.Repair.Checker.Loop.TriggerWait() + satellite.Repair.Checker.Loop.Pause() + + count, err := satellite.DB.RepairQueue().Count(ctx) + require.NoError(t, err) + require.Equal(t, 1, count) + + satellite.Repair.Repairer.Loop.Restart() + satellite.Repair.Repairer.Loop.TriggerWait() + satellite.Repair.Repairer.Loop.Pause() + satellite.Repair.Repairer.WaitForPendingRepairs() + + // Verify that the segment was removed + count, err = satellite.DB.RepairQueue().Count(ctx) + require.NoError(t, err) + require.Zero(t, count) + + // Verify the segment has been repaired + segmentAfterRepair, _ := getRemoteSegment(ctx, t, satellite, planet.Uplinks[0].Projects[0].ID, "testbucket") + require.NotEqual(t, segment.Pieces, segmentAfterRepair.Pieces) + require.Equal(t, 10, len(segmentAfterRepair.Pieces)) + + // check excluded area nodes still exist + for i, n := range nodesInExcluded { + var found bool + for _, p := range segmentAfterRepair.Pieces { + if p.StorageNode == n { + found = true + break + } + } + require.True(t, found, fmt.Sprintf("node %s not in segment, but should be\n", segmentAfterRepair.Pieces[i].StorageNode.String())) + } + nodesInPointer := make(map[storj.NodeID]bool) + for _, n := range segmentAfterRepair.Pieces { + // check for duplicates + _, ok := nodesInPointer[n.StorageNode] + require.False(t, ok) + nodesInPointer[n.StorageNode] = true + } + }) +} diff --git a/satellite/repair/repairer/segments.go b/satellite/repair/repairer/segments.go index d0e619ec0..5d5eeccff 100644 --- a/satellite/repair/repairer/segments.go +++ b/satellite/repair/repairer/segments.go @@ -189,6 +189,13 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue return false, nil } + piecesInExcludedCountries, err := repairer.overlay.GetReliablePiecesInExcludedCountries(ctx, pieces) + if err != nil { + return false, overlayQueryError.New("error identifying pieces in excluded countries: %w", err) + } + + numHealthyInExcludedCountries := len(piecesInExcludedCountries) + // ensure we get values, even if only zero values, so that redash can have an alert based on this mon.Counter("repairer_segments_below_min_req").Inc(0) //mon:locked stats.repairerSegmentsBelowMinReq.Inc(0) @@ -207,7 +214,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue } // repair not needed - if numHealthy > int(repairThreshold) { + if numHealthy-numHealthyInExcludedCountries > int(repairThreshold) { mon.Meter("repair_unnecessary").Mark(1) //mon:locked stats.repairUnnecessary.Mark(1) repairer.log.Debug("segment above repair threshold", zap.Int("numHealthy", numHealthy), zap.Int32("repairThreshold", repairThreshold)) @@ -268,8 +275,8 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue var minSuccessfulNeeded int { totalNeeded := math.Ceil(float64(redundancy.OptimalThreshold()) * repairer.multiplierOptimalThreshold) - requestCount = int(totalNeeded) - len(healthyPieces) - minSuccessfulNeeded = redundancy.OptimalThreshold() - len(healthyPieces) + requestCount = int(totalNeeded) - len(healthyPieces) + numHealthyInExcludedCountries + minSuccessfulNeeded = redundancy.OptimalThreshold() - len(healthyPieces) + numHealthyInExcludedCountries } // Request Overlay for n-h new storage nodes @@ -283,7 +290,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue } // Create the order limits for the PUT_REPAIR action - putLimits, putPrivateKey, err := repairer.orders.CreatePutRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, getOrderLimits, newNodes, repairer.multiplierOptimalThreshold) + putLimits, putPrivateKey, err := repairer.orders.CreatePutRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, getOrderLimits, newNodes, repairer.multiplierOptimalThreshold, numHealthyInExcludedCountries) if err != nil { return false, orderLimitFailureError.New("could not create PUT_REPAIR order limits: %w", err) } diff --git a/satellite/satellitedb/overlaycache.go b/satellite/satellitedb/overlaycache.go index 49086a507..6ccbda065 100644 --- a/satellite/satellitedb/overlaycache.go +++ b/satellite/satellitedb/overlaycache.go @@ -405,6 +405,80 @@ func (cache *overlaycache) KnownUnreliableOrOffline(ctx context.Context, criteri return badNodes, err } +// KnownReliableInExcludedCountries filters healthy nodes that are in excluded countries. +func (cache *overlaycache) KnownReliableInExcludedCountries(ctx context.Context, criteria *overlay.NodeCriteria, nodeIDs storj.NodeIDList) (reliableInExcluded storj.NodeIDList, err error) { + for { + reliableInExcluded, err = cache.knownReliableInExcludedCountries(ctx, criteria, nodeIDs) + if err != nil { + if cockroachutil.NeedsRetry(err) { + continue + } + return reliableInExcluded, err + } + break + } + + return reliableInExcluded, err +} + +func (cache *overlaycache) knownReliableInExcludedCountries(ctx context.Context, criteria *overlay.NodeCriteria, nodeIDs storj.NodeIDList) (reliableInExcluded storj.NodeIDList, err error) { + defer mon.Task()(&ctx)(&err) + + if len(nodeIDs) == 0 { + return nil, Error.New("no ids provided") + } + + args := []interface{}{ + pgutil.NodeIDArray(nodeIDs), + time.Now().Add(-criteria.OnlineWindow), + } + + // When this config is not set, it's a string slice with one empty string. This is a sanity check just + // in case for some reason it's nil or has no elements. + if criteria.ExcludedCountries == nil || len(criteria.ExcludedCountries) == 0 { + return reliableInExcluded, nil + } + + var excludedCountriesCondition string + if criteria.ExcludedCountries[0] == "" { + return reliableInExcluded, nil + } + + excludedCountriesCondition = "AND country_code IN (SELECT UNNEST($3::TEXT[]))" + args = append(args, pgutil.TextArray(criteria.ExcludedCountries)) + + // get reliable and online nodes + var rows tagsql.Rows + rows, err = cache.db.Query(ctx, cache.db.Rebind(` + SELECT id + FROM nodes + `+cache.db.impl.AsOfSystemInterval(criteria.AsOfSystemInterval)+` + WHERE id = any($1::bytea[]) + AND disqualified IS NULL + AND unknown_audit_suspended IS NULL + AND offline_suspended IS NULL + AND exit_finished_at IS NULL + AND last_contact_success > $2 + `+excludedCountriesCondition+` + `), args..., + ) + if err != nil { + return nil, err + } + defer func() { err = errs.Combine(err, rows.Close()) }() + + for rows.Next() { + var id storj.NodeID + err = rows.Scan(&id) + if err != nil { + return nil, err + } + reliableInExcluded = append(reliableInExcluded, id) + } + + return reliableInExcluded, Error.Wrap(rows.Err()) +} + func (cache *overlaycache) knownUnreliableOrOffline(ctx context.Context, criteria *overlay.NodeCriteria, nodeIDs storj.NodeIDList) (badNodes storj.NodeIDList, err error) { defer mon.Task()(&ctx)(&err) @@ -520,6 +594,18 @@ func (cache *overlaycache) Reliable(ctx context.Context, criteria *overlay.NodeC } func (cache *overlaycache) reliable(ctx context.Context, criteria *overlay.NodeCriteria) (nodes storj.NodeIDList, err error) { + args := []interface{}{ + time.Now().Add(-criteria.OnlineWindow), + } + + // When this config is not set, it's a string slice with one empty string. I added some sanity checks to make sure we don't + // dereference a nil pointer or index an element that doesn't exist. + var excludedCountriesCondition string + if criteria.ExcludedCountries != nil && len(criteria.ExcludedCountries) != 0 && criteria.ExcludedCountries[0] != "" { + excludedCountriesCondition = "AND country_code NOT IN (SELECT UNNEST($2::TEXT[]))" + args = append(args, pgutil.TextArray(criteria.ExcludedCountries)) + } + // get reliable and online nodes rows, err := cache.db.Query(ctx, cache.db.Rebind(` SELECT id @@ -530,8 +616,8 @@ func (cache *overlaycache) reliable(ctx context.Context, criteria *overlay.NodeC AND offline_suspended IS NULL AND exit_finished_at IS NULL AND last_contact_success > $1 - AND country_code NOT IN (SELECT UNNEST($2::TEXT[])) - `), time.Now().Add(-criteria.OnlineWindow), pgutil.TextArray(criteria.ExcludedCountries)) + `+excludedCountriesCondition+` + `), args...) if err != nil { return nil, err }