diff --git a/satellite/metainfo/config.go b/satellite/metainfo/config.go index 0e81fdbf8..e023a4e2c 100644 --- a/satellite/metainfo/config.go +++ b/satellite/metainfo/config.go @@ -193,6 +193,8 @@ type MetabaseDB interface { CommitInlineSegment(ctx context.Context, opts metabase.CommitInlineSegment) (err error) // GetObjectLatestVersion returns object information for latest version. GetObjectLatestVersion(ctx context.Context, opts metabase.GetObjectLatestVersion) (_ metabase.Object, err error) + // GetSegmentByLocation returns a information about segment on the specified location. + GetSegmentByLocation(ctx context.Context, opts metabase.GetSegmentByLocation) (segment metabase.Segment, err error) // GetSegmentByPosition returns a information about segment which covers specified offset. GetSegmentByPosition(ctx context.Context, opts metabase.GetSegmentByPosition) (segment metabase.Segment, err error) // GetLatestObjectLastSegment returns an object last segment information. diff --git a/satellite/repair/repair_test.go b/satellite/repair/repair_test.go index 149ac8537..85dcc1e87 100644 --- a/satellite/repair/repair_test.go +++ b/satellite/repair/repair_test.go @@ -4,6 +4,7 @@ package repair_test import ( + "bytes" "context" "io" "math" @@ -161,6 +162,157 @@ func testDataRepair(t *testing.T, inMemoryRepair bool) { }) } +// TestDataRepairPendingObject does the following: +// - Starts new multipart upload with one part of test data. Does not complete the multipart upload. +// - Kills some nodes and disqualifies 1 +// - Triggers data repair, which repairs the data from the remaining nodes to +// the numbers of nodes determined by the upload repair max threshold +// - Shuts down several nodes, but keeping up a number equal to the minim +// threshold +// - Completes the multipart upload. +// - Downloads the data from those left nodes and check that it's the same than the uploaded one. +func TestDataRepairPendingObjectInMemory(t *testing.T) { + testDataRepairPendingObject(t, true) +} +func TestDataRepairPendingObjectToDisk(t *testing.T) { + testDataRepairPendingObject(t, false) +} + +func testDataRepairPendingObject(t *testing.T, inMemoryRepair bool) { + const ( + RepairMaxExcessRateOptimalThreshold = 0.05 + minThreshold = 3 + successThreshold = 7 + ) + + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, + StorageNodeCount: 14, + UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: testplanet.Combine( + func(log *zap.Logger, index int, config *satellite.Config) { + config.Repairer.MaxExcessRateOptimalThreshold = RepairMaxExcessRateOptimalThreshold + config.Repairer.InMemoryRepair = inMemoryRepair + }, + testplanet.ReconfigureRS(minThreshold, 5, successThreshold, 9), + ), + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + + // first, start a new multipart upload and upload one part with some remote data + uplinkPeer := planet.Uplinks[0] + satellite := planet.Satellites[0] + // stop audit to prevent possible interactions i.e. repair timeout problems + satellite.Audit.Worker.Loop.Pause() + + satellite.Repair.Checker.Loop.Pause() + satellite.Repair.Repairer.Loop.Pause() + + testData := testrand.Bytes(8 * memory.KiB) + + project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0]) + require.NoError(t, err) + defer ctx.Check(project.Close) + + _, err = project.EnsureBucket(ctx, "testbucket") + require.NoError(t, err) + + // upload pending object + info, err := project.NewMultipartUpload(ctx, "testbucket", "test/path", nil) + require.NoError(t, err) + _, err = project.PutObjectPart(ctx, "testbucket", "test/path", info.StreamID, 7, bytes.NewReader(testData)) + require.NoError(t, err) + + segment, _ := getRemoteSegment(t, ctx, satellite, planet.Uplinks[0].Projects[0].ID, "testbucket") + + // calculate how many storagenodes to kill + redundancy := segment.Redundancy + minReq := redundancy.RequiredShares + remotePieces := segment.Pieces + numPieces := len(remotePieces) + // disqualify one storage node + toDisqualify := 1 + toKill := numPieces - toDisqualify - int(minReq) + require.True(t, toKill >= 1) + maxNumRepairedPieces := int( + math.Ceil( + float64(successThreshold) * (1 + RepairMaxExcessRateOptimalThreshold), + ), + ) + numStorageNodes := len(planet.StorageNodes) + // Ensure that there are enough storage nodes to upload repaired segments + require.Falsef(t, + (numStorageNodes-toKill-toDisqualify) < maxNumRepairedPieces, + "there is not enough available nodes for repairing: need= %d, have= %d", + maxNumRepairedPieces, numStorageNodes-toKill-toDisqualify, + ) + + // kill nodes and track lost pieces + nodesToKill := make(map[storj.NodeID]bool) + nodesToDisqualify := make(map[storj.NodeID]bool) + nodesToKeepAlive := make(map[storj.NodeID]bool) + + var numDisqualified int + for i, piece := range remotePieces { + if i >= toKill { + if numDisqualified < toDisqualify { + nodesToDisqualify[piece.StorageNode] = true + numDisqualified++ + } + nodesToKeepAlive[piece.StorageNode] = true + continue + } + nodesToKill[piece.StorageNode] = true + } + + for _, node := range planet.StorageNodes { + if nodesToDisqualify[node.ID()] { + err := satellite.DB.OverlayCache().DisqualifyNode(ctx, node.ID()) + require.NoError(t, err) + continue + } + if nodesToKill[node.ID()] { + require.NoError(t, planet.StopNodeAndUpdate(ctx, node)) + } + } + + satellite.Repair.Checker.Loop.Restart() + satellite.Repair.Checker.Loop.TriggerWait() + satellite.Repair.Checker.Loop.Pause() + satellite.Repair.Repairer.Loop.Restart() + satellite.Repair.Repairer.Loop.TriggerWait() + satellite.Repair.Repairer.Loop.Pause() + satellite.Repair.Repairer.WaitForPendingRepairs() + + // repaired segment should not contain any piece in the killed and DQ nodes + segmentAfter, _ := getRemoteSegment(t, ctx, satellite, planet.Uplinks[0].Projects[0].ID, "testbucket") + + nodesToKillForMinThreshold := len(remotePieces) - minThreshold + remotePieces = segmentAfter.Pieces + for _, piece := range remotePieces { + require.NotContains(t, nodesToKill, piece.StorageNode, "there shouldn't be pieces in killed nodes") + require.NotContains(t, nodesToDisqualify, piece.StorageNode, "there shouldn't be pieces in DQ nodes") + + // Kill the original nodes which were kept alive to ensure that we can + // download from the new nodes that the repaired pieces have been uploaded + if _, ok := nodesToKeepAlive[piece.StorageNode]; ok && nodesToKillForMinThreshold > 0 { + require.NoError(t, planet.StopNodeAndUpdate(ctx, planet.FindNode(piece.StorageNode))) + nodesToKillForMinThreshold-- + } + } + + // complete the pending multipart upload + _, err = project.CompleteMultipartUpload(ctx, "testbucket", "test/path", info.StreamID, nil) + require.NoError(t, err) + + // we should be able to download data without any of the original nodes + newData, err := uplinkPeer.Download(ctx, satellite, "testbucket", "test/path") + require.NoError(t, err) + require.Equal(t, newData, testData) + }) +} + // TestCorruptDataRepair_Failed does the following: // - Uploads test data // - Kills all but the minimum number of nodes carrying the uploaded segment @@ -1322,15 +1474,11 @@ func getRemoteSegment( ) (_ metabase.Segment, key metabase.SegmentKey) { t.Helper() - objects, err := satellite.Metainfo.Metabase.TestingAllCommittedObjects(ctx, projectID, bucketName) + objects, err := satellite.Metainfo.Metabase.TestingAllObjects(ctx) require.NoError(t, err) require.Len(t, objects, 1) - segments, err := satellite.Metainfo.Metabase.TestingAllObjectSegments(ctx, metabase.ObjectLocation{ - ProjectID: projectID, - BucketName: bucketName, - ObjectKey: objects[0].ObjectKey, - }) + segments, err := satellite.Metainfo.Metabase.TestingAllSegments(ctx) require.NoError(t, err) require.Len(t, segments, 1) require.False(t, segments[0].Inline()) diff --git a/satellite/repair/repairer/segments.go b/satellite/repair/repairer/segments.go index 6ca2c6628..92ea34d6b 100644 --- a/satellite/repair/repairer/segments.go +++ b/satellite/repair/repairer/segments.go @@ -105,28 +105,16 @@ func NewSegmentRepairer( func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (shouldDelete bool, err error) { defer mon.Task()(&ctx, path)(&err) + // TODO extend InjuredSegment with StreamID/Position and replace path segmentLocation, err := metabase.ParseSegmentKey(metabase.SegmentKey(path)) if err != nil { return false, metainfoGetError.Wrap(err) } - // TODO extend InjuredSegment with StreamID/Position and replace path - object, err := repairer.metabase.GetObjectLatestVersion(ctx, metabase.GetObjectLatestVersion{ - ObjectLocation: segmentLocation.Object(), - }) - if err != nil { - if storj.ErrObjectNotFound.Has(err) { - mon.Meter("repair_unnecessary").Mark(1) //mon:locked - mon.Meter("segment_deleted_before_repair").Mark(1) //mon:locked - repairer.log.Debug("segment was deleted") - return true, nil - } - return false, metainfoGetError.Wrap(err) - } - - segment, err := repairer.metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ - StreamID: object.StreamID, - Position: segmentLocation.Position, + // TODO we should replace GetSegmentByLocation with GetSegmentByPosition when + // we refactor the repair queue to store metabase.SegmentPosition instead of storj.Path. + segment, err := repairer.metabase.GetSegmentByLocation(ctx, metabase.GetSegmentByLocation{ + SegmentLocation: segmentLocation, }) if err != nil { if storj.ErrObjectNotFound.Has(err) { @@ -378,7 +366,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (s } err = repairer.metabase.UpdateSegmentPieces(ctx, metabase.UpdateSegmentPieces{ - StreamID: object.StreamID, + StreamID: segment.StreamID, Position: segmentLocation.Position, OldPieces: segment.Pieces,