2019-01-24 20:15:10 +00:00
|
|
|
// Copyright (C) 2019 Storj Labs, Inc.
|
2018-10-09 22:10:37 +01:00
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
2019-06-07 22:02:36 +01:00
|
|
|
package audit_test
|
2018-10-09 22:10:37 +01:00
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2022-03-03 00:23:11 +00:00
|
|
|
"crypto/rand"
|
2023-05-18 15:19:55 +01:00
|
|
|
"errors"
|
2022-03-03 00:23:11 +00:00
|
|
|
"fmt"
|
2023-05-18 15:19:55 +01:00
|
|
|
"io"
|
2023-04-13 17:31:27 +01:00
|
|
|
"net"
|
2023-02-03 21:03:27 +00:00
|
|
|
"os"
|
2023-04-13 17:31:27 +01:00
|
|
|
"syscall"
|
2018-10-09 22:10:37 +01:00
|
|
|
"testing"
|
2019-06-07 22:02:36 +01:00
|
|
|
"time"
|
2018-10-09 22:10:37 +01:00
|
|
|
|
2019-06-07 13:38:41 +01:00
|
|
|
"github.com/stretchr/testify/assert"
|
2019-05-24 17:57:07 +01:00
|
|
|
"github.com/stretchr/testify/require"
|
2019-06-07 22:02:36 +01:00
|
|
|
"github.com/zeebo/errs"
|
2019-07-05 17:04:15 +01:00
|
|
|
"go.uber.org/zap"
|
2023-02-03 21:03:27 +00:00
|
|
|
"golang.org/x/sync/errgroup"
|
2019-06-07 22:02:36 +01:00
|
|
|
|
2019-12-27 11:48:47 +00:00
|
|
|
"storj.io/common/errs2"
|
|
|
|
"storj.io/common/memory"
|
|
|
|
"storj.io/common/peertls/tlsopts"
|
|
|
|
"storj.io/common/rpc"
|
|
|
|
"storj.io/common/rpc/rpcstatus"
|
|
|
|
"storj.io/common/storj"
|
|
|
|
"storj.io/common/testcontext"
|
|
|
|
"storj.io/common/testrand"
|
2022-03-03 00:23:11 +00:00
|
|
|
"storj.io/common/uuid"
|
2019-11-14 19:46:15 +00:00
|
|
|
"storj.io/storj/private/testplanet"
|
2019-11-19 16:30:28 +00:00
|
|
|
"storj.io/storj/satellite"
|
2019-07-28 06:55:36 +01:00
|
|
|
"storj.io/storj/satellite/audit"
|
2021-04-21 13:42:57 +01:00
|
|
|
"storj.io/storj/satellite/metabase"
|
2019-07-05 17:04:15 +01:00
|
|
|
"storj.io/storj/storagenode"
|
2023-04-05 18:03:06 +01:00
|
|
|
"storj.io/storj/storagenode/blobstore"
|
|
|
|
"storj.io/storj/storagenode/blobstore/testblobs"
|
2018-10-09 22:10:37 +01:00
|
|
|
)
|
|
|
|
|
2019-06-07 22:02:36 +01:00
|
|
|
// TestDownloadSharesHappyPath checks that the Share.Error field of all shares
|
|
|
|
// returned by the DownloadShares method contain no error if all shares were
|
|
|
|
// downloaded successfully.
|
|
|
|
func TestDownloadSharesHappyPath(t *testing.T) {
|
2023-04-24 11:07:16 +01:00
|
|
|
testWithRangedLoop(t, testplanet.Config{
|
2019-06-07 22:02:36 +01:00
|
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
2022-12-15 00:37:37 +00:00
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
2019-09-11 23:37:01 +01:00
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
|
|
|
|
audits.Worker.Loop.Pause()
|
2022-12-15 00:37:37 +00:00
|
|
|
pauseQueueing(satellite)
|
2018-10-09 22:10:37 +01:00
|
|
|
|
2019-06-07 22:02:36 +01:00
|
|
|
uplink := planet.Uplinks[0]
|
2019-06-26 11:38:51 +01:00
|
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
2018-10-09 22:10:37 +01:00
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
err := uplink.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
2018-10-09 22:10:37 +01:00
|
|
|
|
2022-12-15 00:37:37 +00:00
|
|
|
err = runQueueingOnce(ctx, satellite)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2022-11-11 23:11:40 +00:00
|
|
|
queue := audits.VerifyQueue
|
|
|
|
queueSegment, err := queue.Next(ctx)
|
2019-09-11 23:37:01 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2021-09-07 09:15:47 +01:00
|
|
|
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
2020-12-14 12:54:22 +00:00
|
|
|
StreamID: queueSegment.StreamID,
|
|
|
|
Position: queueSegment.Position,
|
|
|
|
})
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
2018-10-09 22:10:37 +01:00
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
randomIndex, err := audit.GetRandomStripe(ctx, segment)
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
2018-10-09 22:10:37 +01:00
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
shareSize := segment.Redundancy.ShareSize
|
|
|
|
|
2021-11-08 20:51:04 +00:00
|
|
|
limits, privateKey, cachedNodesInfo, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, segment, nil)
|
2019-09-11 23:37:01 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2021-11-08 20:51:04 +00:00
|
|
|
shares, err := audits.Verifier.DownloadShares(ctx, limits, privateKey, cachedNodesInfo, randomIndex, shareSize)
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
for _, share := range shares {
|
|
|
|
assert.NoError(t, share.Error)
|
2023-04-13 17:31:27 +01:00
|
|
|
assert.Equal(t, audit.NoFailure, share.FailurePhase)
|
2018-11-28 07:33:17 +00:00
|
|
|
}
|
2019-06-07 22:02:36 +01:00
|
|
|
})
|
|
|
|
}
|
2019-05-24 17:57:07 +01:00
|
|
|
|
2019-06-07 22:02:36 +01:00
|
|
|
// TestDownloadSharesOfflineNode checks that the Share.Error field of the
|
|
|
|
// shares returned by the DownloadShares method for offline nodes contain an
|
|
|
|
// error that:
|
2019-09-19 05:46:39 +01:00
|
|
|
// - has the rpc.Error class
|
2019-06-07 22:02:36 +01:00
|
|
|
// - is not a context.DeadlineExceeded error
|
|
|
|
// - is not an RPC error
|
|
|
|
//
|
|
|
|
// If this test fails, this most probably means we made a backward-incompatible
|
|
|
|
// change that affects the audit service.
|
|
|
|
func TestDownloadSharesOfflineNode(t *testing.T) {
|
2023-04-24 11:07:16 +01:00
|
|
|
testWithRangedLoop(t, testplanet.Config{
|
2019-06-07 22:02:36 +01:00
|
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
2022-12-15 00:37:37 +00:00
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
2019-09-11 23:37:01 +01:00
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
|
|
|
|
audits.Worker.Loop.Pause()
|
2022-12-15 00:37:37 +00:00
|
|
|
pauseQueueing(satellite)
|
2019-06-07 22:02:36 +01:00
|
|
|
|
|
|
|
uplink := planet.Uplinks[0]
|
2019-06-26 11:38:51 +01:00
|
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
2019-06-07 22:02:36 +01:00
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
err := uplink.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2022-12-15 00:37:37 +00:00
|
|
|
err = runQueueingOnce(ctx, satellite)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2022-11-11 23:11:40 +00:00
|
|
|
queue := audits.VerifyQueue
|
|
|
|
queueSegment, err := queue.Next(ctx)
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
2019-05-24 17:57:07 +01:00
|
|
|
|
2021-09-07 09:15:47 +01:00
|
|
|
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
2020-12-14 12:54:22 +00:00
|
|
|
StreamID: queueSegment.StreamID,
|
|
|
|
Position: queueSegment.Position,
|
|
|
|
})
|
2019-09-11 23:37:01 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
randomIndex, err := audit.GetRandomStripe(ctx, segment)
|
2019-09-11 23:37:01 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
shareSize := segment.Redundancy.ShareSize
|
|
|
|
|
2021-11-08 20:51:04 +00:00
|
|
|
limits, privateKey, cachedNodesInfo, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, segment, nil)
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
// stop the first node in the segment
|
|
|
|
stoppedNodeID := segment.Pieces[0].StorageNode
|
2020-05-07 09:23:40 +01:00
|
|
|
err = planet.StopNodeAndUpdate(ctx, planet.FindNode(stoppedNodeID))
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2021-11-08 20:51:04 +00:00
|
|
|
shares, err := audits.Verifier.DownloadShares(ctx, limits, privateKey, cachedNodesInfo, randomIndex, shareSize)
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2019-06-11 09:00:59 +01:00
|
|
|
for _, share := range shares {
|
|
|
|
if share.NodeID == stoppedNodeID {
|
2019-09-19 05:46:39 +01:00
|
|
|
assert.True(t, rpc.Error.Has(share.Error), "unexpected error: %+v", share.Error)
|
2019-06-26 08:38:07 +01:00
|
|
|
assert.False(t, errs.Is(share.Error, context.DeadlineExceeded), "unexpected error: %+v", share.Error)
|
2019-09-19 05:46:39 +01:00
|
|
|
assert.True(t, errs2.IsRPC(share.Error, rpcstatus.Unknown), "unexpected error: %+v", share.Error)
|
2023-04-13 17:31:27 +01:00
|
|
|
assert.Equal(t, audit.DialFailure, share.FailurePhase)
|
2019-06-07 22:02:36 +01:00
|
|
|
} else {
|
|
|
|
assert.NoError(t, share.Error)
|
2023-04-13 17:31:27 +01:00
|
|
|
assert.Equal(t, audit.NoFailure, share.FailurePhase)
|
2019-06-07 22:02:36 +01:00
|
|
|
}
|
2018-10-09 22:10:37 +01:00
|
|
|
}
|
2019-06-07 22:02:36 +01:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
// TestDownloadSharesMissingPiece checks that the Share.Error field of the
|
|
|
|
// shares returned by the DownloadShares method for nodes that don't have the
|
|
|
|
// audited piece contain an RPC error with code NotFound.
|
|
|
|
//
|
|
|
|
// If this test fails, this most probably means we made a backward-incompatible
|
|
|
|
// change that affects the audit service.
|
|
|
|
func TestDownloadSharesMissingPiece(t *testing.T) {
|
2023-04-24 11:07:16 +01:00
|
|
|
testWithRangedLoop(t, testplanet.Config{
|
2019-06-07 22:02:36 +01:00
|
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
2022-12-15 00:37:37 +00:00
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
2019-09-11 23:37:01 +01:00
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
|
|
|
|
audits.Worker.Loop.Pause()
|
2022-12-15 00:37:37 +00:00
|
|
|
pauseQueueing(satellite)
|
2019-06-07 22:02:36 +01:00
|
|
|
|
|
|
|
uplink := planet.Uplinks[0]
|
2019-06-26 11:38:51 +01:00
|
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
2019-06-07 22:02:36 +01:00
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
err := uplink.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2022-12-15 00:37:37 +00:00
|
|
|
err = runQueueingOnce(ctx, satellite)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2022-11-11 23:11:40 +00:00
|
|
|
queue := audits.VerifyQueue
|
|
|
|
queueSegment, err := queue.Next(ctx)
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2021-09-07 09:15:47 +01:00
|
|
|
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
2020-12-14 12:54:22 +00:00
|
|
|
StreamID: queueSegment.StreamID,
|
|
|
|
Position: queueSegment.Position,
|
|
|
|
})
|
2019-09-11 23:37:01 +01:00
|
|
|
require.NoError(t, err)
|
2019-06-07 22:02:36 +01:00
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
randomIndex, err := audit.GetRandomStripe(ctx, segment)
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
// replace the piece id of the selected stripe with a new random one
|
|
|
|
// to simulate missing piece on the storage nodes
|
2020-12-14 12:54:22 +00:00
|
|
|
segment.RootPieceID = storj.NewPieceID()
|
2019-05-24 17:57:07 +01:00
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
shareSize := segment.Redundancy.ShareSize
|
|
|
|
|
2021-11-08 20:51:04 +00:00
|
|
|
limits, privateKey, cachedNodesInfo, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, segment, nil)
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2021-11-08 20:51:04 +00:00
|
|
|
shares, err := audits.Verifier.DownloadShares(ctx, limits, privateKey, cachedNodesInfo, randomIndex, shareSize)
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
for _, share := range shares {
|
2019-09-19 05:46:39 +01:00
|
|
|
assert.True(t, errs2.IsRPC(share.Error, rpcstatus.NotFound), "unexpected error: %+v", share.Error)
|
2023-04-13 17:31:27 +01:00
|
|
|
assert.Equal(t, audit.RequestFailure, share.FailurePhase)
|
2019-06-07 22:02:36 +01:00
|
|
|
}
|
|
|
|
})
|
2018-10-09 22:10:37 +01:00
|
|
|
}
|
|
|
|
|
2019-06-07 22:02:36 +01:00
|
|
|
// TestDownloadSharesDialTimeout checks that the Share.Error field of the
|
|
|
|
// shares returned by the DownloadShares method for nodes that time out on
|
|
|
|
// dialing contain an error that:
|
2019-09-19 05:46:39 +01:00
|
|
|
// - has the rpc.Error class
|
2019-06-07 22:02:36 +01:00
|
|
|
// - is a context.DeadlineExceeded error
|
|
|
|
// - is not an RPC error
|
|
|
|
//
|
|
|
|
// If this test fails, this most probably means we made a backward-incompatible
|
|
|
|
// change that affects the audit service.
|
|
|
|
func TestDownloadSharesDialTimeout(t *testing.T) {
|
2023-04-24 11:07:16 +01:00
|
|
|
testWithRangedLoop(t, testplanet.Config{
|
2019-06-07 22:02:36 +01:00
|
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
2022-12-15 00:37:37 +00:00
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
2019-09-11 23:37:01 +01:00
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
|
|
|
|
audits.Worker.Loop.Pause()
|
2022-12-15 00:37:37 +00:00
|
|
|
pauseQueueing(satellite)
|
2018-10-09 22:10:37 +01:00
|
|
|
|
2019-06-07 22:02:36 +01:00
|
|
|
upl := planet.Uplinks[0]
|
2019-06-26 11:38:51 +01:00
|
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
2018-10-09 22:10:37 +01:00
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
err := upl.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2022-12-15 00:37:37 +00:00
|
|
|
err = runQueueingOnce(ctx, satellite)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2022-11-11 23:11:40 +00:00
|
|
|
queue := audits.VerifyQueue
|
|
|
|
queueSegment, err := queue.Next(ctx)
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2021-09-07 09:15:47 +01:00
|
|
|
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
2020-12-14 12:54:22 +00:00
|
|
|
StreamID: queueSegment.StreamID,
|
|
|
|
Position: queueSegment.Position,
|
|
|
|
})
|
2019-09-11 23:37:01 +01:00
|
|
|
require.NoError(t, err)
|
2018-10-09 22:10:37 +01:00
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
randomIndex, err := audit.GetRandomStripe(ctx, segment)
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2019-09-19 05:46:39 +01:00
|
|
|
tlsOptions, err := tlsopts.NewOptions(satellite.Identity, tlsopts.Config{}, nil)
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2019-09-19 05:46:39 +01:00
|
|
|
dialer := rpc.NewDefaultDialer(tlsOptions)
|
|
|
|
dialer.DialTimeout = 20 * time.Millisecond
|
|
|
|
dialer.DialLatency = 200 * time.Second
|
2020-09-30 18:39:47 +01:00
|
|
|
|
2022-02-16 15:56:01 +00:00
|
|
|
connector := rpc.NewHybridConnector()
|
|
|
|
connector.SetTransferRate(1 * memory.KB)
|
2020-09-30 18:39:47 +01:00
|
|
|
dialer.Connector = connector
|
2019-06-07 22:02:36 +01:00
|
|
|
|
|
|
|
// This config value will create a very short timeframe allowed for receiving
|
2019-07-05 17:04:15 +01:00
|
|
|
// data from storage nodes. This will cause context to cancel with timeout.
|
2019-06-07 22:02:36 +01:00
|
|
|
minBytesPerSecond := 100 * memory.KiB
|
|
|
|
|
2019-07-01 15:02:00 +01:00
|
|
|
verifier := audit.NewVerifier(
|
2019-09-11 23:37:01 +01:00
|
|
|
satellite.Log.Named("verifier"),
|
2021-09-07 09:15:47 +01:00
|
|
|
satellite.Metabase.DB,
|
2019-09-19 05:46:39 +01:00
|
|
|
dialer,
|
2019-09-11 23:37:01 +01:00
|
|
|
satellite.Overlay.Service,
|
2022-11-23 15:24:30 +00:00
|
|
|
satellite.DB.Containment(),
|
2019-09-11 23:37:01 +01:00
|
|
|
satellite.Orders.Service,
|
|
|
|
satellite.Identity,
|
2019-06-07 22:02:36 +01:00
|
|
|
minBytesPerSecond,
|
|
|
|
5*time.Second)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
shareSize := segment.Redundancy.ShareSize
|
|
|
|
|
2021-11-08 20:51:04 +00:00
|
|
|
limits, privateKey, cachedNodesInfo, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, segment, nil)
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2021-11-08 20:51:04 +00:00
|
|
|
shares, err := verifier.DownloadShares(ctx, limits, privateKey, cachedNodesInfo, randomIndex, shareSize)
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
for _, share := range shares {
|
2019-09-19 05:46:39 +01:00
|
|
|
assert.True(t, rpc.Error.Has(share.Error), "unexpected error: %+v", share.Error)
|
2019-06-26 08:38:07 +01:00
|
|
|
assert.True(t, errs.Is(share.Error, context.DeadlineExceeded), "unexpected error: %+v", share.Error)
|
2023-04-13 17:31:27 +01:00
|
|
|
assert.Equal(t, audit.DialFailure, share.FailurePhase)
|
2019-06-07 22:02:36 +01:00
|
|
|
}
|
|
|
|
})
|
2018-10-09 22:10:37 +01:00
|
|
|
}
|
2019-06-07 13:38:41 +01:00
|
|
|
|
2023-04-13 17:31:27 +01:00
|
|
|
// TestDownloadSharesDialIOTimeout checks that i/o timeout dial failures are
|
|
|
|
// handled appropriately.
|
|
|
|
//
|
|
|
|
// This test differs from TestDownloadSharesDialTimeout in that it causes the
|
|
|
|
// timeout error by replacing a storage node with a black hole TCP socket,
|
|
|
|
// causing the failure directly instead of faking it with dialer.DialLatency.
|
|
|
|
func TestDownloadSharesDialIOTimeout(t *testing.T) {
|
|
|
|
var group errgroup.Group
|
|
|
|
// we do this shutdown outside the testplanet scope, so that we can expect
|
|
|
|
// that planet has been shut down before waiting for the black hole goroutines
|
|
|
|
// to finish. (They won't finish until the remote end is closed, which happens
|
|
|
|
// during planet shutdown.)
|
|
|
|
defer func() { assert.NoError(t, group.Wait()) }()
|
|
|
|
|
|
|
|
testWithRangedLoop(t, testplanet.Config{
|
|
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
|
|
|
Reconfigure: testplanet.Reconfigure{
|
|
|
|
// require all nodes for each operation
|
|
|
|
Satellite: testplanet.ReconfigureRS(4, 4, 4, 4),
|
|
|
|
},
|
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
|
|
|
|
audits.Worker.Loop.Pause()
|
|
|
|
pauseQueueing(satellite)
|
|
|
|
|
|
|
|
upl := planet.Uplinks[0]
|
|
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
|
|
|
|
|
|
|
err := upl.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
err = runQueueingOnce(ctx, satellite)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
queue := audits.VerifyQueue
|
|
|
|
queueSegment, err := queue.Next(ctx)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
|
|
|
StreamID: queueSegment.StreamID,
|
|
|
|
Position: queueSegment.Position,
|
|
|
|
})
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
blackHoleNode := planet.StorageNodes[testrand.Intn(len(planet.StorageNodes))]
|
|
|
|
require.NoError(t, planet.StopPeer(blackHoleNode))
|
|
|
|
|
|
|
|
// create a black hole in place of the storage node: a socket that only reads
|
|
|
|
// bytes and never says anything back. A connection to here using a bare TCP Dial
|
|
|
|
// would succeed, but a TLS Dial will not be able to handshake and will time out
|
|
|
|
// or wait forever.
|
|
|
|
listener, err := net.Listen("tcp", blackHoleNode.Addr())
|
|
|
|
require.NoError(t, err)
|
|
|
|
defer func() { assert.NoError(t, listener.Close()) }()
|
|
|
|
t.Logf("black hole listening on %s", listener.Addr())
|
|
|
|
|
|
|
|
group.Go(func() error {
|
|
|
|
for {
|
|
|
|
conn, err := listener.Accept()
|
|
|
|
if err != nil {
|
|
|
|
// this is terrible, but is apparently the standard and correct way to check
|
|
|
|
// for this specific error. See parseCloseError() in net/error_test.go in the
|
|
|
|
// Go stdlib.
|
|
|
|
assert.ErrorContains(t, err, "use of closed network connection")
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
t.Logf("connection made to black hole port %s", listener.Addr())
|
|
|
|
group.Go(func() (err error) {
|
|
|
|
defer func() { assert.NoError(t, conn.Close()) }()
|
|
|
|
|
|
|
|
// black hole: just read until the socket is closed on the other end
|
|
|
|
buf := make([]byte, 1024)
|
|
|
|
for {
|
|
|
|
_, err = conn.Read(buf)
|
|
|
|
if err != nil {
|
2023-05-18 15:19:55 +01:00
|
|
|
if !errors.Is(err, syscall.ECONNRESET) && !errors.Is(err, io.EOF) {
|
|
|
|
t.Fatalf("expected econnreset or eof, got %q", err.Error())
|
|
|
|
}
|
2023-04-13 17:31:27 +01:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
})
|
|
|
|
|
|
|
|
randomIndex, err := audit.GetRandomStripe(ctx, segment)
|
|
|
|
require.NoError(t, err)
|
|
|
|
shareSize := segment.Redundancy.ShareSize
|
|
|
|
|
|
|
|
limits, privateKey, cachedNodesInfo, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, segment, nil)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
verifier := satellite.Audit.Verifier
|
|
|
|
shares, err := verifier.DownloadShares(ctx, limits, privateKey, cachedNodesInfo, randomIndex, shareSize)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
observed := false
|
|
|
|
for _, share := range shares {
|
|
|
|
if share.NodeID.Compare(blackHoleNode.ID()) == 0 {
|
|
|
|
assert.ErrorIs(t, share.Error, context.DeadlineExceeded)
|
|
|
|
assert.Equal(t, audit.DialFailure, share.FailurePhase)
|
|
|
|
observed = true
|
|
|
|
} else {
|
|
|
|
assert.NoError(t, share.Error)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
assert.Truef(t, observed, "No node in returned shares matched expected node ID")
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2019-06-07 22:02:36 +01:00
|
|
|
// TestDownloadSharesDownloadTimeout checks that the Share.Error field of the
|
|
|
|
// shares returned by the DownloadShares method for nodes that are successfully
|
|
|
|
// dialed, but time out during the download of the share contain an error that:
|
|
|
|
// - is an RPC error with code DeadlineExceeded
|
2019-09-19 05:46:39 +01:00
|
|
|
// - does not have the rpc.Error class
|
2019-06-07 22:02:36 +01:00
|
|
|
//
|
|
|
|
// If this test fails, this most probably means we made a backward-incompatible
|
|
|
|
// change that affects the audit service.
|
|
|
|
func TestDownloadSharesDownloadTimeout(t *testing.T) {
|
2023-04-24 11:07:16 +01:00
|
|
|
testWithRangedLoop(t, testplanet.Config{
|
2019-07-05 17:04:15 +01:00
|
|
|
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
|
|
|
|
Reconfigure: testplanet.Reconfigure{
|
2020-03-27 16:18:19 +00:00
|
|
|
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
|
2019-07-05 17:04:15 +01:00
|
|
|
return testblobs.NewSlowDB(log.Named("slowdb"), db), nil
|
|
|
|
},
|
|
|
|
},
|
2022-12-15 00:37:37 +00:00
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
2019-07-05 17:04:15 +01:00
|
|
|
storageNodeDB := planet.StorageNodes[0].DB.(*testblobs.SlowDB)
|
2019-09-11 23:37:01 +01:00
|
|
|
|
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
|
|
|
|
audits.Worker.Loop.Pause()
|
2022-12-15 00:37:37 +00:00
|
|
|
pauseQueueing(satellite)
|
2019-06-07 22:02:36 +01:00
|
|
|
|
|
|
|
upl := planet.Uplinks[0]
|
2019-07-05 17:04:15 +01:00
|
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
2019-06-07 22:02:36 +01:00
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
err := upl.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2022-12-15 00:37:37 +00:00
|
|
|
err = runQueueingOnce(ctx, satellite)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2022-11-11 23:11:40 +00:00
|
|
|
queue := audits.VerifyQueue
|
|
|
|
queueSegment, err := queue.Next(ctx)
|
2019-09-11 23:37:01 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2021-09-07 09:15:47 +01:00
|
|
|
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
2020-12-14 12:54:22 +00:00
|
|
|
StreamID: queueSegment.StreamID,
|
|
|
|
Position: queueSegment.Position,
|
|
|
|
})
|
2019-09-11 23:37:01 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
randomIndex, err := audit.GetRandomStripe(ctx, segment)
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
// This config value will create a very short timeframe allowed for receiving
|
2019-07-05 17:04:15 +01:00
|
|
|
// data from storage nodes. This will cause context to cancel with timeout.
|
|
|
|
minBytesPerSecond := 100 * memory.KiB
|
2019-06-07 22:02:36 +01:00
|
|
|
|
2019-07-01 15:02:00 +01:00
|
|
|
verifier := audit.NewVerifier(
|
2019-09-11 23:37:01 +01:00
|
|
|
satellite.Log.Named("verifier"),
|
2021-09-07 09:15:47 +01:00
|
|
|
satellite.Metabase.DB,
|
2019-09-19 05:46:39 +01:00
|
|
|
satellite.Dialer,
|
2019-09-11 23:37:01 +01:00
|
|
|
satellite.Overlay.Service,
|
2022-11-23 15:24:30 +00:00
|
|
|
satellite.DB.Containment(),
|
2019-09-11 23:37:01 +01:00
|
|
|
satellite.Orders.Service,
|
|
|
|
satellite.Identity,
|
2019-06-07 22:02:36 +01:00
|
|
|
minBytesPerSecond,
|
2019-07-10 22:45:09 +01:00
|
|
|
150*time.Millisecond)
|
2019-06-07 22:02:36 +01:00
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
shareSize := segment.Redundancy.ShareSize
|
|
|
|
|
2021-11-08 20:51:04 +00:00
|
|
|
limits, privateKey, cachedNodesInfo, err := satellite.Orders.Service.CreateAuditOrderLimits(ctx, segment, nil)
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2019-07-10 22:45:09 +01:00
|
|
|
// make downloads on storage node slower than the timeout on the satellite for downloading shares
|
|
|
|
delay := 200 * time.Millisecond
|
2019-07-05 17:04:15 +01:00
|
|
|
storageNodeDB.SetLatency(delay)
|
|
|
|
|
2021-11-08 20:51:04 +00:00
|
|
|
shares, err := verifier.DownloadShares(ctx, limits, privateKey, cachedNodesInfo, randomIndex, shareSize)
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2019-07-05 17:04:15 +01:00
|
|
|
require.Len(t, shares, 1)
|
|
|
|
share := shares[0]
|
2019-09-19 05:46:39 +01:00
|
|
|
assert.True(t, errs2.IsRPC(share.Error, rpcstatus.DeadlineExceeded), "unexpected error: %+v", share.Error)
|
2023-04-13 17:31:27 +01:00
|
|
|
assert.Equal(t, audit.RequestFailure, share.FailurePhase)
|
2019-09-19 05:46:39 +01:00
|
|
|
assert.False(t, rpc.Error.Has(share.Error), "unexpected error: %+v", share.Error)
|
2019-06-07 22:02:36 +01:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestVerifierHappyPath(t *testing.T) {
|
2023-04-24 11:07:16 +01:00
|
|
|
testWithRangedLoop(t, testplanet.Config{
|
2019-06-07 22:02:36 +01:00
|
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
2022-12-15 00:37:37 +00:00
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
2019-09-11 23:37:01 +01:00
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
|
|
|
|
audits.Worker.Loop.Pause()
|
2022-12-15 00:37:37 +00:00
|
|
|
pauseQueueing(satellite)
|
2019-06-07 22:02:36 +01:00
|
|
|
|
|
|
|
ul := planet.Uplinks[0]
|
2019-06-26 11:38:51 +01:00
|
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
2019-06-07 22:02:36 +01:00
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2022-12-15 00:37:37 +00:00
|
|
|
err = runQueueingOnce(ctx, satellite)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2022-11-11 23:11:40 +00:00
|
|
|
queue := audits.VerifyQueue
|
|
|
|
queueSegment, err := queue.Next(ctx)
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2021-09-07 09:15:47 +01:00
|
|
|
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
2020-12-14 12:54:22 +00:00
|
|
|
StreamID: queueSegment.StreamID,
|
|
|
|
Position: queueSegment.Position,
|
|
|
|
})
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
report, err := audits.Verifier.Verify(ctx, queueSegment, nil)
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
assert.Len(t, report.Successes, len(segment.Pieces))
|
2019-06-07 22:02:36 +01:00
|
|
|
assert.Len(t, report.Fails, 0)
|
2019-11-05 19:41:48 +00:00
|
|
|
assert.Len(t, report.Offlines, 0)
|
2022-11-23 15:24:30 +00:00
|
|
|
assert.Len(t, report.PendingAudits, 0)
|
2019-11-05 19:41:48 +00:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestVerifierExpired(t *testing.T) {
|
2023-04-24 11:07:16 +01:00
|
|
|
testWithRangedLoop(t, testplanet.Config{
|
2019-11-05 19:41:48 +00:00
|
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
2022-12-15 00:37:37 +00:00
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
2019-11-05 19:41:48 +00:00
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
|
|
|
|
audits.Worker.Loop.Pause()
|
2022-12-15 00:37:37 +00:00
|
|
|
pauseQueueing(satellite)
|
2019-11-05 19:41:48 +00:00
|
|
|
|
|
|
|
ul := planet.Uplinks[0]
|
|
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
err := ul.UploadWithExpiration(ctx, satellite, "testbucket", "test/path", testData, time.Now().Add(1*time.Hour))
|
2019-11-05 19:41:48 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2022-12-15 00:37:37 +00:00
|
|
|
err = runQueueingOnce(ctx, satellite)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2022-11-11 23:11:40 +00:00
|
|
|
queue := audits.VerifyQueue
|
|
|
|
queueSegment, err := queue.Next(ctx)
|
2019-11-05 19:41:48 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
// move time into the future so the segment is expired
|
|
|
|
audits.Verifier.SetNow(func() time.Time {
|
|
|
|
return time.Now().Add(2 * time.Hour)
|
|
|
|
})
|
2019-11-05 19:41:48 +00:00
|
|
|
|
2020-08-25 14:32:05 +01:00
|
|
|
// Verify should not return an error
|
2020-12-14 12:54:22 +00:00
|
|
|
report, err := audits.Verifier.Verify(ctx, queueSegment, nil)
|
2020-08-25 14:32:05 +01:00
|
|
|
require.NoError(t, err)
|
2019-11-05 19:41:48 +00:00
|
|
|
|
|
|
|
assert.Len(t, report.Successes, 0)
|
|
|
|
assert.Len(t, report.Fails, 0)
|
2019-06-07 22:02:36 +01:00
|
|
|
assert.Len(t, report.Offlines, 0)
|
2022-11-23 15:24:30 +00:00
|
|
|
assert.Len(t, report.PendingAudits, 0)
|
2019-06-07 22:02:36 +01:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestVerifierOfflineNode(t *testing.T) {
|
2023-04-24 11:07:16 +01:00
|
|
|
testWithRangedLoop(t, testplanet.Config{
|
2019-06-07 22:02:36 +01:00
|
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
2022-12-15 00:37:37 +00:00
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
2019-07-01 15:02:00 +01:00
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
|
|
|
|
audits.Worker.Loop.Pause()
|
2022-12-15 00:37:37 +00:00
|
|
|
pauseQueueing(satellite)
|
2019-06-07 22:02:36 +01:00
|
|
|
|
|
|
|
ul := planet.Uplinks[0]
|
2019-06-26 11:38:51 +01:00
|
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
2019-06-07 22:02:36 +01:00
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2022-12-15 00:37:37 +00:00
|
|
|
err = runQueueingOnce(ctx, satellite)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2022-11-11 23:11:40 +00:00
|
|
|
queue := audits.VerifyQueue
|
|
|
|
queueSegment, err := queue.Next(ctx)
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2021-09-07 09:15:47 +01:00
|
|
|
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
2020-12-14 12:54:22 +00:00
|
|
|
StreamID: queueSegment.StreamID,
|
|
|
|
Position: queueSegment.Position,
|
|
|
|
})
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
// stop the first node in the segment
|
|
|
|
stoppedNodeID := segment.Pieces[0].StorageNode
|
2020-05-07 09:23:40 +01:00
|
|
|
err = planet.StopNodeAndUpdate(ctx, planet.FindNode(stoppedNodeID))
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
report, err := audits.Verifier.Verify(ctx, queueSegment, nil)
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
assert.Len(t, report.Successes, len(segment.Pieces)-1)
|
2019-06-07 22:02:36 +01:00
|
|
|
assert.Len(t, report.Fails, 0)
|
|
|
|
assert.Len(t, report.Offlines, 1)
|
2022-11-23 15:24:30 +00:00
|
|
|
assert.Len(t, report.PendingAudits, 0)
|
2019-06-07 22:02:36 +01:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestVerifierMissingPiece(t *testing.T) {
|
2023-04-24 11:07:16 +01:00
|
|
|
testWithRangedLoop(t, testplanet.Config{
|
2019-06-07 22:02:36 +01:00
|
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
2022-12-15 00:37:37 +00:00
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
2019-09-11 23:37:01 +01:00
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
|
|
|
|
audits.Worker.Loop.Pause()
|
2022-12-15 00:37:37 +00:00
|
|
|
pauseQueueing(satellite)
|
2019-06-07 22:02:36 +01:00
|
|
|
|
|
|
|
ul := planet.Uplinks[0]
|
2019-06-26 11:38:51 +01:00
|
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
2019-06-07 22:02:36 +01:00
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2022-12-15 00:37:37 +00:00
|
|
|
err = runQueueingOnce(ctx, satellite)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2022-11-11 23:11:40 +00:00
|
|
|
queue := audits.VerifyQueue
|
|
|
|
queueSegment, err := queue.Next(ctx)
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2021-09-07 09:15:47 +01:00
|
|
|
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
2020-12-14 12:54:22 +00:00
|
|
|
StreamID: queueSegment.StreamID,
|
|
|
|
Position: queueSegment.Position,
|
|
|
|
})
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
// delete the piece from the first node
|
2020-12-14 12:54:22 +00:00
|
|
|
origNumPieces := len(segment.Pieces)
|
|
|
|
piece := segment.Pieces[0]
|
|
|
|
pieceID := segment.RootPieceID.Derive(piece.StorageNode, int32(piece.Number))
|
|
|
|
node := planet.FindNode(piece.StorageNode)
|
2019-09-11 23:37:01 +01:00
|
|
|
err = node.Storage2.Store.Delete(ctx, satellite.ID(), pieceID)
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
report, err := audits.Verifier.Verify(ctx, queueSegment, nil)
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2019-07-18 19:08:15 +01:00
|
|
|
assert.Len(t, report.Successes, origNumPieces-1)
|
2019-06-07 22:02:36 +01:00
|
|
|
assert.Len(t, report.Fails, 1)
|
|
|
|
assert.Len(t, report.Offlines, 0)
|
2022-11-23 15:24:30 +00:00
|
|
|
assert.Len(t, report.PendingAudits, 0)
|
2019-06-07 22:02:36 +01:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2021-07-19 17:32:47 +01:00
|
|
|
func TestVerifierNotEnoughPieces(t *testing.T) {
|
2023-04-24 11:07:16 +01:00
|
|
|
testWithRangedLoop(t, testplanet.Config{
|
2021-07-19 17:32:47 +01:00
|
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
|
|
|
Reconfigure: testplanet.Reconfigure{
|
|
|
|
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
|
|
|
|
return testblobs.NewBadDB(log.Named("baddb"), db), nil
|
|
|
|
},
|
|
|
|
Satellite: testplanet.ReconfigureRS(2, 2, 4, 4),
|
|
|
|
},
|
2022-12-15 00:37:37 +00:00
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
2021-07-19 17:32:47 +01:00
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
|
|
|
|
audits.Worker.Loop.Pause()
|
2022-12-15 00:37:37 +00:00
|
|
|
pauseQueueing(satellite)
|
2021-07-19 17:32:47 +01:00
|
|
|
|
|
|
|
ul := planet.Uplinks[0]
|
|
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
|
|
|
|
|
|
|
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2022-12-15 00:37:37 +00:00
|
|
|
err = runQueueingOnce(ctx, satellite)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2022-11-11 23:11:40 +00:00
|
|
|
queue := audits.VerifyQueue
|
|
|
|
queueSegment, err := queue.Next(ctx)
|
2021-07-19 17:32:47 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2021-09-07 09:15:47 +01:00
|
|
|
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
2021-07-19 17:32:47 +01:00
|
|
|
StreamID: queueSegment.StreamID,
|
|
|
|
Position: queueSegment.Position,
|
|
|
|
})
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
// out of 4 nodes, leave one intact
|
|
|
|
// make one to be offline.
|
|
|
|
// make one to return `unknown error` when respond to `GET_AUDIT/GET` request.
|
|
|
|
// delete the piece from one node which would cause audit failure
|
|
|
|
unknownErrorNode := planet.FindNode(segment.Pieces[0].StorageNode)
|
|
|
|
offlineNode := planet.FindNode(segment.Pieces[1].StorageNode)
|
|
|
|
deletedPieceNode := planet.FindNode(segment.Pieces[2].StorageNode)
|
|
|
|
deletedPieceNum := int32(segment.Pieces[2].Number)
|
|
|
|
|
|
|
|
// return an error when the verifier attempts to download from this node
|
|
|
|
unknownErrorDB := unknownErrorNode.DB.(*testblobs.BadDB)
|
|
|
|
unknownErrorDB.SetError(errs.New("unknown error"))
|
|
|
|
|
|
|
|
// stop the offline node
|
|
|
|
err = planet.StopNodeAndUpdate(ctx, offlineNode)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
// delete piece from deletedPieceNode
|
|
|
|
pieceID := segment.RootPieceID.Derive(deletedPieceNode.ID(), deletedPieceNum)
|
|
|
|
err = deletedPieceNode.Storage2.Store.Delete(ctx, satellite.ID(), pieceID)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
report, err := audits.Verifier.Verify(ctx, queueSegment, nil)
|
|
|
|
require.True(t, audit.ErrNotEnoughShares.Has(err))
|
|
|
|
|
|
|
|
// without enough pieces to complete the audit,
|
|
|
|
// offlines and unknowns should be marked, but
|
|
|
|
// failures should not
|
|
|
|
assert.Len(t, report.Fails, 0)
|
|
|
|
assert.Len(t, report.Offlines, 1)
|
|
|
|
assert.Len(t, report.Unknown, 1)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2019-06-07 22:02:36 +01:00
|
|
|
func TestVerifierDialTimeout(t *testing.T) {
|
2023-04-24 11:07:16 +01:00
|
|
|
testWithRangedLoop(t, testplanet.Config{
|
2019-06-07 22:02:36 +01:00
|
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
2022-12-15 00:37:37 +00:00
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
2019-09-11 23:37:01 +01:00
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
|
|
|
|
audits.Worker.Loop.Pause()
|
2022-12-15 00:37:37 +00:00
|
|
|
pauseQueueing(satellite)
|
2019-06-07 22:02:36 +01:00
|
|
|
|
|
|
|
ul := planet.Uplinks[0]
|
2019-06-26 11:38:51 +01:00
|
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
2019-06-07 22:02:36 +01:00
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2022-12-15 00:37:37 +00:00
|
|
|
err = runQueueingOnce(ctx, satellite)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2022-11-11 23:11:40 +00:00
|
|
|
queue := audits.VerifyQueue
|
|
|
|
queueSegment, err := queue.Next(ctx)
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2021-09-07 09:15:47 +01:00
|
|
|
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
2020-12-14 12:54:22 +00:00
|
|
|
StreamID: queueSegment.StreamID,
|
|
|
|
Position: queueSegment.Position,
|
|
|
|
})
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2019-09-19 05:46:39 +01:00
|
|
|
tlsOptions, err := tlsopts.NewOptions(satellite.Identity, tlsopts.Config{}, nil)
|
2019-06-07 22:02:36 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2019-09-19 05:46:39 +01:00
|
|
|
dialer := rpc.NewDefaultDialer(tlsOptions)
|
|
|
|
dialer.DialTimeout = 20 * time.Millisecond
|
|
|
|
dialer.DialLatency = 200 * time.Second
|
2020-09-30 18:39:47 +01:00
|
|
|
|
2022-02-16 15:56:01 +00:00
|
|
|
connector := rpc.NewHybridConnector()
|
|
|
|
connector.SetTransferRate(1 * memory.KB)
|
2020-09-30 18:39:47 +01:00
|
|
|
dialer.Connector = connector
|
2019-06-07 22:02:36 +01:00
|
|
|
|
|
|
|
// This config value will create a very short timeframe allowed for receiving
|
2019-07-05 17:04:15 +01:00
|
|
|
// data from storage nodes. This will cause context to cancel with timeout.
|
2019-06-07 22:02:36 +01:00
|
|
|
minBytesPerSecond := 100 * memory.KiB
|
|
|
|
|
2019-07-01 15:02:00 +01:00
|
|
|
verifier := audit.NewVerifier(
|
2019-09-11 23:37:01 +01:00
|
|
|
satellite.Log.Named("verifier"),
|
2021-09-07 09:15:47 +01:00
|
|
|
satellite.Metabase.DB,
|
2019-09-19 05:46:39 +01:00
|
|
|
dialer,
|
2019-09-11 23:37:01 +01:00
|
|
|
satellite.Overlay.Service,
|
2022-11-23 15:24:30 +00:00
|
|
|
satellite.DB.Containment(),
|
2019-09-11 23:37:01 +01:00
|
|
|
satellite.Orders.Service,
|
|
|
|
satellite.Identity,
|
2019-06-07 22:02:36 +01:00
|
|
|
minBytesPerSecond,
|
|
|
|
5*time.Second)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
report, err := verifier.Verify(ctx, queueSegment, nil)
|
2019-06-07 22:02:36 +01:00
|
|
|
require.True(t, audit.ErrNotEnoughShares.Has(err), "unexpected error: %+v", err)
|
|
|
|
|
|
|
|
assert.Len(t, report.Successes, 0)
|
|
|
|
assert.Len(t, report.Fails, 0)
|
2020-12-14 12:54:22 +00:00
|
|
|
assert.Len(t, report.Offlines, len(segment.Pieces))
|
2022-11-23 15:24:30 +00:00
|
|
|
assert.Len(t, report.PendingAudits, 0)
|
2019-06-07 22:02:36 +01:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2019-06-19 10:02:25 +01:00
|
|
|
func TestVerifierDeletedSegment(t *testing.T) {
|
2023-04-24 11:07:16 +01:00
|
|
|
testWithRangedLoop(t, testplanet.Config{
|
2019-06-19 10:02:25 +01:00
|
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
2022-12-15 00:37:37 +00:00
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
2019-09-11 23:37:01 +01:00
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
|
|
|
|
audits.Worker.Loop.Pause()
|
2022-12-15 00:37:37 +00:00
|
|
|
pauseQueueing(satellite)
|
2019-06-19 10:02:25 +01:00
|
|
|
|
|
|
|
ul := planet.Uplinks[0]
|
2019-06-26 11:38:51 +01:00
|
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
2019-06-19 10:02:25 +01:00
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
2019-06-19 10:02:25 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2022-12-15 00:37:37 +00:00
|
|
|
err = runQueueingOnce(ctx, satellite)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2022-11-11 23:11:40 +00:00
|
|
|
queue := audits.VerifyQueue
|
|
|
|
segment, err := queue.Next(ctx)
|
2019-06-19 10:02:25 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
// delete the file
|
2020-02-10 12:18:18 +00:00
|
|
|
err = ul.DeleteObject(ctx, satellite, "testbucket", "test/path")
|
2019-06-19 10:02:25 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-08-25 14:32:05 +01:00
|
|
|
// Verify should not return an error, but report should be empty
|
2020-12-14 12:54:22 +00:00
|
|
|
report, err := audits.Verifier.Verify(ctx, segment, nil)
|
2020-08-25 14:32:05 +01:00
|
|
|
require.NoError(t, err)
|
2021-08-11 21:02:54 +01:00
|
|
|
assert.Zero(t, report.Successes)
|
|
|
|
assert.Zero(t, report.Fails)
|
|
|
|
assert.Zero(t, report.Offlines)
|
2022-11-23 15:24:30 +00:00
|
|
|
assert.Zero(t, report.PendingAudits)
|
2021-08-11 21:02:54 +01:00
|
|
|
assert.Zero(t, report.Unknown)
|
2019-06-19 10:02:25 +01:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestVerifierModifiedSegment(t *testing.T) {
|
2023-04-24 11:07:16 +01:00
|
|
|
testWithRangedLoop(t, testplanet.Config{
|
2019-06-19 10:02:25 +01:00
|
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
2022-12-15 00:37:37 +00:00
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
2020-03-04 23:09:18 +00:00
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
|
|
|
|
audits.Worker.Loop.Pause()
|
2022-12-15 00:37:37 +00:00
|
|
|
pauseQueueing(satellite)
|
2020-03-04 23:09:18 +00:00
|
|
|
|
|
|
|
ul := planet.Uplinks[0]
|
|
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
|
|
|
|
|
|
|
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2022-12-15 00:37:37 +00:00
|
|
|
err = runQueueingOnce(ctx, satellite)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2022-11-11 23:11:40 +00:00
|
|
|
queue := audits.VerifyQueue
|
|
|
|
queueSegment, err := queue.Next(ctx)
|
2020-03-04 23:09:18 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2021-08-11 21:02:54 +01:00
|
|
|
var segment metabase.Segment
|
2020-03-04 23:09:18 +00:00
|
|
|
audits.Verifier.OnTestingCheckSegmentAlteredHook = func() {
|
|
|
|
// remove one piece from the segment so that checkIfSegmentAltered fails
|
2021-09-07 09:15:47 +01:00
|
|
|
segment, err = satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
2020-12-14 12:54:22 +00:00
|
|
|
StreamID: queueSegment.StreamID,
|
|
|
|
Position: queueSegment.Position,
|
|
|
|
})
|
2020-03-04 23:09:18 +00:00
|
|
|
require.NoError(t, err)
|
2020-12-14 12:54:22 +00:00
|
|
|
|
2021-09-07 09:15:47 +01:00
|
|
|
err = satellite.Metabase.DB.UpdateSegmentPieces(ctx, metabase.UpdateSegmentPieces{
|
2021-03-12 11:23:44 +00:00
|
|
|
StreamID: queueSegment.StreamID,
|
|
|
|
Position: queueSegment.Position,
|
|
|
|
OldPieces: segment.Pieces,
|
|
|
|
NewPieces: append([]metabase.Piece{segment.Pieces[0]}, segment.Pieces[2:]...),
|
|
|
|
NewRedundancy: segment.Redundancy,
|
2020-12-14 12:54:22 +00:00
|
|
|
})
|
2020-03-04 23:09:18 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
}
|
|
|
|
|
2020-08-25 14:32:05 +01:00
|
|
|
// Verify should not return an error, but report should be empty
|
2020-12-14 12:54:22 +00:00
|
|
|
report, err := audits.Verifier.Verify(ctx, queueSegment, nil)
|
2020-08-25 14:32:05 +01:00
|
|
|
require.NoError(t, err)
|
2021-08-11 21:02:54 +01:00
|
|
|
assert.Zero(t, report.Successes)
|
|
|
|
assert.Zero(t, report.Fails)
|
|
|
|
assert.Zero(t, report.Offlines)
|
2022-11-23 15:24:30 +00:00
|
|
|
assert.Zero(t, report.PendingAudits)
|
2021-08-11 21:02:54 +01:00
|
|
|
assert.Zero(t, report.Unknown)
|
2020-03-04 23:09:18 +00:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestVerifierReplacedSegment(t *testing.T) {
|
2023-04-24 11:07:16 +01:00
|
|
|
testWithRangedLoop(t, testplanet.Config{
|
2020-03-04 23:09:18 +00:00
|
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
2022-12-15 00:37:37 +00:00
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
2019-09-11 23:37:01 +01:00
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
|
|
|
|
audits.Worker.Loop.Pause()
|
2022-12-15 00:37:37 +00:00
|
|
|
pauseQueueing(satellite)
|
2019-06-19 10:02:25 +01:00
|
|
|
|
|
|
|
ul := planet.Uplinks[0]
|
2019-06-26 11:38:51 +01:00
|
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
2019-06-19 10:02:25 +01:00
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
2019-06-19 10:02:25 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2022-12-15 00:37:37 +00:00
|
|
|
err = runQueueingOnce(ctx, satellite)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2022-11-11 23:11:40 +00:00
|
|
|
queue := audits.VerifyQueue
|
|
|
|
segment, err := queue.Next(ctx)
|
2019-06-19 10:02:25 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
audits.Verifier.OnTestingCheckSegmentAlteredHook = func() {
|
|
|
|
// replace the file so that checkIfSegmentAltered fails
|
|
|
|
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
|
|
|
require.NoError(t, err)
|
|
|
|
}
|
2019-06-19 10:02:25 +01:00
|
|
|
|
2020-08-25 14:32:05 +01:00
|
|
|
// Verify should not return an error, but report should be empty
|
2020-12-14 12:54:22 +00:00
|
|
|
report, err := audits.Verifier.Verify(ctx, segment, nil)
|
2020-08-25 14:32:05 +01:00
|
|
|
require.NoError(t, err)
|
2021-08-11 21:02:54 +01:00
|
|
|
assert.Zero(t, report.Successes)
|
|
|
|
assert.Zero(t, report.Fails)
|
|
|
|
assert.Zero(t, report.Offlines)
|
2022-11-23 15:24:30 +00:00
|
|
|
assert.Zero(t, report.PendingAudits)
|
2021-08-11 21:02:54 +01:00
|
|
|
assert.Zero(t, report.Unknown)
|
2019-06-19 10:02:25 +01:00
|
|
|
})
|
|
|
|
}
|
2019-07-18 19:08:15 +01:00
|
|
|
|
|
|
|
func TestVerifierModifiedSegmentFailsOnce(t *testing.T) {
|
2023-04-24 11:07:16 +01:00
|
|
|
testWithRangedLoop(t, testplanet.Config{
|
2019-07-18 19:08:15 +01:00
|
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
2022-12-15 00:37:37 +00:00
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
2019-09-11 23:37:01 +01:00
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
|
|
|
|
audits.Worker.Loop.Pause()
|
2022-12-15 00:37:37 +00:00
|
|
|
pauseQueueing(satellite)
|
2019-07-18 19:08:15 +01:00
|
|
|
|
|
|
|
ul := planet.Uplinks[0]
|
|
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
2019-07-18 19:08:15 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2022-12-15 00:37:37 +00:00
|
|
|
err = runQueueingOnce(ctx, satellite)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2022-11-11 23:11:40 +00:00
|
|
|
queue := audits.VerifyQueue
|
|
|
|
queueSegment, err := queue.Next(ctx)
|
2019-09-11 23:37:01 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2021-09-07 09:15:47 +01:00
|
|
|
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
2020-12-14 12:54:22 +00:00
|
|
|
StreamID: queueSegment.StreamID,
|
|
|
|
Position: queueSegment.Position,
|
|
|
|
})
|
2019-07-18 19:08:15 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
// delete the piece from the first node
|
2020-12-14 12:54:22 +00:00
|
|
|
origNumPieces := len(segment.Pieces)
|
|
|
|
piece := segment.Pieces[0]
|
|
|
|
pieceID := segment.RootPieceID.Derive(piece.StorageNode, int32(piece.Number))
|
|
|
|
node := planet.FindNode(piece.StorageNode)
|
2019-09-11 23:37:01 +01:00
|
|
|
err = node.Storage2.Store.Delete(ctx, satellite.ID(), pieceID)
|
2019-07-18 19:08:15 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
report, err := audits.Verifier.Verify(ctx, queueSegment, nil)
|
2019-07-18 19:08:15 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
assert.Len(t, report.Successes, origNumPieces-1)
|
2022-05-07 20:04:12 +01:00
|
|
|
require.Len(t, report.Fails, 1)
|
2023-04-26 18:59:56 +01:00
|
|
|
assert.Equal(t, metabase.Piece{
|
|
|
|
StorageNode: piece.StorageNode,
|
|
|
|
Number: piece.Number,
|
|
|
|
}, report.Fails[0])
|
|
|
|
require.NotNil(t, report.Segment)
|
|
|
|
assert.Equal(t, segment.StreamID, report.Segment.StreamID)
|
|
|
|
assert.Equal(t, segment.Position, report.Segment.Position)
|
|
|
|
assert.Equal(t, segment.Redundancy, report.Segment.Redundancy)
|
|
|
|
assert.Equal(t, segment.Pieces, report.Segment.Pieces)
|
2019-07-18 19:08:15 +01:00
|
|
|
assert.Len(t, report.Offlines, 0)
|
2022-11-23 15:24:30 +00:00
|
|
|
require.Len(t, report.PendingAudits, 0)
|
2019-07-18 19:08:15 +01:00
|
|
|
})
|
|
|
|
}
|
2019-11-19 16:30:28 +00:00
|
|
|
|
|
|
|
// TestVerifierSlowDownload checks that a node that times out while sending data to the
|
2020-07-16 15:18:02 +01:00
|
|
|
// audit service gets put into containment mode.
|
2019-11-19 16:30:28 +00:00
|
|
|
func TestVerifierSlowDownload(t *testing.T) {
|
2023-04-24 11:07:16 +01:00
|
|
|
testWithRangedLoop(t, testplanet.Config{
|
2019-11-19 16:30:28 +00:00
|
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
|
|
|
Reconfigure: testplanet.Reconfigure{
|
2020-03-27 16:18:19 +00:00
|
|
|
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
|
2019-11-19 16:30:28 +00:00
|
|
|
return testblobs.NewSlowDB(log.Named("slowdb"), db), nil
|
|
|
|
},
|
2020-10-27 17:34:59 +00:00
|
|
|
Satellite: testplanet.Combine(
|
|
|
|
func(log *zap.Logger, index int, config *satellite.Config) {
|
|
|
|
// These config values are chosen to force the slow node to time out without timing out on the three normal nodes
|
|
|
|
config.Audit.MinBytesPerSecond = 100 * memory.KiB
|
|
|
|
config.Audit.MinDownloadTimeout = 950 * time.Millisecond
|
|
|
|
},
|
|
|
|
testplanet.ReconfigureRS(2, 2, 4, 4),
|
|
|
|
),
|
2019-11-19 16:30:28 +00:00
|
|
|
},
|
2022-12-15 00:37:37 +00:00
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
2019-11-19 16:30:28 +00:00
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
|
|
|
|
audits.Worker.Loop.Pause()
|
2022-12-15 00:37:37 +00:00
|
|
|
pauseQueueing(satellite)
|
2019-11-19 16:30:28 +00:00
|
|
|
|
|
|
|
ul := planet.Uplinks[0]
|
|
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
|
|
|
|
2020-01-21 10:38:41 +00:00
|
|
|
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
2019-11-19 16:30:28 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2022-12-15 00:37:37 +00:00
|
|
|
err = runQueueingOnce(ctx, satellite)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2022-11-11 23:11:40 +00:00
|
|
|
queue := audits.VerifyQueue
|
|
|
|
queueSegment, err := queue.Next(ctx)
|
2019-11-19 16:30:28 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2021-09-07 09:15:47 +01:00
|
|
|
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
2020-12-14 12:54:22 +00:00
|
|
|
StreamID: queueSegment.StreamID,
|
|
|
|
Position: queueSegment.Position,
|
|
|
|
})
|
2019-11-19 16:30:28 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
slowNode := planet.FindNode(segment.Pieces[0].StorageNode)
|
2020-05-07 09:23:40 +01:00
|
|
|
slowNodeDB := slowNode.DB.(*testblobs.SlowDB)
|
|
|
|
// make downloads on storage node slower than the timeout on the satellite for downloading shares
|
2022-06-28 15:03:23 +01:00
|
|
|
slowNodeDB.SetLatency(3 * time.Second)
|
2019-11-19 16:30:28 +00:00
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
report, err := audits.Verifier.Verify(ctx, queueSegment, nil)
|
2019-11-19 16:30:28 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-05-07 09:23:40 +01:00
|
|
|
assert.NotContains(t, report.Successes, slowNode.ID())
|
2020-04-30 22:55:28 +01:00
|
|
|
assert.Len(t, report.Fails, 0)
|
|
|
|
assert.Len(t, report.Offlines, 0)
|
|
|
|
assert.Len(t, report.Unknown, 0)
|
2022-11-23 15:24:30 +00:00
|
|
|
require.Len(t, report.PendingAudits, 1)
|
|
|
|
assert.Equal(t, report.PendingAudits[0].Locator.NodeID, slowNode.ID())
|
2019-11-19 16:30:28 +00:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
// TestVerifierUnknownError checks that a node that returns an unknown error in response to an audit request
|
2020-07-16 15:18:02 +01:00
|
|
|
// does not get marked as successful, failed, or contained.
|
2019-11-19 16:30:28 +00:00
|
|
|
func TestVerifierUnknownError(t *testing.T) {
|
2023-04-24 11:07:16 +01:00
|
|
|
testWithRangedLoop(t, testplanet.Config{
|
2019-11-19 16:30:28 +00:00
|
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
|
|
|
Reconfigure: testplanet.Reconfigure{
|
2020-03-27 16:18:19 +00:00
|
|
|
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
|
2019-11-19 16:30:28 +00:00
|
|
|
return testblobs.NewBadDB(log.Named("baddb"), db), nil
|
|
|
|
},
|
2020-01-21 10:38:41 +00:00
|
|
|
Satellite: testplanet.ReconfigureRS(2, 2, 4, 4),
|
2019-11-19 16:30:28 +00:00
|
|
|
},
|
2022-12-15 00:37:37 +00:00
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
2019-11-19 16:30:28 +00:00
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
|
|
|
|
audits.Worker.Loop.Pause()
|
2022-12-15 00:37:37 +00:00
|
|
|
pauseQueueing(satellite)
|
2019-11-19 16:30:28 +00:00
|
|
|
|
|
|
|
ul := planet.Uplinks[0]
|
|
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
|
|
|
|
2020-01-21 10:38:41 +00:00
|
|
|
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
2019-11-19 16:30:28 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2022-12-15 00:37:37 +00:00
|
|
|
err = runQueueingOnce(ctx, satellite)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2022-11-11 23:11:40 +00:00
|
|
|
queue := audits.VerifyQueue
|
|
|
|
queueSegment, err := queue.Next(ctx)
|
2019-11-19 16:30:28 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2021-09-07 09:15:47 +01:00
|
|
|
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
2020-12-14 12:54:22 +00:00
|
|
|
StreamID: queueSegment.StreamID,
|
|
|
|
Position: queueSegment.Position,
|
|
|
|
})
|
2019-11-19 16:30:28 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
badNode := planet.FindNode(segment.Pieces[0].StorageNode)
|
2020-05-07 09:23:40 +01:00
|
|
|
badNodeDB := badNode.DB.(*testblobs.BadDB)
|
|
|
|
// return an error when the verifier attempts to download from this node
|
|
|
|
badNodeDB.SetError(errs.New("unknown error"))
|
2019-11-19 16:30:28 +00:00
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
report, err := audits.Verifier.Verify(ctx, queueSegment, nil)
|
2019-11-19 16:30:28 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2021-08-11 21:02:54 +01:00
|
|
|
assert.Len(t, report.Successes, 3)
|
|
|
|
assert.Len(t, report.Fails, 0)
|
|
|
|
assert.Len(t, report.Offlines, 0)
|
2022-11-23 15:24:30 +00:00
|
|
|
assert.Len(t, report.PendingAudits, 0)
|
2022-05-07 20:04:12 +01:00
|
|
|
require.Len(t, report.Unknown, 1)
|
2021-08-11 21:02:54 +01:00
|
|
|
assert.Equal(t, report.Unknown[0], badNode.ID())
|
2019-11-19 16:30:28 +00:00
|
|
|
})
|
|
|
|
}
|
2022-03-03 00:23:11 +00:00
|
|
|
|
|
|
|
func TestAuditRepairedSegmentInExcludedCountries(t *testing.T) {
|
|
|
|
testplanet.Run(t, testplanet.Config{
|
|
|
|
SatelliteCount: 1,
|
|
|
|
StorageNodeCount: 20,
|
|
|
|
UplinkCount: 1,
|
|
|
|
Reconfigure: testplanet.Reconfigure{
|
|
|
|
Satellite: testplanet.Combine(
|
|
|
|
func(log *zap.Logger, index int, config *satellite.Config) {
|
|
|
|
config.Repairer.InMemoryRepair = true
|
|
|
|
},
|
|
|
|
testplanet.ReconfigureRS(3, 5, 8, 10),
|
|
|
|
testplanet.RepairExcludedCountryCodes([]string{"FR", "BE"}),
|
|
|
|
),
|
|
|
|
},
|
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
|
|
uplinkPeer := planet.Uplinks[0]
|
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
// stop audit to prevent possible interactions i.e. repair timeout problems
|
|
|
|
satellite.Audit.Worker.Loop.Pause()
|
|
|
|
|
2023-04-25 09:40:22 +01:00
|
|
|
satellite.RangedLoop.RangedLoop.Service.Loop.Stop()
|
2022-03-03 00:23:11 +00:00
|
|
|
satellite.Repair.Repairer.Loop.Pause()
|
|
|
|
|
|
|
|
var testData = testrand.Bytes(8 * memory.KiB)
|
|
|
|
bucket := "testbucket"
|
|
|
|
// first, upload some remote data
|
|
|
|
err := uplinkPeer.Upload(ctx, satellite, bucket, "test/path", testData)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
segment, _ := getRemoteSegment(ctx, t, satellite, uplinkPeer.Projects[0].ID, bucket)
|
|
|
|
|
|
|
|
remotePieces := segment.Pieces
|
|
|
|
|
|
|
|
numExcluded := 5
|
|
|
|
var nodesInExcluded storj.NodeIDList
|
|
|
|
for i := 0; i < numExcluded; i++ {
|
|
|
|
err = planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, remotePieces[i].StorageNode, "FR")
|
|
|
|
require.NoError(t, err)
|
|
|
|
nodesInExcluded = append(nodesInExcluded, remotePieces[i].StorageNode)
|
|
|
|
}
|
|
|
|
|
|
|
|
// make extra pieces after optimal bad
|
|
|
|
for i := int(segment.Redundancy.OptimalShares); i < len(remotePieces); i++ {
|
|
|
|
err = planet.StopNodeAndUpdate(ctx, planet.FindNode(remotePieces[i].StorageNode))
|
|
|
|
require.NoError(t, err)
|
|
|
|
}
|
|
|
|
|
2023-04-25 09:40:22 +01:00
|
|
|
// trigger repair checker with ranged loop to add segment to repair queue
|
|
|
|
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
|
|
|
|
require.NoError(t, err)
|
2022-03-03 00:23:11 +00:00
|
|
|
|
|
|
|
count, err := satellite.DB.RepairQueue().Count(ctx)
|
|
|
|
require.NoError(t, err)
|
|
|
|
require.Equal(t, 1, count)
|
|
|
|
|
|
|
|
satellite.Repair.Repairer.Loop.Restart()
|
|
|
|
satellite.Repair.Repairer.Loop.TriggerWait()
|
|
|
|
satellite.Repair.Repairer.Loop.Pause()
|
|
|
|
satellite.Repair.Repairer.WaitForPendingRepairs()
|
|
|
|
|
|
|
|
// Verify that the segment was removed
|
|
|
|
count, err = satellite.DB.RepairQueue().Count(ctx)
|
|
|
|
require.NoError(t, err)
|
|
|
|
require.Zero(t, count)
|
|
|
|
|
|
|
|
// Verify the segment has been repaired
|
|
|
|
segmentAfterRepair, _ := getRemoteSegment(ctx, t, satellite, planet.Uplinks[0].Projects[0].ID, bucket)
|
|
|
|
require.NotEqual(t, segment.Pieces, segmentAfterRepair.Pieces)
|
|
|
|
require.Equal(t, 10, len(segmentAfterRepair.Pieces))
|
|
|
|
|
|
|
|
// check excluded area nodes still exist
|
|
|
|
for i, n := range nodesInExcluded {
|
|
|
|
var found bool
|
|
|
|
for _, p := range segmentAfterRepair.Pieces {
|
|
|
|
if p.StorageNode == n {
|
|
|
|
found = true
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
require.True(t, found, fmt.Sprintf("node %s not in segment, but should be\n", segmentAfterRepair.Pieces[i].StorageNode.String()))
|
|
|
|
}
|
|
|
|
nodesInPointer := make(map[storj.NodeID]bool)
|
|
|
|
for _, n := range segmentAfterRepair.Pieces {
|
|
|
|
// check for duplicates
|
|
|
|
_, ok := nodesInPointer[n.StorageNode]
|
|
|
|
require.False(t, ok)
|
|
|
|
nodesInPointer[n.StorageNode] = true
|
|
|
|
}
|
|
|
|
|
|
|
|
lastPieceIndex := segmentAfterRepair.Pieces.Len() - 1
|
|
|
|
lastPiece := segmentAfterRepair.Pieces[lastPieceIndex]
|
|
|
|
for _, n := range planet.StorageNodes {
|
|
|
|
if n.ID() == lastPiece.StorageNode {
|
|
|
|
pieceID := segmentAfterRepair.RootPieceID.Derive(n.ID(), int32(lastPiece.Number))
|
|
|
|
corruptPieceData(ctx, t, planet, n, pieceID)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// now audit
|
|
|
|
report, err := satellite.Audit.Verifier.Verify(ctx, audit.Segment{
|
|
|
|
StreamID: segmentAfterRepair.StreamID,
|
|
|
|
Position: segmentAfterRepair.Position,
|
|
|
|
ExpiresAt: segmentAfterRepair.ExpiresAt,
|
|
|
|
EncryptedSize: segmentAfterRepair.EncryptedSize,
|
|
|
|
}, nil)
|
|
|
|
require.NoError(t, err)
|
|
|
|
require.Len(t, report.Fails, 1)
|
2023-04-26 18:59:56 +01:00
|
|
|
require.Equal(t, metabase.Piece{
|
|
|
|
StorageNode: lastPiece.StorageNode,
|
|
|
|
Number: lastPiece.Number,
|
|
|
|
}, report.Fails[0])
|
|
|
|
require.NotNil(t, report.Segment)
|
|
|
|
assert.Equal(t, segmentAfterRepair.StreamID, report.Segment.StreamID)
|
|
|
|
assert.Equal(t, segmentAfterRepair.Position, report.Segment.Position)
|
|
|
|
assert.Equal(t, segmentAfterRepair.Redundancy, report.Segment.Redundancy)
|
|
|
|
assert.Equal(t, segmentAfterRepair.Pieces, report.Segment.Pieces)
|
2022-03-03 00:23:11 +00:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
// getRemoteSegment returns a remote pointer its path from satellite.
|
2022-10-11 12:47:02 +01:00
|
|
|
//
|
|
|
|
//nolint:golint
|
2022-03-03 00:23:11 +00:00
|
|
|
func getRemoteSegment(
|
|
|
|
ctx context.Context, t *testing.T, satellite *testplanet.Satellite, projectID uuid.UUID, bucketName string,
|
|
|
|
) (_ metabase.Segment, key metabase.SegmentKey) {
|
|
|
|
t.Helper()
|
|
|
|
|
|
|
|
objects, err := satellite.Metabase.DB.TestingAllObjects(ctx)
|
|
|
|
require.NoError(t, err)
|
|
|
|
require.Len(t, objects, 1)
|
|
|
|
|
|
|
|
segments, err := satellite.Metabase.DB.TestingAllSegments(ctx)
|
|
|
|
require.NoError(t, err)
|
|
|
|
require.Len(t, segments, 1)
|
|
|
|
require.False(t, segments[0].Inline())
|
|
|
|
|
|
|
|
return segments[0], metabase.SegmentLocation{
|
|
|
|
ProjectID: projectID,
|
|
|
|
BucketName: bucketName,
|
|
|
|
ObjectKey: objects[0].ObjectKey,
|
|
|
|
Position: segments[0].Position,
|
|
|
|
}.Encode()
|
|
|
|
}
|
|
|
|
|
|
|
|
// corruptPieceData manipulates piece data on a storage node.
|
|
|
|
func corruptPieceData(ctx context.Context, t *testing.T, planet *testplanet.Planet, corruptedNode *testplanet.StorageNode, corruptedPieceID storj.PieceID) {
|
|
|
|
t.Helper()
|
|
|
|
|
2023-04-05 18:03:06 +01:00
|
|
|
blobRef := blobstore.BlobRef{
|
2022-03-03 00:23:11 +00:00
|
|
|
Namespace: planet.Satellites[0].ID().Bytes(),
|
|
|
|
Key: corruptedPieceID.Bytes(),
|
|
|
|
}
|
|
|
|
|
|
|
|
// get currently stored piece data from storagenode
|
|
|
|
reader, err := corruptedNode.Storage2.BlobsCache.Open(ctx, blobRef)
|
|
|
|
require.NoError(t, err)
|
|
|
|
pieceSize, err := reader.Size()
|
|
|
|
require.NoError(t, err)
|
|
|
|
require.True(t, pieceSize > 0)
|
|
|
|
pieceData := make([]byte, pieceSize)
|
|
|
|
|
|
|
|
// delete piece data
|
|
|
|
err = corruptedNode.Storage2.BlobsCache.Delete(ctx, blobRef)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
// create new random data
|
|
|
|
_, err = rand.Read(pieceData)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
// corrupt piece data (not PieceHeader) and write back to storagenode
|
|
|
|
// this means repair downloading should fail during piece hash verification
|
|
|
|
pieceData[pieceSize-1]++ // if we don't do this, this test should fail
|
|
|
|
writer, err := corruptedNode.Storage2.BlobsCache.Create(ctx, blobRef, pieceSize)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
n, err := writer.Write(pieceData)
|
|
|
|
require.NoError(t, err)
|
|
|
|
require.EqualValues(t, n, pieceSize)
|
|
|
|
|
|
|
|
err = writer.Commit(ctx)
|
|
|
|
require.NoError(t, err)
|
|
|
|
}
|
2022-11-22 22:34:55 +00:00
|
|
|
|
|
|
|
func TestIdentifyContainedNodes(t *testing.T) {
|
2023-04-24 11:07:16 +01:00
|
|
|
testWithRangedLoop(t, testplanet.Config{
|
2022-11-22 22:34:55 +00:00
|
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
2022-12-15 00:37:37 +00:00
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
2022-11-22 22:34:55 +00:00
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
|
|
|
|
audits.Worker.Loop.Pause()
|
2022-12-15 00:37:37 +00:00
|
|
|
pauseQueueing(satellite)
|
2022-11-22 22:34:55 +00:00
|
|
|
|
|
|
|
ul := planet.Uplinks[0]
|
|
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
|
|
|
|
|
|
|
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2022-12-15 00:37:37 +00:00
|
|
|
err = runQueueingOnce(ctx, satellite)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2022-11-22 22:34:55 +00:00
|
|
|
queue := audits.VerifyQueue
|
|
|
|
queueSegment, err := queue.Next(ctx)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
|
|
|
StreamID: queueSegment.StreamID,
|
|
|
|
Position: queueSegment.Position,
|
|
|
|
})
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
// mark a node as contained
|
|
|
|
containedNode := segment.Pieces[0].StorageNode
|
2022-11-23 15:24:30 +00:00
|
|
|
containment := satellite.DB.Containment()
|
2022-11-22 22:34:55 +00:00
|
|
|
err = containment.Insert(ctx, &audit.PieceLocator{
|
|
|
|
StreamID: testrand.UUID(),
|
|
|
|
NodeID: containedNode,
|
|
|
|
})
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
gotContainedNodes, err := audits.Verifier.IdentifyContainedNodes(ctx, audit.Segment{
|
|
|
|
StreamID: segment.StreamID,
|
|
|
|
Position: segment.Position,
|
|
|
|
})
|
|
|
|
require.NoError(t, err)
|
|
|
|
require.Len(t, gotContainedNodes, 1)
|
|
|
|
_, ok := gotContainedNodes[containedNode]
|
|
|
|
require.True(t, ok, "expected node to be indicated as contained, but it was not")
|
|
|
|
})
|
|
|
|
}
|
2023-02-03 21:03:27 +00:00
|
|
|
|
|
|
|
func TestConcurrentAuditsSuccess(t *testing.T) {
|
|
|
|
const (
|
|
|
|
numConcurrentAudits = 10
|
|
|
|
minPieces = 5
|
|
|
|
)
|
|
|
|
|
2023-04-24 11:07:16 +01:00
|
|
|
testWithRangedLoop(t, testplanet.Config{
|
2023-02-03 21:03:27 +00:00
|
|
|
SatelliteCount: 1, StorageNodeCount: minPieces, UplinkCount: 1,
|
|
|
|
Reconfigure: testplanet.Reconfigure{
|
|
|
|
// every segment gets a piece on every node, so that every segment audit
|
|
|
|
// hits the same set of nodes, and every node is touched by every audit
|
|
|
|
Satellite: testplanet.ReconfigureRS(minPieces, minPieces, minPieces, minPieces),
|
|
|
|
},
|
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
|
|
|
|
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
|
|
|
|
audits.Worker.Loop.Pause()
|
|
|
|
audits.ReverifyWorker.Loop.Pause()
|
|
|
|
pauseQueueing(satellite)
|
|
|
|
|
|
|
|
ul := planet.Uplinks[0]
|
|
|
|
|
|
|
|
for n := 0; n < numConcurrentAudits; n++ {
|
|
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
|
|
|
err := ul.Upload(ctx, satellite, "testbucket", fmt.Sprintf("test/path/%d", n), testData)
|
|
|
|
require.NoError(t, err)
|
|
|
|
}
|
|
|
|
|
|
|
|
listResult, err := satellite.Metabase.DB.ListVerifySegments(ctx, metabase.ListVerifySegments{Limit: numConcurrentAudits})
|
|
|
|
require.NoError(t, err)
|
|
|
|
require.Len(t, listResult.Segments, numConcurrentAudits)
|
|
|
|
|
|
|
|
// do all the audits at the same time; at least some nodes will get more than one at the same time
|
|
|
|
group, auditCtx := errgroup.WithContext(ctx)
|
|
|
|
reports := make([]audit.Report, numConcurrentAudits)
|
|
|
|
|
|
|
|
for n, seg := range listResult.Segments {
|
|
|
|
n := n
|
|
|
|
seg := seg
|
|
|
|
group.Go(func() error {
|
|
|
|
report, err := audits.Verifier.Verify(auditCtx, audit.Segment{
|
|
|
|
StreamID: seg.StreamID,
|
|
|
|
Position: seg.Position,
|
|
|
|
}, nil)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
reports[n] = report
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
}
|
|
|
|
err = group.Wait()
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
for _, report := range reports {
|
|
|
|
require.Len(t, report.Fails, 0)
|
|
|
|
require.Len(t, report.Unknown, 0)
|
|
|
|
require.Len(t, report.PendingAudits, 0)
|
|
|
|
require.Len(t, report.Offlines, 0)
|
|
|
|
require.Equal(t, len(report.Successes), minPieces)
|
|
|
|
|
|
|
|
// apply the audit results, as the audit worker would have done
|
|
|
|
audits.Reporter.RecordAudits(ctx, report)
|
|
|
|
}
|
|
|
|
|
|
|
|
// nothing should be in the reverify queue
|
|
|
|
_, err = audits.ReverifyQueue.GetNextJob(ctx, time.Minute)
|
|
|
|
require.Error(t, err)
|
|
|
|
require.True(t, audit.ErrEmptyQueue.Has(err), err)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestConcurrentAuditsUnknownError(t *testing.T) {
|
|
|
|
const (
|
|
|
|
numConcurrentAudits = 10
|
|
|
|
minPieces = 5
|
|
|
|
badNodes = minPieces / 2
|
|
|
|
)
|
|
|
|
|
2023-04-24 11:07:16 +01:00
|
|
|
testWithRangedLoop(t, testplanet.Config{
|
2023-02-03 21:03:27 +00:00
|
|
|
SatelliteCount: 1, StorageNodeCount: minPieces, UplinkCount: 1,
|
|
|
|
Reconfigure: testplanet.Reconfigure{
|
|
|
|
// every segment gets a piece on every node, so that every segment audit
|
|
|
|
// hits the same set of nodes, and every node is touched by every audit
|
|
|
|
Satellite: testplanet.ReconfigureRS(minPieces-badNodes, minPieces, minPieces, minPieces),
|
|
|
|
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
|
2023-04-11 14:09:32 +01:00
|
|
|
return testblobs.NewBadDB(log.Named("baddb"), db), nil
|
2023-02-03 21:03:27 +00:00
|
|
|
},
|
|
|
|
},
|
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
|
|
|
|
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
|
|
|
|
audits.Worker.Loop.Pause()
|
|
|
|
audits.ReverifyWorker.Loop.Pause()
|
|
|
|
pauseQueueing(satellite)
|
|
|
|
|
|
|
|
ul := planet.Uplinks[0]
|
|
|
|
|
|
|
|
for n := 0; n < numConcurrentAudits; n++ {
|
|
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
|
|
|
err := ul.Upload(ctx, satellite, "testbucket", fmt.Sprintf("test/path/%d", n), testData)
|
|
|
|
require.NoError(t, err)
|
|
|
|
}
|
|
|
|
|
|
|
|
listResult, err := satellite.Metabase.DB.ListVerifySegments(ctx, metabase.ListVerifySegments{Limit: numConcurrentAudits})
|
|
|
|
require.NoError(t, err)
|
|
|
|
require.Len(t, listResult.Segments, numConcurrentAudits)
|
|
|
|
|
|
|
|
// make ~half of the nodes time out on all responses
|
|
|
|
for n := 0; n < badNodes; n++ {
|
|
|
|
planet.StorageNodes[n].DB.(*testblobs.BadDB).SetError(fmt.Errorf("an unrecognized error"))
|
|
|
|
}
|
|
|
|
|
|
|
|
// do all the audits at the same time; at least some nodes will get more than one at the same time
|
|
|
|
group, auditCtx := errgroup.WithContext(ctx)
|
|
|
|
reports := make([]audit.Report, numConcurrentAudits)
|
|
|
|
|
|
|
|
for n, seg := range listResult.Segments {
|
|
|
|
n := n
|
|
|
|
seg := seg
|
|
|
|
group.Go(func() error {
|
|
|
|
report, err := audits.Verifier.Verify(auditCtx, audit.Segment{
|
|
|
|
StreamID: seg.StreamID,
|
|
|
|
Position: seg.Position,
|
|
|
|
}, nil)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
reports[n] = report
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
}
|
|
|
|
err = group.Wait()
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
for _, report := range reports {
|
|
|
|
require.Len(t, report.Fails, 0)
|
|
|
|
require.Len(t, report.Unknown, badNodes)
|
|
|
|
require.Len(t, report.PendingAudits, 0)
|
|
|
|
require.Len(t, report.Offlines, 0)
|
|
|
|
require.Equal(t, len(report.Successes), minPieces-badNodes)
|
|
|
|
|
|
|
|
// apply the audit results, as the audit worker would have done
|
|
|
|
audits.Reporter.RecordAudits(ctx, report)
|
|
|
|
}
|
|
|
|
|
|
|
|
// nothing should be in the reverify queue
|
|
|
|
_, err = audits.ReverifyQueue.GetNextJob(ctx, time.Minute)
|
|
|
|
require.Error(t, err)
|
|
|
|
require.True(t, audit.ErrEmptyQueue.Has(err), err)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestConcurrentAuditsFailure(t *testing.T) {
|
|
|
|
const (
|
|
|
|
numConcurrentAudits = 10
|
|
|
|
minPieces = 5
|
|
|
|
badNodes = minPieces / 2
|
|
|
|
)
|
|
|
|
|
2023-04-24 11:07:16 +01:00
|
|
|
testWithRangedLoop(t, testplanet.Config{
|
2023-02-03 21:03:27 +00:00
|
|
|
SatelliteCount: 1, StorageNodeCount: minPieces, UplinkCount: 1,
|
|
|
|
Reconfigure: testplanet.Reconfigure{
|
|
|
|
// every segment gets a piece on every node, so that every segment audit
|
|
|
|
// hits the same set of nodes, and every node is touched by every audit
|
|
|
|
Satellite: testplanet.ReconfigureRS(minPieces-badNodes, minPieces, minPieces, minPieces),
|
|
|
|
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
|
2023-04-11 14:09:32 +01:00
|
|
|
return testblobs.NewBadDB(log.Named("baddb"), db), nil
|
2023-02-03 21:03:27 +00:00
|
|
|
},
|
|
|
|
},
|
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
|
|
|
|
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
|
|
|
|
audits.Worker.Loop.Pause()
|
|
|
|
audits.ReverifyWorker.Loop.Pause()
|
|
|
|
pauseQueueing(satellite)
|
|
|
|
|
|
|
|
ul := planet.Uplinks[0]
|
|
|
|
|
|
|
|
for n := 0; n < numConcurrentAudits; n++ {
|
|
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
|
|
|
err := ul.Upload(ctx, satellite, "testbucket", fmt.Sprintf("test/path/%d", n), testData)
|
|
|
|
require.NoError(t, err)
|
|
|
|
}
|
|
|
|
|
|
|
|
listResult, err := satellite.Metabase.DB.ListVerifySegments(ctx, metabase.ListVerifySegments{Limit: numConcurrentAudits})
|
|
|
|
require.NoError(t, err)
|
|
|
|
require.Len(t, listResult.Segments, numConcurrentAudits)
|
|
|
|
|
|
|
|
// make ~half of the nodes return a Not Found error on all responses
|
|
|
|
for n := 0; n < badNodes; n++ {
|
|
|
|
// Can't make _all_ calls return errors, or the directory read verification will fail
|
|
|
|
// (as it is triggered explicitly when ErrNotExist is returned from Open) and cause the
|
|
|
|
// node to panic before the test is done.
|
|
|
|
planet.StorageNodes[n].DB.(*testblobs.BadDB).SetError(os.ErrNotExist)
|
|
|
|
}
|
|
|
|
|
|
|
|
// do all the audits at the same time; at least some nodes will get more than one at the same time
|
|
|
|
group, auditCtx := errgroup.WithContext(ctx)
|
|
|
|
reports := make([]audit.Report, numConcurrentAudits)
|
|
|
|
|
|
|
|
for n, seg := range listResult.Segments {
|
|
|
|
n := n
|
|
|
|
seg := seg
|
|
|
|
group.Go(func() error {
|
|
|
|
report, err := audits.Verifier.Verify(auditCtx, audit.Segment{
|
|
|
|
StreamID: seg.StreamID,
|
|
|
|
Position: seg.Position,
|
|
|
|
}, nil)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
reports[n] = report
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
}
|
|
|
|
err = group.Wait()
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
for n, report := range reports {
|
|
|
|
require.Len(t, report.Unknown, 0, n)
|
|
|
|
require.Len(t, report.PendingAudits, 0, n)
|
|
|
|
require.Len(t, report.Offlines, 0, n)
|
|
|
|
require.Len(t, report.Fails, badNodes, n)
|
|
|
|
require.Equal(t, len(report.Successes), minPieces-badNodes, n)
|
|
|
|
|
|
|
|
// apply the audit results, as the audit worker would have done
|
|
|
|
audits.Reporter.RecordAudits(ctx, report)
|
|
|
|
}
|
|
|
|
|
|
|
|
// nothing should be in the reverify queue
|
|
|
|
_, err = audits.ReverifyQueue.GetNextJob(ctx, time.Minute)
|
|
|
|
require.Error(t, err)
|
|
|
|
require.True(t, audit.ErrEmptyQueue.Has(err), err)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestConcurrentAuditsTimeout(t *testing.T) {
|
|
|
|
const (
|
|
|
|
numConcurrentAudits = 10
|
|
|
|
minPieces = 5
|
|
|
|
slowNodes = minPieces / 2
|
2023-02-09 23:03:20 +00:00
|
|
|
retryInterval = 5 * time.Minute
|
2023-02-03 21:03:27 +00:00
|
|
|
)
|
|
|
|
|
2023-04-24 11:07:16 +01:00
|
|
|
testWithRangedLoop(t, testplanet.Config{
|
2023-02-03 21:03:27 +00:00
|
|
|
SatelliteCount: 1, StorageNodeCount: minPieces, UplinkCount: 1,
|
|
|
|
Reconfigure: testplanet.Reconfigure{
|
|
|
|
// every segment should get a piece on every node, so that every segment audit
|
|
|
|
// hits the same set of nodes, and every node is touched by every audit
|
|
|
|
Satellite: testplanet.Combine(
|
|
|
|
func(log *zap.Logger, index int, config *satellite.Config) {
|
|
|
|
// These config values are chosen to cause relatively quick timeouts
|
|
|
|
// while allowing the non-slow nodes to complete operations
|
|
|
|
config.Audit.MinBytesPerSecond = 100 * memory.KiB
|
|
|
|
config.Audit.MinDownloadTimeout = time.Second
|
|
|
|
},
|
|
|
|
testplanet.ReconfigureRS(minPieces-slowNodes, minPieces, minPieces, minPieces),
|
|
|
|
),
|
|
|
|
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
|
|
|
|
return testblobs.NewSlowDB(log.Named("slowdb"), db), nil
|
|
|
|
},
|
|
|
|
},
|
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
|
|
|
|
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
|
|
|
|
audits.Worker.Loop.Pause()
|
|
|
|
audits.ReverifyWorker.Loop.Pause()
|
|
|
|
pauseQueueing(satellite)
|
|
|
|
|
|
|
|
ul := planet.Uplinks[0]
|
|
|
|
|
|
|
|
for n := 0; n < numConcurrentAudits; n++ {
|
|
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
|
|
|
err := ul.Upload(ctx, satellite, "testbucket", fmt.Sprintf("test/path/%d", n), testData)
|
|
|
|
require.NoError(t, err)
|
|
|
|
}
|
|
|
|
|
|
|
|
listResult, err := satellite.Metabase.DB.ListVerifySegments(ctx, metabase.ListVerifySegments{Limit: numConcurrentAudits})
|
|
|
|
require.NoError(t, err)
|
|
|
|
require.Len(t, listResult.Segments, numConcurrentAudits)
|
|
|
|
|
|
|
|
// make ~half of the nodes time out on all responses
|
|
|
|
for n := 0; n < slowNodes; n++ {
|
|
|
|
planet.StorageNodes[n].DB.(*testblobs.SlowDB).SetLatency(time.Hour)
|
|
|
|
}
|
|
|
|
|
|
|
|
// do all the audits at the same time; at least some nodes will get more than one at the same time
|
|
|
|
group, auditCtx := errgroup.WithContext(ctx)
|
|
|
|
reports := make([]audit.Report, numConcurrentAudits)
|
|
|
|
|
|
|
|
for n, seg := range listResult.Segments {
|
|
|
|
n := n
|
|
|
|
seg := seg
|
|
|
|
group.Go(func() error {
|
|
|
|
report, err := audits.Verifier.Verify(auditCtx, audit.Segment{
|
|
|
|
StreamID: seg.StreamID,
|
|
|
|
Position: seg.Position,
|
|
|
|
}, nil)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
reports[n] = report
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
}
|
|
|
|
err = group.Wait()
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2023-02-09 23:03:20 +00:00
|
|
|
rq := audits.ReverifyQueue.(interface {
|
|
|
|
audit.ReverifyQueue
|
|
|
|
TestingFudgeUpdateTime(ctx context.Context, pendingAudit *audit.PieceLocator, updateTime time.Time) error
|
|
|
|
})
|
|
|
|
|
2023-02-03 21:03:27 +00:00
|
|
|
for _, report := range reports {
|
|
|
|
require.Len(t, report.Fails, 0)
|
|
|
|
require.Len(t, report.Unknown, 0)
|
|
|
|
require.Len(t, report.PendingAudits, slowNodes)
|
|
|
|
require.Len(t, report.Offlines, 0)
|
|
|
|
require.Equal(t, len(report.Successes), minPieces-slowNodes)
|
|
|
|
|
|
|
|
// apply the audit results, as the audit worker would have done
|
|
|
|
audits.Reporter.RecordAudits(ctx, report)
|
2023-02-09 23:03:20 +00:00
|
|
|
|
|
|
|
// fudge the insert time backward by retryInterval so the jobs will be available to GetNextJob
|
|
|
|
for _, pending := range report.PendingAudits {
|
|
|
|
err := rq.TestingFudgeUpdateTime(ctx, &pending.Locator, time.Now().Add(-retryInterval))
|
|
|
|
require.NoError(t, err)
|
|
|
|
}
|
2023-02-03 21:03:27 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// the slow nodes should have been added to the reverify queue multiple times;
|
|
|
|
// once for each timed-out piece fetch
|
|
|
|
queuedReverifies := make([]*audit.ReverificationJob, 0, numConcurrentAudits*slowNodes)
|
|
|
|
for {
|
2023-02-09 23:03:20 +00:00
|
|
|
job, err := audits.ReverifyQueue.GetNextJob(ctx, retryInterval)
|
2023-02-03 21:03:27 +00:00
|
|
|
if err != nil {
|
|
|
|
if audit.ErrEmptyQueue.Has(err) {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
require.NoError(t, err)
|
|
|
|
}
|
|
|
|
queuedReverifies = append(queuedReverifies, job)
|
|
|
|
}
|
|
|
|
require.Len(t, queuedReverifies, numConcurrentAudits*slowNodes)
|
|
|
|
|
|
|
|
appearancesPerNode := make(map[storj.NodeID]int)
|
|
|
|
for _, job := range queuedReverifies {
|
|
|
|
appearancesPerNode[job.Locator.NodeID]++
|
|
|
|
}
|
|
|
|
|
|
|
|
require.Len(t, appearancesPerNode, slowNodes)
|
|
|
|
for n := 0; n < slowNodes; n++ {
|
|
|
|
require.EqualValues(t, appearancesPerNode[planet.StorageNodes[n].ID()], numConcurrentAudits)
|
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|