storj/satellite/audit/integration_test.go
Andrew Harding 590d44301c satellite/audit: implement rangedloop observer
This change implements the ranged loop observer to replace the audit
chore that builds the audit queue.

The strategy employed by this change is to use a collector for each
segment range to  build separate per-node segment reservoirs that are
then merge them during the join step.

In previous observer migrations, there were only a handful of tests so
the strategy was to duplicate them. In this package, there are dozens
of tests that utilize the chore. To reduce code churn and maintenance
burden until the chore is removed, this change introduces a helper that
runs tests under both the chore and observer, providing a pair of
functions that can be used to pause or run the queueing function.

https://github.com/storj/storj/issues/5232

Change-Id: I8bb4b4e55cf98b1aac9f26307e3a9a355cb3f506
2023-01-03 08:52:01 -07:00

86 lines
2.5 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package audit_test
import (
"context"
"strconv"
"testing"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"storj.io/common/memory"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/satellite/audit"
)
func TestChoreAndWorkerIntegration(t *testing.T) {
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 5, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
// disable reputation write cache so changes are immediate
config.Reputation.FlushInterval = 0
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
pauseQueueing(satellite)
ul := planet.Uplinks[0]
// Upload 2 remote files with 1 segment.
for i := 0; i < 2; i++ {
testData := testrand.Bytes(8 * memory.KiB)
path := "/some/remote/path/" + strconv.Itoa(i)
err := ul.Upload(ctx, satellite, "testbucket", path, testData)
require.NoError(t, err)
}
err := runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue := audits.VerifyQueue
uniqueSegments := make(map[audit.Segment]struct{})
var segment audit.Segment
var segmentCount int
for {
segment, err = queue.Next(ctx)
if err != nil {
break
}
segmentCount++
_, ok := uniqueSegments[segment]
require.False(t, ok, "expected unique segment in chore queue")
uniqueSegments[segment] = struct{}{}
}
require.True(t, audit.ErrEmptyQueue.Has(err), "expected empty queue error, but got error %+v", err)
require.Equal(t, 2, segmentCount)
requireAuditQueueEmpty(ctx, t, audits.VerifyQueue)
// Repopulate the queue for the worker.
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
// Make sure the worker processes the audit queue.
audits.Worker.Loop.TriggerWait()
requireAuditQueueEmpty(ctx, t, audits.VerifyQueue)
})
}
func requireAuditQueueEmpty(ctx context.Context, t *testing.T, verifyQueue audit.VerifyQueue) {
entry, err := verifyQueue.Next(ctx)
require.NotNilf(t, err, "expected empty audit queue, but got entry %+v", entry)
require.Truef(t, audit.ErrEmptyQueue.Has(err), "expected empty audit queue error, but unexpectedly got error %v", err)
require.Empty(t, entry)
}