satellite/repairer: handle excluded countries
For nodes in excluded areas, we don't necessarily want to remove them from the pointer, but we do want to increase the number of pieces in the segment in case those excluded area nodes go down. To do that, we increase the number of pieces repaired by the number of pieces in excluded areas. Change-Id: I0424f1bcd7e93f33eb3eeeec79dbada3b3ea1f3a
This commit is contained in:
parent
35290d1890
commit
29fd36a20e
@ -74,6 +74,20 @@ var ReconfigureRS = func(minThreshold, repairThreshold, successThreshold, totalT
|
||||
}
|
||||
}
|
||||
|
||||
// RepairExcludedCountryCodes returns function to change satellite repair excluded country codes.
|
||||
var RepairExcludedCountryCodes = func(repairExcludedCountryCodes []string) func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
return func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
config.Overlay.RepairExcludedCountryCodes = repairExcludedCountryCodes
|
||||
}
|
||||
}
|
||||
|
||||
// UploadExcludedCountryCodes returns function to change satellite upload excluded country codes.
|
||||
var UploadExcludedCountryCodes = func(uploadExcludedCountryCodes []string) func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
return func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
config.Overlay.Node.UploadExcludedCountryCodes = uploadExcludedCountryCodes
|
||||
}
|
||||
}
|
||||
|
||||
// MaxSegmentSize returns function to change satellite max segment size value.
|
||||
var MaxSegmentSize = func(maxSegmentSize memory.Size) func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
return func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
|
@ -5,6 +5,8 @@ package audit_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -21,11 +23,13 @@ import (
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/testrand"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/storj/private/testblobs"
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/audit"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
"storj.io/storj/storage"
|
||||
"storj.io/storj/storagenode"
|
||||
)
|
||||
|
||||
@ -927,3 +931,178 @@ func TestVerifierUnknownError(t *testing.T) {
|
||||
assert.Equal(t, report.Unknown[0], badNode.ID())
|
||||
})
|
||||
}
|
||||
|
||||
func TestAuditRepairedSegmentInExcludedCountries(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1,
|
||||
StorageNodeCount: 20,
|
||||
UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: testplanet.Combine(
|
||||
func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
config.Repairer.InMemoryRepair = true
|
||||
},
|
||||
testplanet.ReconfigureRS(3, 5, 8, 10),
|
||||
testplanet.RepairExcludedCountryCodes([]string{"FR", "BE"}),
|
||||
),
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
uplinkPeer := planet.Uplinks[0]
|
||||
satellite := planet.Satellites[0]
|
||||
// stop audit to prevent possible interactions i.e. repair timeout problems
|
||||
satellite.Audit.Worker.Loop.Pause()
|
||||
|
||||
satellite.Repair.Checker.Loop.Pause()
|
||||
satellite.Repair.Repairer.Loop.Pause()
|
||||
|
||||
var testData = testrand.Bytes(8 * memory.KiB)
|
||||
bucket := "testbucket"
|
||||
// first, upload some remote data
|
||||
err := uplinkPeer.Upload(ctx, satellite, bucket, "test/path", testData)
|
||||
require.NoError(t, err)
|
||||
|
||||
segment, _ := getRemoteSegment(ctx, t, satellite, uplinkPeer.Projects[0].ID, bucket)
|
||||
|
||||
remotePieces := segment.Pieces
|
||||
|
||||
numExcluded := 5
|
||||
var nodesInExcluded storj.NodeIDList
|
||||
for i := 0; i < numExcluded; i++ {
|
||||
err = planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, remotePieces[i].StorageNode, "FR")
|
||||
require.NoError(t, err)
|
||||
nodesInExcluded = append(nodesInExcluded, remotePieces[i].StorageNode)
|
||||
}
|
||||
|
||||
// make extra pieces after optimal bad
|
||||
for i := int(segment.Redundancy.OptimalShares); i < len(remotePieces); i++ {
|
||||
err = planet.StopNodeAndUpdate(ctx, planet.FindNode(remotePieces[i].StorageNode))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// trigger checker to add segment to repair queue
|
||||
satellite.Repair.Checker.Loop.Restart()
|
||||
satellite.Repair.Checker.Loop.TriggerWait()
|
||||
satellite.Repair.Checker.Loop.Pause()
|
||||
|
||||
count, err := satellite.DB.RepairQueue().Count(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, count)
|
||||
|
||||
satellite.Repair.Repairer.Loop.Restart()
|
||||
satellite.Repair.Repairer.Loop.TriggerWait()
|
||||
satellite.Repair.Repairer.Loop.Pause()
|
||||
satellite.Repair.Repairer.WaitForPendingRepairs()
|
||||
|
||||
// Verify that the segment was removed
|
||||
count, err = satellite.DB.RepairQueue().Count(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Zero(t, count)
|
||||
|
||||
// Verify the segment has been repaired
|
||||
segmentAfterRepair, _ := getRemoteSegment(ctx, t, satellite, planet.Uplinks[0].Projects[0].ID, bucket)
|
||||
require.NotEqual(t, segment.Pieces, segmentAfterRepair.Pieces)
|
||||
require.Equal(t, 10, len(segmentAfterRepair.Pieces))
|
||||
|
||||
// check excluded area nodes still exist
|
||||
for i, n := range nodesInExcluded {
|
||||
var found bool
|
||||
for _, p := range segmentAfterRepair.Pieces {
|
||||
if p.StorageNode == n {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
require.True(t, found, fmt.Sprintf("node %s not in segment, but should be\n", segmentAfterRepair.Pieces[i].StorageNode.String()))
|
||||
}
|
||||
nodesInPointer := make(map[storj.NodeID]bool)
|
||||
for _, n := range segmentAfterRepair.Pieces {
|
||||
// check for duplicates
|
||||
_, ok := nodesInPointer[n.StorageNode]
|
||||
require.False(t, ok)
|
||||
nodesInPointer[n.StorageNode] = true
|
||||
}
|
||||
|
||||
lastPieceIndex := segmentAfterRepair.Pieces.Len() - 1
|
||||
lastPiece := segmentAfterRepair.Pieces[lastPieceIndex]
|
||||
for _, n := range planet.StorageNodes {
|
||||
if n.ID() == lastPiece.StorageNode {
|
||||
pieceID := segmentAfterRepair.RootPieceID.Derive(n.ID(), int32(lastPiece.Number))
|
||||
corruptPieceData(ctx, t, planet, n, pieceID)
|
||||
}
|
||||
}
|
||||
|
||||
// now audit
|
||||
report, err := satellite.Audit.Verifier.Verify(ctx, audit.Segment{
|
||||
StreamID: segmentAfterRepair.StreamID,
|
||||
Position: segmentAfterRepair.Position,
|
||||
ExpiresAt: segmentAfterRepair.ExpiresAt,
|
||||
EncryptedSize: segmentAfterRepair.EncryptedSize,
|
||||
}, nil)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, report.Fails, 1)
|
||||
require.Equal(t, report.Fails[0], lastPiece.StorageNode)
|
||||
})
|
||||
}
|
||||
|
||||
// getRemoteSegment returns a remote pointer its path from satellite.
|
||||
// nolint:golint
|
||||
func getRemoteSegment(
|
||||
ctx context.Context, t *testing.T, satellite *testplanet.Satellite, projectID uuid.UUID, bucketName string,
|
||||
) (_ metabase.Segment, key metabase.SegmentKey) {
|
||||
t.Helper()
|
||||
|
||||
objects, err := satellite.Metabase.DB.TestingAllObjects(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, objects, 1)
|
||||
|
||||
segments, err := satellite.Metabase.DB.TestingAllSegments(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, segments, 1)
|
||||
require.False(t, segments[0].Inline())
|
||||
|
||||
return segments[0], metabase.SegmentLocation{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
ObjectKey: objects[0].ObjectKey,
|
||||
Position: segments[0].Position,
|
||||
}.Encode()
|
||||
}
|
||||
|
||||
// corruptPieceData manipulates piece data on a storage node.
|
||||
func corruptPieceData(ctx context.Context, t *testing.T, planet *testplanet.Planet, corruptedNode *testplanet.StorageNode, corruptedPieceID storj.PieceID) {
|
||||
t.Helper()
|
||||
|
||||
blobRef := storage.BlobRef{
|
||||
Namespace: planet.Satellites[0].ID().Bytes(),
|
||||
Key: corruptedPieceID.Bytes(),
|
||||
}
|
||||
|
||||
// get currently stored piece data from storagenode
|
||||
reader, err := corruptedNode.Storage2.BlobsCache.Open(ctx, blobRef)
|
||||
require.NoError(t, err)
|
||||
pieceSize, err := reader.Size()
|
||||
require.NoError(t, err)
|
||||
require.True(t, pieceSize > 0)
|
||||
pieceData := make([]byte, pieceSize)
|
||||
|
||||
// delete piece data
|
||||
err = corruptedNode.Storage2.BlobsCache.Delete(ctx, blobRef)
|
||||
require.NoError(t, err)
|
||||
|
||||
// create new random data
|
||||
_, err = rand.Read(pieceData)
|
||||
require.NoError(t, err)
|
||||
|
||||
// corrupt piece data (not PieceHeader) and write back to storagenode
|
||||
// this means repair downloading should fail during piece hash verification
|
||||
pieceData[pieceSize-1]++ // if we don't do this, this test should fail
|
||||
writer, err := corruptedNode.Storage2.BlobsCache.Create(ctx, blobRef, pieceSize)
|
||||
require.NoError(t, err)
|
||||
|
||||
n, err := writer.Write(pieceData)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, n, pieceSize)
|
||||
|
||||
err = writer.Commit(ctx)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
@ -34,6 +34,9 @@ func (c *Criteria) MatchInclude(node *Node) bool {
|
||||
}
|
||||
|
||||
for _, code := range c.ExcludedCountryCodes {
|
||||
if code.String() == "" {
|
||||
continue
|
||||
}
|
||||
if node.CountryCode == code {
|
||||
return false
|
||||
}
|
||||
|
@ -434,7 +434,7 @@ func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, bucket m
|
||||
}
|
||||
|
||||
// CreatePutRepairOrderLimits creates the order limits for uploading the repaired pieces of segment to newNodes.
|
||||
func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, bucket metabase.BucketLocation, segment metabase.Segment, getOrderLimits []*pb.AddressedOrderLimit, newNodes []*overlay.SelectedNode, optimalThresholdMultiplier float64) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error) {
|
||||
func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, bucket metabase.BucketLocation, segment metabase.Segment, getOrderLimits []*pb.AddressedOrderLimit, newNodes []*overlay.SelectedNode, optimalThresholdMultiplier float64, numPiecesInExcludedCountries int) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
// Create the order limits for being used to upload the repaired pieces
|
||||
@ -445,7 +445,8 @@ func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, bucket m
|
||||
pieceSize := eestream.CalcPieceSize(int64(segment.EncryptedSize), redundancy)
|
||||
|
||||
totalPieces := redundancy.TotalCount()
|
||||
totalPiecesAfterRepair := int(math.Ceil(float64(redundancy.OptimalThreshold()) * optimalThresholdMultiplier))
|
||||
totalPiecesAfterRepair := int(math.Ceil(float64(redundancy.OptimalThreshold())*optimalThresholdMultiplier)) + numPiecesInExcludedCountries
|
||||
|
||||
if totalPiecesAfterRepair > totalPieces {
|
||||
totalPiecesAfterRepair = totalPieces
|
||||
}
|
||||
|
@ -60,6 +60,8 @@ type DB interface {
|
||||
KnownOffline(context.Context, *NodeCriteria, storj.NodeIDList) (storj.NodeIDList, error)
|
||||
// KnownUnreliableOrOffline filters a set of nodes to unhealth or offlines node, independent of new
|
||||
KnownUnreliableOrOffline(context.Context, *NodeCriteria, storj.NodeIDList) (storj.NodeIDList, error)
|
||||
// KnownReliableInExcludedCountries filters healthy nodes that are in excluded countries.
|
||||
KnownReliableInExcludedCountries(context.Context, *NodeCriteria, storj.NodeIDList) (storj.NodeIDList, error)
|
||||
// KnownReliable filters a set of nodes to reliable (online and qualified) nodes.
|
||||
KnownReliable(ctx context.Context, onlineWindow time.Duration, nodeIDs storj.NodeIDList) ([]*pb.Node, error)
|
||||
// Reliable returns all nodes that are reliable
|
||||
@ -489,6 +491,17 @@ func (service *Service) KnownUnreliableOrOffline(ctx context.Context, nodeIds st
|
||||
return service.db.KnownUnreliableOrOffline(ctx, criteria, nodeIds)
|
||||
}
|
||||
|
||||
// KnownReliableInExcludedCountries filters healthy nodes that are in excluded countries.
|
||||
func (service *Service) KnownReliableInExcludedCountries(ctx context.Context, nodeIds storj.NodeIDList) (reliableInExcluded storj.NodeIDList, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
criteria := &NodeCriteria{
|
||||
OnlineWindow: service.config.Node.OnlineWindow,
|
||||
ExcludedCountries: service.config.RepairExcludedCountryCodes,
|
||||
}
|
||||
return service.db.KnownReliableInExcludedCountries(ctx, criteria, nodeIds)
|
||||
}
|
||||
|
||||
// KnownReliable filters a set of nodes to reliable (online and qualified) nodes.
|
||||
func (service *Service) KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList) (nodes []*pb.Node, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
@ -599,7 +612,7 @@ func (service *Service) UpdateCheckIn(ctx context.Context, node NodeCheckInInfo,
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetMissingPieces returns the list of offline nodes.
|
||||
// GetMissingPieces returns the list of offline nodes and the corresponding pieces.
|
||||
func (service *Service) GetMissingPieces(ctx context.Context, pieces metabase.Pieces) (missingPieces []uint16, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
var nodeIDs storj.NodeIDList
|
||||
@ -621,6 +634,28 @@ func (service *Service) GetMissingPieces(ctx context.Context, pieces metabase.Pi
|
||||
return missingPieces, nil
|
||||
}
|
||||
|
||||
// GetReliablePiecesInExcludedCountries returns the list of pieces held by nodes located in excluded countries.
|
||||
func (service *Service) GetReliablePiecesInExcludedCountries(ctx context.Context, pieces metabase.Pieces) (piecesInExcluded []uint16, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
var nodeIDs storj.NodeIDList
|
||||
for _, p := range pieces {
|
||||
nodeIDs = append(nodeIDs, p.StorageNode)
|
||||
}
|
||||
inExcluded, err := service.KnownReliableInExcludedCountries(ctx, nodeIDs)
|
||||
if err != nil {
|
||||
return nil, Error.New("error getting nodes %s", err)
|
||||
}
|
||||
|
||||
for _, p := range pieces {
|
||||
for _, nodeID := range inExcluded {
|
||||
if nodeID == p.StorageNode {
|
||||
piecesInExcluded = append(piecesInExcluded, p.Number)
|
||||
}
|
||||
}
|
||||
}
|
||||
return piecesInExcluded, nil
|
||||
}
|
||||
|
||||
// DisqualifyNode disqualifies a storage node.
|
||||
func (service *Service) DisqualifyNode(ctx context.Context, nodeID storj.NodeID) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
@ -845,3 +845,25 @@ func TestReliable(t *testing.T) {
|
||||
require.NotEqual(t, node.ID(), nodes[0])
|
||||
})
|
||||
}
|
||||
|
||||
func TestKnownReliableInExcludedCountries(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 2, UplinkCount: 0,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
service := planet.Satellites[0].Overlay.Service
|
||||
node := planet.StorageNodes[0]
|
||||
|
||||
nodes, err := service.Reliable(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, nodes, 2)
|
||||
|
||||
err = planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), "FR")
|
||||
require.NoError(t, err)
|
||||
|
||||
// first node should be excluded from Reliable result because of country code
|
||||
nodes, err = service.KnownReliableInExcludedCountries(ctx, nodes)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, nodes, 1)
|
||||
require.Equal(t, node.ID(), nodes[0])
|
||||
})
|
||||
}
|
||||
|
@ -11,7 +11,6 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"storj.io/common/memory"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/testrand"
|
||||
@ -84,61 +83,6 @@ func TestIdentifyInjuredSegments(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestInjuredsSegmentWhenPiecesAreInExcludedCountries(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: testplanet.ReconfigureRS(2, 3, 4, 4),
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
checker := planet.Satellites[0].Repair.Checker
|
||||
|
||||
checker.Loop.Pause()
|
||||
planet.Satellites[0].Repair.Repairer.Loop.Pause()
|
||||
|
||||
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "key", testrand.Bytes(5*memory.KiB))
|
||||
require.NoError(t, err)
|
||||
|
||||
objects, err := planet.Satellites[0].Metabase.DB.TestingAllObjects(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, objects, 1)
|
||||
|
||||
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, segments, 1)
|
||||
require.False(t, segments[0].Inline())
|
||||
|
||||
err = planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, planet.StorageNodes[0].ID(), "FR")
|
||||
require.NoError(t, err)
|
||||
err = planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, planet.StorageNodes[1].ID(), "FR")
|
||||
require.NoError(t, err)
|
||||
|
||||
checker.Loop.TriggerWait()
|
||||
|
||||
// check that the healthy segments was added to repair queue
|
||||
// because of part of nodes have country code value on exclude
|
||||
// list
|
||||
count, err := planet.Satellites[0].DB.RepairQueue().Count(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, count)
|
||||
|
||||
// trigger checker to add segment to repair queue
|
||||
planet.Satellites[0].Repair.Repairer.Loop.Restart()
|
||||
planet.Satellites[0].Repair.Repairer.Loop.TriggerWait()
|
||||
planet.Satellites[0].Repair.Repairer.Loop.Pause()
|
||||
planet.Satellites[0].Repair.Repairer.WaitForPendingRepairs()
|
||||
|
||||
count, err = planet.Satellites[0].DB.RepairQueue().Count(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, count)
|
||||
|
||||
segmentsAfterRepair, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, segments[0].Pieces, segmentsAfterRepair[0].Pieces)
|
||||
})
|
||||
}
|
||||
|
||||
func TestIdentifyIrreparableSegments(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 3, UplinkCount: 1,
|
||||
|
@ -3076,3 +3076,185 @@ func TestECRepairerGetPrefersCachedIPPort(t *testing.T) {
|
||||
require.NotContains(t, mock.addressesDialed, realAddresses)
|
||||
})
|
||||
}
|
||||
|
||||
func TestSegmentInExcludedCountriesRepair(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1,
|
||||
StorageNodeCount: 20,
|
||||
UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: testplanet.Combine(
|
||||
func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
config.Repairer.InMemoryRepair = true
|
||||
},
|
||||
testplanet.ReconfigureRS(3, 5, 8, 10),
|
||||
testplanet.RepairExcludedCountryCodes([]string{"FR", "BE"}),
|
||||
),
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
uplinkPeer := planet.Uplinks[0]
|
||||
satellite := planet.Satellites[0]
|
||||
// stop audit to prevent possible interactions i.e. repair timeout problems
|
||||
satellite.Audit.Worker.Loop.Pause()
|
||||
|
||||
satellite.Repair.Checker.Loop.Pause()
|
||||
satellite.Repair.Repairer.Loop.Pause()
|
||||
|
||||
var testData = testrand.Bytes(8 * memory.KiB)
|
||||
// first, upload some remote data
|
||||
err := uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
||||
require.NoError(t, err)
|
||||
|
||||
segment, _ := getRemoteSegment(ctx, t, satellite, planet.Uplinks[0].Projects[0].ID, "testbucket")
|
||||
require.Equal(t, 3, int(segment.Redundancy.RequiredShares))
|
||||
|
||||
remotePieces := segment.Pieces
|
||||
|
||||
numExcluded := 5
|
||||
var nodesInExcluded storj.NodeIDList
|
||||
for i := 0; i < numExcluded; i++ {
|
||||
err = planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, remotePieces[i].StorageNode, "FR")
|
||||
require.NoError(t, err)
|
||||
nodesInExcluded = append(nodesInExcluded, remotePieces[i].StorageNode)
|
||||
}
|
||||
// make extra pieces after optimal bad
|
||||
for i := int(segment.Redundancy.OptimalShares); i < len(remotePieces); i++ {
|
||||
err = planet.StopNodeAndUpdate(ctx, planet.FindNode(remotePieces[i].StorageNode))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// trigger checker to add segment to repair queue
|
||||
satellite.Repair.Checker.Loop.Restart()
|
||||
satellite.Repair.Checker.Loop.TriggerWait()
|
||||
satellite.Repair.Checker.Loop.Pause()
|
||||
|
||||
count, err := satellite.DB.RepairQueue().Count(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, count)
|
||||
|
||||
satellite.Repair.Repairer.Loop.Restart()
|
||||
satellite.Repair.Repairer.Loop.TriggerWait()
|
||||
satellite.Repair.Repairer.Loop.Pause()
|
||||
satellite.Repair.Repairer.WaitForPendingRepairs()
|
||||
|
||||
// Verify that the segment was removed
|
||||
count, err = satellite.DB.RepairQueue().Count(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Zero(t, count)
|
||||
|
||||
// Verify the segment has been repaired
|
||||
segmentAfterRepair, _ := getRemoteSegment(ctx, t, satellite, planet.Uplinks[0].Projects[0].ID, "testbucket")
|
||||
require.NotEqual(t, segment.Pieces, segmentAfterRepair.Pieces)
|
||||
require.Equal(t, 10, len(segmentAfterRepair.Pieces))
|
||||
|
||||
// check excluded area nodes still exist
|
||||
for i, n := range nodesInExcluded {
|
||||
var found bool
|
||||
for _, p := range segmentAfterRepair.Pieces {
|
||||
if p.StorageNode == n {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
require.True(t, found, fmt.Sprintf("node %s not in segment, but should be\n", segmentAfterRepair.Pieces[i].StorageNode.String()))
|
||||
}
|
||||
nodesInPointer := make(map[storj.NodeID]bool)
|
||||
for _, n := range segmentAfterRepair.Pieces {
|
||||
// check for duplicates
|
||||
_, ok := nodesInPointer[n.StorageNode]
|
||||
require.False(t, ok)
|
||||
nodesInPointer[n.StorageNode] = true
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestSegmentInExcludedCountriesRepairIrreparable(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1,
|
||||
StorageNodeCount: 20,
|
||||
UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: testplanet.Combine(
|
||||
func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
config.Repairer.InMemoryRepair = true
|
||||
},
|
||||
testplanet.ReconfigureRS(3, 5, 8, 10),
|
||||
testplanet.RepairExcludedCountryCodes([]string{"FR", "BE"}),
|
||||
),
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
uplinkPeer := planet.Uplinks[0]
|
||||
satellite := planet.Satellites[0]
|
||||
// stop audit to prevent possible interactions i.e. repair timeout problems
|
||||
satellite.Audit.Worker.Loop.Pause()
|
||||
|
||||
satellite.Repair.Checker.Loop.Pause()
|
||||
satellite.Repair.Repairer.Loop.Pause()
|
||||
|
||||
var testData = testrand.Bytes(8 * memory.KiB)
|
||||
// first, upload some remote data
|
||||
err := uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
||||
require.NoError(t, err)
|
||||
|
||||
segment, _ := getRemoteSegment(ctx, t, satellite, planet.Uplinks[0].Projects[0].ID, "testbucket")
|
||||
require.Equal(t, 3, int(segment.Redundancy.RequiredShares))
|
||||
|
||||
remotePieces := segment.Pieces
|
||||
|
||||
numExcluded := 6
|
||||
var nodesInExcluded storj.NodeIDList
|
||||
for i := 0; i < numExcluded; i++ {
|
||||
err = planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, remotePieces[i].StorageNode, "FR")
|
||||
require.NoError(t, err)
|
||||
nodesInExcluded = append(nodesInExcluded, remotePieces[i].StorageNode)
|
||||
}
|
||||
// make the rest unhealthy
|
||||
for i := numExcluded; i < len(remotePieces); i++ {
|
||||
err = planet.StopNodeAndUpdate(ctx, planet.FindNode(remotePieces[i].StorageNode))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// trigger checker to add segment to repair queue
|
||||
satellite.Repair.Checker.Loop.Restart()
|
||||
satellite.Repair.Checker.Loop.TriggerWait()
|
||||
satellite.Repair.Checker.Loop.Pause()
|
||||
|
||||
count, err := satellite.DB.RepairQueue().Count(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, count)
|
||||
|
||||
satellite.Repair.Repairer.Loop.Restart()
|
||||
satellite.Repair.Repairer.Loop.TriggerWait()
|
||||
satellite.Repair.Repairer.Loop.Pause()
|
||||
satellite.Repair.Repairer.WaitForPendingRepairs()
|
||||
|
||||
// Verify that the segment was removed
|
||||
count, err = satellite.DB.RepairQueue().Count(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Zero(t, count)
|
||||
|
||||
// Verify the segment has been repaired
|
||||
segmentAfterRepair, _ := getRemoteSegment(ctx, t, satellite, planet.Uplinks[0].Projects[0].ID, "testbucket")
|
||||
require.NotEqual(t, segment.Pieces, segmentAfterRepair.Pieces)
|
||||
require.Equal(t, 10, len(segmentAfterRepair.Pieces))
|
||||
|
||||
// check excluded area nodes still exist
|
||||
for i, n := range nodesInExcluded {
|
||||
var found bool
|
||||
for _, p := range segmentAfterRepair.Pieces {
|
||||
if p.StorageNode == n {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
require.True(t, found, fmt.Sprintf("node %s not in segment, but should be\n", segmentAfterRepair.Pieces[i].StorageNode.String()))
|
||||
}
|
||||
nodesInPointer := make(map[storj.NodeID]bool)
|
||||
for _, n := range segmentAfterRepair.Pieces {
|
||||
// check for duplicates
|
||||
_, ok := nodesInPointer[n.StorageNode]
|
||||
require.False(t, ok)
|
||||
nodesInPointer[n.StorageNode] = true
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -189,6 +189,13 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
|
||||
return false, nil
|
||||
}
|
||||
|
||||
piecesInExcludedCountries, err := repairer.overlay.GetReliablePiecesInExcludedCountries(ctx, pieces)
|
||||
if err != nil {
|
||||
return false, overlayQueryError.New("error identifying pieces in excluded countries: %w", err)
|
||||
}
|
||||
|
||||
numHealthyInExcludedCountries := len(piecesInExcludedCountries)
|
||||
|
||||
// ensure we get values, even if only zero values, so that redash can have an alert based on this
|
||||
mon.Counter("repairer_segments_below_min_req").Inc(0) //mon:locked
|
||||
stats.repairerSegmentsBelowMinReq.Inc(0)
|
||||
@ -207,7 +214,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
|
||||
}
|
||||
|
||||
// repair not needed
|
||||
if numHealthy > int(repairThreshold) {
|
||||
if numHealthy-numHealthyInExcludedCountries > int(repairThreshold) {
|
||||
mon.Meter("repair_unnecessary").Mark(1) //mon:locked
|
||||
stats.repairUnnecessary.Mark(1)
|
||||
repairer.log.Debug("segment above repair threshold", zap.Int("numHealthy", numHealthy), zap.Int32("repairThreshold", repairThreshold))
|
||||
@ -268,8 +275,8 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
|
||||
var minSuccessfulNeeded int
|
||||
{
|
||||
totalNeeded := math.Ceil(float64(redundancy.OptimalThreshold()) * repairer.multiplierOptimalThreshold)
|
||||
requestCount = int(totalNeeded) - len(healthyPieces)
|
||||
minSuccessfulNeeded = redundancy.OptimalThreshold() - len(healthyPieces)
|
||||
requestCount = int(totalNeeded) - len(healthyPieces) + numHealthyInExcludedCountries
|
||||
minSuccessfulNeeded = redundancy.OptimalThreshold() - len(healthyPieces) + numHealthyInExcludedCountries
|
||||
}
|
||||
|
||||
// Request Overlay for n-h new storage nodes
|
||||
@ -283,7 +290,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
|
||||
}
|
||||
|
||||
// Create the order limits for the PUT_REPAIR action
|
||||
putLimits, putPrivateKey, err := repairer.orders.CreatePutRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, getOrderLimits, newNodes, repairer.multiplierOptimalThreshold)
|
||||
putLimits, putPrivateKey, err := repairer.orders.CreatePutRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, getOrderLimits, newNodes, repairer.multiplierOptimalThreshold, numHealthyInExcludedCountries)
|
||||
if err != nil {
|
||||
return false, orderLimitFailureError.New("could not create PUT_REPAIR order limits: %w", err)
|
||||
}
|
||||
|
@ -405,6 +405,80 @@ func (cache *overlaycache) KnownUnreliableOrOffline(ctx context.Context, criteri
|
||||
return badNodes, err
|
||||
}
|
||||
|
||||
// KnownReliableInExcludedCountries filters healthy nodes that are in excluded countries.
|
||||
func (cache *overlaycache) KnownReliableInExcludedCountries(ctx context.Context, criteria *overlay.NodeCriteria, nodeIDs storj.NodeIDList) (reliableInExcluded storj.NodeIDList, err error) {
|
||||
for {
|
||||
reliableInExcluded, err = cache.knownReliableInExcludedCountries(ctx, criteria, nodeIDs)
|
||||
if err != nil {
|
||||
if cockroachutil.NeedsRetry(err) {
|
||||
continue
|
||||
}
|
||||
return reliableInExcluded, err
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
return reliableInExcluded, err
|
||||
}
|
||||
|
||||
func (cache *overlaycache) knownReliableInExcludedCountries(ctx context.Context, criteria *overlay.NodeCriteria, nodeIDs storj.NodeIDList) (reliableInExcluded storj.NodeIDList, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if len(nodeIDs) == 0 {
|
||||
return nil, Error.New("no ids provided")
|
||||
}
|
||||
|
||||
args := []interface{}{
|
||||
pgutil.NodeIDArray(nodeIDs),
|
||||
time.Now().Add(-criteria.OnlineWindow),
|
||||
}
|
||||
|
||||
// When this config is not set, it's a string slice with one empty string. This is a sanity check just
|
||||
// in case for some reason it's nil or has no elements.
|
||||
if criteria.ExcludedCountries == nil || len(criteria.ExcludedCountries) == 0 {
|
||||
return reliableInExcluded, nil
|
||||
}
|
||||
|
||||
var excludedCountriesCondition string
|
||||
if criteria.ExcludedCountries[0] == "" {
|
||||
return reliableInExcluded, nil
|
||||
}
|
||||
|
||||
excludedCountriesCondition = "AND country_code IN (SELECT UNNEST($3::TEXT[]))"
|
||||
args = append(args, pgutil.TextArray(criteria.ExcludedCountries))
|
||||
|
||||
// get reliable and online nodes
|
||||
var rows tagsql.Rows
|
||||
rows, err = cache.db.Query(ctx, cache.db.Rebind(`
|
||||
SELECT id
|
||||
FROM nodes
|
||||
`+cache.db.impl.AsOfSystemInterval(criteria.AsOfSystemInterval)+`
|
||||
WHERE id = any($1::bytea[])
|
||||
AND disqualified IS NULL
|
||||
AND unknown_audit_suspended IS NULL
|
||||
AND offline_suspended IS NULL
|
||||
AND exit_finished_at IS NULL
|
||||
AND last_contact_success > $2
|
||||
`+excludedCountriesCondition+`
|
||||
`), args...,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() { err = errs.Combine(err, rows.Close()) }()
|
||||
|
||||
for rows.Next() {
|
||||
var id storj.NodeID
|
||||
err = rows.Scan(&id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
reliableInExcluded = append(reliableInExcluded, id)
|
||||
}
|
||||
|
||||
return reliableInExcluded, Error.Wrap(rows.Err())
|
||||
}
|
||||
|
||||
func (cache *overlaycache) knownUnreliableOrOffline(ctx context.Context, criteria *overlay.NodeCriteria, nodeIDs storj.NodeIDList) (badNodes storj.NodeIDList, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
@ -520,6 +594,18 @@ func (cache *overlaycache) Reliable(ctx context.Context, criteria *overlay.NodeC
|
||||
}
|
||||
|
||||
func (cache *overlaycache) reliable(ctx context.Context, criteria *overlay.NodeCriteria) (nodes storj.NodeIDList, err error) {
|
||||
args := []interface{}{
|
||||
time.Now().Add(-criteria.OnlineWindow),
|
||||
}
|
||||
|
||||
// When this config is not set, it's a string slice with one empty string. I added some sanity checks to make sure we don't
|
||||
// dereference a nil pointer or index an element that doesn't exist.
|
||||
var excludedCountriesCondition string
|
||||
if criteria.ExcludedCountries != nil && len(criteria.ExcludedCountries) != 0 && criteria.ExcludedCountries[0] != "" {
|
||||
excludedCountriesCondition = "AND country_code NOT IN (SELECT UNNEST($2::TEXT[]))"
|
||||
args = append(args, pgutil.TextArray(criteria.ExcludedCountries))
|
||||
}
|
||||
|
||||
// get reliable and online nodes
|
||||
rows, err := cache.db.Query(ctx, cache.db.Rebind(`
|
||||
SELECT id
|
||||
@ -530,8 +616,8 @@ func (cache *overlaycache) reliable(ctx context.Context, criteria *overlay.NodeC
|
||||
AND offline_suspended IS NULL
|
||||
AND exit_finished_at IS NULL
|
||||
AND last_contact_success > $1
|
||||
AND country_code NOT IN (SELECT UNNEST($2::TEXT[]))
|
||||
`), time.Now().Add(-criteria.OnlineWindow), pgutil.TextArray(criteria.ExcludedCountries))
|
||||
`+excludedCountriesCondition+`
|
||||
`), args...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user