storj/satellite/audit/chore.go
Andrew Harding 93fad70e4b satellite/audit: prevent accessing unset reservoir segments
This change fixes the access of unset segments and keys on the reservoir
when the reservoir size is less than the max OR the number of sampled
segments is smaller than the reservoir size. It does so by tucking away
the segments and keys behind methods that return properly sized slices
into the segments/keys arrays.

It also fixes a bug in the housekeeping for the internal index variable
that holds onto how many items in the array have been populated. As part
of this fix, it changes the type of index to int8, which reduces the
size of the reservoir struct by 8 bytes.

The tests have been updated to provide better coverage for this case.

Change-Id: I3ceb17b692fe456fc4c1ca5d67d35c96aeb0a169
2022-12-14 17:43:17 -07:00

102 lines
2.3 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package audit
import (
"context"
"math/rand"
"time"
"go.uber.org/zap"
"storj.io/common/sync2"
"storj.io/common/uuid"
"storj.io/storj/satellite/metabase/segmentloop"
)
// Chore populates reservoirs and the audit queue.
//
// architecture: Chore
type Chore struct {
log *zap.Logger
rand *rand.Rand
queue VerifyQueue
Loop *sync2.Cycle
segmentLoop *segmentloop.Service
config Config
}
// NewChore instantiates Chore.
func NewChore(log *zap.Logger, queue VerifyQueue, loop *segmentloop.Service, config Config) *Chore {
if config.VerificationPushBatchSize < 1 {
config.VerificationPushBatchSize = 1
}
return &Chore{
log: log,
rand: rand.New(rand.NewSource(time.Now().Unix())),
queue: queue,
Loop: sync2.NewCycle(config.ChoreInterval),
segmentLoop: loop,
config: config,
}
}
// Run starts the chore.
func (chore *Chore) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
return chore.Loop.Run(ctx, func(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
collector := NewCollector(chore.config.Slots, chore.rand)
err = chore.segmentLoop.Join(ctx, collector)
if err != nil {
chore.log.Error("error joining segmentloop", zap.Error(err))
return nil
}
type SegmentKey struct {
StreamID uuid.UUID
Position uint64
}
var newQueue []Segment
queueSegments := make(map[SegmentKey]struct{})
// Add reservoir segments to queue in pseudorandom order.
for i := 0; i < chore.config.Slots; i++ {
for _, res := range collector.Reservoirs {
segments := res.Segments()
// Skip reservoir if no segment at this index.
if len(segments) <= i {
continue
}
segment := segments[i]
segmentKey := SegmentKey{
StreamID: segment.StreamID,
Position: segment.Position.Encode(),
}
if segmentKey == (SegmentKey{}) {
continue
}
if _, ok := queueSegments[segmentKey]; !ok {
newQueue = append(newQueue, NewSegment(segment))
queueSegments[segmentKey] = struct{}{}
}
}
}
// Push new queue to queues struct so it can be fetched by worker.
return chore.queue.Push(ctx, newQueue, chore.config.VerificationPushBatchSize)
})
}
// Close closes chore.
func (chore *Chore) Close() error {
chore.Loop.Close()
return nil
}