1aa24b9f0d
We are switching completely to ranged loop. https://github.com/storj/storj/issues/5368 Change-Id: I9cec0ac454f40f19d52c078a8b1870c4d192bd7a
1001 lines
34 KiB
Go
1001 lines
34 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package audit_test
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
"github.com/zeebo/errs"
|
|
"go.uber.org/zap"
|
|
"go.uber.org/zap/zaptest"
|
|
|
|
"storj.io/common/memory"
|
|
"storj.io/common/peertls/tlsopts"
|
|
"storj.io/common/rpc"
|
|
"storj.io/common/sync2"
|
|
"storj.io/common/testcontext"
|
|
"storj.io/common/testrand"
|
|
"storj.io/storj/private/testplanet"
|
|
"storj.io/storj/satellite"
|
|
"storj.io/storj/satellite/audit"
|
|
"storj.io/storj/satellite/metabase"
|
|
"storj.io/storj/storagenode"
|
|
"storj.io/storj/storagenode/blobstore/testblobs"
|
|
)
|
|
|
|
func TestReverifySuccess(t *testing.T) {
|
|
testWithRangedLoop(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
|
// This is a bulky test but all it's doing is:
|
|
// - uploads random data
|
|
// - uses the cursor to get a stripe
|
|
// - calls reverify on that stripe
|
|
// - expects one storage node to be marked as a success in the audit report
|
|
|
|
satellite := planet.Satellites[0]
|
|
audits := satellite.Audit
|
|
|
|
audits.Worker.Loop.Pause()
|
|
pauseQueueing(satellite)
|
|
|
|
ul := planet.Uplinks[0]
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
|
|
|
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
|
require.NoError(t, err)
|
|
|
|
err = runQueueingOnce(ctx, satellite)
|
|
require.NoError(t, err)
|
|
|
|
queue := audits.VerifyQueue
|
|
queueSegment, err := queue.Next(ctx)
|
|
require.NoError(t, err)
|
|
|
|
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
|
StreamID: queueSegment.StreamID,
|
|
Position: queueSegment.Position,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
pieceIndex := testrand.Intn(len(segment.Pieces))
|
|
piece := segment.Pieces[pieceIndex]
|
|
|
|
containment := satellite.DB.Containment()
|
|
|
|
pending := &audit.PieceLocator{
|
|
NodeID: piece.StorageNode,
|
|
StreamID: segment.StreamID,
|
|
Position: segment.Position,
|
|
PieceNum: int(piece.Number),
|
|
}
|
|
|
|
err = audits.Reporter.ReportReverificationNeeded(ctx, pending)
|
|
require.NoError(t, err)
|
|
|
|
outcome, reputation := audits.Reverifier.ReverifyPiece(ctx, zaptest.NewLogger(t), pending)
|
|
require.Equal(t, audit.OutcomeSuccess, outcome)
|
|
|
|
err = audits.Reporter.RecordReverificationResult(ctx, &audit.ReverificationJob{Locator: *pending}, outcome, reputation)
|
|
require.NoError(t, err)
|
|
|
|
// make sure that pending audit is removed
|
|
_, err = containment.Get(ctx, pending.NodeID)
|
|
require.True(t, audit.ErrContainedNotFound.Has(err))
|
|
})
|
|
}
|
|
|
|
func TestReverifyFailMissingShare(t *testing.T) {
|
|
testWithRangedLoop(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
|
// - uploads random data
|
|
// - uses the cursor to get a stripe
|
|
// - creates one pending audit for a node holding a piece for that stripe
|
|
// - delete piece from node
|
|
// - calls reverify on that piece
|
|
// - expects one storage node to be marked as a fail in the audit report
|
|
|
|
satellite := planet.Satellites[0]
|
|
audits := satellite.Audit
|
|
reporter := audits.Reporter
|
|
audits.Worker.Loop.Pause()
|
|
pauseQueueing(satellite)
|
|
|
|
ul := planet.Uplinks[0]
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
|
|
|
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
|
require.NoError(t, err)
|
|
|
|
err = runQueueingOnce(ctx, satellite)
|
|
require.NoError(t, err)
|
|
|
|
queue := audits.VerifyQueue
|
|
queueSegment, err := queue.Next(ctx)
|
|
require.NoError(t, err)
|
|
|
|
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
|
StreamID: queueSegment.StreamID,
|
|
Position: queueSegment.Position,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
containment := satellite.DB.Containment()
|
|
|
|
pieceIndex := testrand.Intn(len(segment.Pieces))
|
|
piece := segment.Pieces[pieceIndex]
|
|
rootPieceID := segment.RootPieceID
|
|
|
|
pending := &audit.PieceLocator{
|
|
NodeID: piece.StorageNode,
|
|
StreamID: segment.StreamID,
|
|
Position: segment.Position,
|
|
PieceNum: int(piece.Number),
|
|
}
|
|
|
|
err = reporter.ReportReverificationNeeded(ctx, pending)
|
|
require.NoError(t, err)
|
|
|
|
// delete the piece from the first node
|
|
pieceID := rootPieceID.Derive(piece.StorageNode, int32(piece.Number))
|
|
node := planet.FindNode(piece.StorageNode)
|
|
err = node.Storage2.Store.Delete(ctx, satellite.ID(), pieceID)
|
|
require.NoError(t, err)
|
|
|
|
outcome, reputation := audits.Reverifier.ReverifyPiece(ctx, zaptest.NewLogger(t), pending)
|
|
require.Equal(t, audit.OutcomeFailure, outcome)
|
|
|
|
err = reporter.RecordReverificationResult(ctx, &audit.ReverificationJob{Locator: *pending}, outcome, reputation)
|
|
require.NoError(t, err)
|
|
|
|
// make sure that pending audit is removed
|
|
_, err = containment.Get(ctx, pending.NodeID)
|
|
require.True(t, audit.ErrContainedNotFound.Has(err))
|
|
})
|
|
}
|
|
|
|
func TestReverifyOffline(t *testing.T) {
|
|
testWithRangedLoop(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
|
// - uploads random data
|
|
// - uses the cursor to get a stripe
|
|
// - creates pending audits for one node holding a piece for that stripe
|
|
// - stop the node that has the pending audit
|
|
// - calls reverify on that same stripe
|
|
// - expects one storage node to be marked as offline in the audit report
|
|
|
|
satellite := planet.Satellites[0]
|
|
audits := satellite.Audit
|
|
reporter := audits.Reporter
|
|
|
|
audits.Worker.Loop.Pause()
|
|
pauseQueueing(satellite)
|
|
|
|
ul := planet.Uplinks[0]
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
|
|
|
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
|
require.NoError(t, err)
|
|
|
|
err = runQueueingOnce(ctx, satellite)
|
|
require.NoError(t, err)
|
|
|
|
queue := audits.VerifyQueue
|
|
queueSegment, err := queue.Next(ctx)
|
|
require.NoError(t, err)
|
|
|
|
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
|
StreamID: queueSegment.StreamID,
|
|
Position: queueSegment.Position,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
pieceIndex := testrand.Intn(len(segment.Pieces))
|
|
piece := segment.Pieces[pieceIndex]
|
|
|
|
pending := &audit.PieceLocator{
|
|
NodeID: piece.StorageNode,
|
|
StreamID: segment.StreamID,
|
|
Position: segment.Position,
|
|
PieceNum: int(piece.Number),
|
|
}
|
|
|
|
err = reporter.ReportReverificationNeeded(ctx, pending)
|
|
require.NoError(t, err)
|
|
|
|
err = planet.StopNodeAndUpdate(ctx, planet.FindNode(piece.StorageNode))
|
|
require.NoError(t, err)
|
|
|
|
outcome, reputation := audits.Reverifier.ReverifyPiece(ctx, zaptest.NewLogger(t), pending)
|
|
require.Equal(t, audit.OutcomeNodeOffline, outcome)
|
|
|
|
err = reporter.RecordReverificationResult(ctx, &audit.ReverificationJob{Locator: *pending}, outcome, reputation)
|
|
require.NoError(t, err)
|
|
|
|
// make sure that pending audit is not removed
|
|
containment := satellite.DB.Containment()
|
|
_, err = containment.Get(ctx, pending.NodeID)
|
|
require.NoError(t, err)
|
|
})
|
|
}
|
|
|
|
func TestReverifyOfflineDialTimeout(t *testing.T) {
|
|
testWithRangedLoop(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
|
// - uploads random data
|
|
// - uses the cursor to get a stripe
|
|
// - creates pending audit for one node holding a piece for that stripe
|
|
// - uses a slow transport client so that dial timeout will happen (an offline case)
|
|
// - calls reverify on that same stripe
|
|
// - expects the reverification to be alive still
|
|
|
|
satellite := planet.Satellites[0]
|
|
audits := satellite.Audit
|
|
|
|
audits.Worker.Loop.Pause()
|
|
pauseQueueing(satellite)
|
|
|
|
ul := planet.Uplinks[0]
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
|
|
|
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
|
require.NoError(t, err)
|
|
|
|
err = runQueueingOnce(ctx, satellite)
|
|
require.NoError(t, err)
|
|
|
|
queue := audits.VerifyQueue
|
|
queueSegment, err := queue.Next(ctx)
|
|
require.NoError(t, err)
|
|
|
|
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
|
StreamID: queueSegment.StreamID,
|
|
Position: queueSegment.Position,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
tlsOptions, err := tlsopts.NewOptions(satellite.Identity, tlsopts.Config{}, nil)
|
|
require.NoError(t, err)
|
|
|
|
dialer := rpc.NewDefaultDialer(tlsOptions)
|
|
dialer.DialTimeout = 20 * time.Millisecond
|
|
dialer.DialLatency = 200 * time.Second
|
|
|
|
connector := rpc.NewHybridConnector()
|
|
connector.SetTransferRate(1 * memory.KB)
|
|
dialer.Connector = connector
|
|
|
|
// This config value will create a very short timeframe allowed for receiving
|
|
// data from storage nodes. This will cause context to cancel and start
|
|
// downloading from new nodes.
|
|
minBytesPerSecond := 100 * memory.KiB
|
|
|
|
verifier := audit.NewVerifier(
|
|
satellite.Log.Named("verifier"),
|
|
satellite.Metabase.DB,
|
|
dialer,
|
|
satellite.Overlay.Service,
|
|
satellite.DB.Containment(),
|
|
satellite.Orders.Service,
|
|
satellite.Identity,
|
|
minBytesPerSecond,
|
|
5*time.Second)
|
|
reverifier := audit.NewReverifier(
|
|
satellite.Log.Named("reverifier"),
|
|
verifier,
|
|
satellite.DB.ReverifyQueue(),
|
|
audit.Config{})
|
|
|
|
pieceIndex := testrand.Intn(len(segment.Pieces))
|
|
piece := segment.Pieces[pieceIndex]
|
|
|
|
pending := &audit.PieceLocator{
|
|
NodeID: piece.StorageNode,
|
|
StreamID: segment.StreamID,
|
|
Position: segment.Position,
|
|
PieceNum: int(piece.Number),
|
|
}
|
|
|
|
err = audits.Reporter.ReportReverificationNeeded(ctx, pending)
|
|
require.NoError(t, err)
|
|
|
|
outcome, reputation := reverifier.ReverifyPiece(ctx, zaptest.NewLogger(t), pending)
|
|
require.Equal(t, audit.OutcomeTimedOut, outcome)
|
|
|
|
err = audits.Reporter.RecordReverificationResult(ctx, &audit.ReverificationJob{Locator: *pending}, outcome, reputation)
|
|
require.NoError(t, err)
|
|
|
|
// make sure that pending audit is not removed
|
|
containment := satellite.DB.Containment()
|
|
_, err = containment.Get(ctx, pending.NodeID)
|
|
require.NoError(t, err)
|
|
})
|
|
}
|
|
|
|
func TestReverifyDeletedSegment(t *testing.T) {
|
|
testWithRangedLoop(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
|
Reconfigure: testplanet.Reconfigure{
|
|
Satellite: testplanet.ReconfigureRS(1, 2, 4, 4),
|
|
},
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
|
// - uploads random data to all nodes
|
|
// - gets a segment from the audit queue
|
|
// - creates one pending audit for a node holding a piece for that segment
|
|
// - deletes the file
|
|
// - calls reverify on the deleted file
|
|
// - expects reverification to pass successfully and the storage node to be not in containment mode
|
|
|
|
satellite := planet.Satellites[0]
|
|
audits := satellite.Audit
|
|
|
|
audits.Worker.Loop.Pause()
|
|
pauseQueueing(satellite)
|
|
|
|
ul := planet.Uplinks[0]
|
|
testData1 := testrand.Bytes(8 * memory.KiB)
|
|
err := ul.Upload(ctx, satellite, "testbucket", "test/path1", testData1)
|
|
require.NoError(t, err)
|
|
|
|
err = runQueueingOnce(ctx, satellite)
|
|
require.NoError(t, err)
|
|
|
|
queue := audits.VerifyQueue
|
|
queueSegment, err := queue.Next(ctx)
|
|
require.NoError(t, err)
|
|
|
|
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
|
StreamID: queueSegment.StreamID,
|
|
Position: queueSegment.Position,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
pieceIndex := testrand.Intn(len(segment.Pieces))
|
|
piece := segment.Pieces[pieceIndex]
|
|
|
|
pending := &audit.PieceLocator{
|
|
NodeID: piece.StorageNode,
|
|
StreamID: segment.StreamID,
|
|
Position: segment.Position,
|
|
PieceNum: int(piece.Number),
|
|
}
|
|
|
|
err = audits.Reporter.ReportReverificationNeeded(ctx, pending)
|
|
require.NoError(t, err)
|
|
|
|
// delete the file
|
|
err = ul.DeleteObject(ctx, satellite, "testbucket", "test/path1")
|
|
require.NoError(t, err)
|
|
|
|
// call reverify on the deleted file and expect OutcomeNotNecessary
|
|
outcome, reputation := audits.Reverifier.ReverifyPiece(ctx, zaptest.NewLogger(t), pending)
|
|
require.Equal(t, audit.OutcomeNotNecessary, outcome)
|
|
|
|
err = audits.Reporter.RecordReverificationResult(ctx, &audit.ReverificationJob{Locator: *pending}, outcome, reputation)
|
|
require.NoError(t, err)
|
|
|
|
// expect that the node was removed from containment since the segment it was contained for has been deleted
|
|
containment := satellite.DB.Containment()
|
|
_, err = containment.Get(ctx, piece.StorageNode)
|
|
require.True(t, audit.ErrContainedNotFound.Has(err))
|
|
})
|
|
}
|
|
|
|
func cloneAndDropPiece(ctx context.Context, metabaseDB *metabase.DB, segment *metabase.Segment, pieceNum int) error {
|
|
newPieces := make([]metabase.Piece, len(segment.Pieces))
|
|
copy(newPieces, segment.Pieces)
|
|
return metabaseDB.UpdateSegmentPieces(ctx, metabase.UpdateSegmentPieces{
|
|
StreamID: segment.StreamID,
|
|
Position: segment.Position,
|
|
OldPieces: segment.Pieces,
|
|
NewPieces: append(newPieces[:pieceNum], newPieces[pieceNum+1:]...),
|
|
NewRedundancy: segment.Redundancy,
|
|
})
|
|
}
|
|
|
|
func TestReverifyModifiedSegment(t *testing.T) {
|
|
testWithRangedLoop(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
|
Reconfigure: testplanet.Reconfigure{
|
|
Satellite: testplanet.ReconfigureRS(1, 2, 4, 4),
|
|
},
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
|
// - uploads random data to an object on all nodes
|
|
// - creates a pending audit for a particular piece of that object
|
|
// - removes a (different) piece from the object so that the segment is modified
|
|
// - expects reverification to pass with OutcomeNotNecessary and the storage node to be not in containment mode
|
|
|
|
satellite := planet.Satellites[0]
|
|
audits := satellite.Audit
|
|
|
|
audits.Worker.Loop.Pause()
|
|
pauseQueueing(satellite)
|
|
|
|
ul := planet.Uplinks[0]
|
|
testData1 := testrand.Bytes(8 * memory.KiB)
|
|
err := ul.Upload(ctx, satellite, "testbucket", "test/path1", testData1)
|
|
require.NoError(t, err)
|
|
|
|
err = runQueueingOnce(ctx, satellite)
|
|
require.NoError(t, err)
|
|
|
|
queue := audits.VerifyQueue
|
|
queueSegment, err := queue.Next(ctx)
|
|
require.NoError(t, err)
|
|
|
|
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
|
StreamID: queueSegment.StreamID,
|
|
Position: queueSegment.Position,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
pieceIndex := testrand.Intn(len(segment.Pieces))
|
|
piece := segment.Pieces[pieceIndex]
|
|
|
|
pending := &audit.PieceLocator{
|
|
NodeID: piece.StorageNode,
|
|
StreamID: segment.StreamID,
|
|
Position: segment.Position,
|
|
PieceNum: int(piece.Number),
|
|
}
|
|
|
|
containment := satellite.DB.Containment()
|
|
|
|
err = audits.Reporter.ReportReverificationNeeded(ctx, pending)
|
|
require.NoError(t, err)
|
|
|
|
otherPiece := (pieceIndex + 1) % len(segment.Pieces)
|
|
|
|
// remove a piece from the file (a piece that the contained node isn't holding)
|
|
audits.Verifier.OnTestingCheckSegmentAlteredHook = func() {
|
|
err := cloneAndDropPiece(ctx, satellite.Metabase.DB, &segment, otherPiece)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
// try reverifying the piece we just removed
|
|
outcome, reputation := audits.Reverifier.ReverifyPiece(ctx, zaptest.NewLogger(t), &audit.PieceLocator{
|
|
StreamID: segment.StreamID,
|
|
Position: segment.Position,
|
|
NodeID: segment.Pieces[otherPiece].StorageNode,
|
|
PieceNum: int(segment.Pieces[otherPiece].Number),
|
|
})
|
|
require.Equal(t, audit.OutcomeNotNecessary, outcome)
|
|
|
|
err = audits.Reporter.RecordReverificationResult(ctx, &audit.ReverificationJob{Locator: *pending}, outcome, reputation)
|
|
require.NoError(t, err)
|
|
require.Equal(t, audit.OutcomeNotNecessary, outcome)
|
|
|
|
// expect that the node was removed from containment since the piece it was contained for is no longer part of the segment
|
|
_, err = containment.Get(ctx, segment.Pieces[otherPiece].StorageNode)
|
|
require.True(t, audit.ErrContainedNotFound.Has(err))
|
|
})
|
|
}
|
|
|
|
func TestReverifyReplacedSegment(t *testing.T) {
|
|
testWithRangedLoop(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
|
Reconfigure: testplanet.Reconfigure{
|
|
Satellite: testplanet.ReconfigureRS(1, 2, 4, 4),
|
|
},
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
|
// - uploads random data to an object on all nodes
|
|
// - creates a pending audit for a particular piece of that object
|
|
// - re-uploads the object (with the same contents) so that the segment is modified
|
|
// - expects reverification to pass with OutcomeNotNecessary and the storage node to be not in containment
|
|
|
|
satellite := planet.Satellites[0]
|
|
audits := satellite.Audit
|
|
|
|
audits.Worker.Loop.Pause()
|
|
pauseQueueing(satellite)
|
|
|
|
ul := planet.Uplinks[0]
|
|
testData1 := testrand.Bytes(8 * memory.KiB)
|
|
err := ul.Upload(ctx, satellite, "testbucket", "test/path1", testData1)
|
|
require.NoError(t, err)
|
|
|
|
err = runQueueingOnce(ctx, satellite)
|
|
require.NoError(t, err)
|
|
|
|
queue := audits.VerifyQueue
|
|
queueSegment, err := queue.Next(ctx)
|
|
require.NoError(t, err)
|
|
|
|
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
|
StreamID: queueSegment.StreamID,
|
|
Position: queueSegment.Position,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
pieceIndex := testrand.Intn(len(segment.Pieces))
|
|
piece := segment.Pieces[pieceIndex]
|
|
|
|
pending := &audit.PieceLocator{
|
|
NodeID: piece.StorageNode,
|
|
StreamID: segment.StreamID,
|
|
Position: segment.Position,
|
|
PieceNum: int(piece.Number),
|
|
}
|
|
|
|
containment := satellite.DB.Containment()
|
|
|
|
err = audits.Reporter.ReportReverificationNeeded(ctx, pending)
|
|
require.NoError(t, err)
|
|
|
|
// replace the file
|
|
err = ul.Upload(ctx, satellite, "testbucket", "test/path1", testData1)
|
|
require.NoError(t, err)
|
|
|
|
// reverify the segment that was not modified
|
|
outcome, reputation := audits.Reverifier.ReverifyPiece(ctx, zaptest.NewLogger(t), pending)
|
|
require.Equal(t, audit.OutcomeNotNecessary, outcome)
|
|
|
|
err = audits.Reporter.RecordReverificationResult(ctx, &audit.ReverificationJob{Locator: *pending}, outcome, reputation)
|
|
require.NoError(t, err)
|
|
|
|
// expect that the node was removed from containment since the segment it was contained for has been changed
|
|
_, err = containment.Get(ctx, piece.StorageNode)
|
|
require.True(t, audit.ErrContainedNotFound.Has(err))
|
|
})
|
|
}
|
|
|
|
// TestReverifyExpired tests the case where the segment passed into Reverify is expired.
|
|
func TestReverifyExpired(t *testing.T) {
|
|
testWithRangedLoop(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
|
satellite := planet.Satellites[0]
|
|
audits := satellite.Audit
|
|
|
|
audits.Worker.Loop.Pause()
|
|
pauseQueueing(satellite)
|
|
|
|
ul := planet.Uplinks[0]
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
|
|
|
err := ul.UploadWithExpiration(ctx, satellite, "testbucket", "test/path", testData, time.Now().Add(1*time.Hour))
|
|
require.NoError(t, err)
|
|
|
|
err = runQueueingOnce(ctx, satellite)
|
|
require.NoError(t, err)
|
|
|
|
queue := audits.VerifyQueue
|
|
queueSegment, err := queue.Next(ctx)
|
|
require.NoError(t, err)
|
|
|
|
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
|
StreamID: queueSegment.StreamID,
|
|
Position: queueSegment.Position,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
// move time into the future so the segment is expired
|
|
audits.Reverifier.SetNow(func() time.Time {
|
|
return time.Now().Add(2 * time.Hour)
|
|
})
|
|
|
|
pieceIndex := testrand.Intn(len(segment.Pieces))
|
|
piece := segment.Pieces[pieceIndex]
|
|
|
|
pending := &audit.PieceLocator{
|
|
StreamID: segment.StreamID,
|
|
Position: segment.Position,
|
|
NodeID: piece.StorageNode,
|
|
PieceNum: int(piece.Number),
|
|
}
|
|
|
|
// Reverify should not return an error
|
|
outcome, reputation := audits.Reverifier.ReverifyPiece(ctx, zaptest.NewLogger(t), pending)
|
|
require.Equal(t, audit.OutcomeNotNecessary, outcome)
|
|
|
|
err = audits.Reporter.RecordReverificationResult(ctx, &audit.ReverificationJob{Locator: *pending}, outcome, reputation)
|
|
require.NoError(t, err)
|
|
|
|
// expect that the node was removed from containment since the segment it was
|
|
// contained for has expired
|
|
_, err = satellite.DB.Containment().Get(ctx, piece.StorageNode)
|
|
require.True(t, audit.ErrContainedNotFound.Has(err))
|
|
})
|
|
}
|
|
|
|
// TestReverifySlowDownload checks that a node that times out while sending data to the
|
|
// audit service gets put into containment mode.
|
|
func TestReverifySlowDownload(t *testing.T) {
|
|
const auditTimeout = time.Second
|
|
testWithRangedLoop(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
|
Reconfigure: testplanet.Reconfigure{
|
|
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
|
|
return testblobs.NewSlowDB(log.Named("slowdb"), db), nil
|
|
},
|
|
Satellite: testplanet.Combine(
|
|
func(log *zap.Logger, index int, config *satellite.Config) {
|
|
// These config values are chosen to force the slow node to time out without timing out on the three normal nodes
|
|
config.Audit.MinBytesPerSecond = 100 * memory.KiB
|
|
config.Audit.MinDownloadTimeout = auditTimeout
|
|
},
|
|
testplanet.ReconfigureRS(2, 2, 4, 4),
|
|
),
|
|
},
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
|
satellite := planet.Satellites[0]
|
|
audits := satellite.Audit
|
|
|
|
audits.Worker.Loop.Pause()
|
|
pauseQueueing(satellite)
|
|
|
|
ul := planet.Uplinks[0]
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
|
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
|
require.NoError(t, err)
|
|
|
|
err = runQueueingOnce(ctx, satellite)
|
|
require.NoError(t, err)
|
|
|
|
queue := audits.VerifyQueue
|
|
queueSegment, err := queue.Next(ctx)
|
|
require.NoError(t, err)
|
|
|
|
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
|
StreamID: queueSegment.StreamID,
|
|
Position: queueSegment.Position,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
slowPiece := segment.Pieces[0]
|
|
slowNode := slowPiece.StorageNode
|
|
containment := satellite.DB.Containment()
|
|
|
|
pending := &audit.PieceLocator{
|
|
NodeID: slowNode,
|
|
StreamID: segment.StreamID,
|
|
Position: segment.Position,
|
|
PieceNum: int(slowPiece.Number),
|
|
}
|
|
|
|
err = audits.Reporter.ReportReverificationNeeded(ctx, pending)
|
|
require.NoError(t, err)
|
|
|
|
node := planet.FindNode(slowNode)
|
|
slowNodeDB := node.DB.(*testblobs.SlowDB)
|
|
// make downloads on storage node slower than the timeout on the satellite for downloading shares
|
|
delay := 10 * auditTimeout
|
|
slowNodeDB.SetLatency(delay)
|
|
|
|
outcome, reputation := audits.Reverifier.ReverifyPiece(ctx, zaptest.NewLogger(t), pending)
|
|
require.Equal(t, audit.OutcomeTimedOut, outcome)
|
|
|
|
err = audits.Reporter.RecordReverificationResult(ctx, &audit.ReverificationJob{Locator: *pending}, outcome, reputation)
|
|
require.NoError(t, err)
|
|
require.Equal(t, audit.OutcomeTimedOut, outcome)
|
|
|
|
// expect that the node is still in containment
|
|
_, err = containment.Get(ctx, slowNode)
|
|
require.NoError(t, err)
|
|
})
|
|
}
|
|
|
|
// TestReverifyUnknownError checks that a node that returns an unknown error during an audit does not get marked as successful, failed, or contained.
|
|
func TestReverifyUnknownError(t *testing.T) {
|
|
testWithRangedLoop(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
|
Reconfigure: testplanet.Reconfigure{
|
|
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
|
|
return testblobs.NewBadDB(log.Named("baddb"), db), nil
|
|
},
|
|
Satellite: testplanet.ReconfigureRS(2, 2, 4, 4),
|
|
},
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
|
satellite := planet.Satellites[0]
|
|
audits := satellite.Audit
|
|
|
|
audits.Worker.Loop.Pause()
|
|
pauseQueueing(satellite)
|
|
|
|
ul := planet.Uplinks[0]
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
|
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
|
require.NoError(t, err)
|
|
|
|
err = runQueueingOnce(ctx, satellite)
|
|
require.NoError(t, err)
|
|
|
|
queue := audits.VerifyQueue
|
|
queueSegment, err := queue.Next(ctx)
|
|
require.NoError(t, err)
|
|
|
|
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
|
StreamID: queueSegment.StreamID,
|
|
Position: queueSegment.Position,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
badPiece := segment.Pieces[0]
|
|
badNode := badPiece.StorageNode
|
|
containment := satellite.DB.Containment()
|
|
|
|
pending := &audit.PieceLocator{
|
|
NodeID: badNode,
|
|
StreamID: segment.StreamID,
|
|
Position: segment.Position,
|
|
PieceNum: int(badPiece.Number),
|
|
}
|
|
|
|
err = audits.Reporter.ReportReverificationNeeded(ctx, pending)
|
|
require.NoError(t, err)
|
|
|
|
node := planet.FindNode(badNode)
|
|
badNodeDB := node.DB.(*testblobs.BadDB)
|
|
// return an error when the satellite requests a share
|
|
badNodeDB.SetError(errs.New("unknown error"))
|
|
|
|
outcome, reputation := audits.Reverifier.ReverifyPiece(ctx, zaptest.NewLogger(t), pending)
|
|
require.Equal(t, audit.OutcomeUnknownError, outcome)
|
|
|
|
err = audits.Reporter.RecordReverificationResult(ctx, &audit.ReverificationJob{Locator: *pending}, outcome, reputation)
|
|
require.NoError(t, err)
|
|
|
|
// make sure that pending audit is removed
|
|
_, err = containment.Get(ctx, pending.NodeID)
|
|
require.Truef(t, audit.ErrContainedNotFound.Has(err), "expected ErrContainedNotFound but got error %+v", err)
|
|
})
|
|
}
|
|
|
|
func TestMaxReverifyCount(t *testing.T) {
|
|
const auditTimeout = time.Second
|
|
testWithRangedLoop(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
|
Reconfigure: testplanet.Reconfigure{
|
|
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
|
|
return testblobs.NewSlowDB(log.Named("slowdb"), db), nil
|
|
},
|
|
Satellite: testplanet.Combine(
|
|
func(log *zap.Logger, index int, config *satellite.Config) {
|
|
// These config values are chosen to force the slow node to time out without timing out on the three normal nodes
|
|
config.Audit.MinBytesPerSecond = 100 * memory.KiB
|
|
config.Audit.MinDownloadTimeout = auditTimeout
|
|
// disable reputation write cache so changes are immediate
|
|
config.Reputation.FlushInterval = 0
|
|
},
|
|
testplanet.ReconfigureRS(2, 2, 4, 4),
|
|
),
|
|
},
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
|
satellite := planet.Satellites[0]
|
|
audits := satellite.Audit
|
|
|
|
audits.Worker.Loop.Pause()
|
|
audits.ReverifyWorker.Loop.Pause()
|
|
pauseQueueing(satellite)
|
|
|
|
ul := planet.Uplinks[0]
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
|
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
|
require.NoError(t, err)
|
|
|
|
err = runQueueingOnce(ctx, satellite)
|
|
require.NoError(t, err)
|
|
|
|
queue := audits.VerifyQueue
|
|
queueSegment, err := queue.Next(ctx)
|
|
require.NoError(t, err)
|
|
|
|
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
|
StreamID: queueSegment.StreamID,
|
|
Position: queueSegment.Position,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
slowPiece := segment.Pieces[0]
|
|
slowNode := slowPiece.StorageNode
|
|
containment := satellite.DB.Containment()
|
|
|
|
pending := &audit.PieceLocator{
|
|
NodeID: slowNode,
|
|
StreamID: segment.StreamID,
|
|
Position: segment.Position,
|
|
PieceNum: int(slowPiece.Number),
|
|
}
|
|
|
|
err = audits.Reporter.ReportReverificationNeeded(ctx, pending)
|
|
require.NoError(t, err)
|
|
|
|
node := planet.FindNode(slowNode)
|
|
slowNodeDB := node.DB.(*testblobs.SlowDB)
|
|
// make downloads on storage node slower than the timeout on the satellite for downloading shares
|
|
delay := 10 * auditTimeout
|
|
slowNodeDB.SetLatency(delay)
|
|
|
|
oldRep, err := satellite.Reputation.Service.Get(ctx, slowNode)
|
|
require.NoError(t, err)
|
|
|
|
rq := audits.ReverifyQueue.(interface {
|
|
TestingFudgeUpdateTime(ctx context.Context, pendingAudit *audit.PieceLocator, updateTime time.Time) error
|
|
})
|
|
|
|
// give node enough timeouts to reach max
|
|
for i := 0; i < satellite.Config.Audit.MaxReverifyCount; i++ {
|
|
// run the reverify worker; each loop should complete once there are
|
|
// no more reverifications to do in the queue
|
|
audits.ReverifyWorker.Loop.TriggerWait()
|
|
|
|
// make sure the node is still contained
|
|
info, err := containment.Get(ctx, slowNode)
|
|
require.NoError(t, err)
|
|
|
|
err = rq.TestingFudgeUpdateTime(ctx, pending, info.LastAttempt.Add(-satellite.Config.Audit.ReverificationRetryInterval))
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
// final timeout should trigger failure and removal from containment
|
|
audits.ReverifyWorker.Loop.TriggerWait()
|
|
|
|
_, err = containment.Get(ctx, slowNode)
|
|
require.Truef(t, audit.ErrContainedNotFound.Has(err), "expected ErrContainedNotFound but got error %+v", err)
|
|
|
|
newRep, err := satellite.Reputation.Service.Get(ctx, slowNode)
|
|
require.NoError(t, err)
|
|
require.Less(t, oldRep.AuditReputationBeta, newRep.AuditReputationBeta)
|
|
})
|
|
}
|
|
|
|
func TestTimeDelayBeforeReverifies(t *testing.T) {
|
|
const (
|
|
auditTimeout = time.Second
|
|
reverifyInterval = time.Second / 4
|
|
)
|
|
testWithRangedLoop(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
|
Reconfigure: testplanet.Reconfigure{
|
|
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
|
|
return testblobs.NewSlowDB(log.Named("slowdb"), db), nil
|
|
},
|
|
Satellite: testplanet.Combine(
|
|
func(log *zap.Logger, index int, config *satellite.Config) {
|
|
// These config values are chosen to force the slow node to time out without timing out on the three normal nodes
|
|
config.Audit.MinBytesPerSecond = 100 * memory.KiB
|
|
config.Audit.MinDownloadTimeout = auditTimeout
|
|
// disable reputation write cache so changes are immediate
|
|
config.Reputation.FlushInterval = 0
|
|
},
|
|
testplanet.ReconfigureRS(2, 2, 4, 4),
|
|
),
|
|
},
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
|
|
satellite := planet.Satellites[0]
|
|
audits := satellite.Audit
|
|
|
|
audits.Worker.Loop.Pause()
|
|
pauseQueueing(satellite)
|
|
|
|
ul := planet.Uplinks[0]
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
|
|
|
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
|
require.NoError(t, err)
|
|
|
|
err = runQueueingOnce(ctx, satellite)
|
|
require.NoError(t, err)
|
|
|
|
queue := audits.VerifyQueue
|
|
queueSegment, err := queue.Next(ctx)
|
|
require.NoError(t, err)
|
|
|
|
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
|
StreamID: queueSegment.StreamID,
|
|
Position: queueSegment.Position,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
slowPiece := segment.Pieces[0]
|
|
slowNode := planet.FindNode(slowPiece.StorageNode)
|
|
slowNode.DB.(*testblobs.SlowDB).SetLatency(10 * auditTimeout)
|
|
|
|
report, err := audits.Verifier.Verify(ctx, audit.Segment{
|
|
StreamID: segment.StreamID,
|
|
Position: segment.Position,
|
|
}, nil)
|
|
require.NoError(t, err)
|
|
|
|
approximateQueueTime := time.Now()
|
|
audits.Reporter.RecordAudits(ctx, report)
|
|
node, err := satellite.Overlay.DB.Get(ctx, slowNode.ID())
|
|
require.NoError(t, err)
|
|
require.True(t, node.Contained)
|
|
pendingJob, err := satellite.DB.Containment().Get(ctx, slowNode.ID())
|
|
require.NoError(t, err)
|
|
require.NotNil(t, pendingJob)
|
|
dbQueueTime := pendingJob.InsertedAt // note this is not necessarily comparable with times from time.Now()
|
|
|
|
reverifyQueue := satellite.Audit.ReverifyQueue
|
|
|
|
// To demonstrate that a Reverify won't happen until reverifyInterval has elapsed, we will
|
|
// call reverifyQueue.GetNextJob up to 10 times, evenly spaced within reverifyInterval,
|
|
// asserting that the reverification job is still there, unchanged, and that the node
|
|
// is still contained, until after reverifyInterval.
|
|
//
|
|
// Yes, this is unfortunately dependent on the system clock and on sleep()s. But I've tried
|
|
// to make it as independent of actual timing as I can.
|
|
const (
|
|
numCallsTarget = 10
|
|
callInterval = reverifyInterval / numCallsTarget
|
|
)
|
|
|
|
for {
|
|
// reverify queue won't let us get the job yet
|
|
nextJob, err := reverifyQueue.GetNextJob(ctx, reverifyInterval)
|
|
if err == nil {
|
|
// unless reverifyInterval has elapsed
|
|
if time.Since(approximateQueueTime) >= reverifyInterval {
|
|
// in which case, it's good to get this
|
|
require.Equal(t, slowNode.ID(), nextJob.Locator.NodeID)
|
|
require.True(t, dbQueueTime.Equal(nextJob.InsertedAt), nextJob)
|
|
break
|
|
}
|
|
require.Failf(t, "Got no error", "only %s has elapsed. nextJob=%+v", time.Since(approximateQueueTime), nextJob)
|
|
}
|
|
require.Error(t, err)
|
|
require.True(t, audit.ErrEmptyQueue.Has(err), err)
|
|
require.Nil(t, nextJob)
|
|
|
|
// reverification job is still in the queue, though
|
|
pendingJob, err := reverifyQueue.GetByNodeID(ctx, slowNode.ID())
|
|
require.NoError(t, err)
|
|
require.Equal(t, slowNode.ID(), pendingJob.Locator.NodeID)
|
|
require.True(t, dbQueueTime.Equal(pendingJob.InsertedAt), pendingJob)
|
|
|
|
// and the node is still contained
|
|
node, err := satellite.Overlay.DB.Get(ctx, slowNode.ID())
|
|
require.NoError(t, err)
|
|
require.True(t, node.Contained)
|
|
|
|
// wait a bit
|
|
sync2.Sleep(ctx, callInterval)
|
|
require.NoError(t, ctx.Err())
|
|
}
|
|
|
|
// Now we need to demonstrate that a second Reverify won't happen until reverifyInterval
|
|
// has elapsed again. This code will be largely the same as the first time around.
|
|
|
|
for {
|
|
// reverify queue won't let us get the job yet
|
|
nextJob, err := reverifyQueue.GetNextJob(ctx, reverifyInterval)
|
|
if err == nil {
|
|
// unless 2*reverifyInterval has elapsed
|
|
if time.Since(approximateQueueTime) >= 2*reverifyInterval {
|
|
// in which case, it's good to get this
|
|
require.Equal(t, slowNode.ID(), nextJob.Locator.NodeID)
|
|
require.True(t, dbQueueTime.Equal(nextJob.InsertedAt), nextJob)
|
|
break
|
|
}
|
|
}
|
|
require.Error(t, err)
|
|
require.True(t, audit.ErrEmptyQueue.Has(err), err)
|
|
require.Nil(t, nextJob)
|
|
|
|
// reverification job is still in the queue, though
|
|
pendingJob, err := reverifyQueue.GetByNodeID(ctx, slowNode.ID())
|
|
require.NoError(t, err)
|
|
require.Equal(t, slowNode.ID(), pendingJob.Locator.NodeID)
|
|
require.True(t, dbQueueTime.Equal(pendingJob.InsertedAt), pendingJob)
|
|
require.True(t, pendingJob.LastAttempt.After(dbQueueTime), pendingJob)
|
|
|
|
// and the node is still contained
|
|
node, err := satellite.Overlay.DB.Get(ctx, slowNode.ID())
|
|
require.NoError(t, err)
|
|
require.True(t, node.Contained)
|
|
|
|
// wait a bit
|
|
sync2.Sleep(ctx, callInterval)
|
|
require.NoError(t, ctx.Err())
|
|
}
|
|
})
|
|
}
|