satellite/audit: optimize loop observer

Two things were done to optimize audit observer:
* monik call was removed as we have different way to track it
* no new allocation for audit.Segment struct inside observer

Benchmark against 'main':
name                                         old time/op    new time/op    delta
RemoteSegment/Cockroach/multiple_segments-8    5.85µs ± 1%    0.74µs ± 4%   -87.28%  (p=0.008 n=5+5)

name                                         old alloc/op   new alloc/op   delta
RemoteSegment/Cockroach/multiple_segments-8    2.72kB ± 0%    0.00kB           ~     (p=0.079 n=4+5)

name                                         old allocs/op  new allocs/op  delta
RemoteSegment/Cockroach/multiple_segments-8      50.0 ± 0%       0.0       -100.00%  (p=0.008 n=5+5)

Change-Id: Ib973e48782bad4346eee1cd5aee77f0a50f69258
This commit is contained in:
Michal Niewrzal 2022-09-23 16:00:56 +02:00 committed by Storj Robot
parent 05e57edb20
commit e37435602f
5 changed files with 72 additions and 18 deletions

View File

@ -11,6 +11,7 @@ import (
"go.uber.org/zap"
"storj.io/common/sync2"
"storj.io/common/uuid"
"storj.io/storj/satellite/metabase/segmentloop"
)
@ -59,8 +60,13 @@ func (chore *Chore) Run(ctx context.Context) (err error) {
return nil
}
type SegmentKey struct {
StreamID uuid.UUID
Position uint64
}
var newQueue []Segment
queueSegments := make(map[Segment]struct{})
queueSegments := make(map[SegmentKey]struct{})
// Add reservoir segments to queue in pseudorandom order.
for i := 0; i < chore.config.Slots; i++ {
@ -70,12 +76,17 @@ func (chore *Chore) Run(ctx context.Context) (err error) {
continue
}
segment := res.Segments[i]
if segment == (Segment{}) {
segmentKey := SegmentKey{
StreamID: segment.StreamID,
Position: segment.Position.Encode(),
}
if segmentKey == (SegmentKey{}) {
continue
}
if _, ok := queueSegments[segment]; !ok {
newQueue = append(newQueue, segment)
queueSegments[segment] = struct{}{}
if _, ok := queueSegments[segmentKey]; !ok {
newQueue = append(newQueue, NewSegment(segment))
queueSegments[segmentKey] = struct{}{}
}
}
}

View File

@ -11,8 +11,6 @@ import (
"storj.io/storj/satellite/metabase/segmentloop"
)
var remoteSegmentFunc = mon.Task()
var _ segmentloop.Observer = (*Collector)(nil)
// Collector uses the segment loop to add segments to node reservoirs.
@ -38,7 +36,7 @@ func (collector *Collector) LoopStarted(context.Context, segmentloop.LoopInfo) (
// RemoteSegment takes a remote segment found in metainfo and creates a reservoir for it if it doesn't exist already.
func (collector *Collector) RemoteSegment(ctx context.Context, segment *segmentloop.Segment) error {
defer remoteSegmentFunc(&ctx)(nil) // method always returns nil
// we are expliticy not adding monitoring here as we are tracking loop observers separately
for _, piece := range segment.Pieces {
res, ok := collector.Reservoirs[piece.StorageNode]
@ -46,7 +44,7 @@ func (collector *Collector) RemoteSegment(ctx context.Context, segment *segmentl
res = NewReservoir(collector.slotCount)
collector.Reservoirs[piece.StorageNode] = res
}
res.Sample(collector.rand, NewSegment(segment))
res.Sample(collector.rand, segment)
}
return nil
}

View File

@ -17,6 +17,7 @@ import (
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite/audit"
"storj.io/storj/satellite/metabase/segmentloop"
)
// TestAuditCollector does the following:
@ -64,10 +65,53 @@ func TestAuditCollector(t *testing.T) {
require.True(t, len(observer.Reservoirs[node.ID()].Segments) <= 3)
repeats := make(map[audit.Segment]bool)
for _, segment := range observer.Reservoirs[node.ID()].Segments {
for _, loopSegment := range observer.Reservoirs[node.ID()].Segments {
segment := audit.NewSegment(loopSegment)
assert.False(t, repeats[segment], "expected every item in reservoir to be unique")
repeats[segment] = true
}
}
})
}
func BenchmarkRemoteSegment(b *testing.B) {
testplanet.Bench(b, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(b *testing.B, ctx *testcontext.Context, planet *testplanet.Planet) {
for i := 0; i < 10; i++ {
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "object"+strconv.Itoa(i), testrand.Bytes(10*memory.KiB))
require.NoError(b, err)
}
observer := audit.NewCollector(3, rand.New(rand.NewSource(time.Now().Unix())))
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
require.NoError(b, err)
loopSegments := []*segmentloop.Segment{}
for _, segment := range segments {
loopSegments = append(loopSegments, &segmentloop.Segment{
StreamID: segment.StreamID,
Position: segment.Position,
CreatedAt: segment.CreatedAt,
ExpiresAt: segment.ExpiresAt,
Redundancy: segment.Redundancy,
Pieces: segment.Pieces,
})
}
b.Run("multiple segments", func(b *testing.B) {
for i := 0; i < b.N; i++ {
for _, loopSegment := range loopSegments {
err := observer.RemoteSegment(ctx, loopSegment)
if err != nil {
b.FailNow()
}
}
}
})
})
}

