diff --git a/satellite/audit/reverify_test.go b/satellite/audit/reverify_test.go index 1b3b7b656..75270e610 100644 --- a/satellite/audit/reverify_test.go +++ b/satellite/audit/reverify_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -14,11 +15,13 @@ import ( "storj.io/storj/internal/testcontext" "storj.io/storj/internal/testplanet" "storj.io/storj/internal/testrand" + "storj.io/storj/pkg/pb" "storj.io/storj/pkg/peertls/tlsopts" "storj.io/storj/pkg/pkcrypto" "storj.io/storj/pkg/rpc" "storj.io/storj/pkg/storj" "storj.io/storj/satellite/audit" + "storj.io/storj/storage" "storj.io/storj/uplink" ) @@ -776,3 +779,179 @@ func TestReverifyDifferentShare(t *testing.T) { require.Equal(t, report.Fails[0], selectedNode) }) } + +// TestReverifyExpired1 tests the case where the segment passed into Reverify is expired +func TestReverifyExpired1(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + satellite := planet.Satellites[0] + audits := satellite.Audit + queue := audits.Queue + + audits.Worker.Loop.Pause() + + ul := planet.Uplinks[0] + testData := testrand.Bytes(8 * memory.KiB) + + err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData) + require.NoError(t, err) + + audits.Chore.Loop.TriggerWait() + path, err := queue.Next() + require.NoError(t, err) + + // set pointer's expiration date to be already expired + pointer, err := satellite.Metainfo.Service.Get(ctx, path) + require.NoError(t, err) + oldPointerBytes, err := proto.Marshal(pointer) + require.NoError(t, err) + newPointer := &pb.Pointer{} + err = proto.Unmarshal(oldPointerBytes, newPointer) + require.NoError(t, err) + newPointer.ExpirationDate = time.Now().UTC().Add(-1 * time.Hour) + newPointerBytes, err := proto.Marshal(newPointer) + require.NoError(t, err) + err = satellite.Metainfo.Service.DB.CompareAndSwap(ctx, storage.Key(path), oldPointerBytes, newPointerBytes) + require.NoError(t, err) + + report, err := audits.Verifier.Reverify(ctx, path) + require.Error(t, err) + require.True(t, audit.ErrSegmentExpired.Has(err)) + + // Reverify should delete the expired segment + pointer, err = satellite.Metainfo.Service.Get(ctx, path) + require.Error(t, err) + require.Nil(t, pointer) + + assert.Len(t, report.Successes, 0) + assert.Len(t, report.Fails, 0) + assert.Len(t, report.Offlines, 0) + assert.Len(t, report.PendingAudits, 0) + }) +} + +// TestReverifyExpired2 tests the case where the segment passed into Reverify is not expired, +// but the segment a node is contained for has expired. +func TestReverifyExpired2(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + satellite := planet.Satellites[0] + audits := satellite.Audit + queue := audits.Queue + + audits.Worker.Loop.Pause() + + ul := planet.Uplinks[0] + testData1 := testrand.Bytes(8 * memory.KiB) + testData2 := testrand.Bytes(8 * memory.KiB) + // upload to three nodes so there is definitely at least one node overlap between the two files + rs := &uplink.RSConfig{ + MinThreshold: 1, + RepairThreshold: 2, + SuccessThreshold: 3, + MaxThreshold: 3, + } + err := ul.UploadWithConfig(ctx, satellite, rs, "testbucket", "test/path1", testData1) + require.NoError(t, err) + + err = ul.UploadWithConfig(ctx, satellite, rs, "testbucket", "test/path2", testData2) + require.NoError(t, err) + + audits.Chore.Loop.TriggerWait() + path1, err := queue.Next() + require.NoError(t, err) + path2, err := queue.Next() + require.NoError(t, err) + require.NotEqual(t, path1, path2) + + pointer1, err := satellite.Metainfo.Service.Get(ctx, path1) + require.NoError(t, err) + pointer2, err := satellite.Metainfo.Service.Get(ctx, path2) + require.NoError(t, err) + + // find a node that contains a piece for both files + // save that node ID and the piece number associated with it for pointer1 + var selectedNode storj.NodeID + var selectedPieceNum int32 + p1Nodes := make(map[storj.NodeID]int32) + for _, piece := range pointer1.GetRemote().GetRemotePieces() { + p1Nodes[piece.NodeId] = piece.PieceNum + } + for _, piece := range pointer2.GetRemote().GetRemotePieces() { + pieceNum, ok := p1Nodes[piece.NodeId] + if ok { + selectedNode = piece.NodeId + selectedPieceNum = pieceNum + break + } + } + require.NotEqual(t, selectedNode, storj.NodeID{}) + + randomIndex, err := audit.GetRandomStripe(ctx, pointer1) + require.NoError(t, err) + + orders := satellite.Orders.Service + containment := satellite.DB.Containment() + + projects, err := satellite.DB.Console().Projects().GetAll(ctx) + require.NoError(t, err) + + bucketID := []byte(storj.JoinPaths(projects[0].ID.String(), "testbucket")) + shareSize := pointer1.GetRemote().GetRedundancy().GetErasureShareSize() + + rootPieceID := pointer1.GetRemote().RootPieceId + limit, privateKey, err := orders.CreateAuditOrderLimit(ctx, bucketID, selectedNode, selectedPieceNum, rootPieceID, shareSize) + require.NoError(t, err) + + share, err := audits.Verifier.GetShare(ctx, limit, privateKey, randomIndex, shareSize, int(selectedPieceNum)) + require.NoError(t, err) + + pending := &audit.PendingAudit{ + NodeID: selectedNode, + PieceID: rootPieceID, + StripeIndex: randomIndex, + ShareSize: shareSize, + ExpectedShareHash: pkcrypto.SHA256Hash(share.Data), + ReverifyCount: 0, + Path: path1, + } + + err = containment.IncrementPending(ctx, pending) + require.NoError(t, err) + + // update pointer1 to be expired + oldPointerBytes, err := proto.Marshal(pointer1) + require.NoError(t, err) + newPointer := &pb.Pointer{} + err = proto.Unmarshal(oldPointerBytes, newPointer) + require.NoError(t, err) + newPointer.ExpirationDate = time.Now().UTC().Add(-1 * time.Hour) + newPointerBytes, err := proto.Marshal(newPointer) + require.NoError(t, err) + err = satellite.Metainfo.Service.DB.CompareAndSwap(ctx, storage.Key(path1), oldPointerBytes, newPointerBytes) + require.NoError(t, err) + + // reverify with path 2. Since the selected node was put in containment for path1, + // it should be audited for path1 + // since path1 has expired, we expect no failure and we expect that the pointer has been deleted + // and that the selected node has been removed from containment mode + report, err := audits.Verifier.Reverify(ctx, path2) + require.NoError(t, err) + + require.Len(t, report.Successes, 0) + require.Len(t, report.Offlines, 0) + require.Len(t, report.PendingAudits, 0) + require.Len(t, report.Fails, 0) + + // Reverify should delete the expired segment + pointer, err := satellite.Metainfo.Service.Get(ctx, path1) + require.Error(t, err) + require.Nil(t, pointer) + + // Reverify should remove the node from containment mode + _, err = containment.Get(ctx, pending.NodeID) + require.Error(t, err) + }) +} diff --git a/satellite/audit/verifier.go b/satellite/audit/verifier.go index 305d365a8..ede7b2b24 100644 --- a/satellite/audit/verifier.go +++ b/satellite/audit/verifier.go @@ -39,6 +39,8 @@ var ( ErrNotEnoughShares = errs.Class("not enough shares for successful audit") // ErrSegmentDeleted is the errs class when the audited segment was deleted during the audit ErrSegmentDeleted = errs.Class("segment deleted during audit") + // ErrSegmentExpired is the errs class used when a segment to audit has already expired. + ErrSegmentExpired = errs.Class("segment expired before audit") ) // Share represents required information about an audited share @@ -92,6 +94,13 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[ } return Report{}, err } + if pointer.ExpirationDate != (time.Time{}) && pointer.ExpirationDate.Before(time.Now().UTC()) { + errDelete := verifier.metainfo.Delete(ctx, path) + if errDelete != nil { + return Report{}, Error.Wrap(errDelete) + } + return Report{}, ErrSegmentExpired.New("Segment expired before Verify") + } defer func() { // if piece hashes have not been verified for this segment, do not mark nodes as failing audit @@ -359,6 +368,13 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report } return Report{}, err } + if pointer.ExpirationDate != (time.Time{}) && pointer.ExpirationDate.Before(time.Now().UTC()) { + errDelete := verifier.metainfo.Delete(ctx, path) + if errDelete != nil { + return Report{}, Error.Wrap(errDelete) + } + return Report{}, ErrSegmentExpired.New("Segment expired before Reverify") + } pieceHashesVerified := make(map[storj.NodeID]bool) pieceHashesVerifiedMutex := &sync.Mutex{} @@ -420,6 +436,19 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report verifier.log.Debug("Reverify: error getting pending pointer from metainfo", zap.Binary("Segment", []byte(pending.Path)), zap.Stringer("Node ID", pending.NodeID), zap.Error(err)) return } + if pendingPointer.ExpirationDate != (time.Time{}) && pendingPointer.ExpirationDate.Before(time.Now().UTC()) { + errDelete := verifier.metainfo.Delete(ctx, pending.Path) + if errDelete != nil { + verifier.log.Debug("Reverify: error deleting expired segment", zap.Binary("Segment", []byte(pending.Path)), zap.Stringer("Node ID", pending.NodeID), zap.Error(errDelete)) + } + _, errDelete = verifier.containment.Delete(ctx, pending.NodeID) + if errDelete != nil { + verifier.log.Debug("Error deleting node from containment db", zap.Binary("Segment", []byte(pending.Path)), zap.Stringer("Node ID", pending.NodeID), zap.Error(errDelete)) + } + verifier.log.Debug("Reverify: segment already expired", zap.Binary("Segment", []byte(pending.Path)), zap.Stringer("Node ID", pending.NodeID)) + ch <- result{nodeID: pending.NodeID, status: skipped} + return + } // set whether piece hashes have been verified for this segment so we know whether to report a failed or pending audit for this node pieceHashesVerifiedMutex.Lock() diff --git a/satellite/audit/verifier_test.go b/satellite/audit/verifier_test.go index 0f6224486..c3de162b6 100644 --- a/satellite/audit/verifier_test.go +++ b/satellite/audit/verifier_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/zeebo/errs" @@ -19,11 +20,13 @@ import ( "storj.io/storj/internal/testcontext" "storj.io/storj/internal/testplanet" "storj.io/storj/internal/testrand" + "storj.io/storj/pkg/pb" "storj.io/storj/pkg/peertls/tlsopts" "storj.io/storj/pkg/rpc" "storj.io/storj/pkg/rpc/rpcstatus" "storj.io/storj/pkg/storj" "storj.io/storj/satellite/audit" + "storj.io/storj/storage" "storj.io/storj/storagenode" ) @@ -381,6 +384,56 @@ func TestVerifierHappyPath(t *testing.T) { }) } +func TestVerifierExpired(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + satellite := planet.Satellites[0] + audits := satellite.Audit + queue := audits.Queue + + audits.Worker.Loop.Pause() + + ul := planet.Uplinks[0] + testData := testrand.Bytes(8 * memory.KiB) + + err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData) + require.NoError(t, err) + + audits.Chore.Loop.TriggerWait() + path, err := queue.Next() + require.NoError(t, err) + + // set pointer's expiration date to be already expired + pointer, err := satellite.Metainfo.Service.Get(ctx, path) + require.NoError(t, err) + oldPointerBytes, err := proto.Marshal(pointer) + require.NoError(t, err) + newPointer := &pb.Pointer{} + err = proto.Unmarshal(oldPointerBytes, newPointer) + require.NoError(t, err) + newPointer.ExpirationDate = time.Now().UTC().Add(-1 * time.Hour) + newPointerBytes, err := proto.Marshal(newPointer) + require.NoError(t, err) + err = satellite.Metainfo.Service.DB.CompareAndSwap(ctx, storage.Key(path), oldPointerBytes, newPointerBytes) + require.NoError(t, err) + + report, err := audits.Verifier.Verify(ctx, path, nil) + require.Error(t, err) + require.True(t, audit.ErrSegmentExpired.Has(err)) + + // Verify should delete the expired segment + pointer, err = satellite.Metainfo.Service.Get(ctx, path) + require.Error(t, err) + require.Nil(t, pointer) + + assert.Len(t, report.Successes, 0) + assert.Len(t, report.Fails, 0) + assert.Len(t, report.Offlines, 0) + assert.Len(t, report.PendingAudits, 0) + }) +} + func TestVerifierOfflineNode(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,