From 5314c950cc01e57285e94233509304f47e6eac80 Mon Sep 17 00:00:00 2001 From: Natalie Villasana Date: Tue, 11 Jun 2019 16:14:40 -0400 Subject: [PATCH] pkg/audit: add more reverify tests (#2144) --- pkg/audit/containment_test.go | 169 ------------- pkg/audit/reverify_test.go | 435 ++++++++++++++++++++++++++++++++++ 2 files changed, 435 insertions(+), 169 deletions(-) create mode 100644 pkg/audit/reverify_test.go diff --git a/pkg/audit/containment_test.go b/pkg/audit/containment_test.go index 94e2ed35f..c917a9d22 100644 --- a/pkg/audit/containment_test.go +++ b/pkg/audit/containment_test.go @@ -6,185 +6,16 @@ package audit_test import ( "crypto/rand" "testing" - "time" "github.com/stretchr/testify/require" - "go.uber.org/zap" - "storj.io/storj/internal/memory" "storj.io/storj/internal/testcontext" "storj.io/storj/internal/testplanet" "storj.io/storj/pkg/audit" "storj.io/storj/pkg/pkcrypto" "storj.io/storj/pkg/storj" - "storj.io/storj/uplink" ) -func TestReverifySuccess(t *testing.T) { - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - - // This is a bulky test but all it's doing is: - // - uploads random data - // - uses the cursor to get a stripe - // - creates pending audits for all nodes holding pieces for that stripe - // - the actual shares are downloaded to make sure ExpectedShareHash is correct - // - calls reverify on that same stripe - // - expects all six storage nodes to be marked as successes in the audit report - - err := planet.Satellites[0].Audit.Service.Close() - require.NoError(t, err) - - ul := planet.Uplinks[0] - testData := make([]byte, 1*memory.MiB) - _, err = rand.Read(testData) - require.NoError(t, err) - - err = ul.UploadWithConfig(ctx, planet.Satellites[0], &uplink.RSConfig{ - MinThreshold: 4, - RepairThreshold: 5, - SuccessThreshold: 6, - MaxThreshold: 6, - }, "testbucket", "test/path", testData) - require.NoError(t, err) - - metainfo := planet.Satellites[0].Metainfo.Service - cursor := audit.NewCursor(metainfo) - - var stripe *audit.Stripe - stripe, _, err = cursor.NextStripe(ctx) - require.NoError(t, err) - require.NotNil(t, stripe) - - overlay := planet.Satellites[0].Overlay.Service - transport := planet.Satellites[0].Transport - orders := planet.Satellites[0].Orders.Service - containment := planet.Satellites[0].DB.Containment() - minBytesPerSecond := 128 * memory.B - verifier := audit.NewVerifier(zap.L(), transport, overlay, containment, orders, planet.Satellites[0].Identity, minBytesPerSecond, 5*time.Second) - require.NotNil(t, verifier) - - projects, err := planet.Satellites[0].DB.Console().Projects().GetAll(ctx) - require.NoError(t, err) - - bucketID := []byte(storj.JoinPaths(projects[0].ID.String(), "testbucket")) - shareSize := stripe.Segment.GetRemote().GetRedundancy().GetErasureShareSize() - - for _, piece := range stripe.Segment.GetRemote().GetRemotePieces() { - rootPieceID := stripe.Segment.GetRemote().RootPieceId - limit, err := orders.CreateAuditOrderLimit(ctx, planet.Satellites[0].Identity.PeerIdentity(), bucketID, piece.NodeId, rootPieceID, shareSize) - require.NoError(t, err) - - share, err := verifier.GetShare(ctx, limit, stripe.Index, shareSize, int(piece.PieceNum)) - require.NoError(t, err) - - pending := &audit.PendingAudit{ - NodeID: piece.NodeId, - PieceID: rootPieceID, - StripeIndex: stripe.Index, - ShareSize: shareSize, - ExpectedShareHash: pkcrypto.SHA256Hash(share.Data), - ReverifyCount: 0, - } - - err = containment.IncrementPending(ctx, pending) - require.NoError(t, err) - } - - report, err := verifier.Reverify(ctx, stripe) - require.NoError(t, err) - - successes := make(map[string]bool) - for _, nodeID := range report.Successes { - successes[nodeID.String()] = true - } - - for _, piece := range stripe.Segment.GetRemote().GetRemotePieces() { - require.True(t, successes[piece.NodeId.String()]) - } - }) -} - -func TestReverifyFail(t *testing.T) { - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - - // - uploads random data - // - uses the cursor to get a stripe - // - creates pending audits for all nodes holding pieces for that stripe - // - makes ExpectedShareHash have random data - // - calls reverify on that same stripe - // - expects all six storage nodes to be marked as fails in the audit report - - err := planet.Satellites[0].Audit.Service.Close() - require.NoError(t, err) - - ul := planet.Uplinks[0] - testData := make([]byte, 1*memory.MiB) - _, err = rand.Read(testData) - require.NoError(t, err) - - err = ul.UploadWithConfig(ctx, planet.Satellites[0], &uplink.RSConfig{ - MinThreshold: 4, - RepairThreshold: 5, - SuccessThreshold: 6, - MaxThreshold: 6, - }, "testbucket", "test/path", testData) - require.NoError(t, err) - - metainfo := planet.Satellites[0].Metainfo.Service - cursor := audit.NewCursor(metainfo) - - var stripe *audit.Stripe - stripe, _, err = cursor.NextStripe(ctx) - require.NoError(t, err) - require.NotNil(t, stripe) - - overlay := planet.Satellites[0].Overlay.Service - transport := planet.Satellites[0].Transport - orders := planet.Satellites[0].Orders.Service - containment := planet.Satellites[0].DB.Containment() - minBytesPerSecond := 128 * memory.B - verifier := audit.NewVerifier(zap.L(), transport, overlay, containment, orders, planet.Satellites[0].Identity, minBytesPerSecond, 5*time.Second) - require.NotNil(t, verifier) - - for _, piece := range stripe.Segment.GetRemote().GetRemotePieces() { - rootPieceID := stripe.Segment.GetRemote().RootPieceId - redundancy := stripe.Segment.GetRemote().GetRedundancy() - - randBytes := make([]byte, 10) - _, err := rand.Read(randBytes) - require.NoError(t, err) - - pending := &audit.PendingAudit{ - NodeID: piece.NodeId, - PieceID: rootPieceID, - StripeIndex: stripe.Index, - ShareSize: redundancy.ErasureShareSize, - ExpectedShareHash: pkcrypto.SHA256Hash(randBytes), - ReverifyCount: 0, - } - - err = planet.Satellites[0].DB.Containment().IncrementPending(ctx, pending) - require.NoError(t, err) - } - - report, err := verifier.Reverify(ctx, stripe) - require.NoError(t, err) - - fails := make(map[string]bool) - for _, nodeID := range report.Fails { - fails[nodeID.String()] = true - } - - for _, piece := range stripe.Segment.GetRemote().GetRemotePieces() { - require.True(t, fails[piece.NodeId.String()]) - } - }) -} - func TestContainIncrementAndGet(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, diff --git a/pkg/audit/reverify_test.go b/pkg/audit/reverify_test.go new file mode 100644 index 000000000..fb1326205 --- /dev/null +++ b/pkg/audit/reverify_test.go @@ -0,0 +1,435 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package audit_test + +import ( + "crypto/rand" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "storj.io/storj/internal/memory" + "storj.io/storj/internal/testcontext" + "storj.io/storj/internal/testplanet" + "storj.io/storj/pkg/audit" + "storj.io/storj/pkg/peertls/tlsopts" + "storj.io/storj/pkg/pkcrypto" + "storj.io/storj/pkg/storj" + "storj.io/storj/pkg/transport" +) + +func TestReverifySuccess(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + + // This is a bulky test but all it's doing is: + // - uploads random data + // - uses the cursor to get a stripe + // - creates one pending audit for a node holding a piece for that stripe + // - the actual share is downloaded to make sure ExpectedShareHash is correct + // - calls reverify on that same stripe + // - expects one storage node to be marked as a success in the audit report + + err := planet.Satellites[0].Audit.Service.Close() + require.NoError(t, err) + + ul := planet.Uplinks[0] + testData := make([]byte, 1*memory.MiB) + _, err = rand.Read(testData) + require.NoError(t, err) + + err = ul.Upload(ctx, planet.Satellites[0], "testbucket", "test/path", testData) + require.NoError(t, err) + + metainfo := planet.Satellites[0].Metainfo.Service + cursor := audit.NewCursor(metainfo) + + var stripe *audit.Stripe + stripe, _, err = cursor.NextStripe(ctx) + require.NoError(t, err) + require.NotNil(t, stripe) + + orders := planet.Satellites[0].Orders.Service + containment := planet.Satellites[0].DB.Containment() + + verifier := audit.NewVerifier(zap.L(), + planet.Satellites[0].Transport, + planet.Satellites[0].Overlay.Service, + containment, + orders, + planet.Satellites[0].Identity, + 128*memory.B, + 5*time.Second) + require.NotNil(t, verifier) + + projects, err := planet.Satellites[0].DB.Console().Projects().GetAll(ctx) + require.NoError(t, err) + + bucketID := []byte(storj.JoinPaths(projects[0].ID.String(), "testbucket")) + shareSize := stripe.Segment.GetRemote().GetRedundancy().GetErasureShareSize() + + pieces := stripe.Segment.GetRemote().GetRemotePieces() + rootPieceID := stripe.Segment.GetRemote().RootPieceId + limit, err := orders.CreateAuditOrderLimit(ctx, planet.Satellites[0].Identity.PeerIdentity(), bucketID, pieces[0].NodeId, rootPieceID, shareSize) + require.NoError(t, err) + + share, err := verifier.GetShare(ctx, limit, stripe.Index, shareSize, int(pieces[0].PieceNum)) + require.NoError(t, err) + + pending := &audit.PendingAudit{ + NodeID: pieces[0].NodeId, + PieceID: rootPieceID, + StripeIndex: stripe.Index, + ShareSize: shareSize, + ExpectedShareHash: pkcrypto.SHA256Hash(share.Data), + ReverifyCount: 0, + } + + err = containment.IncrementPending(ctx, pending) + require.NoError(t, err) + + report, err := verifier.Reverify(ctx, stripe) + require.NoError(t, err) + + require.Len(t, report.Fails, 0) + require.Len(t, report.Offlines, 0) + require.Len(t, report.PendingAudits, 0) + require.Len(t, report.Successes, 1) + require.Equal(t, report.Successes[0], pieces[0].NodeId) + }) +} + +func TestReverifyFailMissingShare(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + + // - uploads random data + // - uses the cursor to get a stripe + // - creates one pending audit for a node holding a piece for that stripe + // - the actual share is downloaded to make sure ExpectedShareHash is correct + // - delete piece from node + // - calls reverify on that same stripe + // - expects one storage node to be marked as a fail in the audit report + + err := planet.Satellites[0].Audit.Service.Close() + require.NoError(t, err) + + ul := planet.Uplinks[0] + testData := make([]byte, 1*memory.MiB) + _, err = rand.Read(testData) + require.NoError(t, err) + + err = ul.Upload(ctx, planet.Satellites[0], "testbucket", "test/path", testData) + require.NoError(t, err) + + metainfo := planet.Satellites[0].Metainfo.Service + cursor := audit.NewCursor(metainfo) + + var stripe *audit.Stripe + stripe, _, err = cursor.NextStripe(ctx) + require.NoError(t, err) + require.NotNil(t, stripe) + + orders := planet.Satellites[0].Orders.Service + containment := planet.Satellites[0].DB.Containment() + + verifier := audit.NewVerifier(zap.L(), + planet.Satellites[0].Transport, + planet.Satellites[0].Overlay.Service, + containment, + orders, + planet.Satellites[0].Identity, + 128*memory.B, + 5*time.Second) + require.NotNil(t, verifier) + + projects, err := planet.Satellites[0].DB.Console().Projects().GetAll(ctx) + require.NoError(t, err) + + bucketID := []byte(storj.JoinPaths(projects[0].ID.String(), "testbucket")) + shareSize := stripe.Segment.GetRemote().GetRedundancy().GetErasureShareSize() + + pieces := stripe.Segment.GetRemote().GetRemotePieces() + rootPieceID := stripe.Segment.GetRemote().RootPieceId + limit, err := orders.CreateAuditOrderLimit(ctx, planet.Satellites[0].Identity.PeerIdentity(), bucketID, pieces[0].NodeId, rootPieceID, shareSize) + require.NoError(t, err) + + share, err := verifier.GetShare(ctx, limit, stripe.Index, shareSize, int(pieces[0].PieceNum)) + require.NoError(t, err) + + pending := &audit.PendingAudit{ + NodeID: pieces[0].NodeId, + PieceID: rootPieceID, + StripeIndex: stripe.Index, + ShareSize: shareSize, + ExpectedShareHash: pkcrypto.SHA256Hash(share.Data), + ReverifyCount: 0, + } + + err = containment.IncrementPending(ctx, pending) + require.NoError(t, err) + + // delete the piece from the first node + nodeID := stripe.Segment.GetRemote().GetRemotePieces()[0].NodeId + pieceID := stripe.Segment.GetRemote().RootPieceId.Derive(nodeID) + node := getStorageNode(planet, nodeID) + err = node.Storage2.Store.Delete(ctx, planet.Satellites[0].ID(), pieceID) + require.NoError(t, err) + + report, err := verifier.Reverify(ctx, stripe) + require.NoError(t, err) + + require.Len(t, report.Successes, 0) + require.Len(t, report.Offlines, 0) + require.Len(t, report.PendingAudits, 0) + require.Len(t, report.Fails, 1) + require.Equal(t, report.Fails[0], pieces[0].NodeId) + }) +} + +func TestReverifyFailBadData(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + + // - uploads random data + // - uses the cursor to get a stripe + // - creates a pending audit for a node holding a piece for that stripe + // - makes ExpectedShareHash have random data + // - calls reverify on that same stripe + // - expects one storage node to be marked as a fail in the audit report + + err := planet.Satellites[0].Audit.Service.Close() + require.NoError(t, err) + + ul := planet.Uplinks[0] + testData := make([]byte, 1*memory.MiB) + _, err = rand.Read(testData) + require.NoError(t, err) + + err = ul.Upload(ctx, planet.Satellites[0], "testbucket", "test/path", testData) + require.NoError(t, err) + + metainfo := planet.Satellites[0].Metainfo.Service + cursor := audit.NewCursor(metainfo) + + var stripe *audit.Stripe + stripe, _, err = cursor.NextStripe(ctx) + require.NoError(t, err) + require.NotNil(t, stripe) + + minBytesPerSecond := 128 * memory.B + + verifier := audit.NewVerifier(zap.L(), + planet.Satellites[0].Transport, + planet.Satellites[0].Overlay.Service, + planet.Satellites[0].DB.Containment(), + planet.Satellites[0].Orders.Service, + planet.Satellites[0].Identity, + minBytesPerSecond, + 5*time.Second) + require.NotNil(t, verifier) + + pieces := stripe.Segment.GetRemote().GetRemotePieces() + rootPieceID := stripe.Segment.GetRemote().RootPieceId + redundancy := stripe.Segment.GetRemote().GetRedundancy() + + randBytes := make([]byte, 10) + _, err = rand.Read(randBytes) + require.NoError(t, err) + + pending := &audit.PendingAudit{ + NodeID: pieces[0].NodeId, + PieceID: rootPieceID, + StripeIndex: stripe.Index, + ShareSize: redundancy.ErasureShareSize, + ExpectedShareHash: pkcrypto.SHA256Hash(randBytes), + ReverifyCount: 0, + } + + err = planet.Satellites[0].DB.Containment().IncrementPending(ctx, pending) + require.NoError(t, err) + + report, err := verifier.Reverify(ctx, stripe) + require.NoError(t, err) + + require.Len(t, report.Successes, 0) + require.Len(t, report.Offlines, 0) + require.Len(t, report.PendingAudits, 0) + require.Len(t, report.Fails, 1) + require.Equal(t, report.Fails[0], pieces[0].NodeId) + }) +} + +func TestReverifyOffline(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + + // - uploads random data + // - uses the cursor to get a stripe + // - creates pending audits for one node holding a piece for that stripe + // - stop the node that has the pending audit + // - calls reverify on that same stripe + // - expects one storage node to be marked as offline in the audit report + + err := planet.Satellites[0].Audit.Service.Close() + require.NoError(t, err) + + ul := planet.Uplinks[0] + testData := make([]byte, 1*memory.MiB) + _, err = rand.Read(testData) + require.NoError(t, err) + + err = ul.Upload(ctx, planet.Satellites[0], "testbucket", "test/path", testData) + require.NoError(t, err) + + cursor := audit.NewCursor(planet.Satellites[0].Metainfo.Service) + + var stripe *audit.Stripe + stripe, _, err = cursor.NextStripe(ctx) + require.NoError(t, err) + require.NotNil(t, stripe) + + minBytesPerSecond := 128 * memory.B + + verifier := audit.NewVerifier(zap.L(), + planet.Satellites[0].Transport, + planet.Satellites[0].Overlay.Service, + planet.Satellites[0].DB.Containment(), + planet.Satellites[0].Orders.Service, + planet.Satellites[0].Identity, + minBytesPerSecond, + 5*time.Second) + require.NotNil(t, verifier) + + pieces := stripe.Segment.GetRemote().GetRemotePieces() + rootPieceID := stripe.Segment.GetRemote().RootPieceId + redundancy := stripe.Segment.GetRemote().GetRedundancy() + + randBytes := make([]byte, 10) + _, err = rand.Read(randBytes) + require.NoError(t, err) + + pending := &audit.PendingAudit{ + NodeID: pieces[0].NodeId, + PieceID: rootPieceID, + StripeIndex: stripe.Index, + ShareSize: redundancy.ErasureShareSize, + ExpectedShareHash: pkcrypto.SHA256Hash(randBytes), + ReverifyCount: 0, + } + + err = planet.Satellites[0].DB.Containment().IncrementPending(ctx, pending) + require.NoError(t, err) + + err = stopStorageNode(ctx, planet, pieces[0].NodeId) + require.NoError(t, err) + + report, err := verifier.Reverify(ctx, stripe) + require.NoError(t, err) + + require.Len(t, report.Successes, 0) + require.Len(t, report.Fails, 0) + require.Len(t, report.PendingAudits, 0) + require.Len(t, report.Offlines, 1) + require.Equal(t, report.Offlines[0], pieces[0].NodeId) + }) +} + +func TestReverifyOfflineDialTimeout(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + + // - uploads random data + // - uses the cursor to get a stripe + // - creates pending audit for one node holding a piece for that stripe + // - uses a slow transport client so that dial timeout will happen (an offline case) + // - calls reverify on that same stripe + // - expects one storage node to be marked as offline in the audit report + + err := planet.Satellites[0].Audit.Service.Close() + require.NoError(t, err) + + ul := planet.Uplinks[0] + testData := make([]byte, 1*memory.MiB) + _, err = rand.Read(testData) + require.NoError(t, err) + + err = ul.Upload(ctx, planet.Satellites[0], "testbucket", "test/path", testData) + require.NoError(t, err) + + metainfo := planet.Satellites[0].Metainfo.Service + cursor := audit.NewCursor(metainfo) + + var stripe *audit.Stripe + stripe, _, err = cursor.NextStripe(ctx) + require.NoError(t, err) + require.NotNil(t, stripe) + + network := &transport.SimulatedNetwork{ + DialLatency: 200 * time.Second, + BytesPerSecond: 1 * memory.KiB, + } + + tlsOpts, err := tlsopts.NewOptions(planet.Satellites[0].Identity, tlsopts.Config{}) + require.NoError(t, err) + + newTransport := transport.NewClientWithTimeouts(tlsOpts, transport.Timeouts{ + Dial: 20 * time.Millisecond, + }) + + slowClient := network.NewClient(newTransport) + require.NotNil(t, slowClient) + + // This config value will create a very short timeframe allowed for receiving + // data from storage nodes. This will cause context to cancel and start + // downloading from new nodes. + minBytesPerSecond := 100 * memory.KiB + + verifier := audit.NewVerifier(zap.L(), + slowClient, + planet.Satellites[0].Overlay.Service, + planet.Satellites[0].DB.Containment(), + planet.Satellites[0].Orders.Service, + planet.Satellites[0].Identity, + minBytesPerSecond, + 5*time.Second) + + pieces := stripe.Segment.GetRemote().GetRemotePieces() + + rootPieceID := stripe.Segment.GetRemote().RootPieceId + redundancy := stripe.Segment.GetRemote().GetRedundancy() + + randBytes := make([]byte, 10) + _, err = rand.Read(randBytes) + require.NoError(t, err) + + pending := &audit.PendingAudit{ + NodeID: pieces[0].NodeId, + PieceID: rootPieceID, + StripeIndex: stripe.Index, + ShareSize: redundancy.ErasureShareSize, + ExpectedShareHash: pkcrypto.SHA256Hash(randBytes), + ReverifyCount: 0, + } + + err = planet.Satellites[0].DB.Containment().IncrementPending(ctx, pending) + require.NoError(t, err) + + report, err := verifier.Reverify(ctx, stripe) + require.NoError(t, err) + + require.Len(t, report.Successes, 0) + require.Len(t, report.Fails, 0) + require.Len(t, report.PendingAudits, 0) + require.Len(t, report.Offlines, 1) + require.Equal(t, report.Offlines[0], pending.NodeID) + }) +}