View File

@ -16,7 +16,7 @@ const maxReservoirSize = 3
// Reservoir holds a certain number of segments to reflect a random sample.
type Reservoir struct {
Segments [maxReservoirSize]Segment
Segments [maxReservoirSize]segmentloop.Segment
size int8
index int64
wSum int64
@ -41,9 +41,9 @@ func NewReservoir(size int) *Reservoir {
// select uniformly a random segment reservoir.Segments[rand(0..i)] to replace with
// segment. See https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_A-Chao
// for the algorithm used.
func (reservoir *Reservoir) Sample(r *rand.Rand, segment Segment) {
func (reservoir *Reservoir) Sample(r *rand.Rand, segment *segmentloop.Segment) {
if reservoir.index < int64(reservoir.size) {
reservoir.Segments[reservoir.index] = segment
reservoir.Segments[reservoir.index] = *segment
reservoir.wSum += int64(segment.EncryptedSize)
} else {
reservoir.wSum += int64(segment.EncryptedSize)
@ -51,7 +51,7 @@ func (reservoir *Reservoir) Sample(r *rand.Rand, segment Segment) {
random := r.Float64()
if random < p {
index := r.Int31n(int32(reservoir.size))
reservoir.Segments[index] = segment
reservoir.Segments[index] = *segment
}
}
reservoir.index++
@ -66,7 +66,7 @@ type Segment struct {
}
// NewSegment creates a new segment to audit from a metainfo loop segment.
func NewSegment(loopSegment *segmentloop.Segment) Segment {
func NewSegment(loopSegment segmentloop.Segment) Segment {
return Segment{
StreamID: loopSegment.StreamID,
Position: loopSegment.Position,

View File

@ -13,20 +13,21 @@ import (
"storj.io/common/testrand"
"storj.io/common/uuid"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/segmentloop"
)
func TestReservoir(t *testing.T) {
rng := rand.New(rand.NewSource(0))
r := NewReservoir(3)
seg := func(n byte) Segment { return Segment{StreamID: uuid.UUID{0: n}} }
seg := func(n byte) *segmentloop.Segment { return &segmentloop.Segment{StreamID: uuid.UUID{0: n}} }
// if we sample 3 segments, we should record all 3
r.Sample(rng, seg(1))
r.Sample(rng, seg(2))
r.Sample(rng, seg(3))
require.Equal(t, r.Segments[:], []Segment{seg(1), seg(2), seg(3)})
require.Equal(t, r.Segments[:], []segmentloop.Segment{*seg(1), *seg(2), *seg(3)})
}
func TestReservoirBias(t *testing.T) {
@ -41,7 +42,7 @@ func TestReservoirBias(t *testing.T) {
weight1StreamID: 0,
}
segments := []Segment{
segments := []*segmentloop.Segment{
{
StreamID: weight10StreamID,
Position: metabase.SegmentPosition{},