storj/satellite/audit/verifier_test.go
Kaloyan Raev b409b53f7f cmd/satellite: command for verifying piece hashes
Jira: https://storjlabs.atlassian.net/browse/PG-69

There are a number of segments with piece_hashes_verified = false in
their metadata) on US-Central-1, Europe-West-1, and Asia-East-1
satellites. Most probably, this happened due to a bug we had in the
past. We want to verify them before executing the main migration to
metabase. This would simplify the main migration to metabase with one
less issue to think about.

Change-Id: I8831af1a254c560d45bb87d7104e49abd8242236
2020-09-29 10:58:24 +00:00

1053 lines
36 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package audit_test
import (
"context"
"errors"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/errs2"
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/peertls/tlsopts"
"storj.io/common/rpc"
"storj.io/common/rpc/rpcstatus"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testblobs"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/satellite/audit"
"storj.io/storj/satellite/metainfo/metabase"
"storj.io/storj/storage"
"storj.io/storj/storagenode"
)
// TestDownloadSharesHappyPath checks that the Share.Error field of all shares
// returned by the DownloadShares method contain no error if all shares were
// downloaded successfully.
func TestDownloadSharesHappyPath(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
uplink := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := uplink.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
bucket := metabase.BucketLocation{ProjectID: uplink.Projects[0].ID, BucketName: "testbucket"}
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
require.NoError(t, err)
randomIndex, err := audit.GetRandomStripe(ctx, pointer)
require.NoError(t, err)
shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize()
limits, privateKey, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, bucket, pointer, nil)
require.NoError(t, err)
shares, err := audits.Verifier.DownloadShares(ctx, limits, privateKey, randomIndex, shareSize)
require.NoError(t, err)
for _, share := range shares {
assert.NoError(t, share.Error)
}
})
}
// TestDownloadSharesOfflineNode checks that the Share.Error field of the
// shares returned by the DownloadShares method for offline nodes contain an
// error that:
// - has the rpc.Error class
// - is not a context.DeadlineExceeded error
// - is not an RPC error
//
// If this test fails, this most probably means we made a backward-incompatible
// change that affects the audit service.
func TestDownloadSharesOfflineNode(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
uplink := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := uplink.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
bucket := metabase.BucketLocation{ProjectID: uplink.Projects[0].ID, BucketName: "testbucket"}
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
require.NoError(t, err)
randomIndex, err := audit.GetRandomStripe(ctx, pointer)
require.NoError(t, err)
shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize()
limits, privateKey, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, bucket, pointer, nil)
require.NoError(t, err)
// stop the first node in the pointer
stoppedNodeID := pointer.GetRemote().GetRemotePieces()[0].NodeId
err = planet.StopNodeAndUpdate(ctx, planet.FindNode(stoppedNodeID))
require.NoError(t, err)
shares, err := audits.Verifier.DownloadShares(ctx, limits, privateKey, randomIndex, shareSize)
require.NoError(t, err)
for _, share := range shares {
if share.NodeID == stoppedNodeID {
assert.True(t, rpc.Error.Has(share.Error), "unexpected error: %+v", share.Error)
assert.False(t, errs.Is(share.Error, context.DeadlineExceeded), "unexpected error: %+v", share.Error)
assert.True(t, errs2.IsRPC(share.Error, rpcstatus.Unknown), "unexpected error: %+v", share.Error)
} else {
assert.NoError(t, share.Error)
}
}
})
}
// TestDownloadSharesMissingPiece checks that the Share.Error field of the
// shares returned by the DownloadShares method for nodes that don't have the
// audited piece contain an RPC error with code NotFound.
//
// If this test fails, this most probably means we made a backward-incompatible
// change that affects the audit service.
func TestDownloadSharesMissingPiece(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
uplink := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := uplink.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
require.NoError(t, err)
randomIndex, err := audit.GetRandomStripe(ctx, pointer)
require.NoError(t, err)
bucket := metabase.BucketLocation{ProjectID: uplink.Projects[0].ID, BucketName: "testbucket"}
// replace the piece id of the selected stripe with a new random one
// to simulate missing piece on the storage nodes
pointer.GetRemote().RootPieceId = storj.NewPieceID()
shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize()
limits, privateKey, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, bucket, pointer, nil)
require.NoError(t, err)
shares, err := audits.Verifier.DownloadShares(ctx, limits, privateKey, randomIndex, shareSize)
require.NoError(t, err)
for _, share := range shares {
assert.True(t, errs2.IsRPC(share.Error, rpcstatus.NotFound), "unexpected error: %+v", share.Error)
}
})
}
// TestDownloadSharesDialTimeout checks that the Share.Error field of the
// shares returned by the DownloadShares method for nodes that time out on
// dialing contain an error that:
// - has the rpc.Error class
// - is a context.DeadlineExceeded error
// - is not an RPC error
//
// If this test fails, this most probably means we made a backward-incompatible
// change that affects the audit service.
func TestDownloadSharesDialTimeout(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
upl := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := upl.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
require.NoError(t, err)
randomIndex, err := audit.GetRandomStripe(ctx, pointer)
require.NoError(t, err)
bucket := metabase.BucketLocation{ProjectID: upl.Projects[0].ID, BucketName: "testbucket"}
tlsOptions, err := tlsopts.NewOptions(satellite.Identity, tlsopts.Config{}, nil)
require.NoError(t, err)
dialer := rpc.NewDefaultDialer(tlsOptions)
dialer.DialTimeout = 20 * time.Millisecond
dialer.DialLatency = 200 * time.Second
dialer.TransferRate = 1 * memory.KB
// This config value will create a very short timeframe allowed for receiving
// data from storage nodes. This will cause context to cancel with timeout.
minBytesPerSecond := 100 * memory.KiB
verifier := audit.NewVerifier(
satellite.Log.Named("verifier"),
satellite.Metainfo.Service,
dialer,
satellite.Overlay.Service,
satellite.DB.Containment(),
satellite.Orders.Service,
satellite.Identity,
minBytesPerSecond,
5*time.Second)
shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize()
limits, privateKey, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, bucket, pointer, nil)
require.NoError(t, err)
shares, err := verifier.DownloadShares(ctx, limits, privateKey, randomIndex, shareSize)
require.NoError(t, err)
for _, share := range shares {
assert.True(t, rpc.Error.Has(share.Error), "unexpected error: %+v", share.Error)
assert.True(t, errs.Is(share.Error, context.DeadlineExceeded), "unexpected error: %+v", share.Error)
}
})
}
// TestDownloadSharesDownloadTimeout checks that the Share.Error field of the
// shares returned by the DownloadShares method for nodes that are successfully
// dialed, but time out during the download of the share contain an error that:
// - is an RPC error with code DeadlineExceeded
// - does not have the rpc.Error class
//
// If this test fails, this most probably means we made a backward-incompatible
// change that affects the audit service.
func TestDownloadSharesDownloadTimeout(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
return testblobs.NewSlowDB(log.Named("slowdb"), db), nil
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
storageNodeDB := planet.StorageNodes[0].DB.(*testblobs.SlowDB)
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
upl := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := upl.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
bucket := metabase.BucketLocation{ProjectID: upl.Projects[0].ID, BucketName: "testbucket"}
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
require.NoError(t, err)
randomIndex, err := audit.GetRandomStripe(ctx, pointer)
require.NoError(t, err)
// This config value will create a very short timeframe allowed for receiving
// data from storage nodes. This will cause context to cancel with timeout.
minBytesPerSecond := 100 * memory.KiB
verifier := audit.NewVerifier(
satellite.Log.Named("verifier"),
satellite.Metainfo.Service,
satellite.Dialer,
satellite.Overlay.Service,
satellite.DB.Containment(),
satellite.Orders.Service,
satellite.Identity,
minBytesPerSecond,
150*time.Millisecond)
shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize()
limits, privateKey, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, bucket, pointer, nil)
require.NoError(t, err)
// make downloads on storage node slower than the timeout on the satellite for downloading shares
delay := 200 * time.Millisecond
storageNodeDB.SetLatency(delay)
shares, err := verifier.DownloadShares(ctx, limits, privateKey, randomIndex, shareSize)
require.NoError(t, err)
require.Len(t, shares, 1)
share := shares[0]
assert.True(t, errs2.IsRPC(share.Error, rpcstatus.DeadlineExceeded), "unexpected error: %+v", share.Error)
assert.False(t, rpc.Error.Has(share.Error), "unexpected error: %+v", share.Error)
})
}
func TestVerifierHappyPath(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
require.NoError(t, err)
report, err := audits.Verifier.Verify(ctx, path, nil)
require.NoError(t, err)
assert.Len(t, report.Successes, len(pointer.GetRemote().GetRemotePieces()))
assert.Len(t, report.Fails, 0)
assert.Len(t, report.Offlines, 0)
assert.Len(t, report.PendingAudits, 0)
})
}
func TestVerifierExpired(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
// set pointer's expiration date to be already expired
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
require.NoError(t, err)
oldPointerBytes, err := pb.Marshal(pointer)
require.NoError(t, err)
newPointer := &pb.Pointer{}
err = pb.Unmarshal(oldPointerBytes, newPointer)
require.NoError(t, err)
newPointer.ExpirationDate = time.Now().Add(-1 * time.Hour)
newPointerBytes, err := pb.Marshal(newPointer)
require.NoError(t, err)
err = satellite.Metainfo.Database.CompareAndSwap(ctx, storage.Key(path), oldPointerBytes, newPointerBytes)
require.NoError(t, err)
// Verify should not return an error
report, err := audits.Verifier.Verify(ctx, path, nil)
require.NoError(t, err)
// Verify should delete the expired segment
pointer, err = satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
require.Error(t, err)
require.Nil(t, pointer)
assert.Len(t, report.Successes, 0)
assert.Len(t, report.Fails, 0)
assert.Len(t, report.Offlines, 0)
assert.Len(t, report.PendingAudits, 0)
})
}
func TestVerifierOfflineNode(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
require.NoError(t, err)
// stop the first node in the pointer
stoppedNodeID := pointer.GetRemote().GetRemotePieces()[0].NodeId
err = planet.StopNodeAndUpdate(ctx, planet.FindNode(stoppedNodeID))
require.NoError(t, err)
report, err := audits.Verifier.Verify(ctx, path, nil)
require.NoError(t, err)
assert.Len(t, report.Successes, len(pointer.GetRemote().GetRemotePieces())-1)
assert.Len(t, report.Fails, 0)
assert.Len(t, report.Offlines, 1)
assert.Len(t, report.PendingAudits, 0)
})
}
func TestVerifierMissingPiece(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
require.NoError(t, err)
// delete the piece from the first node
origNumPieces := len(pointer.GetRemote().GetRemotePieces())
piece := pointer.GetRemote().GetRemotePieces()[0]
pieceID := pointer.GetRemote().RootPieceId.Derive(piece.NodeId, piece.PieceNum)
node := planet.FindNode(piece.NodeId)
err = node.Storage2.Store.Delete(ctx, satellite.ID(), pieceID)
require.NoError(t, err)
report, err := audits.Verifier.Verify(ctx, path, nil)
require.NoError(t, err)
assert.Len(t, report.Successes, origNumPieces-1)
assert.Len(t, report.Fails, 1)
assert.Len(t, report.Offlines, 0)
assert.Len(t, report.PendingAudits, 0)
})
}
// TestVerifierMissingPieceHashesNotVerified tests that if piece hashes were not verified for a pointer,
// a node that fails an audit for that pointer does not get marked as failing an audit, but is removed from
// the pointer.
func TestVerifierMissingPieceHashesNotVerified(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
require.NoError(t, err)
// update pointer to have PieceHashesVerified false
err = satellite.Metainfo.Service.UnsynchronizedDelete(ctx, metabase.SegmentKey(path))
require.NoError(t, err)
pointer.PieceHashesVerified = false
err = satellite.Metainfo.Service.Put(ctx, metabase.SegmentKey(path), pointer)
require.NoError(t, err)
// delete the piece from the first node
origNumPieces := len(pointer.GetRemote().GetRemotePieces())
piece := pointer.GetRemote().GetRemotePieces()[0]
pieceID := pointer.GetRemote().RootPieceId.Derive(piece.NodeId, piece.PieceNum)
node := planet.FindNode(piece.NodeId)
err = node.Storage2.Store.Delete(ctx, satellite.ID(), pieceID)
require.NoError(t, err)
report, err := audits.Verifier.Verify(ctx, path, nil)
require.NoError(t, err)
assert.Len(t, report.Successes, origNumPieces-1)
// expect no failed audit
assert.Len(t, report.Fails, 0)
assert.Len(t, report.Offlines, 0)
assert.Len(t, report.PendingAudits, 0)
})
}
func TestVerifierDialTimeout(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
require.NoError(t, err)
tlsOptions, err := tlsopts.NewOptions(satellite.Identity, tlsopts.Config{}, nil)
require.NoError(t, err)
dialer := rpc.NewDefaultDialer(tlsOptions)
dialer.DialTimeout = 20 * time.Millisecond
dialer.DialLatency = 200 * time.Second
dialer.TransferRate = 1 * memory.KB
// This config value will create a very short timeframe allowed for receiving
// data from storage nodes. This will cause context to cancel with timeout.
minBytesPerSecond := 100 * memory.KiB
verifier := audit.NewVerifier(
satellite.Log.Named("verifier"),
satellite.Metainfo.Service,
dialer,
satellite.Overlay.Service,
satellite.DB.Containment(),
satellite.Orders.Service,
satellite.Identity,
minBytesPerSecond,
5*time.Second)
report, err := verifier.Verify(ctx, path, nil)
require.True(t, audit.ErrNotEnoughShares.Has(err), "unexpected error: %+v", err)
assert.Len(t, report.Successes, 0)
assert.Len(t, report.Fails, 0)
assert.Len(t, report.Offlines, len(pointer.GetRemote().GetRemotePieces()))
assert.Len(t, report.PendingAudits, 0)
})
}
func TestVerifierDeletedSegment(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
// delete the file
err = ul.DeleteObject(ctx, satellite, "testbucket", "test/path")
require.NoError(t, err)
// Verify should not return an error, but report should be empty
report, err := audits.Verifier.Verify(ctx, path, nil)
require.NoError(t, err)
assert.Empty(t, report)
})
}
func TestVerifierModifiedSegment(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
metainfo := satellite.Metainfo.Service
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
audits.Verifier.OnTestingCheckSegmentAlteredHook = func() {
// remove one piece from the segment so that checkIfSegmentAltered fails
pointer, err := metainfo.Get(ctx, metabase.SegmentKey(path))
require.NoError(t, err)
pieceToRemove := pointer.Remote.RemotePieces[0]
_, err = metainfo.UpdatePieces(ctx, metabase.SegmentKey(path), pointer, nil, []*pb.RemotePiece{pieceToRemove})
require.NoError(t, err)
}
// Verify should not return an error, but report should be empty
report, err := audits.Verifier.Verify(ctx, path, nil)
require.NoError(t, err)
assert.Empty(t, report)
})
}
func TestVerifierReplacedSegment(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
audits.Verifier.OnTestingCheckSegmentAlteredHook = func() {
// replace the file so that checkIfSegmentAltered fails
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
}
// Verify should not return an error, but report should be empty
report, err := audits.Verifier.Verify(ctx, path, nil)
require.NoError(t, err)
assert.Empty(t, report)
})
}
func TestVerifierModifiedSegmentFailsOnce(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
require.NoError(t, err)
// delete the piece from the first node
origNumPieces := len(pointer.GetRemote().GetRemotePieces())
piece := pointer.GetRemote().GetRemotePieces()[0]
pieceID := pointer.GetRemote().RootPieceId.Derive(piece.NodeId, piece.PieceNum)
node := planet.FindNode(piece.NodeId)
err = node.Storage2.Store.Delete(ctx, satellite.ID(), pieceID)
require.NoError(t, err)
report, err := audits.Verifier.Verify(ctx, path, nil)
require.NoError(t, err)
assert.Len(t, report.Successes, origNumPieces-1)
assert.Len(t, report.Fails, 1)
assert.Equal(t, report.Fails[0], piece.NodeId)
assert.Len(t, report.Offlines, 0)
require.Len(t, report.PendingAudits, 0)
})
}
// TestVerifierSlowDownload checks that a node that times out while sending data to the
// audit service gets put into containment mode.
func TestVerifierSlowDownload(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
return testblobs.NewSlowDB(log.Named("slowdb"), db), nil
},
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
// These config values are chosen to force the slow node to time out without timing out on the three normal nodes
config.Audit.MinBytesPerSecond = 100 * memory.KiB
config.Audit.MinDownloadTimeout = 950 * time.Millisecond
config.Metainfo.RS.MinThreshold = 2
config.Metainfo.RS.RepairThreshold = 2
config.Metainfo.RS.SuccessThreshold = 4
config.Metainfo.RS.TotalThreshold = 4
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
require.NoError(t, err)
slowNode := planet.FindNode(pointer.Remote.RemotePieces[0].NodeId)
slowNodeDB := slowNode.DB.(*testblobs.SlowDB)
// make downloads on storage node slower than the timeout on the satellite for downloading shares
delay := 1 * time.Second
slowNodeDB.SetLatency(delay)
report, err := audits.Verifier.Verify(ctx, path, nil)
require.NoError(t, err)
assert.NotContains(t, report.Successes, slowNode.ID())
assert.Len(t, report.Fails, 0)
assert.Len(t, report.Offlines, 0)
assert.Len(t, report.Unknown, 0)
assert.Len(t, report.PendingAudits, 1)
assert.Equal(t, report.PendingAudits[0].NodeID, slowNode.ID())
})
}
// TestVerifierUnknownError checks that a node that returns an unknown error in response to an audit request
// does not get marked as successful, failed, or contained.
func TestVerifierUnknownError(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
return testblobs.NewBadDB(log.Named("baddb"), db), nil
},
Satellite: testplanet.ReconfigureRS(2, 2, 4, 4),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
path, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
require.NoError(t, err)
badNode := planet.FindNode(pointer.Remote.RemotePieces[0].NodeId)
badNodeDB := badNode.DB.(*testblobs.BadDB)
// return an error when the verifier attempts to download from this node
badNodeDB.SetError(errs.New("unknown error"))
report, err := audits.Verifier.Verify(ctx, path, nil)
require.NoError(t, err)
require.Len(t, report.Successes, 3)
require.Len(t, report.Fails, 0)
require.Len(t, report.Offlines, 0)
require.Len(t, report.PendingAudits, 0)
require.Len(t, report.Unknown, 1)
require.Equal(t, report.Unknown[0], badNode.ID())
})
}
func TestVerifyPieceHashes(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.ReconfigureRS(2, 2, 6, 6),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
nodes := storj.NodeIDList{
planet.StorageNodes[0].ID(),
planet.StorageNodes[1].ID(),
planet.StorageNodes[2].ID(),
planet.StorageNodes[3].ID(),
planet.StorageNodes[4].ID(),
planet.StorageNodes[5].ID(),
}
// happy path test cases
for i, tt := range []struct {
report audit.Report
err error
changed bool
}{
{ // empty report is sometimes returned if the segment was expired or deleted.
report: audit.Report{},
changed: false,
},
{ // all nodes from the pointer responded successfully to the audit
report: audit.Report{Successes: nodes},
changed: true,
},
{ // one node failed the audit
report: audit.Report{Successes: nodes[1:], Fails: nodes[:1]},
changed: true,
},
{ // 4 nodes failed the audit
report: audit.Report{Successes: nodes[4:], Fails: nodes[:4]},
changed: true,
},
{ // one node was offline
report: audit.Report{Successes: nodes[1:], Offlines: nodes[:1]},
changed: true,
},
{ // 4 nodes were offline
report: audit.Report{Successes: nodes[4:], Offlines: nodes[:4]},
changed: true,
},
{ // one node was contained and scheduled for reverification
report: audit.Report{Successes: nodes[1:], PendingAudits: []*audit.PendingAudit{{NodeID: nodes[0]}}},
changed: true,
},
{ // 4 nodes were contained and scheduled for reverification
report: audit.Report{Successes: nodes[4:], PendingAudits: []*audit.PendingAudit{{NodeID: nodes[0]}, {NodeID: nodes[1]}, {NodeID: nodes[2]}, {NodeID: nodes[3]}}},
changed: true,
},
{ // one node returned unknown error
report: audit.Report{Successes: nodes[1:], Unknown: nodes[:1]},
changed: true,
},
{ // 4 nodes returned unknown error
report: audit.Report{Successes: nodes[4:], Unknown: nodes[:4]},
changed: true,
},
{ // one node failed the audit and 2 nodes were offline
report: audit.Report{Successes: nodes[3:], Fails: nodes[:1], Offlines: nodes[1:3]},
changed: true,
},
{ // one node failed the audit, one was offline, one was contained, and one returned unknown error
report: audit.Report{Successes: nodes[4:], Fails: nodes[:1], Offlines: nodes[1:2], PendingAudits: []*audit.PendingAudit{{NodeID: nodes[2]}}, Unknown: nodes[3:4]},
changed: true,
},
{ // remaining nodes are below repair threshold
report: audit.Report{Successes: nodes[5:], Offlines: nodes[:5]},
changed: false,
},
{ // Verify returns an error
report: audit.Report{},
err: errors.New("test error"),
changed: false,
},
} {
errTag := fmt.Sprintf("%d. %+v", i, tt)
testReport := tt.report
testErr := tt.err
audits.Verifier.OnTestingVerifyMockFunc = func() (audit.Report, error) {
return testReport, testErr
}
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
keys, err := satellite.Metainfo.Database.List(ctx, nil, 1)
require.NoError(t, err, errTag)
require.Equal(t, 1, len(keys))
key := metabase.SegmentKey(keys[0])
// verifying segments with piece_hashes_verified = true should return no error and changed = false
changed, err := audits.Verifier.VerifyPieceHashes(ctx, string(key), false)
require.NoError(t, err, errTag)
assert.False(t, changed, errTag)
// assert that piece_hashes_verified = true before setting it to false
pointer, err := satellite.Metainfo.Service.Get(ctx, key)
require.NoError(t, err, errTag)
require.True(t, pointer.PieceHashesVerified, errTag)
// set the piece_hashes_verified to false and store it in the pointer
pointer.PieceHashesVerified = false
err = satellite.Metainfo.Service.UnsynchronizedPut(ctx, key, pointer)
require.NoError(t, err, errTag)
// verifying (dry run) segments with piece_hashes_verified = false should return no error and changed = true
changed, err = audits.Verifier.VerifyPieceHashes(ctx, string(key), true)
assert.Equal(t, tt.err, err, errTag)
assert.Equal(t, tt.changed, changed, errTag)
// assert that piece_hashes_verified is still false after the dry run
dryRunPointer, err := satellite.Metainfo.Service.Get(ctx, key)
require.NoError(t, err, errTag)
assert.False(t, dryRunPointer.PieceHashesVerified, errTag)
// assert the no piece was removed from the pointer by the dry run
for i, piece := range dryRunPointer.Remote.RemotePieces {
require.GreaterOrEqual(t, len(pointer.Remote.RemotePieces), i, errTag)
assert.Equal(t, pointer.Remote.RemotePieces[i].NodeId, piece.NodeId, errTag)
}
// verifying (no dry run) segments with piece_hashes_verified = false should return no error and changed = true
changed, err = audits.Verifier.VerifyPieceHashes(ctx, string(key), false)
assert.Equal(t, tt.err, err, errTag)
assert.Equal(t, tt.changed, changed, errTag)
// assert that piece_hashes_verified = true if the segment was verified
verifiedPointer, err := satellite.Metainfo.Service.Get(ctx, key)
require.NoError(t, err, errTag)
assert.Equal(t, tt.changed, verifiedPointer.PieceHashesVerified, errTag)
if changed {
// assert the remaining pieces in the pointer are the expected ones
for _, piece := range verifiedPointer.Remote.RemotePieces {
assert.Contains(t, tt.report.Successes, piece.NodeId, errTag)
assert.NotContains(t, tt.report.Fails, piece.NodeId, errTag)
assert.NotContains(t, tt.report.Offlines, piece.NodeId, errTag)
assert.NotContains(t, tt.report.Unknown, piece.NodeId, errTag)
for _, pending := range tt.report.PendingAudits {
assert.NotEqual(t, pending.NodeID, piece.NodeId, errTag)
}
}
} else {
// assert the no piece was removed from the pointer if it wasn't verified
for i, piece := range verifiedPointer.Remote.RemotePieces {
require.GreaterOrEqual(t, len(pointer.Remote.RemotePieces), i, errTag)
assert.Equal(t, pointer.Remote.RemotePieces[i].NodeId, piece.NodeId, errTag)
}
}
// fixing non-existing object should return no error and changed = false
err = satellite.Metainfo.Service.UnsynchronizedDelete(ctx, key)
require.NoError(t, err, errTag)
changed, err = audits.Verifier.VerifyPieceHashes(ctx, string(key), false)
require.NoError(t, err, errTag)
assert.False(t, changed, errTag)
}
})
}