From 27ae0d1f15d24c81536c3f08cbbce04a1d3550cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Niewrza=C5=82?= Date: Fri, 12 Mar 2021 12:23:44 +0100 Subject: [PATCH] satellite/metainfo/metabase: add NewRedundancy parameter for UpdateSegmentPieces method At some point we might try to change original segment RS values and set Pieces according to the new values. This change adds add NewRedundancy parameter for UpdateSegmentPieces method to give ability to do that. As a part of change NewPieces are validated against NewRedundancy. Change-Id: I8ea531c9060b5cd283d3bf4f6e4c320099dd5576 --- satellite/audit/reverify_test.go | 9 ++- satellite/audit/verifier_test.go | 9 ++- satellite/gracefulexit/chore_test.go | 5 +- satellite/gracefulexit/endpoint.go | 5 +- satellite/metainfo/metabase/update.go | 22 +++++- satellite/metainfo/metabase/update_test.go | 92 +++++++++++++++++----- satellite/repair/repairer/segments.go | 5 +- 7 files changed, 111 insertions(+), 36 deletions(-) diff --git a/satellite/audit/reverify_test.go b/satellite/audit/reverify_test.go index 211408889..4d071a29c 100644 --- a/satellite/audit/reverify_test.go +++ b/satellite/audit/reverify_test.go @@ -593,10 +593,11 @@ func TestReverifyModifiedSegment(t *testing.T) { // remove a piece from the file (a piece that the contained node isn't holding) audits.Verifier.OnTestingCheckSegmentAlteredHook = func() { err = satellite.Metainfo.Metabase.UpdateSegmentPieces(ctx, metabase.UpdateSegmentPieces{ - StreamID: queueSegment.StreamID, - Position: queueSegment.Position, - OldPieces: segment.Pieces, - NewPieces: append([]metabase.Piece{segment.Pieces[0]}, segment.Pieces[2:]...), + StreamID: queueSegment.StreamID, + Position: queueSegment.Position, + OldPieces: segment.Pieces, + NewPieces: append([]metabase.Piece{segment.Pieces[0]}, segment.Pieces[2:]...), + NewRedundancy: segment.Redundancy, }) require.NoError(t, err) } diff --git a/satellite/audit/verifier_test.go b/satellite/audit/verifier_test.go index c2472023f..4ae602318 100644 --- a/satellite/audit/verifier_test.go +++ b/satellite/audit/verifier_test.go @@ -640,10 +640,11 @@ func TestVerifierModifiedSegment(t *testing.T) { require.NoError(t, err) err = satellite.Metainfo.Metabase.UpdateSegmentPieces(ctx, metabase.UpdateSegmentPieces{ - StreamID: queueSegment.StreamID, - Position: queueSegment.Position, - OldPieces: segment.Pieces, - NewPieces: append([]metabase.Piece{segment.Pieces[0]}, segment.Pieces[2:]...), + StreamID: queueSegment.StreamID, + Position: queueSegment.Position, + OldPieces: segment.Pieces, + NewPieces: append([]metabase.Piece{segment.Pieces[0]}, segment.Pieces[2:]...), + NewRedundancy: segment.Redundancy, }) require.NoError(t, err) } diff --git a/satellite/gracefulexit/chore_test.go b/satellite/gracefulexit/chore_test.go index 174a67911..11ea80a72 100644 --- a/satellite/gracefulexit/chore_test.go +++ b/satellite/gracefulexit/chore_test.go @@ -208,8 +208,9 @@ func TestDurabilityRatio(t *testing.T) { StreamID: segment.StreamID, Position: segment.Position, - OldPieces: segment.Pieces, - NewPieces: newPieces, + OldPieces: segment.Pieces, + NewPieces: newPieces, + NewRedundancy: segment.Redundancy, }) require.NoError(t, err) } diff --git a/satellite/gracefulexit/endpoint.go b/satellite/gracefulexit/endpoint.go index 7c74d1f86..687dce23b 100644 --- a/satellite/gracefulexit/endpoint.go +++ b/satellite/gracefulexit/endpoint.go @@ -988,8 +988,9 @@ func (endpoint *Endpoint) UpdatePiecesCheckDuplicates(ctx context.Context, segme StreamID: segment.StreamID, Position: segment.Position, - OldPieces: segment.Pieces, - NewPieces: pieces, + OldPieces: segment.Pieces, + NewRedundancy: segment.Redundancy, + NewPieces: pieces, }) if err != nil { if metabase.ErrSegmentNotFound.Has(err) { diff --git a/satellite/metainfo/metabase/update.go b/satellite/metainfo/metabase/update.go index 6afddf681..3679818bb 100644 --- a/satellite/metainfo/metabase/update.go +++ b/satellite/metainfo/metabase/update.go @@ -10,6 +10,7 @@ import ( "github.com/zeebo/errs" + "storj.io/common/storj" "storj.io/common/uuid" "storj.io/storj/storage" ) @@ -20,7 +21,9 @@ type UpdateSegmentPieces struct { Position SegmentPosition OldPieces Pieces - NewPieces Pieces + + NewRedundancy storj.RedundancyScheme + NewPieces Pieces } // UpdateSegmentPieces updates pieces for specified segment. If provided old pieces @@ -39,6 +42,16 @@ func (db *DB) UpdateSegmentPieces(ctx context.Context, opts UpdateSegmentPieces) return err } + if opts.NewRedundancy.IsZero() { + return ErrInvalidRequest.New("NewRedundancy zero") + } + + // its possible that in this method we will have less pieces + // than optimal shares (e.g. after repair) + if len(opts.NewPieces) < int(opts.NewRedundancy.RepairShares) { + return ErrInvalidRequest.New("number of new pieces is less than new redundancy repair shares value") + } + if err := opts.NewPieces.Verify(); err != nil { if ErrInvalidRequest.Has(err) { return ErrInvalidRequest.New("NewPieces: %v", errs.Unwrap(err)) @@ -62,12 +75,17 @@ func (db *DB) UpdateSegmentPieces(ctx context.Context, opts UpdateSegmentPieces) remote_alias_pieces = CASE WHEN remote_alias_pieces = $3 THEN $4 ELSE remote_alias_pieces + END, + redundancy = CASE + WHEN remote_alias_pieces = $3 THEN $5 + ELSE redundancy END WHERE stream_id = $1 AND position = $2 RETURNING remote_alias_pieces - `, opts.StreamID, opts.Position, oldPieces, newPieces).Scan(&resultPieces) + `, opts.StreamID, opts.Position, oldPieces, newPieces, redundancyScheme{&opts.NewRedundancy}). + Scan(&resultPieces) if err != nil { if errors.Is(err, sql.ErrNoRows) { return ErrSegmentNotFound.New("segment missing") diff --git a/satellite/metainfo/metabase/update_test.go b/satellite/metainfo/metabase/update_test.go index 3bf747175..66f112269 100644 --- a/satellite/metainfo/metabase/update_test.go +++ b/satellite/metainfo/metabase/update_test.go @@ -112,7 +112,7 @@ func TestUpdateSegmentPieces(t *testing.T) { Verify{}.Check(ctx, t, db) }) - t.Run("NewPieces missing", func(t *testing.T) { + t.Run("NewRedundancy zero", func(t *testing.T) { defer DeleteAll{}.Check(ctx, t, db) UpdateSegmentPieces{ @@ -124,7 +124,24 @@ func TestUpdateSegmentPieces(t *testing.T) { }}, }, ErrClass: &metabase.ErrInvalidRequest, - ErrText: "NewPieces: pieces missing", + ErrText: "NewRedundancy zero", + }.Check(ctx, t, db) + }) + + t.Run("NewPieces vs NewRedundancy", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + UpdateSegmentPieces{ + Opts: metabase.UpdateSegmentPieces{ + StreamID: obj.StreamID, + OldPieces: []metabase.Piece{{ + Number: 1, + StorageNode: testrand.NodeID(), + }}, + NewRedundancy: defaultTestRedundancy, + }, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "number of new pieces is less than new redundancy repair shares value", }.Check(ctx, t, db) }) @@ -133,8 +150,9 @@ func TestUpdateSegmentPieces(t *testing.T) { UpdateSegmentPieces{ Opts: metabase.UpdateSegmentPieces{ - StreamID: obj.StreamID, - OldPieces: validPieces, + StreamID: obj.StreamID, + OldPieces: validPieces, + NewRedundancy: defaultTestRedundancy, NewPieces: []metabase.Piece{{ Number: 1, StorageNode: storj.NodeID{}, @@ -151,8 +169,9 @@ func TestUpdateSegmentPieces(t *testing.T) { UpdateSegmentPieces{ Opts: metabase.UpdateSegmentPieces{ - StreamID: obj.StreamID, - OldPieces: validPieces, + StreamID: obj.StreamID, + OldPieces: validPieces, + NewRedundancy: defaultTestRedundancy, NewPieces: []metabase.Piece{ { Number: 1, @@ -175,8 +194,9 @@ func TestUpdateSegmentPieces(t *testing.T) { UpdateSegmentPieces{ Opts: metabase.UpdateSegmentPieces{ - StreamID: obj.StreamID, - OldPieces: validPieces, + StreamID: obj.StreamID, + OldPieces: validPieces, + NewRedundancy: defaultTestRedundancy, NewPieces: []metabase.Piece{ { Number: 2, @@ -199,10 +219,11 @@ func TestUpdateSegmentPieces(t *testing.T) { UpdateSegmentPieces{ Opts: metabase.UpdateSegmentPieces{ - StreamID: obj.StreamID, - Position: metabase.SegmentPosition{Index: 1}, - OldPieces: validPieces, - NewPieces: validPieces, + StreamID: obj.StreamID, + Position: metabase.SegmentPosition{Index: 1}, + OldPieces: validPieces, + NewRedundancy: defaultTestRedundancy, + NewPieces: validPieces, }, ErrClass: &metabase.ErrSegmentNotFound, ErrText: "segment missing", @@ -212,13 +233,21 @@ func TestUpdateSegmentPieces(t *testing.T) { t.Run("segment pieces column was changed", func(t *testing.T) { defer DeleteAll{}.Check(ctx, t, db) - createObject(ctx, t, db, obj, 1) + obj := createObject(ctx, t, db, obj, 1) + + newRedundancy := storj.RedundancyScheme{ + RequiredShares: 1, + RepairShares: 1, + OptimalShares: 1, + TotalShares: 4, + } UpdateSegmentPieces{ Opts: metabase.UpdateSegmentPieces{ - StreamID: obj.StreamID, - Position: metabase.SegmentPosition{Index: 0}, - OldPieces: validPieces, + StreamID: obj.StreamID, + Position: metabase.SegmentPosition{Index: 0}, + OldPieces: validPieces, + NewRedundancy: newRedundancy, NewPieces: metabase.Pieces{ metabase.Piece{ Number: 1, @@ -229,6 +258,28 @@ func TestUpdateSegmentPieces(t *testing.T) { ErrClass: &storage.ErrValueChanged, ErrText: "segment remote_alias_pieces field was changed", }.Check(ctx, t, db) + + // verify that original pieces and redundancy did not change + Verify{ + Objects: []metabase.RawObject{ + metabase.RawObject(obj), + }, + Segments: []metabase.RawSegment{ + { + StreamID: obj.StreamID, + RootPieceID: storj.PieceID{1}, + CreatedAt: &now, + EncryptedKey: []byte{3}, + EncryptedKeyNonce: []byte{4}, + EncryptedSize: 1024, + PlainOffset: 0, + PlainSize: 512, + + Redundancy: defaultTestRedundancy, + Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}}, + }, + }, + }.Check(ctx, t, db) }) t.Run("update pieces", func(t *testing.T) { @@ -255,10 +306,11 @@ func TestUpdateSegmentPieces(t *testing.T) { UpdateSegmentPieces{ Opts: metabase.UpdateSegmentPieces{ - StreamID: obj.StreamID, - Position: metabase.SegmentPosition{Index: 0}, - OldPieces: segment.Pieces, - NewPieces: expectedPieces, + StreamID: obj.StreamID, + Position: metabase.SegmentPosition{Index: 0}, + OldPieces: segment.Pieces, + NewRedundancy: defaultTestRedundancy, + NewPieces: expectedPieces, }, }.Check(ctx, t, db) diff --git a/satellite/repair/repairer/segments.go b/satellite/repair/repairer/segments.go index 0c2442d9e..d854430d7 100644 --- a/satellite/repair/repairer/segments.go +++ b/satellite/repair/repairer/segments.go @@ -394,8 +394,9 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (s StreamID: segment.StreamID, Position: segmentLocation.Position, - OldPieces: segment.Pieces, - NewPieces: newPieces, + OldPieces: segment.Pieces, + NewRedundancy: segment.Redundancy, + NewPieces: newPieces, }) if err != nil { return false, metainfoPutError.Wrap(err)