satellite/audit: add tests for concurrent audits

This commit introduces tests that perform multiple concurrent audits
against the same storage node, to make sure that doing so does not
create incorrect outcomes.

Refs: https://github.com/storj/storj/issues/5495
Change-Id: Iaae49e042306bfa59bdf04c1a1540667488e51e5
This commit is contained in:
paul cannon 2023-02-03 15:03:27 -06:00 committed by Storj Robot
parent 4cb825a6ea
commit 362d73f783
2 changed files with 392 additions and 4 deletions

View File

@ -15,10 +15,18 @@ import (
"storj.io/storj/storagenode"
)
// ErrorBlobs is the interface of storage.Blobs with the SetError method added.
// This allows the BadDB{}.Blobs member to be replaced with something that has
// specific behavior changes.
type ErrorBlobs interface {
storage.Blobs
SetError(err error)
}
// BadDB implements bad storage node DB.
type BadDB struct {
storagenode.DB
blobs *BadBlobs
Blobs ErrorBlobs
log *zap.Logger
}
@ -27,19 +35,19 @@ type BadDB struct {
func NewBadDB(log *zap.Logger, db storagenode.DB) *BadDB {
return &BadDB{
DB: db,
blobs: newBadBlobs(log, db.Pieces()),
Blobs: newBadBlobs(log, db.Pieces()),
log: log,
}
}
// Pieces returns the blob store.
func (bad *BadDB) Pieces() storage.Blobs {
return bad.blobs
return bad.Blobs
}
// SetError sets an error to be returned for all piece operations.
func (bad *BadDB) SetError(err error) {
bad.blobs.SetError(err)
bad.Blobs.SetError(err)
}
// BadBlobs implements a bad blob store.

View File

@ -7,6 +7,7 @@ import (
"context"
"crypto/rand"
"fmt"
"os"
"testing"
"time"
@ -14,6 +15,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"storj.io/common/errs2"
"storj.io/common/memory"
@ -1189,3 +1191,381 @@ func TestIdentifyContainedNodes(t *testing.T) {
require.True(t, ok, "expected node to be indicated as contained, but it was not")
})
}
func TestConcurrentAuditsSuccess(t *testing.T) {
const (
numConcurrentAudits = 10
minPieces = 5
)
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: minPieces, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
// every segment gets a piece on every node, so that every segment audit
// hits the same set of nodes, and every node is touched by every audit
Satellite: testplanet.ReconfigureRS(minPieces, minPieces, minPieces, minPieces),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.ReverifyWorker.Loop.Pause()
pauseQueueing(satellite)
ul := planet.Uplinks[0]
for n := 0; n < numConcurrentAudits; n++ {
testData := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", fmt.Sprintf("test/path/%d", n), testData)
require.NoError(t, err)
}
listResult, err := satellite.Metabase.DB.ListVerifySegments(ctx, metabase.ListVerifySegments{Limit: numConcurrentAudits})
require.NoError(t, err)
require.Len(t, listResult.Segments, numConcurrentAudits)
// do all the audits at the same time; at least some nodes will get more than one at the same time
group, auditCtx := errgroup.WithContext(ctx)
reports := make([]audit.Report, numConcurrentAudits)
for n, seg := range listResult.Segments {
n := n
seg := seg
group.Go(func() error {
report, err := audits.Verifier.Verify(auditCtx, audit.Segment{
StreamID: seg.StreamID,
Position: seg.Position,
}, nil)
if err != nil {
return err
}
reports[n] = report
return nil
})
}
err = group.Wait()
require.NoError(t, err)
for _, report := range reports {
require.Len(t, report.Fails, 0)
require.Len(t, report.Unknown, 0)
require.Len(t, report.PendingAudits, 0)
require.Len(t, report.Offlines, 0)
require.Equal(t, len(report.Successes), minPieces)
// apply the audit results, as the audit worker would have done
audits.Reporter.RecordAudits(ctx, report)
}
// nothing should be in the reverify queue
_, err = audits.ReverifyQueue.GetNextJob(ctx, time.Minute)
require.Error(t, err)
require.True(t, audit.ErrEmptyQueue.Has(err), err)
})
}
func TestConcurrentAuditsUnknownError(t *testing.T) {
const (
numConcurrentAudits = 10
minPieces = 5
badNodes = minPieces / 2
)
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: minPieces, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
// every segment gets a piece on every node, so that every segment audit
// hits the same set of nodes, and every node is touched by every audit
Satellite: testplanet.ReconfigureRS(minPieces-badNodes, minPieces, minPieces, minPieces),
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
return newBadBlobsAllowVerify(log.Named("baddb"), db), nil
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.ReverifyWorker.Loop.Pause()
pauseQueueing(satellite)
ul := planet.Uplinks[0]
for n := 0; n < numConcurrentAudits; n++ {
testData := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", fmt.Sprintf("test/path/%d", n), testData)
require.NoError(t, err)
}
listResult, err := satellite.Metabase.DB.ListVerifySegments(ctx, metabase.ListVerifySegments{Limit: numConcurrentAudits})
require.NoError(t, err)
require.Len(t, listResult.Segments, numConcurrentAudits)
// make ~half of the nodes time out on all responses
for n := 0; n < badNodes; n++ {
planet.StorageNodes[n].DB.(*testblobs.BadDB).SetError(fmt.Errorf("an unrecognized error"))
}
// do all the audits at the same time; at least some nodes will get more than one at the same time
group, auditCtx := errgroup.WithContext(ctx)
reports := make([]audit.Report, numConcurrentAudits)
for n, seg := range listResult.Segments {
n := n
seg := seg
group.Go(func() error {
report, err := audits.Verifier.Verify(auditCtx, audit.Segment{
StreamID: seg.StreamID,
Position: seg.Position,
}, nil)
if err != nil {
return err
}
reports[n] = report
return nil
})
}
err = group.Wait()
require.NoError(t, err)
for _, report := range reports {
require.Len(t, report.Fails, 0)
require.Len(t, report.Unknown, badNodes)
require.Len(t, report.PendingAudits, 0)
require.Len(t, report.Offlines, 0)
require.Equal(t, len(report.Successes), minPieces-badNodes)
// apply the audit results, as the audit worker would have done
audits.Reporter.RecordAudits(ctx, report)
}
// nothing should be in the reverify queue
_, err = audits.ReverifyQueue.GetNextJob(ctx, time.Minute)
require.Error(t, err)
require.True(t, audit.ErrEmptyQueue.Has(err), err)
})
}
func TestConcurrentAuditsFailure(t *testing.T) {
const (
numConcurrentAudits = 10
minPieces = 5
badNodes = minPieces / 2
)
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: minPieces, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
// every segment gets a piece on every node, so that every segment audit
// hits the same set of nodes, and every node is touched by every audit
Satellite: testplanet.ReconfigureRS(minPieces-badNodes, minPieces, minPieces, minPieces),
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
return newBadBlobsAllowVerify(log.Named("baddb"), db), nil
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.ReverifyWorker.Loop.Pause()
pauseQueueing(satellite)
ul := planet.Uplinks[0]
for n := 0; n < numConcurrentAudits; n++ {
testData := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", fmt.Sprintf("test/path/%d", n), testData)
require.NoError(t, err)
}
listResult, err := satellite.Metabase.DB.ListVerifySegments(ctx, metabase.ListVerifySegments{Limit: numConcurrentAudits})
require.NoError(t, err)
require.Len(t, listResult.Segments, numConcurrentAudits)
// make ~half of the nodes return a Not Found error on all responses
for n := 0; n < badNodes; n++ {
// Can't make _all_ calls return errors, or the directory read verification will fail
// (as it is triggered explicitly when ErrNotExist is returned from Open) and cause the
// node to panic before the test is done.
planet.StorageNodes[n].DB.(*testblobs.BadDB).SetError(os.ErrNotExist)
}
// do all the audits at the same time; at least some nodes will get more than one at the same time
group, auditCtx := errgroup.WithContext(ctx)
reports := make([]audit.Report, numConcurrentAudits)
for n, seg := range listResult.Segments {
n := n
seg := seg
group.Go(func() error {
report, err := audits.Verifier.Verify(auditCtx, audit.Segment{
StreamID: seg.StreamID,
Position: seg.Position,
}, nil)
if err != nil {
return err
}
reports[n] = report
return nil
})
}
err = group.Wait()
require.NoError(t, err)
for n, report := range reports {
require.Len(t, report.Unknown, 0, n)
require.Len(t, report.PendingAudits, 0, n)
require.Len(t, report.Offlines, 0, n)
require.Len(t, report.Fails, badNodes, n)
require.Equal(t, len(report.Successes), minPieces-badNodes, n)
// apply the audit results, as the audit worker would have done
audits.Reporter.RecordAudits(ctx, report)
}
// nothing should be in the reverify queue
_, err = audits.ReverifyQueue.GetNextJob(ctx, time.Minute)
require.Error(t, err)
require.True(t, audit.ErrEmptyQueue.Has(err), err)
})
}
func TestConcurrentAuditsTimeout(t *testing.T) {
const (
numConcurrentAudits = 10
minPieces = 5
slowNodes = minPieces / 2
)
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: minPieces, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
// every segment should get a piece on every node, so that every segment audit
// hits the same set of nodes, and every node is touched by every audit
Satellite: testplanet.Combine(
func(log *zap.Logger, index int, config *satellite.Config) {
// These config values are chosen to cause relatively quick timeouts
// while allowing the non-slow nodes to complete operations
config.Audit.MinBytesPerSecond = 100 * memory.KiB
config.Audit.MinDownloadTimeout = time.Second
},
testplanet.ReconfigureRS(minPieces-slowNodes, minPieces, minPieces, minPieces),
),
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
return testblobs.NewSlowDB(log.Named("slowdb"), db), nil
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.ReverifyWorker.Loop.Pause()
pauseQueueing(satellite)
ul := planet.Uplinks[0]
for n := 0; n < numConcurrentAudits; n++ {
testData := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, satellite, "testbucket", fmt.Sprintf("test/path/%d", n), testData)
require.NoError(t, err)
}
listResult, err := satellite.Metabase.DB.ListVerifySegments(ctx, metabase.ListVerifySegments{Limit: numConcurrentAudits})
require.NoError(t, err)
require.Len(t, listResult.Segments, numConcurrentAudits)
// make ~half of the nodes time out on all responses
for n := 0; n < slowNodes; n++ {
planet.StorageNodes[n].DB.(*testblobs.SlowDB).SetLatency(time.Hour)
}
// do all the audits at the same time; at least some nodes will get more than one at the same time
group, auditCtx := errgroup.WithContext(ctx)
reports := make([]audit.Report, numConcurrentAudits)
for n, seg := range listResult.Segments {
n := n
seg := seg
group.Go(func() error {
report, err := audits.Verifier.Verify(auditCtx, audit.Segment{
StreamID: seg.StreamID,
Position: seg.Position,
}, nil)
if err != nil {
return err
}
reports[n] = report
return nil
})
}
err = group.Wait()
require.NoError(t, err)
for _, report := range reports {
require.Len(t, report.Fails, 0)
require.Len(t, report.Unknown, 0)
require.Len(t, report.PendingAudits, slowNodes)
require.Len(t, report.Offlines, 0)
require.Equal(t, len(report.Successes), minPieces-slowNodes)
// apply the audit results, as the audit worker would have done
audits.Reporter.RecordAudits(ctx, report)
}
// the slow nodes should have been added to the reverify queue multiple times;
// once for each timed-out piece fetch
queuedReverifies := make([]*audit.ReverificationJob, 0, numConcurrentAudits*slowNodes)
dummyRetryInterval := 5 * time.Minute
for {
job, err := audits.ReverifyQueue.GetNextJob(ctx, dummyRetryInterval)
if err != nil {
if audit.ErrEmptyQueue.Has(err) {
break
}
require.NoError(t, err)
}
queuedReverifies = append(queuedReverifies, job)
}
require.Len(t, queuedReverifies, numConcurrentAudits*slowNodes)
appearancesPerNode := make(map[storj.NodeID]int)
for _, job := range queuedReverifies {
appearancesPerNode[job.Locator.NodeID]++
}
require.Len(t, appearancesPerNode, slowNodes)
for n := 0; n < slowNodes; n++ {
require.EqualValues(t, appearancesPerNode[planet.StorageNodes[n].ID()], numConcurrentAudits)
}
})
}
func newBadBlobsAllowVerify(log *zap.Logger, nodeDB storagenode.DB) storagenode.DB {
badBlobsDB := testblobs.NewBadDB(log.Named("baddb"), nodeDB)
badBlobsDB.Blobs = &badBlobsAllowVerify{ErrorBlobs: badBlobsDB.Blobs, goodBlobs: nodeDB.Pieces()}
return badBlobsDB
}
type badBlobsAllowVerify struct {
testblobs.ErrorBlobs
goodBlobs storage.Blobs
}
func (b *badBlobsAllowVerify) VerifyStorageDir(ctx context.Context, id storj.NodeID) error {
return b.goodBlobs.VerifyStorageDir(ctx, id)
}
func (b *badBlobsAllowVerify) CreateVerificationFile(ctx context.Context, id storj.NodeID) error {
return b.goodBlobs.CreateVerificationFile(ctx, id)
}
func (b *badBlobsAllowVerify) CheckWritability(ctx context.Context) error {
return b.goodBlobs.CheckWritability(ctx)
}