satellite/audit: implement reservoir struct and RemoteSegment observer method (#2744)
This commit is contained in:
parent
476fbf919a
commit
243cedb628
49
satellite/audit/pathcollector.go
Normal file
49
satellite/audit/pathcollector.go
Normal file
@ -0,0 +1,49 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package audit
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/storj"
|
||||
)
|
||||
|
||||
// PathCollector uses the metainfo loop to add paths to node reservoirs
|
||||
type PathCollector struct {
|
||||
Reservoirs map[storj.NodeID]*Reservoir
|
||||
slotCount int
|
||||
rand *rand.Rand
|
||||
}
|
||||
|
||||
// NewPathCollector instantiates a path collector
|
||||
func NewPathCollector(reservoirSlots int, r *rand.Rand) *PathCollector {
|
||||
return &PathCollector{
|
||||
Reservoirs: make(map[storj.NodeID]*Reservoir),
|
||||
slotCount: reservoirSlots,
|
||||
rand: r,
|
||||
}
|
||||
}
|
||||
|
||||
// RemoteSegment takes a remote segment found in metainfo and creates a reservoir for it if it doesn't exist already
|
||||
func (collector *PathCollector) RemoteSegment(ctx context.Context, path storj.Path, pointer *pb.Pointer) (err error) {
|
||||
for _, piece := range pointer.GetRemote().GetRemotePieces() {
|
||||
if _, ok := collector.Reservoirs[piece.NodeId]; !ok {
|
||||
collector.Reservoirs[piece.NodeId] = NewReservoir(collector.slotCount)
|
||||
}
|
||||
collector.Reservoirs[piece.NodeId].Sample(collector.rand, path)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// RemoteObject returns nil because the audit service does not interact with remote objects
|
||||
func (collector *PathCollector) RemoteObject(ctx context.Context, path storj.Path, pointer *pb.Pointer) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// InlineSegment returns nil because we're only auditing for storage nodes for now
|
||||
func (collector *PathCollector) InlineSegment(ctx context.Context, path storj.Path, pointer *pb.Pointer) (err error) {
|
||||
return nil
|
||||
}
|
78
satellite/audit/pathcollector_test.go
Normal file
78
satellite/audit/pathcollector_test.go
Normal file
@ -0,0 +1,78 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package audit_test
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"storj.io/storj/internal/memory"
|
||||
"storj.io/storj/internal/testcontext"
|
||||
"storj.io/storj/internal/testplanet"
|
||||
"storj.io/storj/internal/testrand"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/satellite/audit"
|
||||
"storj.io/storj/uplink"
|
||||
)
|
||||
|
||||
// TestAuditPathCollector does the following:
|
||||
// - start testplanet with 5 nodes and a reservoir size of 3
|
||||
// - upload 5 files
|
||||
// - iterate over all the segments in satellite.Metainfo and store them in allPieces map
|
||||
// - create a audit observer and call metaloop.Join(auditObs)
|
||||
//
|
||||
// Then for every node in testplanet:
|
||||
// - expect that there is a reservoir for that node on the audit observer
|
||||
// - that the reservoir size is <= 2 (the maxReservoirSize)
|
||||
// - that every item in the reservoir is unique
|
||||
func TestAuditPathCollector(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 5, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
audits := planet.Satellites[0].Audit
|
||||
satellite := planet.Satellites[0]
|
||||
err := audits.Service.Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
ul := planet.Uplinks[0]
|
||||
|
||||
// upload 5 remote files with 1 segment
|
||||
for i := 0; i < 5; i++ {
|
||||
testData := testrand.Bytes(8 * memory.KiB)
|
||||
path := "/some/remote/path/" + string(i)
|
||||
err := ul.UploadWithConfig(ctx, satellite, &uplink.RSConfig{
|
||||
MinThreshold: 3,
|
||||
RepairThreshold: 4,
|
||||
SuccessThreshold: 5,
|
||||
MaxThreshold: 5,
|
||||
}, "testbucket", path, testData)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
r := rand.New(rand.NewSource(time.Now().Unix()))
|
||||
observer := audit.NewPathCollector(3, r)
|
||||
err = audits.ReservoirService.MetainfoLoop.Join(ctx, observer)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, node := range planet.StorageNodes {
|
||||
// expect a reservoir for every node
|
||||
require.NotNil(t, observer.Reservoirs[node.ID()])
|
||||
require.True(t, len(observer.Reservoirs[node.ID()].Paths) > 1)
|
||||
|
||||
// Require that len paths are <= 2 even though the PathCollector was instantiated with 3
|
||||
// because the maxReservoirSize is currently 2.
|
||||
require.True(t, len(observer.Reservoirs[node.ID()].Paths) <= 2)
|
||||
|
||||
repeats := make(map[storj.Path]bool)
|
||||
for _, path := range observer.Reservoirs[node.ID()].Paths {
|
||||
assert.False(t, repeats[path], "expected every item in reservoir to be unique")
|
||||
repeats[path] = true
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
46
satellite/audit/reservoir.go
Normal file
46
satellite/audit/reservoir.go
Normal file
@ -0,0 +1,46 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package audit
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
|
||||
"storj.io/storj/pkg/storj"
|
||||
)
|
||||
|
||||
const maxReservoirSize = 2
|
||||
|
||||
// Reservoir holds a certain number of segments to reflect a random sample
|
||||
type Reservoir struct {
|
||||
Paths [maxReservoirSize]storj.Path
|
||||
size int8
|
||||
index int64
|
||||
}
|
||||
|
||||
// NewReservoir instantiates a Reservoir
|
||||
func NewReservoir(size int) *Reservoir {
|
||||
if size < 1 {
|
||||
size = 1
|
||||
} else if size > maxReservoirSize {
|
||||
size = maxReservoirSize
|
||||
}
|
||||
return &Reservoir{
|
||||
size: int8(size),
|
||||
index: 0,
|
||||
}
|
||||
}
|
||||
|
||||
// Sample makes sure that for every segment in metainfo from index i=size..n-1,
|
||||
// pick a random number r = rand(0..i), and if r < size, replace reservoir.Segments[r] with segment
|
||||
func (reservoir *Reservoir) Sample(r *rand.Rand, path storj.Path) {
|
||||
reservoir.index++
|
||||
if reservoir.index < int64(reservoir.size) {
|
||||
reservoir.Paths[reservoir.index] = path
|
||||
} else {
|
||||
random := r.Int63n(reservoir.index)
|
||||
if random < int64(reservoir.size) {
|
||||
reservoir.Paths[random] = path
|
||||
}
|
||||
}
|
||||
}
|
@ -5,6 +5,7 @@ package audit
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
@ -30,6 +31,8 @@ type Config struct {
|
||||
MinBytesPerSecond memory.Size `help:"the minimum acceptable bytes that storage nodes can transfer per second to the satellite" default:"128B"`
|
||||
MinDownloadTimeout time.Duration `help:"the minimum duration for downloading a share from storage nodes before timing out" default:"25s"`
|
||||
MaxReverifyCount int `help:"limit above which we consider an audit is failed" default:"3"`
|
||||
|
||||
Slots int `help:"number of reservoir slots allotted for nodes, currently capped at 2" default:"1"`
|
||||
}
|
||||
|
||||
// Service helps coordinate Cursor and Verifier to run the audit process continuously
|
||||
@ -58,13 +61,58 @@ func NewService(log *zap.Logger, config Config, metainfo *metainfo.Service,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ReservoirService is a temp name for the service struct during the audit 2.0 refactor.
|
||||
// Once V3-2363 and V3-2364 are implemented, Service2 will replace the existing Service struct.
|
||||
type ReservoirService struct {
|
||||
log *zap.Logger
|
||||
|
||||
reservoirSlots int
|
||||
Reservoirs map[storj.NodeID]*Reservoir
|
||||
rand *rand.Rand
|
||||
|
||||
MetainfoLoop *metainfo.Loop
|
||||
Loop sync2.Cycle
|
||||
}
|
||||
|
||||
// NewReservoirService instantiates Service2
|
||||
func NewReservoirService(log *zap.Logger, metaLoop *metainfo.Loop, config Config) (*ReservoirService, error) {
|
||||
return &ReservoirService{
|
||||
log: log,
|
||||
|
||||
reservoirSlots: config.Slots,
|
||||
rand: rand.New(rand.NewSource(time.Now().Unix())),
|
||||
|
||||
MetainfoLoop: metaLoop,
|
||||
Loop: *sync2.NewCycle(config.Interval),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Run runs auditing service 2.0
|
||||
func (service *ReservoirService) Run(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
service.log.Info("audit 2.0 is starting up")
|
||||
|
||||
return service.Loop.Run(ctx, func(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
pathCollector := NewPathCollector(service.reservoirSlots, service.rand)
|
||||
err = service.MetainfoLoop.Join(ctx, pathCollector)
|
||||
if err != nil {
|
||||
service.log.Error("error joining metainfoloop", zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
service.Reservoirs = pathCollector.Reservoirs
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// Run runs auditing service
|
||||
func (service *Service) Run(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
service.log.Info("Audit cron is starting up")
|
||||
service.log.Info("audit 1.0 is starting up")
|
||||
|
||||
return service.Loop.Run(ctx, func(ctx context.Context) error {
|
||||
err := service.process(ctx)
|
||||
return service.Loop.Run(ctx, func(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
err = service.process(ctx)
|
||||
if err != nil {
|
||||
service.log.Error("process", zap.Error(err))
|
||||
}
|
||||
|
@ -15,7 +15,7 @@ import (
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
monkit "gopkg.in/spacemonkeygo/monkit.v2"
|
||||
"gopkg.in/spacemonkeygo/monkit.v2"
|
||||
|
||||
"storj.io/storj/internal/errs2"
|
||||
"storj.io/storj/internal/post"
|
||||
@ -187,7 +187,8 @@ type Peer struct {
|
||||
Inspector *irreparable.Inspector
|
||||
}
|
||||
Audit struct {
|
||||
Service *audit.Service
|
||||
Service *audit.Service
|
||||
ReservoirService *audit.ReservoirService
|
||||
}
|
||||
|
||||
GarbageCollection struct {
|
||||
@ -477,6 +478,15 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
|
||||
if err != nil {
|
||||
return nil, errs.Combine(err, peer.Close())
|
||||
}
|
||||
|
||||
// setup audit 2.0
|
||||
peer.Audit.ReservoirService, err = audit.NewReservoirService(peer.Log.Named("reservoir service"),
|
||||
peer.Metainfo.Loop,
|
||||
config,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errs.Combine(err, peer.Close())
|
||||
}
|
||||
}
|
||||
|
||||
{ // setup garbage collection
|
||||
@ -689,6 +699,9 @@ func (peer *Peer) Run(ctx context.Context) (err error) {
|
||||
group.Go(func() error {
|
||||
return errs2.IgnoreCanceled(peer.Audit.Service.Run(ctx))
|
||||
})
|
||||
group.Go(func() error {
|
||||
return errs2.IgnoreCanceled(peer.Audit.ReservoirService.Run(ctx))
|
||||
})
|
||||
group.Go(func() error {
|
||||
return errs2.IgnoreCanceled(peer.GarbageCollection.Service.Run(ctx))
|
||||
})
|
||||
|
3
scripts/testdata/satellite-config.yaml.lock
vendored
3
scripts/testdata/satellite-config.yaml.lock
vendored
@ -13,6 +13,9 @@
|
||||
# the minimum duration for downloading a share from storage nodes before timing out
|
||||
# audit.min-download-timeout: 25s
|
||||
|
||||
# number of reservoir slots allotted for nodes, currently capped at 2
|
||||
# audit.slots: 1
|
||||
|
||||
# how frequently checker should check for bad segments
|
||||
# checker.interval: 30s
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user