satellite/repair/repairer: fix repair for pending objects

https://storjlabs.atlassian.net/browse/PG-160

Change-Id: Ice7a0dcfc591bcde85a355cf95fff1eb3411f508
This commit is contained in:
Kaloyan Raev 2021-01-29 17:28:19 +02:00
parent 6f3d0c4ad5
commit 038bd0a4da
3 changed files with 162 additions and 24 deletions

View File

@ -193,6 +193,8 @@ type MetabaseDB interface {
CommitInlineSegment(ctx context.Context, opts metabase.CommitInlineSegment) (err error)
// GetObjectLatestVersion returns object information for latest version.
GetObjectLatestVersion(ctx context.Context, opts metabase.GetObjectLatestVersion) (_ metabase.Object, err error)
// GetSegmentByLocation returns a information about segment on the specified location.
GetSegmentByLocation(ctx context.Context, opts metabase.GetSegmentByLocation) (segment metabase.Segment, err error)
// GetSegmentByPosition returns a information about segment which covers specified offset.
GetSegmentByPosition(ctx context.Context, opts metabase.GetSegmentByPosition) (segment metabase.Segment, err error)
// GetLatestObjectLastSegment returns an object last segment information.

View File

@ -4,6 +4,7 @@
package repair_test
import (
"bytes"
"context"
"io"
"math"
@ -161,6 +162,157 @@ func testDataRepair(t *testing.T, inMemoryRepair bool) {
})
}
// TestDataRepairPendingObject does the following:
// - Starts new multipart upload with one part of test data. Does not complete the multipart upload.
// - Kills some nodes and disqualifies 1
// - Triggers data repair, which repairs the data from the remaining nodes to
// the numbers of nodes determined by the upload repair max threshold
// - Shuts down several nodes, but keeping up a number equal to the minim
// threshold
// - Completes the multipart upload.
// - Downloads the data from those left nodes and check that it's the same than the uploaded one.
func TestDataRepairPendingObjectInMemory(t *testing.T) {
testDataRepairPendingObject(t, true)
}
func TestDataRepairPendingObjectToDisk(t *testing.T) {
testDataRepairPendingObject(t, false)
}
func testDataRepairPendingObject(t *testing.T, inMemoryRepair bool) {
const (
RepairMaxExcessRateOptimalThreshold = 0.05
minThreshold = 3
successThreshold = 7
)
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: 14,
UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.Combine(
func(log *zap.Logger, index int, config *satellite.Config) {
config.Repairer.MaxExcessRateOptimalThreshold = RepairMaxExcessRateOptimalThreshold
config.Repairer.InMemoryRepair = inMemoryRepair
},
testplanet.ReconfigureRS(minThreshold, 5, successThreshold, 9),
),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
// first, start a new multipart upload and upload one part with some remote data
uplinkPeer := planet.Uplinks[0]
satellite := planet.Satellites[0]
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Pause()
satellite.Repair.Checker.Loop.Pause()
satellite.Repair.Repairer.Loop.Pause()
testData := testrand.Bytes(8 * memory.KiB)
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
require.NoError(t, err)
defer ctx.Check(project.Close)
_, err = project.EnsureBucket(ctx, "testbucket")
require.NoError(t, err)
// upload pending object
info, err := project.NewMultipartUpload(ctx, "testbucket", "test/path", nil)
require.NoError(t, err)
_, err = project.PutObjectPart(ctx, "testbucket", "test/path", info.StreamID, 7, bytes.NewReader(testData))
require.NoError(t, err)
segment, _ := getRemoteSegment(t, ctx, satellite, planet.Uplinks[0].Projects[0].ID, "testbucket")
// calculate how many storagenodes to kill
redundancy := segment.Redundancy
minReq := redundancy.RequiredShares
remotePieces := segment.Pieces
numPieces := len(remotePieces)
// disqualify one storage node
toDisqualify := 1
toKill := numPieces - toDisqualify - int(minReq)
require.True(t, toKill >= 1)
maxNumRepairedPieces := int(
math.Ceil(
float64(successThreshold) * (1 + RepairMaxExcessRateOptimalThreshold),
),
)
numStorageNodes := len(planet.StorageNodes)
// Ensure that there are enough storage nodes to upload repaired segments
require.Falsef(t,
(numStorageNodes-toKill-toDisqualify) < maxNumRepairedPieces,
"there is not enough available nodes for repairing: need= %d, have= %d",
maxNumRepairedPieces, numStorageNodes-toKill-toDisqualify,
)
// kill nodes and track lost pieces
nodesToKill := make(map[storj.NodeID]bool)
nodesToDisqualify := make(map[storj.NodeID]bool)
nodesToKeepAlive := make(map[storj.NodeID]bool)
var numDisqualified int
for i, piece := range remotePieces {
if i >= toKill {
if numDisqualified < toDisqualify {
nodesToDisqualify[piece.StorageNode] = true
numDisqualified++
}
nodesToKeepAlive[piece.StorageNode] = true
continue
}
nodesToKill[piece.StorageNode] = true
}
for _, node := range planet.StorageNodes {
if nodesToDisqualify[node.ID()] {
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, node.ID())
require.NoError(t, err)
continue
}
if nodesToKill[node.ID()] {
require.NoError(t, planet.StopNodeAndUpdate(ctx, node))
}
}
satellite.Repair.Checker.Loop.Restart()
satellite.Repair.Checker.Loop.TriggerWait()
satellite.Repair.Checker.Loop.Pause()
satellite.Repair.Repairer.Loop.Restart()
satellite.Repair.Repairer.Loop.TriggerWait()
satellite.Repair.Repairer.Loop.Pause()
satellite.Repair.Repairer.WaitForPendingRepairs()
// repaired segment should not contain any piece in the killed and DQ nodes
segmentAfter, _ := getRemoteSegment(t, ctx, satellite, planet.Uplinks[0].Projects[0].ID, "testbucket")
nodesToKillForMinThreshold := len(remotePieces) - minThreshold
remotePieces = segmentAfter.Pieces
for _, piece := range remotePieces {
require.NotContains(t, nodesToKill, piece.StorageNode, "there shouldn't be pieces in killed nodes")
require.NotContains(t, nodesToDisqualify, piece.StorageNode, "there shouldn't be pieces in DQ nodes")
// Kill the original nodes which were kept alive to ensure that we can
// download from the new nodes that the repaired pieces have been uploaded
if _, ok := nodesToKeepAlive[piece.StorageNode]; ok && nodesToKillForMinThreshold > 0 {
require.NoError(t, planet.StopNodeAndUpdate(ctx, planet.FindNode(piece.StorageNode)))
nodesToKillForMinThreshold--
}
}
// complete the pending multipart upload
_, err = project.CompleteMultipartUpload(ctx, "testbucket", "test/path", info.StreamID, nil)
require.NoError(t, err)
// we should be able to download data without any of the original nodes
newData, err := uplinkPeer.Download(ctx, satellite, "testbucket", "test/path")
require.NoError(t, err)
require.Equal(t, newData, testData)
})
}
// TestCorruptDataRepair_Failed does the following:
// - Uploads test data
// - Kills all but the minimum number of nodes carrying the uploaded segment
@ -1322,15 +1474,11 @@ func getRemoteSegment(
) (_ metabase.Segment, key metabase.SegmentKey) {
t.Helper()
objects, err := satellite.Metainfo.Metabase.TestingAllCommittedObjects(ctx, projectID, bucketName)
objects, err := satellite.Metainfo.Metabase.TestingAllObjects(ctx)
require.NoError(t, err)
require.Len(t, objects, 1)
segments, err := satellite.Metainfo.Metabase.TestingAllObjectSegments(ctx, metabase.ObjectLocation{
ProjectID: projectID,
BucketName: bucketName,
ObjectKey: objects[0].ObjectKey,
})
segments, err := satellite.Metainfo.Metabase.TestingAllSegments(ctx)
require.NoError(t, err)
require.Len(t, segments, 1)
require.False(t, segments[0].Inline())

View File

@ -105,28 +105,16 @@ func NewSegmentRepairer(
func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (shouldDelete bool, err error) {
defer mon.Task()(&ctx, path)(&err)
// TODO extend InjuredSegment with StreamID/Position and replace path
segmentLocation, err := metabase.ParseSegmentKey(metabase.SegmentKey(path))
if err != nil {
return false, metainfoGetError.Wrap(err)
}
// TODO extend InjuredSegment with StreamID/Position and replace path
object, err := repairer.metabase.GetObjectLatestVersion(ctx, metabase.GetObjectLatestVersion{
ObjectLocation: segmentLocation.Object(),
})
if err != nil {
if storj.ErrObjectNotFound.Has(err) {
mon.Meter("repair_unnecessary").Mark(1) //mon:locked
mon.Meter("segment_deleted_before_repair").Mark(1) //mon:locked
repairer.log.Debug("segment was deleted")
return true, nil
}
return false, metainfoGetError.Wrap(err)
}
segment, err := repairer.metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: object.StreamID,
Position: segmentLocation.Position,
// TODO we should replace GetSegmentByLocation with GetSegmentByPosition when
// we refactor the repair queue to store metabase.SegmentPosition instead of storj.Path.
segment, err := repairer.metabase.GetSegmentByLocation(ctx, metabase.GetSegmentByLocation{
SegmentLocation: segmentLocation,
})
if err != nil {
if storj.ErrObjectNotFound.Has(err) {
@ -378,7 +366,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (s
}
err = repairer.metabase.UpdateSegmentPieces(ctx, metabase.UpdateSegmentPieces{
StreamID: object.StreamID,
StreamID: segment.StreamID,
Position: segmentLocation.Position,
OldPieces: segment.Pieces,