2019-06-11 21:14:40 +01:00
|
|
|
// Copyright (C) 2019 Storj Labs, Inc.
|
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
|
|
|
package audit_test
|
|
|
|
|
|
|
|
import (
|
|
|
|
"testing"
|
|
|
|
"time"
|
|
|
|
|
2019-06-19 10:02:25 +01:00
|
|
|
"github.com/stretchr/testify/assert"
|
2019-06-11 21:14:40 +01:00
|
|
|
"github.com/stretchr/testify/require"
|
2019-11-19 16:30:28 +00:00
|
|
|
"github.com/zeebo/errs"
|
|
|
|
"go.uber.org/zap"
|
2019-06-11 21:14:40 +01:00
|
|
|
|
2019-12-27 11:48:47 +00:00
|
|
|
"storj.io/common/memory"
|
|
|
|
"storj.io/common/peertls/tlsopts"
|
|
|
|
"storj.io/common/pkcrypto"
|
|
|
|
"storj.io/common/rpc"
|
|
|
|
"storj.io/common/storj"
|
|
|
|
"storj.io/common/testcontext"
|
|
|
|
"storj.io/common/testrand"
|
2019-11-19 16:30:28 +00:00
|
|
|
"storj.io/storj/private/testblobs"
|
2019-11-14 19:46:15 +00:00
|
|
|
"storj.io/storj/private/testplanet"
|
2019-11-19 16:30:28 +00:00
|
|
|
"storj.io/storj/satellite"
|
2019-07-28 06:55:36 +01:00
|
|
|
"storj.io/storj/satellite/audit"
|
2020-08-28 12:56:09 +01:00
|
|
|
"storj.io/storj/satellite/metainfo/metabase"
|
2019-11-19 16:30:28 +00:00
|
|
|
"storj.io/storj/storagenode"
|
2019-06-11 21:14:40 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
func TestReverifySuccess(t *testing.T) {
|
|
|
|
testplanet.Run(t, testplanet.Config{
|
2019-07-01 15:02:00 +01:00
|
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
2019-06-11 21:14:40 +01:00
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
|
|
// This is a bulky test but all it's doing is:
|
|
|
|
// - uploads random data
|
|
|
|
// - uses the cursor to get a stripe
|
|
|
|
// - creates one pending audit for a node holding a piece for that stripe
|
|
|
|
// - the actual share is downloaded to make sure ExpectedShareHash is correct
|
|
|
|
// - calls reverify on that same stripe
|
|
|
|
// - expects one storage node to be marked as a success in the audit report
|
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
|
|
|
|
audits.Worker.Loop.Pause()
|
2020-07-13 23:24:15 +01:00
|
|
|
audits.Chore.Loop.Pause()
|
2019-06-11 21:14:40 +01:00
|
|
|
|
|
|
|
ul := planet.Uplinks[0]
|
2019-07-01 15:02:00 +01:00
|
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
2019-06-11 21:14:40 +01:00
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
audits.Chore.Loop.TriggerWait()
|
2020-08-20 14:29:02 +01:00
|
|
|
queue := audits.Queues.Fetch()
|
2020-12-14 12:54:22 +00:00
|
|
|
queueSegment, err := queue.Next()
|
2019-09-11 23:37:01 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
|
|
|
StreamID: queueSegment.StreamID,
|
|
|
|
Position: queueSegment.Position,
|
|
|
|
})
|
2019-06-11 21:14:40 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
randomIndex, err := audit.GetRandomStripe(ctx, segment)
|
2019-06-11 21:14:40 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
orders := satellite.Orders.Service
|
|
|
|
containment := satellite.DB.Containment()
|
2019-06-11 21:14:40 +01:00
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
shareSize := segment.Redundancy.ShareSize
|
|
|
|
pieces := segment.Pieces
|
|
|
|
rootPieceID := segment.RootPieceID
|
2019-06-11 21:14:40 +01:00
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, queueSegment.Bucket(), pieces[0].StorageNode, pieces[0].Number, rootPieceID, shareSize)
|
2019-06-11 21:14:40 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(pieces[0].Number))
|
2019-06-11 21:14:40 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
pending := &audit.PendingAudit{
|
2020-12-14 12:54:22 +00:00
|
|
|
NodeID: pieces[0].StorageNode,
|
2019-06-11 21:14:40 +01:00
|
|
|
PieceID: rootPieceID,
|
2019-09-11 23:37:01 +01:00
|
|
|
StripeIndex: randomIndex,
|
2019-06-11 21:14:40 +01:00
|
|
|
ShareSize: shareSize,
|
|
|
|
ExpectedShareHash: pkcrypto.SHA256Hash(share.Data),
|
|
|
|
ReverifyCount: 0,
|
2020-12-14 12:54:22 +00:00
|
|
|
Segment: queueSegment.SegmentLocation,
|
2019-06-11 21:14:40 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
err = containment.IncrementPending(ctx, pending)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
report, err := audits.Verifier.Reverify(ctx, queueSegment)
|
2019-06-11 21:14:40 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
require.Len(t, report.Fails, 0)
|
|
|
|
require.Len(t, report.Offlines, 0)
|
|
|
|
require.Len(t, report.PendingAudits, 0)
|
|
|
|
require.Len(t, report.Successes, 1)
|
2020-12-14 12:54:22 +00:00
|
|
|
require.Equal(t, report.Successes[0], pieces[0].StorageNode)
|
2020-03-04 23:09:18 +00:00
|
|
|
|
|
|
|
// record audit
|
2020-12-14 12:54:22 +00:00
|
|
|
_, err = audits.Reporter.RecordAudits(ctx, report)
|
2020-03-04 23:09:18 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
// make sure that pending audit is removed by the reporter when audit is recorded
|
|
|
|
_, err = containment.Get(ctx, pending.NodeID)
|
|
|
|
require.True(t, audit.ErrContainedNotFound.Has(err))
|
2019-06-11 21:14:40 +01:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestReverifyFailMissingShare(t *testing.T) {
|
|
|
|
testplanet.Run(t, testplanet.Config{
|
2019-07-01 15:02:00 +01:00
|
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
2019-06-11 21:14:40 +01:00
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
|
|
// - uploads random data
|
|
|
|
// - uses the cursor to get a stripe
|
|
|
|
// - creates one pending audit for a node holding a piece for that stripe
|
|
|
|
// - the actual share is downloaded to make sure ExpectedShareHash is correct
|
|
|
|
// - delete piece from node
|
|
|
|
// - calls reverify on that same stripe
|
|
|
|
// - expects one storage node to be marked as a fail in the audit report
|
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
audits.Worker.Loop.Pause()
|
2020-07-13 23:24:15 +01:00
|
|
|
audits.Chore.Loop.Pause()
|
2019-06-11 21:14:40 +01:00
|
|
|
|
|
|
|
ul := planet.Uplinks[0]
|
2019-07-01 15:02:00 +01:00
|
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
2019-06-11 21:14:40 +01:00
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
audits.Chore.Loop.TriggerWait()
|
2020-08-20 14:29:02 +01:00
|
|
|
queue := audits.Queues.Fetch()
|
2020-12-14 12:54:22 +00:00
|
|
|
queueSegment, err := queue.Next()
|
2019-09-11 23:37:01 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
|
|
|
StreamID: queueSegment.StreamID,
|
|
|
|
Position: queueSegment.Position,
|
|
|
|
})
|
2019-06-11 21:14:40 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
randomIndex, err := audit.GetRandomStripe(ctx, segment)
|
2019-06-11 21:14:40 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
orders := satellite.Orders.Service
|
|
|
|
containment := satellite.DB.Containment()
|
2019-06-11 21:14:40 +01:00
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
shareSize := segment.Redundancy.ShareSize
|
|
|
|
pieces := segment.Pieces
|
|
|
|
rootPieceID := segment.RootPieceID
|
2019-06-11 21:14:40 +01:00
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, queueSegment.Bucket(), pieces[0].StorageNode, pieces[0].Number, rootPieceID, shareSize)
|
2019-06-11 21:14:40 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(pieces[0].Number))
|
2019-06-11 21:14:40 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
pending := &audit.PendingAudit{
|
2020-12-14 12:54:22 +00:00
|
|
|
NodeID: pieces[0].StorageNode,
|
2019-06-11 21:14:40 +01:00
|
|
|
PieceID: rootPieceID,
|
2019-09-11 23:37:01 +01:00
|
|
|
StripeIndex: randomIndex,
|
2019-06-11 21:14:40 +01:00
|
|
|
ShareSize: shareSize,
|
|
|
|
ExpectedShareHash: pkcrypto.SHA256Hash(share.Data),
|
|
|
|
ReverifyCount: 0,
|
2020-12-14 12:54:22 +00:00
|
|
|
Segment: queueSegment.SegmentLocation,
|
2019-06-11 21:14:40 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
err = containment.IncrementPending(ctx, pending)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
// delete the piece from the first node
|
2020-12-14 12:54:22 +00:00
|
|
|
piece := pieces[0]
|
|
|
|
pieceID := rootPieceID.Derive(piece.StorageNode, int32(piece.Number))
|
|
|
|
node := planet.FindNode(piece.StorageNode)
|
2019-09-11 23:37:01 +01:00
|
|
|
err = node.Storage2.Store.Delete(ctx, satellite.ID(), pieceID)
|
2019-06-11 21:14:40 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
report, err := audits.Verifier.Reverify(ctx, queueSegment)
|
2019-06-11 21:14:40 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
require.Len(t, report.Successes, 0)
|
|
|
|
require.Len(t, report.Offlines, 0)
|
|
|
|
require.Len(t, report.PendingAudits, 0)
|
|
|
|
require.Len(t, report.Fails, 1)
|
2020-12-14 12:54:22 +00:00
|
|
|
require.Equal(t, report.Fails[0], pieces[0].StorageNode)
|
2020-03-04 23:09:18 +00:00
|
|
|
|
|
|
|
// record audit
|
2020-12-14 12:54:22 +00:00
|
|
|
_, err = audits.Reporter.RecordAudits(ctx, report)
|
2020-03-04 23:09:18 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
// make sure that pending audit is removed by the reporter when audit is recorded
|
|
|
|
_, err = containment.Get(ctx, pending.NodeID)
|
|
|
|
require.True(t, audit.ErrContainedNotFound.Has(err))
|
2019-06-11 21:14:40 +01:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestReverifyFailBadData(t *testing.T) {
|
|
|
|
testplanet.Run(t, testplanet.Config{
|
2019-07-01 15:02:00 +01:00
|
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
2019-06-11 21:14:40 +01:00
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
|
|
// - uploads random data
|
|
|
|
// - uses the cursor to get a stripe
|
|
|
|
// - creates a pending audit for a node holding a piece for that stripe
|
|
|
|
// - makes ExpectedShareHash have random data
|
|
|
|
// - calls reverify on that same stripe
|
|
|
|
// - expects one storage node to be marked as a fail in the audit report
|
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
|
|
|
|
audits.Worker.Loop.Pause()
|
2020-07-13 23:24:15 +01:00
|
|
|
audits.Chore.Loop.Pause()
|
2019-06-11 21:14:40 +01:00
|
|
|
|
|
|
|
ul := planet.Uplinks[0]
|
2019-07-01 15:02:00 +01:00
|
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
2019-06-11 21:14:40 +01:00
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
audits.Chore.Loop.TriggerWait()
|
2020-08-20 14:29:02 +01:00
|
|
|
queue := audits.Queues.Fetch()
|
2020-12-14 12:54:22 +00:00
|
|
|
queueSegment, err := queue.Next()
|
2019-09-11 23:37:01 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
|
|
|
StreamID: queueSegment.StreamID,
|
|
|
|
Position: queueSegment.Position,
|
|
|
|
})
|
2019-06-11 21:14:40 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
randomIndex, err := audit.GetRandomStripe(ctx, segment)
|
2019-06-11 21:14:40 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
pieces := segment.Pieces
|
|
|
|
rootPieceID := segment.RootPieceID
|
|
|
|
redundancy := segment.Redundancy
|
2019-06-11 21:14:40 +01:00
|
|
|
|
|
|
|
pending := &audit.PendingAudit{
|
2020-12-14 12:54:22 +00:00
|
|
|
NodeID: pieces[0].StorageNode,
|
2019-06-11 21:14:40 +01:00
|
|
|
PieceID: rootPieceID,
|
2019-09-11 23:37:01 +01:00
|
|
|
StripeIndex: randomIndex,
|
2020-12-14 12:54:22 +00:00
|
|
|
ShareSize: redundancy.ShareSize,
|
2019-07-01 15:02:00 +01:00
|
|
|
ExpectedShareHash: pkcrypto.SHA256Hash(nil),
|
2019-06-11 21:14:40 +01:00
|
|
|
ReverifyCount: 0,
|
2020-12-14 12:54:22 +00:00
|
|
|
Segment: queueSegment.SegmentLocation,
|
2019-06-11 21:14:40 +01:00
|
|
|
}
|
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
err = satellite.DB.Containment().IncrementPending(ctx, pending)
|
2019-06-11 21:14:40 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
nodeID := pieces[0].StorageNode
|
|
|
|
report, err := audits.Verifier.Reverify(ctx, queueSegment)
|
2019-06-11 21:14:40 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
require.Len(t, report.Successes, 0)
|
|
|
|
require.Len(t, report.Offlines, 0)
|
|
|
|
require.Len(t, report.PendingAudits, 0)
|
|
|
|
require.Len(t, report.Fails, 1)
|
2019-07-18 19:08:15 +01:00
|
|
|
require.Equal(t, report.Fails[0], nodeID)
|
2020-03-04 23:09:18 +00:00
|
|
|
|
|
|
|
// record audit
|
2020-12-14 12:54:22 +00:00
|
|
|
_, err = audits.Reporter.RecordAudits(ctx, report)
|
2020-03-04 23:09:18 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
// make sure that pending audit is removed by the reporter when audit is recorded
|
|
|
|
containment := satellite.DB.Containment()
|
|
|
|
_, err = containment.Get(ctx, pending.NodeID)
|
|
|
|
require.True(t, audit.ErrContainedNotFound.Has(err))
|
2019-06-11 21:14:40 +01:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestReverifyOffline(t *testing.T) {
|
|
|
|
testplanet.Run(t, testplanet.Config{
|
2019-07-01 15:02:00 +01:00
|
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
2019-06-11 21:14:40 +01:00
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
|
|
// - uploads random data
|
|
|
|
// - uses the cursor to get a stripe
|
|
|
|
// - creates pending audits for one node holding a piece for that stripe
|
|
|
|
// - stop the node that has the pending audit
|
|
|
|
// - calls reverify on that same stripe
|
|
|
|
// - expects one storage node to be marked as offline in the audit report
|
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
|
|
|
|
audits.Worker.Loop.Pause()
|
2020-07-13 23:24:15 +01:00
|
|
|
audits.Chore.Loop.Pause()
|
2019-06-11 21:14:40 +01:00
|
|
|
|
|
|
|
ul := planet.Uplinks[0]
|
2019-07-01 15:02:00 +01:00
|
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
2019-06-11 21:14:40 +01:00
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
2019-06-11 21:14:40 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
audits.Chore.Loop.TriggerWait()
|
2020-08-20 14:29:02 +01:00
|
|
|
queue := audits.Queues.Fetch()
|
2020-12-14 12:54:22 +00:00
|
|
|
queueSegment, err := queue.Next()
|
2019-06-11 21:14:40 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
|
|
|
StreamID: queueSegment.StreamID,
|
|
|
|
Position: queueSegment.Position,
|
|
|
|
})
|
2019-09-11 23:37:01 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
randomIndex, err := audit.GetRandomStripe(ctx, segment)
|
2019-09-11 23:37:01 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
pieces := segment.Pieces
|
|
|
|
rootPieceID := segment.RootPieceID
|
|
|
|
redundancy := segment.Redundancy
|
2019-06-11 21:14:40 +01:00
|
|
|
|
|
|
|
pending := &audit.PendingAudit{
|
2020-12-14 12:54:22 +00:00
|
|
|
NodeID: pieces[0].StorageNode,
|
2019-06-11 21:14:40 +01:00
|
|
|
PieceID: rootPieceID,
|
2019-09-11 23:37:01 +01:00
|
|
|
StripeIndex: randomIndex,
|
2020-12-14 12:54:22 +00:00
|
|
|
ShareSize: redundancy.ShareSize,
|
2019-06-26 11:38:51 +01:00
|
|
|
ExpectedShareHash: pkcrypto.SHA256Hash(testrand.Bytes(10)),
|
2019-06-11 21:14:40 +01:00
|
|
|
ReverifyCount: 0,
|
2020-12-14 12:54:22 +00:00
|
|
|
Segment: queueSegment.SegmentLocation,
|
2019-06-11 21:14:40 +01:00
|
|
|
}
|
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
err = satellite.DB.Containment().IncrementPending(ctx, pending)
|
2019-06-11 21:14:40 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
err = planet.StopNodeAndUpdate(ctx, planet.FindNode(pieces[0].StorageNode))
|
2019-06-11 21:14:40 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
report, err := audits.Verifier.Reverify(ctx, queueSegment)
|
2019-06-11 21:14:40 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
require.Len(t, report.Successes, 0)
|
|
|
|
require.Len(t, report.Fails, 0)
|
|
|
|
require.Len(t, report.PendingAudits, 0)
|
|
|
|
require.Len(t, report.Offlines, 1)
|
2020-12-14 12:54:22 +00:00
|
|
|
require.Equal(t, report.Offlines[0], pieces[0].StorageNode)
|
2020-03-04 23:09:18 +00:00
|
|
|
|
|
|
|
// make sure that pending audit is not removed
|
|
|
|
containment := satellite.DB.Containment()
|
|
|
|
_, err = containment.Get(ctx, pending.NodeID)
|
|
|
|
require.NoError(t, err)
|
2019-06-11 21:14:40 +01:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestReverifyOfflineDialTimeout(t *testing.T) {
|
|
|
|
testplanet.Run(t, testplanet.Config{
|
2019-07-01 15:02:00 +01:00
|
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
2019-06-11 21:14:40 +01:00
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
|
|
// - uploads random data
|
|
|
|
// - uses the cursor to get a stripe
|
|
|
|
// - creates pending audit for one node holding a piece for that stripe
|
|
|
|
// - uses a slow transport client so that dial timeout will happen (an offline case)
|
|
|
|
// - calls reverify on that same stripe
|
|
|
|
// - expects one storage node to be marked as offline in the audit report
|
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
|
|
|
|
audits.Worker.Loop.Pause()
|
2020-07-13 23:24:15 +01:00
|
|
|
audits.Chore.Loop.Pause()
|
2019-06-11 21:14:40 +01:00
|
|
|
|
|
|
|
ul := planet.Uplinks[0]
|
2019-07-01 15:02:00 +01:00
|
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
2019-06-11 21:14:40 +01:00
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
2019-06-11 21:14:40 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
audits.Chore.Loop.TriggerWait()
|
2020-08-20 14:29:02 +01:00
|
|
|
queue := audits.Queues.Fetch()
|
2020-12-14 12:54:22 +00:00
|
|
|
queueSegment, err := queue.Next()
|
2019-09-11 23:37:01 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
|
|
|
StreamID: queueSegment.StreamID,
|
|
|
|
Position: queueSegment.Position,
|
|
|
|
})
|
2019-09-11 23:37:01 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
randomIndex, err := audit.GetRandomStripe(ctx, segment)
|
2019-06-11 21:14:40 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2019-09-19 05:46:39 +01:00
|
|
|
tlsOptions, err := tlsopts.NewOptions(satellite.Identity, tlsopts.Config{}, nil)
|
2019-06-11 21:14:40 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2019-09-19 05:46:39 +01:00
|
|
|
dialer := rpc.NewDefaultDialer(tlsOptions)
|
|
|
|
dialer.DialTimeout = 20 * time.Millisecond
|
|
|
|
dialer.DialLatency = 200 * time.Second
|
2020-09-30 18:39:47 +01:00
|
|
|
|
|
|
|
connector := rpc.NewDefaultTCPConnector(nil)
|
|
|
|
connector.TransferRate = 1 * memory.KB
|
|
|
|
dialer.Connector = connector
|
2019-06-11 21:14:40 +01:00
|
|
|
|
|
|
|
// This config value will create a very short timeframe allowed for receiving
|
|
|
|
// data from storage nodes. This will cause context to cancel and start
|
|
|
|
// downloading from new nodes.
|
|
|
|
minBytesPerSecond := 100 * memory.KiB
|
|
|
|
|
2019-07-01 15:02:00 +01:00
|
|
|
verifier := audit.NewVerifier(
|
2019-09-11 23:37:01 +01:00
|
|
|
satellite.Log.Named("verifier"),
|
2020-12-14 12:54:22 +00:00
|
|
|
satellite.Metainfo.Metabase,
|
2019-09-19 05:46:39 +01:00
|
|
|
dialer,
|
2019-09-11 23:37:01 +01:00
|
|
|
satellite.Overlay.Service,
|
|
|
|
satellite.DB.Containment(),
|
|
|
|
satellite.Orders.Service,
|
|
|
|
satellite.Identity,
|
2019-06-11 21:14:40 +01:00
|
|
|
minBytesPerSecond,
|
|
|
|
5*time.Second)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
pieces := segment.Pieces
|
|
|
|
rootPieceID := segment.RootPieceID
|
|
|
|
redundancy := segment.Redundancy
|
2019-06-11 21:14:40 +01:00
|
|
|
|
|
|
|
pending := &audit.PendingAudit{
|
2020-12-14 12:54:22 +00:00
|
|
|
NodeID: pieces[0].StorageNode,
|
2019-06-11 21:14:40 +01:00
|
|
|
PieceID: rootPieceID,
|
2019-09-11 23:37:01 +01:00
|
|
|
StripeIndex: randomIndex,
|
2020-12-14 12:54:22 +00:00
|
|
|
ShareSize: redundancy.ShareSize,
|
2019-07-01 15:02:00 +01:00
|
|
|
ExpectedShareHash: pkcrypto.SHA256Hash(nil),
|
2019-06-11 21:14:40 +01:00
|
|
|
ReverifyCount: 0,
|
2020-12-14 12:54:22 +00:00
|
|
|
Segment: queueSegment.SegmentLocation,
|
2019-06-11 21:14:40 +01:00
|
|
|
}
|
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
err = satellite.DB.Containment().IncrementPending(ctx, pending)
|
2019-06-11 21:14:40 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
report, err := verifier.Reverify(ctx, queueSegment)
|
2019-06-11 21:14:40 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
require.Len(t, report.Successes, 0)
|
|
|
|
require.Len(t, report.Fails, 0)
|
|
|
|
require.Len(t, report.PendingAudits, 0)
|
|
|
|
require.Len(t, report.Offlines, 1)
|
|
|
|
require.Equal(t, report.Offlines[0], pending.NodeID)
|
2020-03-04 23:09:18 +00:00
|
|
|
|
|
|
|
// make sure that pending audit is not removed
|
|
|
|
containment := satellite.DB.Containment()
|
|
|
|
_, err = containment.Get(ctx, pending.NodeID)
|
|
|
|
require.NoError(t, err)
|
2019-06-11 21:14:40 +01:00
|
|
|
})
|
|
|
|
}
|
2019-06-19 10:02:25 +01:00
|
|
|
|
|
|
|
func TestReverifyDeletedSegment(t *testing.T) {
|
|
|
|
testplanet.Run(t, testplanet.Config{
|
|
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
2020-01-21 10:38:41 +00:00
|
|
|
Reconfigure: testplanet.Reconfigure{
|
|
|
|
Satellite: testplanet.ReconfigureRS(1, 2, 4, 4),
|
|
|
|
},
|
2019-06-19 10:02:25 +01:00
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
2019-09-29 03:03:15 +01:00
|
|
|
// - uploads random data to all nodes
|
2020-12-14 12:54:22 +00:00
|
|
|
// - gets a segment from the audit queue
|
2019-09-11 23:37:01 +01:00
|
|
|
// - creates one pending audit for a node holding a piece for that segment
|
2019-06-19 10:02:25 +01:00
|
|
|
// - deletes the file
|
2019-09-29 03:03:15 +01:00
|
|
|
// - calls reverify on the deleted file
|
|
|
|
// - expects reverification to return a segment deleted error, and expects the storage node to still be in containment
|
|
|
|
// - uploads a new file and calls reverify on it
|
2019-06-19 10:02:25 +01:00
|
|
|
// - expects reverification to pass successufully and the storage node to be not in containment mode
|
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
|
|
|
|
audits.Worker.Loop.Pause()
|
2020-07-13 23:24:15 +01:00
|
|
|
audits.Chore.Loop.Pause()
|
2019-06-19 10:02:25 +01:00
|
|
|
|
|
|
|
ul := planet.Uplinks[0]
|
2019-09-29 03:03:15 +01:00
|
|
|
testData1 := testrand.Bytes(8 * memory.KiB)
|
2020-01-21 10:38:41 +00:00
|
|
|
err := ul.Upload(ctx, satellite, "testbucket", "test/path1", testData1)
|
2019-06-19 10:02:25 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
audits.Chore.Loop.TriggerWait()
|
2020-08-20 14:29:02 +01:00
|
|
|
queue := audits.Queues.Fetch()
|
2020-12-14 12:54:22 +00:00
|
|
|
queueSegment, err := queue.Next()
|
2019-06-19 10:02:25 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
|
|
|
StreamID: queueSegment.StreamID,
|
|
|
|
Position: queueSegment.Position,
|
|
|
|
})
|
2019-09-11 23:37:01 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
randomIndex, err := audit.GetRandomStripe(ctx, segment)
|
2019-09-11 23:37:01 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
nodeID := segment.Pieces[0].StorageNode
|
2019-06-19 10:02:25 +01:00
|
|
|
pending := &audit.PendingAudit{
|
|
|
|
NodeID: nodeID,
|
2020-12-14 12:54:22 +00:00
|
|
|
PieceID: segment.RootPieceID,
|
2019-09-11 23:37:01 +01:00
|
|
|
StripeIndex: randomIndex,
|
2020-12-14 12:54:22 +00:00
|
|
|
ShareSize: segment.Redundancy.ShareSize,
|
2019-06-19 10:02:25 +01:00
|
|
|
ExpectedShareHash: pkcrypto.SHA256Hash(nil),
|
|
|
|
ReverifyCount: 0,
|
2020-12-14 12:54:22 +00:00
|
|
|
Segment: queueSegment.SegmentLocation,
|
2019-06-19 10:02:25 +01:00
|
|
|
}
|
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
containment := satellite.DB.Containment()
|
2019-06-19 10:02:25 +01:00
|
|
|
err = containment.IncrementPending(ctx, pending)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
// delete the file
|
2020-02-10 12:18:18 +00:00
|
|
|
err = ul.DeleteObject(ctx, satellite, "testbucket", "test/path1")
|
2019-06-19 10:02:25 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-08-25 14:32:05 +01:00
|
|
|
// call reverify on the deleted file and expect no error
|
2019-09-29 03:03:15 +01:00
|
|
|
// but expect that the node is still in containment
|
2020-12-14 12:54:22 +00:00
|
|
|
report, err := audits.Verifier.Reverify(ctx, queueSegment)
|
2020-08-25 14:32:05 +01:00
|
|
|
require.NoError(t, err)
|
2019-06-19 10:02:25 +01:00
|
|
|
assert.Empty(t, report)
|
|
|
|
|
2019-09-29 03:03:15 +01:00
|
|
|
_, err = containment.Get(ctx, nodeID)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
// upload a new file to call reverify on
|
|
|
|
testData2 := testrand.Bytes(8 * memory.KiB)
|
2020-01-21 10:38:41 +00:00
|
|
|
err = ul.Upload(ctx, satellite, "testbucket", "test/path2", testData2)
|
2019-09-29 03:03:15 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
audits.Chore.Loop.TriggerWait()
|
2020-08-20 14:29:02 +01:00
|
|
|
queue = audits.Queues.Fetch()
|
2020-12-14 12:54:22 +00:00
|
|
|
queueSegment, err = queue.Next()
|
2019-09-29 03:03:15 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
// reverify the new segment
|
|
|
|
report, err = audits.Verifier.Reverify(ctx, queueSegment)
|
2019-09-29 03:03:15 +01:00
|
|
|
require.NoError(t, err)
|
2019-10-07 21:06:10 +01:00
|
|
|
assert.Empty(t, report.Fails)
|
|
|
|
assert.Empty(t, report.Successes)
|
|
|
|
assert.Empty(t, report.PendingAudits)
|
2019-09-29 03:03:15 +01:00
|
|
|
|
|
|
|
// expect that the node was removed from containment since the segment it was contained for has been deleted
|
2019-06-19 10:02:25 +01:00
|
|
|
_, err = containment.Get(ctx, nodeID)
|
|
|
|
require.True(t, audit.ErrContainedNotFound.Has(err))
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestReverifyModifiedSegment(t *testing.T) {
|
2020-03-04 23:09:18 +00:00
|
|
|
testplanet.Run(t, testplanet.Config{
|
|
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
|
|
|
Reconfigure: testplanet.Reconfigure{
|
|
|
|
Satellite: testplanet.ReconfigureRS(1, 2, 4, 4),
|
|
|
|
},
|
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
|
|
// - uploads random data to a file on all nodes
|
|
|
|
// - creates a pending audit for a particular node in that file
|
|
|
|
// - removes a piece from the file so that the segment is modified
|
|
|
|
// - uploads a new file to all nodes and calls reverify on it
|
|
|
|
// - expects reverification to pass successufully and the storage node to be not in containment mode
|
|
|
|
|
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
|
|
|
|
audits.Worker.Loop.Pause()
|
2020-07-13 23:24:15 +01:00
|
|
|
audits.Chore.Loop.Pause()
|
2020-03-04 23:09:18 +00:00
|
|
|
|
|
|
|
ul := planet.Uplinks[0]
|
|
|
|
testData1 := testrand.Bytes(8 * memory.KiB)
|
|
|
|
err := ul.Upload(ctx, satellite, "testbucket", "test/path1", testData1)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
audits.Chore.Loop.TriggerWait()
|
2020-08-20 14:29:02 +01:00
|
|
|
queue := audits.Queues.Fetch()
|
2020-12-14 12:54:22 +00:00
|
|
|
queueSegment, err := queue.Next()
|
2020-03-04 23:09:18 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
|
|
|
StreamID: queueSegment.StreamID,
|
|
|
|
Position: queueSegment.Position,
|
|
|
|
})
|
2020-03-04 23:09:18 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
randomIndex, err := audit.GetRandomStripe(ctx, segment)
|
2020-03-04 23:09:18 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
nodeID := segment.Pieces[0].StorageNode
|
2020-03-04 23:09:18 +00:00
|
|
|
pending := &audit.PendingAudit{
|
|
|
|
NodeID: nodeID,
|
2020-12-14 12:54:22 +00:00
|
|
|
PieceID: segment.RootPieceID,
|
2020-03-04 23:09:18 +00:00
|
|
|
StripeIndex: randomIndex,
|
2020-12-14 12:54:22 +00:00
|
|
|
ShareSize: segment.Redundancy.ShareSize,
|
2020-03-04 23:09:18 +00:00
|
|
|
ExpectedShareHash: pkcrypto.SHA256Hash(nil),
|
|
|
|
ReverifyCount: 0,
|
2020-12-14 12:54:22 +00:00
|
|
|
Segment: queueSegment.SegmentLocation,
|
2020-03-04 23:09:18 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
containment := satellite.DB.Containment()
|
|
|
|
|
|
|
|
err = containment.IncrementPending(ctx, pending)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
// remove a piece from the file (a piece that the contained node isn't holding)
|
|
|
|
audits.Verifier.OnTestingCheckSegmentAlteredHook = func() {
|
2020-12-14 12:54:22 +00:00
|
|
|
err = satellite.Metainfo.Metabase.UpdateSegmentPieces(ctx, metabase.UpdateSegmentPieces{
|
|
|
|
StreamID: queueSegment.StreamID,
|
|
|
|
Position: queueSegment.Position,
|
|
|
|
OldPieces: segment.Pieces,
|
|
|
|
NewPieces: append([]metabase.Piece{segment.Pieces[0]}, segment.Pieces[2:]...),
|
|
|
|
})
|
2020-03-04 23:09:18 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// upload another file to call reverify on
|
|
|
|
testData2 := testrand.Bytes(8 * memory.KiB)
|
|
|
|
err = ul.Upload(ctx, satellite, "testbucket", "test/path2", testData2)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
// select the segment that was not used for the pending audit
|
2020-03-04 23:09:18 +00:00
|
|
|
audits.Chore.Loop.TriggerWait()
|
2020-08-20 14:29:02 +01:00
|
|
|
queue = audits.Queues.Fetch()
|
2020-12-14 12:54:22 +00:00
|
|
|
queueSegment1, err := queue.Next()
|
2020-03-04 23:09:18 +00:00
|
|
|
require.NoError(t, err)
|
2020-12-14 12:54:22 +00:00
|
|
|
queueSegment2, err := queue.Next()
|
2020-03-04 23:09:18 +00:00
|
|
|
require.NoError(t, err)
|
2020-12-14 12:54:22 +00:00
|
|
|
reverifySegment := queueSegment1
|
|
|
|
if queueSegment1 == queueSegment {
|
|
|
|
reverifySegment = queueSegment2
|
2020-03-04 23:09:18 +00:00
|
|
|
}
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
// reverify the segment that was not modified
|
|
|
|
report, err := audits.Verifier.Reverify(ctx, reverifySegment)
|
2020-03-04 23:09:18 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
assert.Empty(t, report.Fails)
|
|
|
|
assert.Empty(t, report.Successes)
|
|
|
|
assert.Empty(t, report.PendingAudits)
|
|
|
|
|
|
|
|
// expect that the node was removed from containment since the segment it was contained for has been changed
|
|
|
|
_, err = containment.Get(ctx, nodeID)
|
|
|
|
require.True(t, audit.ErrContainedNotFound.Has(err))
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestReverifyReplacedSegment(t *testing.T) {
|
2019-06-19 10:02:25 +01:00
|
|
|
testplanet.Run(t, testplanet.Config{
|
|
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
2020-01-21 10:38:41 +00:00
|
|
|
Reconfigure: testplanet.Reconfigure{
|
|
|
|
Satellite: testplanet.ReconfigureRS(1, 2, 4, 4),
|
|
|
|
},
|
2019-06-19 10:02:25 +01:00
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
2019-09-29 03:03:15 +01:00
|
|
|
// - uploads random data to a file on all nodes
|
|
|
|
// - creates a pending audit for a particular node in that file
|
|
|
|
// - re-uploads the file so that the segment is modified
|
|
|
|
// - uploads a new file to all nodes and calls reverify on it
|
2019-06-19 10:02:25 +01:00
|
|
|
// - expects reverification to pass successufully and the storage node to be not in containment mode
|
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
|
|
|
|
audits.Worker.Loop.Pause()
|
2020-07-13 23:24:15 +01:00
|
|
|
audits.Chore.Loop.Pause()
|
2019-06-19 10:02:25 +01:00
|
|
|
|
|
|
|
ul := planet.Uplinks[0]
|
2019-09-29 03:03:15 +01:00
|
|
|
testData1 := testrand.Bytes(8 * memory.KiB)
|
2020-01-21 10:38:41 +00:00
|
|
|
err := ul.Upload(ctx, satellite, "testbucket", "test/path1", testData1)
|
2019-09-11 23:37:01 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
audits.Chore.Loop.TriggerWait()
|
2020-08-20 14:29:02 +01:00
|
|
|
queue := audits.Queues.Fetch()
|
2020-12-14 12:54:22 +00:00
|
|
|
queueSegment, err := queue.Next()
|
2019-09-11 23:37:01 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
|
|
|
StreamID: queueSegment.StreamID,
|
|
|
|
Position: queueSegment.Position,
|
|
|
|
})
|
2019-06-19 10:02:25 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
randomIndex, err := audit.GetRandomStripe(ctx, segment)
|
2019-06-19 10:02:25 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
nodeID := segment.Pieces[0].StorageNode
|
2019-06-19 10:02:25 +01:00
|
|
|
pending := &audit.PendingAudit{
|
|
|
|
NodeID: nodeID,
|
2020-12-14 12:54:22 +00:00
|
|
|
PieceID: segment.RootPieceID,
|
2019-09-11 23:37:01 +01:00
|
|
|
StripeIndex: randomIndex,
|
2020-12-14 12:54:22 +00:00
|
|
|
ShareSize: segment.Redundancy.ShareSize,
|
2019-06-19 10:02:25 +01:00
|
|
|
ExpectedShareHash: pkcrypto.SHA256Hash(nil),
|
|
|
|
ReverifyCount: 0,
|
2020-12-14 12:54:22 +00:00
|
|
|
Segment: queueSegment.SegmentLocation,
|
2019-06-19 10:02:25 +01:00
|
|
|
}
|
|
|
|
|
2019-09-11 23:37:01 +01:00
|
|
|
containment := satellite.DB.Containment()
|
2019-06-19 10:02:25 +01:00
|
|
|
|
|
|
|
err = containment.IncrementPending(ctx, pending)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
// replace the file
|
2019-09-29 03:03:15 +01:00
|
|
|
err = ul.Upload(ctx, satellite, "testbucket", "test/path1", testData1)
|
2019-06-19 10:02:25 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2019-09-29 03:03:15 +01:00
|
|
|
// upload another file to call reverify on
|
|
|
|
testData2 := testrand.Bytes(8 * memory.KiB)
|
2020-01-21 10:38:41 +00:00
|
|
|
err = ul.Upload(ctx, satellite, "testbucket", "test/path2", testData2)
|
2019-09-29 03:03:15 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
// select the segment that was not used for the pending audit
|
2019-09-29 03:03:15 +01:00
|
|
|
audits.Chore.Loop.TriggerWait()
|
2020-08-20 14:29:02 +01:00
|
|
|
queue = audits.Queues.Fetch()
|
2020-12-14 12:54:22 +00:00
|
|
|
queueSegment1, err := queue.Next()
|
2019-09-29 03:03:15 +01:00
|
|
|
require.NoError(t, err)
|
2020-12-14 12:54:22 +00:00
|
|
|
queueSegment2, err := queue.Next()
|
2019-09-29 03:03:15 +01:00
|
|
|
require.NoError(t, err)
|
2020-12-14 12:54:22 +00:00
|
|
|
reverifySegment := queueSegment1
|
|
|
|
if queueSegment1 == queueSegment {
|
|
|
|
reverifySegment = queueSegment2
|
2019-09-29 03:03:15 +01:00
|
|
|
}
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
// reverify the segment that was not modified
|
|
|
|
report, err := audits.Verifier.Reverify(ctx, reverifySegment)
|
2019-06-19 10:02:25 +01:00
|
|
|
require.NoError(t, err)
|
2019-10-07 21:06:10 +01:00
|
|
|
assert.Empty(t, report.Fails)
|
|
|
|
assert.Empty(t, report.Successes)
|
|
|
|
assert.Empty(t, report.PendingAudits)
|
2019-06-19 10:02:25 +01:00
|
|
|
|
2019-09-29 03:03:15 +01:00
|
|
|
// expect that the node was removed from containment since the segment it was contained for has been changed
|
2019-06-19 10:02:25 +01:00
|
|
|
_, err = containment.Get(ctx, nodeID)
|
|
|
|
require.True(t, audit.ErrContainedNotFound.Has(err))
|
|
|
|
})
|
|
|
|
}
|
2019-09-19 00:45:15 +01:00
|
|
|
|
|
|
|
func TestReverifyDifferentShare(t *testing.T) {
|
|
|
|
testplanet.Run(t, testplanet.Config{
|
|
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
2020-01-21 10:38:41 +00:00
|
|
|
Reconfigure: testplanet.Reconfigure{
|
|
|
|
// upload to three nodes so there is definitely at least one node overlap between the two files
|
|
|
|
Satellite: testplanet.ReconfigureRS(1, 2, 3, 3),
|
|
|
|
},
|
2019-09-19 00:45:15 +01:00
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
|
|
// - uploads random data to two files
|
|
|
|
// - get a random stripe to audit from file 1
|
|
|
|
// - creates one pending audit for a node holding a piece for that stripe
|
|
|
|
// - the actual share is downloaded to make sure ExpectedShareHash is correct
|
|
|
|
// - delete piece for file 1 from the selected node
|
|
|
|
// - calls reverify on some stripe from file 2
|
|
|
|
// - expects one storage node to be marked as a fail in the audit report
|
|
|
|
// - (if file 2 is used during reverify, the node will pass the audit and the test should fail)
|
|
|
|
|
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
|
|
|
|
audits.Worker.Loop.Pause()
|
2020-07-13 23:24:15 +01:00
|
|
|
audits.Chore.Loop.Pause()
|
2019-09-19 00:45:15 +01:00
|
|
|
|
|
|
|
ul := planet.Uplinks[0]
|
|
|
|
testData1 := testrand.Bytes(8 * memory.KiB)
|
|
|
|
testData2 := testrand.Bytes(8 * memory.KiB)
|
2020-01-21 10:38:41 +00:00
|
|
|
|
|
|
|
err := ul.Upload(ctx, satellite, "testbucket", "test/path1", testData1)
|
2019-09-19 00:45:15 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-01-21 10:38:41 +00:00
|
|
|
err = ul.Upload(ctx, satellite, "testbucket", "test/path2", testData2)
|
2019-09-19 00:45:15 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
audits.Chore.Loop.TriggerWait()
|
2020-08-20 14:29:02 +01:00
|
|
|
queue := audits.Queues.Fetch()
|
2020-12-14 12:54:22 +00:00
|
|
|
queueSegment1, err := queue.Next()
|
2019-09-19 00:45:15 +01:00
|
|
|
require.NoError(t, err)
|
2020-12-14 12:54:22 +00:00
|
|
|
queueSegment2, err := queue.Next()
|
2019-09-19 00:45:15 +01:00
|
|
|
require.NoError(t, err)
|
2020-12-14 12:54:22 +00:00
|
|
|
require.NotEqual(t, queueSegment1, queueSegment2)
|
2019-09-19 00:45:15 +01:00
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
segment1, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
|
|
|
StreamID: queueSegment1.StreamID,
|
|
|
|
Position: queueSegment1.Position,
|
|
|
|
})
|
2019-09-19 00:45:15 +01:00
|
|
|
require.NoError(t, err)
|
2020-12-14 12:54:22 +00:00
|
|
|
|
|
|
|
segment2, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
|
|
|
StreamID: queueSegment2.StreamID,
|
|
|
|
Position: queueSegment2.Position,
|
|
|
|
})
|
2019-09-19 00:45:15 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
// find a node that contains a piece for both files
|
2020-12-14 12:54:22 +00:00
|
|
|
// save that node ID and the piece number associated with it for segment1
|
2019-09-19 00:45:15 +01:00
|
|
|
var selectedNode storj.NodeID
|
2020-12-14 12:54:22 +00:00
|
|
|
var selectedPieceNum uint16
|
|
|
|
p1Nodes := make(map[storj.NodeID]uint16)
|
|
|
|
for _, piece := range segment1.Pieces {
|
|
|
|
p1Nodes[piece.StorageNode] = piece.Number
|
2019-09-19 00:45:15 +01:00
|
|
|
}
|
2020-12-14 12:54:22 +00:00
|
|
|
for _, piece := range segment2.Pieces {
|
|
|
|
pieceNum, ok := p1Nodes[piece.StorageNode]
|
2019-09-19 00:45:15 +01:00
|
|
|
if ok {
|
2020-12-14 12:54:22 +00:00
|
|
|
selectedNode = piece.StorageNode
|
2019-09-19 00:45:15 +01:00
|
|
|
selectedPieceNum = pieceNum
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
require.NotEqual(t, selectedNode, storj.NodeID{})
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
randomIndex, err := audit.GetRandomStripe(ctx, segment1)
|
2019-09-19 00:45:15 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
orders := satellite.Orders.Service
|
|
|
|
containment := satellite.DB.Containment()
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
shareSize := segment1.Redundancy.ShareSize
|
|
|
|
rootPieceID := segment1.RootPieceID
|
2019-09-19 00:45:15 +01:00
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, queueSegment1.Bucket(), selectedNode, selectedPieceNum, rootPieceID, shareSize)
|
2019-09-19 00:45:15 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
satellite/audit: use LastIPAndPort preferentially
This preserves the last_ip_and_port field from node lookups through
CreateAuditOrderLimits() and CreateAuditOrderLimit(), so that later
calls to (*Verifier).GetShare() can try to use that IP and port. If a
connection to the given IP and port cannot be made, or the connection
cannot be verified and secured with the target node identity, an
attempt is made to connect to the original node address instead.
A similar change is not necessary to the other Create*OrderLimits
functions, because they already replace node addresses with the cached
IP and port as appropriate. We might want to consider making a similar
change to CreateGetRepairOrderLimits(), though.
The audit situation is unique because the ramifications are especially
powerful when we get the address wrong. Failing a single audit can have
a heavy cost to a storage node. We need to make extra effort in order
to avoid imposing that cost unfairly.
Situation 1: If an audit fails because the repair worker failed to make
a DNS query (which might well be the fault on the satellite side), and
we have last_ip_and_port information available for the target node, it
would be unfair not to try connecting to that last_ip_and_port address.
Situation 2: If a node has changed addresses recently and the operator
correctly changed its DNS entry, but we don't bother querying DNS, it
would be unfair to penalize the node for our failure to connect to it.
So the audit worker must try both last_ip_and_port _and_ the node
address as supplied by the SNO.
We elect here to try last_ip_and_port first, on the grounds that (a) it
is expected to work in the large majority of cases, and (b) there
should not be any security concerns with connecting to an out-or-date
address, and (c) avoiding DNS queries on the satellite side helps
alleviate satellite operational load.
Change-Id: I9bf6c6c79866d879adecac6144a6c346f4f61200
2020-09-30 05:53:43 +01:00
|
|
|
share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(selectedPieceNum))
|
2019-09-19 00:45:15 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
pending := &audit.PendingAudit{
|
|
|
|
NodeID: selectedNode,
|
|
|
|
PieceID: rootPieceID,
|
|
|
|
StripeIndex: randomIndex,
|
|
|
|
ShareSize: shareSize,
|
|
|
|
ExpectedShareHash: pkcrypto.SHA256Hash(share.Data),
|
|
|
|
ReverifyCount: 0,
|
2020-12-14 12:54:22 +00:00
|
|
|
Segment: queueSegment1.SegmentLocation,
|
2019-09-19 00:45:15 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
err = containment.IncrementPending(ctx, pending)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
// delete the piece for segment1 from the selected node
|
|
|
|
pieceID := segment1.RootPieceID.Derive(selectedNode, int32(selectedPieceNum))
|
2020-03-27 14:46:40 +00:00
|
|
|
node := planet.FindNode(selectedNode)
|
2019-09-19 00:45:15 +01:00
|
|
|
err = node.Storage2.Store.Delete(ctx, satellite.ID(), pieceID)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
// reverify with segment2. Since the selected node was put in containment for segment1,
|
|
|
|
// it should be audited for segment1 and fail
|
|
|
|
report, err := audits.Verifier.Reverify(ctx, queueSegment2)
|
2019-09-19 00:45:15 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
require.Len(t, report.Successes, 0)
|
|
|
|
require.Len(t, report.Offlines, 0)
|
|
|
|
require.Len(t, report.PendingAudits, 0)
|
|
|
|
require.Len(t, report.Fails, 1)
|
|
|
|
require.Equal(t, report.Fails[0], selectedNode)
|
2020-03-04 23:09:18 +00:00
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
// record audit
|
|
|
|
_, err = audits.Reporter.RecordAudits(ctx, report)
|
2020-03-04 23:09:18 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
// make sure that pending audit is removed by the reporter when audit is recorded
|
|
|
|
_, err = containment.Get(ctx, pending.NodeID)
|
|
|
|
require.True(t, audit.ErrContainedNotFound.Has(err))
|
2019-09-19 00:45:15 +01:00
|
|
|
})
|
|
|
|
}
|
2019-11-05 19:41:48 +00:00
|
|
|
|
2020-07-16 15:18:02 +01:00
|
|
|
// TestReverifyExpired1 tests the case where the segment passed into Reverify is expired.
|
2019-11-05 19:41:48 +00:00
|
|
|
func TestReverifyExpired1(t *testing.T) {
|
|
|
|
testplanet.Run(t, testplanet.Config{
|
|
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
|
|
|
|
audits.Worker.Loop.Pause()
|
2020-07-13 23:24:15 +01:00
|
|
|
audits.Chore.Loop.Pause()
|
2019-11-05 19:41:48 +00:00
|
|
|
|
|
|
|
ul := planet.Uplinks[0]
|
|
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
err := ul.UploadWithExpiration(ctx, satellite, "testbucket", "test/path", testData, time.Now().Add(1*time.Hour))
|
2019-11-05 19:41:48 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
audits.Chore.Loop.TriggerWait()
|
2020-08-20 14:29:02 +01:00
|
|
|
queue := audits.Queues.Fetch()
|
2020-12-14 12:54:22 +00:00
|
|
|
queueSegment, err := queue.Next()
|
2019-11-05 19:41:48 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
// move time into the future so the segment is expired
|
|
|
|
audits.Verifier.SetNow(func() time.Time {
|
|
|
|
return time.Now().Add(2 * time.Hour)
|
|
|
|
})
|
2019-11-05 19:41:48 +00:00
|
|
|
|
2020-08-25 14:32:05 +01:00
|
|
|
// Reverify should not return an error
|
2020-12-14 12:54:22 +00:00
|
|
|
report, err := audits.Verifier.Reverify(ctx, queueSegment)
|
2020-08-25 14:32:05 +01:00
|
|
|
require.NoError(t, err)
|
2019-11-05 19:41:48 +00:00
|
|
|
|
|
|
|
assert.Len(t, report.Successes, 0)
|
|
|
|
assert.Len(t, report.Fails, 0)
|
|
|
|
assert.Len(t, report.Offlines, 0)
|
|
|
|
assert.Len(t, report.PendingAudits, 0)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
// TestReverifyExpired2 tests the case where the segment passed into Reverify is not expired,
|
|
|
|
// but the segment a node is contained for has expired.
|
|
|
|
func TestReverifyExpired2(t *testing.T) {
|
|
|
|
testplanet.Run(t, testplanet.Config{
|
|
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
2020-01-21 10:38:41 +00:00
|
|
|
Reconfigure: testplanet.Reconfigure{
|
|
|
|
// upload to three nodes so there is definitely at least one node overlap between the two files
|
|
|
|
Satellite: testplanet.ReconfigureRS(1, 2, 3, 3),
|
|
|
|
},
|
2019-11-05 19:41:48 +00:00
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
|
|
|
|
audits.Worker.Loop.Pause()
|
2020-07-13 23:24:15 +01:00
|
|
|
audits.Chore.Loop.Pause()
|
2019-11-05 19:41:48 +00:00
|
|
|
|
|
|
|
ul := planet.Uplinks[0]
|
|
|
|
testData1 := testrand.Bytes(8 * memory.KiB)
|
|
|
|
testData2 := testrand.Bytes(8 * memory.KiB)
|
2020-01-21 10:38:41 +00:00
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
err := ul.UploadWithExpiration(ctx, satellite, "testbucket", "test/path1", testData1, time.Now().Add(1*time.Hour))
|
2019-11-05 19:41:48 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-01-21 10:38:41 +00:00
|
|
|
err = ul.Upload(ctx, satellite, "testbucket", "test/path2", testData2)
|
2019-11-05 19:41:48 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
audits.Chore.Loop.TriggerWait()
|
2020-08-20 14:29:02 +01:00
|
|
|
queue := audits.Queues.Fetch()
|
2020-12-14 12:54:22 +00:00
|
|
|
queueSegment1, err := queue.Next()
|
2019-11-05 19:41:48 +00:00
|
|
|
require.NoError(t, err)
|
2020-12-14 12:54:22 +00:00
|
|
|
queueSegment2, err := queue.Next()
|
2019-11-05 19:41:48 +00:00
|
|
|
require.NoError(t, err)
|
2020-12-14 12:54:22 +00:00
|
|
|
require.NotEqual(t, queueSegment1, queueSegment2)
|
2019-11-05 19:41:48 +00:00
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
// make sure queueSegment1 is the one with the expiration date
|
|
|
|
if queueSegment1.ExpirationDate.IsZero() {
|
|
|
|
queueSegment1, queueSegment2 = queueSegment2, queueSegment1
|
|
|
|
}
|
|
|
|
|
|
|
|
segment1, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
|
|
|
StreamID: queueSegment1.StreamID,
|
|
|
|
Position: queueSegment1.Position,
|
|
|
|
})
|
2019-11-05 19:41:48 +00:00
|
|
|
require.NoError(t, err)
|
2020-12-14 12:54:22 +00:00
|
|
|
|
|
|
|
segment2, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
|
|
|
StreamID: queueSegment2.StreamID,
|
|
|
|
Position: queueSegment2.Position,
|
|
|
|
})
|
2019-11-05 19:41:48 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
// find a node that contains a piece for both files
|
2020-12-14 12:54:22 +00:00
|
|
|
// save that node ID and the piece number associated with it for segment1
|
2019-11-05 19:41:48 +00:00
|
|
|
var selectedNode storj.NodeID
|
2020-12-14 12:54:22 +00:00
|
|
|
var selectedPieceNum uint16
|
|
|
|
p1Nodes := make(map[storj.NodeID]uint16)
|
|
|
|
for _, piece := range segment1.Pieces {
|
|
|
|
p1Nodes[piece.StorageNode] = piece.Number
|
2019-11-05 19:41:48 +00:00
|
|
|
}
|
2020-12-14 12:54:22 +00:00
|
|
|
for _, piece := range segment2.Pieces {
|
|
|
|
pieceNum, ok := p1Nodes[piece.StorageNode]
|
2019-11-05 19:41:48 +00:00
|
|
|
if ok {
|
2020-12-14 12:54:22 +00:00
|
|
|
selectedNode = piece.StorageNode
|
2019-11-05 19:41:48 +00:00
|
|
|
selectedPieceNum = pieceNum
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
require.NotEqual(t, selectedNode, storj.NodeID{})
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
randomIndex, err := audit.GetRandomStripe(ctx, segment1)
|
2019-11-05 19:41:48 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
orders := satellite.Orders.Service
|
|
|
|
containment := satellite.DB.Containment()
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
shareSize := segment1.Redundancy.ShareSize
|
|
|
|
rootPieceID := segment1.RootPieceID
|
2019-11-05 19:41:48 +00:00
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, queueSegment1.Bucket(), selectedNode, selectedPieceNum, rootPieceID, shareSize)
|
2019-11-05 19:41:48 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
satellite/audit: use LastIPAndPort preferentially
This preserves the last_ip_and_port field from node lookups through
CreateAuditOrderLimits() and CreateAuditOrderLimit(), so that later
calls to (*Verifier).GetShare() can try to use that IP and port. If a
connection to the given IP and port cannot be made, or the connection
cannot be verified and secured with the target node identity, an
attempt is made to connect to the original node address instead.
A similar change is not necessary to the other Create*OrderLimits
functions, because they already replace node addresses with the cached
IP and port as appropriate. We might want to consider making a similar
change to CreateGetRepairOrderLimits(), though.
The audit situation is unique because the ramifications are especially
powerful when we get the address wrong. Failing a single audit can have
a heavy cost to a storage node. We need to make extra effort in order
to avoid imposing that cost unfairly.
Situation 1: If an audit fails because the repair worker failed to make
a DNS query (which might well be the fault on the satellite side), and
we have last_ip_and_port information available for the target node, it
would be unfair not to try connecting to that last_ip_and_port address.
Situation 2: If a node has changed addresses recently and the operator
correctly changed its DNS entry, but we don't bother querying DNS, it
would be unfair to penalize the node for our failure to connect to it.
So the audit worker must try both last_ip_and_port _and_ the node
address as supplied by the SNO.
We elect here to try last_ip_and_port first, on the grounds that (a) it
is expected to work in the large majority of cases, and (b) there
should not be any security concerns with connecting to an out-or-date
address, and (c) avoiding DNS queries on the satellite side helps
alleviate satellite operational load.
Change-Id: I9bf6c6c79866d879adecac6144a6c346f4f61200
2020-09-30 05:53:43 +01:00
|
|
|
share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(selectedPieceNum))
|
2019-11-05 19:41:48 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
pending := &audit.PendingAudit{
|
|
|
|
NodeID: selectedNode,
|
|
|
|
PieceID: rootPieceID,
|
|
|
|
StripeIndex: randomIndex,
|
|
|
|
ShareSize: shareSize,
|
|
|
|
ExpectedShareHash: pkcrypto.SHA256Hash(share.Data),
|
|
|
|
ReverifyCount: 0,
|
2020-12-14 12:54:22 +00:00
|
|
|
Segment: queueSegment1.SegmentLocation,
|
2019-11-05 19:41:48 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
err = containment.IncrementPending(ctx, pending)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
// move time into the future so segment1 is expired
|
|
|
|
audits.Verifier.SetNow(func() time.Time {
|
|
|
|
return time.Now().Add(2 * time.Hour)
|
|
|
|
})
|
2019-11-05 19:41:48 +00:00
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
// reverify with segment2. Since the selected node was put in containment for segment1,
|
|
|
|
// it should be audited for segment1
|
|
|
|
// since segment1 has expired, we expect no failure and we expect that the segment has been deleted
|
2019-11-05 19:41:48 +00:00
|
|
|
// and that the selected node has been removed from containment mode
|
2020-12-14 12:54:22 +00:00
|
|
|
report, err := audits.Verifier.Reverify(ctx, queueSegment2)
|
2019-11-05 19:41:48 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
require.Len(t, report.Successes, 0)
|
|
|
|
require.Len(t, report.Offlines, 0)
|
|
|
|
require.Len(t, report.PendingAudits, 0)
|
|
|
|
require.Len(t, report.Fails, 0)
|
|
|
|
|
|
|
|
// Reverify should remove the node from containment mode
|
|
|
|
_, err = containment.Get(ctx, pending.NodeID)
|
2020-12-14 12:54:22 +00:00
|
|
|
require.True(t, audit.ErrContainedNotFound.Has(err))
|
2019-11-05 19:41:48 +00:00
|
|
|
})
|
|
|
|
}
|
2019-11-19 16:30:28 +00:00
|
|
|
|
|
|
|
// TestReverifySlowDownload checks that a node that times out while sending data to the
|
2020-07-16 15:18:02 +01:00
|
|
|
// audit service gets put into containment mode.
|
2019-11-19 16:30:28 +00:00
|
|
|
func TestReverifySlowDownload(t *testing.T) {
|
|
|
|
testplanet.Run(t, testplanet.Config{
|
|
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
|
|
|
Reconfigure: testplanet.Reconfigure{
|
2020-03-27 16:18:19 +00:00
|
|
|
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
|
2019-11-19 16:30:28 +00:00
|
|
|
return testblobs.NewSlowDB(log.Named("slowdb"), db), nil
|
|
|
|
},
|
2020-10-27 17:34:59 +00:00
|
|
|
Satellite: testplanet.Combine(
|
|
|
|
func(log *zap.Logger, index int, config *satellite.Config) {
|
|
|
|
// These config values are chosen to force the slow node to time out without timing out on the three normal nodes
|
|
|
|
config.Audit.MinBytesPerSecond = 100 * memory.KiB
|
|
|
|
config.Audit.MinDownloadTimeout = 1 * time.Second
|
|
|
|
},
|
|
|
|
testplanet.ReconfigureRS(2, 2, 4, 4),
|
|
|
|
),
|
2019-11-19 16:30:28 +00:00
|
|
|
},
|
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
|
|
|
|
audits.Worker.Loop.Pause()
|
2020-07-13 23:24:15 +01:00
|
|
|
audits.Chore.Loop.Pause()
|
2019-11-19 16:30:28 +00:00
|
|
|
|
|
|
|
ul := planet.Uplinks[0]
|
|
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
2020-01-21 10:38:41 +00:00
|
|
|
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
2019-11-19 16:30:28 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
audits.Chore.Loop.TriggerWait()
|
2020-08-20 14:29:02 +01:00
|
|
|
queue := audits.Queues.Fetch()
|
2020-12-14 12:54:22 +00:00
|
|
|
queueSegment, err := queue.Next()
|
2019-11-19 16:30:28 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
|
|
|
StreamID: queueSegment.StreamID,
|
|
|
|
Position: queueSegment.Position,
|
|
|
|
})
|
2019-11-19 16:30:28 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
slowPiece := segment.Pieces[0]
|
|
|
|
slowNode := slowPiece.StorageNode
|
2019-11-19 16:30:28 +00:00
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
randomIndex, err := audit.GetRandomStripe(ctx, segment)
|
2019-11-19 16:30:28 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
orders := satellite.Orders.Service
|
|
|
|
containment := satellite.DB.Containment()
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
shareSize := segment.Redundancy.ShareSize
|
|
|
|
rootPieceID := segment.RootPieceID
|
2019-11-19 16:30:28 +00:00
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, queueSegment.Bucket(), slowNode, slowPiece.Number, rootPieceID, shareSize)
|
2019-11-19 16:30:28 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(slowPiece.Number))
|
2019-11-19 16:30:28 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
pending := &audit.PendingAudit{
|
|
|
|
NodeID: slowNode,
|
2020-12-14 12:54:22 +00:00
|
|
|
PieceID: rootPieceID,
|
2019-11-19 16:30:28 +00:00
|
|
|
StripeIndex: randomIndex,
|
|
|
|
ShareSize: shareSize,
|
|
|
|
ExpectedShareHash: pkcrypto.SHA256Hash(share.Data),
|
|
|
|
ReverifyCount: 0,
|
2020-12-14 12:54:22 +00:00
|
|
|
Segment: queueSegment.SegmentLocation,
|
2019-11-19 16:30:28 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
err = containment.IncrementPending(ctx, pending)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-05-07 09:23:40 +01:00
|
|
|
node := planet.FindNode(slowNode)
|
|
|
|
slowNodeDB := node.DB.(*testblobs.SlowDB)
|
|
|
|
// make downloads on storage node slower than the timeout on the satellite for downloading shares
|
|
|
|
delay := 1 * time.Second
|
|
|
|
slowNodeDB.SetLatency(delay)
|
2019-11-19 16:30:28 +00:00
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
report, err := audits.Verifier.Reverify(ctx, queueSegment)
|
2019-11-19 16:30:28 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
require.Len(t, report.Successes, 0)
|
|
|
|
require.Len(t, report.Fails, 0)
|
|
|
|
require.Len(t, report.Offlines, 0)
|
|
|
|
require.Len(t, report.PendingAudits, 1)
|
|
|
|
require.Len(t, report.Unknown, 0)
|
|
|
|
require.Equal(t, report.PendingAudits[0].NodeID, slowNode)
|
2020-03-04 23:09:18 +00:00
|
|
|
|
|
|
|
_, err = containment.Get(ctx, slowNode)
|
|
|
|
require.NoError(t, err)
|
2019-11-19 16:30:28 +00:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
// TestReverifyUnknownError checks that a node that returns an unknown error during an audit does not get marked as successful, failed, or contained.
|
|
|
|
func TestReverifyUnknownError(t *testing.T) {
|
|
|
|
testplanet.Run(t, testplanet.Config{
|
|
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
|
|
|
Reconfigure: testplanet.Reconfigure{
|
2020-03-27 16:18:19 +00:00
|
|
|
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
|
2019-11-19 16:30:28 +00:00
|
|
|
return testblobs.NewBadDB(log.Named("baddb"), db), nil
|
|
|
|
},
|
2020-01-21 10:38:41 +00:00
|
|
|
Satellite: testplanet.ReconfigureRS(2, 2, 4, 4),
|
2019-11-19 16:30:28 +00:00
|
|
|
},
|
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
audits := satellite.Audit
|
|
|
|
|
|
|
|
audits.Worker.Loop.Pause()
|
2020-07-13 23:24:15 +01:00
|
|
|
audits.Chore.Loop.Pause()
|
2019-11-19 16:30:28 +00:00
|
|
|
|
|
|
|
ul := planet.Uplinks[0]
|
|
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
2020-01-21 10:38:41 +00:00
|
|
|
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
2019-11-19 16:30:28 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
audits.Chore.Loop.TriggerWait()
|
2020-08-20 14:29:02 +01:00
|
|
|
queue := audits.Queues.Fetch()
|
2020-12-14 12:54:22 +00:00
|
|
|
queueSegment, err := queue.Next()
|
2019-11-19 16:30:28 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
segment, err := satellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
|
|
|
StreamID: queueSegment.StreamID,
|
|
|
|
Position: queueSegment.Position,
|
|
|
|
})
|
2019-11-19 16:30:28 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
badPiece := segment.Pieces[0]
|
|
|
|
badNode := badPiece.StorageNode
|
2019-11-19 16:30:28 +00:00
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
randomIndex, err := audit.GetRandomStripe(ctx, segment)
|
2019-11-19 16:30:28 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
orders := satellite.Orders.Service
|
|
|
|
containment := satellite.DB.Containment()
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
shareSize := segment.Redundancy.ShareSize
|
|
|
|
rootPieceID := segment.RootPieceID
|
2019-11-19 16:30:28 +00:00
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, queueSegment.Bucket(), badNode, badPiece.Number, rootPieceID, shareSize)
|
2019-11-19 16:30:28 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(badPiece.Number))
|
2019-11-19 16:30:28 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
pending := &audit.PendingAudit{
|
|
|
|
NodeID: badNode,
|
2020-12-14 12:54:22 +00:00
|
|
|
PieceID: rootPieceID,
|
2019-11-19 16:30:28 +00:00
|
|
|
StripeIndex: randomIndex,
|
|
|
|
ShareSize: shareSize,
|
|
|
|
ExpectedShareHash: pkcrypto.SHA256Hash(share.Data),
|
|
|
|
ReverifyCount: 0,
|
2020-12-14 12:54:22 +00:00
|
|
|
Segment: queueSegment.SegmentLocation,
|
2019-11-19 16:30:28 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
err = containment.IncrementPending(ctx, pending)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-05-07 09:23:40 +01:00
|
|
|
node := planet.FindNode(badNode)
|
|
|
|
badNodeDB := node.DB.(*testblobs.BadDB)
|
|
|
|
// return an error when the satellite requests a share
|
|
|
|
badNodeDB.SetError(errs.New("unknown error"))
|
2019-11-19 16:30:28 +00:00
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
report, err := audits.Verifier.Reverify(ctx, queueSegment)
|
2019-11-19 16:30:28 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
require.Len(t, report.Successes, 0)
|
|
|
|
require.Len(t, report.Fails, 0)
|
|
|
|
require.Len(t, report.Offlines, 0)
|
|
|
|
require.Len(t, report.PendingAudits, 0)
|
|
|
|
require.Len(t, report.Unknown, 1)
|
|
|
|
require.Equal(t, report.Unknown[0], badNode)
|
2020-03-04 23:09:18 +00:00
|
|
|
|
2020-12-14 12:54:22 +00:00
|
|
|
// record audit
|
|
|
|
_, err = audits.Reporter.RecordAudits(ctx, report)
|
2020-03-09 15:35:54 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
// make sure that pending audit is removed by the reporter when audit is recorded
|
|
|
|
_, err = containment.Get(ctx, pending.NodeID)
|
|
|
|
require.True(t, audit.ErrContainedNotFound.Has(err))
|
2019-11-19 16:30:28 +00:00
|
|
|
})
|
|
|
|
}
|