storj/satellite/audit/verifier_test.go
Jeff Wendling 098cbc9c67 all: use pkg/rpc instead of pkg/transport
all of the packages and tests work with both grpc and
drpc. we'll probably need to do some jenkins pipelines
to run the tests with drpc as well.

most of the changes are really due to a bit of cleanup
of the pkg/transport.Client api into an rpc.Dialer in
the spirit of a net.Dialer. now that we don't need
observers, we can pass around stateless configuration
to everything rather than stateful things that issue
observations. it also adds a DialAddressID for the
case where we don't have a pb.Node, but we do have an
address and want to assert some ID. this happened
pretty frequently, and now there's no more weird
contortions creating custom tls options, etc.

a lot of the other changes are being consistent/using
the abstractions in the rpc package to do rpc style
things like finding peer information, or checking
status codes.

Change-Id: Ief62875e21d80a21b3c56a5a37f45887679f9412
2019-09-25 15:37:06 -06:00

640 lines
20 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package audit_test
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/storj/internal/errs2"
"storj.io/storj/internal/memory"
"storj.io/storj/internal/testblobs"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testplanet"
"storj.io/storj/internal/testrand"
"storj.io/storj/pkg/peertls/tlsopts"
"storj.io/storj/pkg/rpc"
"storj.io/storj/pkg/rpc/rpcstatus"
"storj.io/storj/pkg/storj"
"storj.io/storj/satellite/audit"
"storj.io/storj/storagenode"
)
// TestDownloadSharesHappyPath checks that the Share.Error field of all shares
// returned by the DownloadShares method contain no error if all shares were
// downloaded successfully.
func TestDownloadSharesHappyPath(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
uplink := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := uplink.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
projects, err := satellite.DB.Console().Projects().GetAll(ctx)
require.NoError(t, err)
bucketID := []byte(storj.JoinPaths(projects[0].ID.String(), "testbucket"))
audits.Chore.Loop.TriggerWait()
path, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, path)
require.NoError(t, err)
randomIndex, err := audit.GetRandomStripe(ctx, pointer)
require.NoError(t, err)
shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize()
limits, privateKey, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, bucketID, pointer, nil)
require.NoError(t, err)
shares, err := audits.Verifier.DownloadShares(ctx, limits, privateKey, randomIndex, shareSize)
require.NoError(t, err)
for _, share := range shares {
assert.NoError(t, share.Error)
}
})
}
// TestDownloadSharesOfflineNode checks that the Share.Error field of the
// shares returned by the DownloadShares method for offline nodes contain an
// error that:
// - has the rpc.Error class
// - is not a context.DeadlineExceeded error
// - is not an RPC error
//
// If this test fails, this most probably means we made a backward-incompatible
// change that affects the audit service.
func TestDownloadSharesOfflineNode(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
uplink := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := uplink.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
projects, err := satellite.DB.Console().Projects().GetAll(ctx)
require.NoError(t, err)
bucketID := []byte(storj.JoinPaths(projects[0].ID.String(), "testbucket"))
audits.Chore.Loop.TriggerWait()
path, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, path)
require.NoError(t, err)
randomIndex, err := audit.GetRandomStripe(ctx, pointer)
require.NoError(t, err)
shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize()
limits, privateKey, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, bucketID, pointer, nil)
require.NoError(t, err)
// stop the first node in the pointer
stoppedNodeID := pointer.GetRemote().GetRemotePieces()[0].NodeId
err = stopStorageNode(ctx, planet, stoppedNodeID)
require.NoError(t, err)
shares, err := audits.Verifier.DownloadShares(ctx, limits, privateKey, randomIndex, shareSize)
require.NoError(t, err)
for _, share := range shares {
if share.NodeID == stoppedNodeID {
assert.True(t, rpc.Error.Has(share.Error), "unexpected error: %+v", share.Error)
assert.False(t, errs.Is(share.Error, context.DeadlineExceeded), "unexpected error: %+v", share.Error)
assert.True(t, errs2.IsRPC(share.Error, rpcstatus.Unknown), "unexpected error: %+v", share.Error)
} else {
assert.NoError(t, share.Error)
}
}
})
}
// TestDownloadSharesMissingPiece checks that the Share.Error field of the
// shares returned by the DownloadShares method for nodes that don't have the
// audited piece contain an RPC error with code NotFound.
//
// If this test fails, this most probably means we made a backward-incompatible
// change that affects the audit service.
func TestDownloadSharesMissingPiece(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
uplink := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := uplink.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
path, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, path)
require.NoError(t, err)
randomIndex, err := audit.GetRandomStripe(ctx, pointer)
require.NoError(t, err)
projects, err := satellite.DB.Console().Projects().GetAll(ctx)
require.NoError(t, err)
bucketID := []byte(storj.JoinPaths(projects[0].ID.String(), "testbucket"))
// replace the piece id of the selected stripe with a new random one
// to simulate missing piece on the storage nodes
pointer.GetRemote().RootPieceId = storj.NewPieceID()
shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize()
limits, privateKey, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, bucketID, pointer, nil)
require.NoError(t, err)
shares, err := audits.Verifier.DownloadShares(ctx, limits, privateKey, randomIndex, shareSize)
require.NoError(t, err)
for _, share := range shares {
assert.True(t, errs2.IsRPC(share.Error, rpcstatus.NotFound), "unexpected error: %+v", share.Error)
}
})
}
// TestDownloadSharesDialTimeout checks that the Share.Error field of the
// shares returned by the DownloadShares method for nodes that time out on
// dialing contain an error that:
// - has the rpc.Error class
// - is a context.DeadlineExceeded error
// - is not an RPC error
//
// If this test fails, this most probably means we made a backward-incompatible
// change that affects the audit service.
func TestDownloadSharesDialTimeout(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
upl := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := upl.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
path, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, path)
require.NoError(t, err)
randomIndex, err := audit.GetRandomStripe(ctx, pointer)
require.NoError(t, err)
projects, err := satellite.DB.Console().Projects().GetAll(ctx)
require.NoError(t, err)
bucketID := []byte(storj.JoinPaths(projects[0].ID.String(), "testbucket"))
tlsOptions, err := tlsopts.NewOptions(satellite.Identity, tlsopts.Config{}, nil)
require.NoError(t, err)
dialer := rpc.NewDefaultDialer(tlsOptions)
dialer.DialTimeout = 20 * time.Millisecond
dialer.DialLatency = 200 * time.Second
dialer.TransferRate = 1 * memory.KB
// This config value will create a very short timeframe allowed for receiving
// data from storage nodes. This will cause context to cancel with timeout.
minBytesPerSecond := 100 * memory.KiB
verifier := audit.NewVerifier(
satellite.Log.Named("verifier"),
satellite.Metainfo.Service,
dialer,
satellite.Overlay.Service,
satellite.DB.Containment(),
satellite.Orders.Service,
satellite.Identity,
minBytesPerSecond,
5*time.Second)
shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize()
limits, privateKey, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, bucketID, pointer, nil)
require.NoError(t, err)
shares, err := verifier.DownloadShares(ctx, limits, privateKey, randomIndex, shareSize)
require.NoError(t, err)
for _, share := range shares {
assert.True(t, rpc.Error.Has(share.Error), "unexpected error: %+v", share.Error)
assert.True(t, errs.Is(share.Error, context.DeadlineExceeded), "unexpected error: %+v", share.Error)
}
})
}
// TestDownloadSharesDownloadTimeout checks that the Share.Error field of the
// shares returned by the DownloadShares method for nodes that are successfully
// dialed, but time out during the download of the share contain an error that:
// - is an RPC error with code DeadlineExceeded
// - does not have the rpc.Error class
//
// If this test fails, this most probably means we made a backward-incompatible
// change that affects the audit service.
func TestDownloadSharesDownloadTimeout(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
NewStorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
return testblobs.NewSlowDB(log.Named("slowdb"), db), nil
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
storageNodeDB := planet.StorageNodes[0].DB.(*testblobs.SlowDB)
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
upl := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := upl.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
projects, err := satellite.DB.Console().Projects().GetAll(ctx)
require.NoError(t, err)
bucketID := []byte(storj.JoinPaths(projects[0].ID.String(), "testbucket"))
audits.Chore.Loop.TriggerWait()
path, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, path)
require.NoError(t, err)
randomIndex, err := audit.GetRandomStripe(ctx, pointer)
require.NoError(t, err)
// This config value will create a very short timeframe allowed for receiving
// data from storage nodes. This will cause context to cancel with timeout.
minBytesPerSecond := 100 * memory.KiB
verifier := audit.NewVerifier(
satellite.Log.Named("verifier"),
satellite.Metainfo.Service,
satellite.Dialer,
satellite.Overlay.Service,
satellite.DB.Containment(),
satellite.Orders.Service,
satellite.Identity,
minBytesPerSecond,
150*time.Millisecond)
shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize()
limits, privateKey, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, bucketID, pointer, nil)
require.NoError(t, err)
// make downloads on storage node slower than the timeout on the satellite for downloading shares
delay := 200 * time.Millisecond
storageNodeDB.SetLatency(delay)
shares, err := verifier.DownloadShares(ctx, limits, privateKey, randomIndex, shareSize)
require.NoError(t, err)
require.Len(t, shares, 1)
share := shares[0]
assert.True(t, errs2.IsRPC(share.Error, rpcstatus.DeadlineExceeded), "unexpected error: %+v", share.Error)
assert.False(t, rpc.Error.Has(share.Error), "unexpected error: %+v", share.Error)
})
}
func TestVerifierHappyPath(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
path, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, path)
require.NoError(t, err)
report, err := audits.Verifier.Verify(ctx, path, nil)
require.NoError(t, err)
assert.Len(t, report.Successes, len(pointer.GetRemote().GetRemotePieces()))
assert.Len(t, report.Fails, 0)
assert.Len(t, report.Offlines, 0)
assert.Len(t, report.PendingAudits, 0)
})
}
func TestVerifierOfflineNode(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
path, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, path)
require.NoError(t, err)
// stop the first node in the pointer
stoppedNodeID := pointer.GetRemote().GetRemotePieces()[0].NodeId
err = stopStorageNode(ctx, planet, stoppedNodeID)
require.NoError(t, err)
report, err := audits.Verifier.Verify(ctx, path, nil)
require.NoError(t, err)
assert.Len(t, report.Successes, len(pointer.GetRemote().GetRemotePieces())-1)
assert.Len(t, report.Fails, 0)
assert.Len(t, report.Offlines, 1)
assert.Len(t, report.PendingAudits, 0)
})
}
func TestVerifierMissingPiece(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
path, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, path)
require.NoError(t, err)
// delete the piece from the first node
origNumPieces := len(pointer.GetRemote().GetRemotePieces())
piece := pointer.GetRemote().GetRemotePieces()[0]
pieceID := pointer.GetRemote().RootPieceId.Derive(piece.NodeId, piece.PieceNum)
node := getStorageNode(planet, piece.NodeId)
err = node.Storage2.Store.Delete(ctx, satellite.ID(), pieceID)
require.NoError(t, err)
report, err := audits.Verifier.Verify(ctx, path, nil)
require.NoError(t, err)
assert.Len(t, report.Successes, origNumPieces-1)
assert.Len(t, report.Fails, 1)
assert.Len(t, report.Offlines, 0)
assert.Len(t, report.PendingAudits, 0)
})
}
func TestVerifierDialTimeout(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
path, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, path)
require.NoError(t, err)
tlsOptions, err := tlsopts.NewOptions(satellite.Identity, tlsopts.Config{}, nil)
require.NoError(t, err)
dialer := rpc.NewDefaultDialer(tlsOptions)
dialer.DialTimeout = 20 * time.Millisecond
dialer.DialLatency = 200 * time.Second
dialer.TransferRate = 1 * memory.KB
// This config value will create a very short timeframe allowed for receiving
// data from storage nodes. This will cause context to cancel with timeout.
minBytesPerSecond := 100 * memory.KiB
verifier := audit.NewVerifier(
satellite.Log.Named("verifier"),
satellite.Metainfo.Service,
dialer,
satellite.Overlay.Service,
satellite.DB.Containment(),
satellite.Orders.Service,
satellite.Identity,
minBytesPerSecond,
5*time.Second)
report, err := verifier.Verify(ctx, path, nil)
require.True(t, audit.ErrNotEnoughShares.Has(err), "unexpected error: %+v", err)
assert.Len(t, report.Successes, 0)
assert.Len(t, report.Fails, 0)
assert.Len(t, report.Offlines, len(pointer.GetRemote().GetRemotePieces()))
assert.Len(t, report.PendingAudits, 0)
})
}
func TestVerifierDeletedSegment(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
path, err := queue.Next()
require.NoError(t, err)
// delete the file
err = ul.Delete(ctx, satellite, "testbucket", "test/path")
require.NoError(t, err)
report, err := audits.Verifier.Verify(ctx, path, nil)
require.True(t, audit.ErrSegmentDeleted.Has(err))
assert.Empty(t, report)
})
}
func TestVerifierModifiedSegment(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
path, err := queue.Next()
require.NoError(t, err)
audits.Verifier.OnTestingCheckSegmentAlteredHook = func() {
// replace the file so that checkIfSegmentAltered fails
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
}
report, err := audits.Verifier.Verify(ctx, path, nil)
require.True(t, audit.ErrSegmentDeleted.Has(err))
assert.Empty(t, report)
})
}
func TestVerifierModifiedSegmentFailsOnce(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
path, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, path)
require.NoError(t, err)
// delete the piece from the first node
origNumPieces := len(pointer.GetRemote().GetRemotePieces())
piece := pointer.GetRemote().GetRemotePieces()[0]
pieceID := pointer.GetRemote().RootPieceId.Derive(piece.NodeId, piece.PieceNum)
node := getStorageNode(planet, piece.NodeId)
err = node.Storage2.Store.Delete(ctx, satellite.ID(), pieceID)
require.NoError(t, err)
report, err := audits.Verifier.Verify(ctx, path, nil)
require.NoError(t, err)
assert.Len(t, report.Successes, origNumPieces-1)
assert.Len(t, report.Fails, 1)
assert.Equal(t, report.Fails[0], piece.NodeId)
assert.Len(t, report.Offlines, 0)
require.Len(t, report.PendingAudits, 0)
// refetch the pointer
pointerAgain, err := satellite.Metainfo.Service.Get(ctx, path)
require.NoError(t, err)
report, err = audits.Verifier.Verify(ctx, path, nil)
require.NoError(t, err)
//verify no failures because that segment is gone
assert.Len(t, report.Successes, origNumPieces-1)
assert.Len(t, report.Fails, 0)
assert.Len(t, report.Offlines, 0)
require.Len(t, report.PendingAudits, 0)
for _, newPiece := range pointerAgain.GetRemote().GetRemotePieces() {
assert.NotEqual(t, newPiece.NodeId, piece.NodeId)
}
})
}