satellite/metabase: add segmentloop service

We want to move some of current metainfo loop observers to
segment loop. This change adds new service, similar to metainfo
loop but which is iterating only over segments.

Change-Id: I67f7f461781723a4476e2b83377f31736d7c4870
This commit is contained in:
Michał Niewrzał 2021-05-28 11:42:55 +02:00 committed by Michal Niewrzal
parent 6d9b91d435
commit 053e58b683
8 changed files with 978 additions and 7 deletions

View File

@ -61,6 +61,8 @@ storj.io/storj/satellite/gracefulexit."graceful_exit_success" Meter
storj.io/storj/satellite/gracefulexit."graceful_exit_successful_pieces_transfer_ratio" IntVal
storj.io/storj/satellite/gracefulexit."graceful_exit_transfer_piece_fail" Meter
storj.io/storj/satellite/gracefulexit."graceful_exit_transfer_piece_success" Meter
storj.io/storj/satellite/metabase/segmentloop."segmentsProcessed" IntVal
storj.io/storj/satellite/metabase/segmentloop.*Service.RunOnce Task
storj.io/storj/satellite/metainfo."metainfo_rate_limit_exceeded" Event
storj.io/storj/satellite/metainfo/metaloop."objectsIterated" IntVal
storj.io/storj/satellite/metainfo/metaloop."objectsProcessed" IntVal

View File

@ -50,6 +50,7 @@ import (
"storj.io/storj/satellite/mailservice"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/metaloop"
"storj.io/storj/satellite/metabase/segmentloop"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/metainfo/expireddeletion"
"storj.io/storj/satellite/metainfo/piecedeletion"
@ -99,10 +100,11 @@ type Satellite struct {
}
Metainfo struct {
Metabase *metabase.DB
Service *metainfo.Service
Endpoint2 *metainfo.Endpoint
Loop *metaloop.Service
Metabase *metabase.DB
Service *metainfo.Service
Endpoint2 *metainfo.Endpoint
Loop *metaloop.Service
SegmentLoop *segmentloop.Service
}
Inspector struct {
@ -468,6 +470,9 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int
CoalesceDuration: 1 * time.Second,
ListLimit: 10000,
},
SegmentLoop: segmentloop.Config{
CoalesceDuration: 1 * time.Second,
},
RateLimiter: metainfo.RateLimiterConfig{
Enabled: true,
Rate: 1000,
@ -708,6 +713,7 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer
system.Metainfo.Service = peer.Metainfo.Service
system.Metainfo.Endpoint2 = api.Metainfo.Endpoint2
system.Metainfo.Loop = peer.Metainfo.Loop
system.Metainfo.SegmentLoop = peer.Metainfo.SegmentLoop
system.Inspector.Endpoint = api.Inspector.Endpoint

View File

@ -35,6 +35,7 @@ import (
"storj.io/storj/satellite/gracefulexit"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/metaloop"
"storj.io/storj/satellite/metabase/segmentloop"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/metainfo/expireddeletion"
"storj.io/storj/satellite/metrics"
@ -82,9 +83,10 @@ type Core struct {
}
Metainfo struct {
Metabase *metabase.DB
Service *metainfo.Service
Loop *metaloop.Service
Metabase *metabase.DB
Service *metainfo.Service
Loop *metaloop.Service
SegmentLoop *segmentloop.Service
}
Orders struct {
@ -287,6 +289,15 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
Run: peer.Metainfo.Loop.Run,
Close: peer.Metainfo.Loop.Close,
})
peer.Metainfo.SegmentLoop = segmentloop.New(
config.Metainfo.SegmentLoop,
peer.Metainfo.Metabase,
)
peer.Services.Add(lifecycle.Item{
Name: "metainfo:segmentloop",
Run: peer.Metainfo.SegmentLoop.Run,
Close: peer.Metainfo.SegmentLoop.Close,
})
}
{ // setup datarepair

View File

@ -0,0 +1,437 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package segmentloop
import (
"context"
"fmt"
"time"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"golang.org/x/time/rate"
"storj.io/storj/satellite/metabase"
)
const batchsizeLimit = 2500
var (
mon = monkit.Package()
// Error is a standard error class for this component.
Error = errs.Class("segments loop")
// ErrClosed is a loop closed error.
ErrClosed = Error.New("loop closed")
)
// Segment contains information about segment metadata which will be received by observers.
type Segment metabase.LoopSegmentEntry
// Inline returns true if segment is inline.
func (s Segment) Inline() bool {
return s.Redundancy.IsZero() && len(s.Pieces) == 0
}
// Observer is an interface defining an observer that can subscribe to the segments loop.
//
// architecture: Observer
type Observer interface {
LoopStarted(context.Context, LoopInfo) error
RemoteSegment(context.Context, *Segment) error
InlineSegment(context.Context, *Segment) error
}
// LoopInfo contains information about the current loop.
type LoopInfo struct {
Started time.Time
}
// NullObserver is an observer that does nothing. This is useful for joining
// and ensuring the segments loop runs once before you use a real observer.
type NullObserver struct{}
// LoopStarted is called at each loop start.
func (NullObserver) LoopStarted(context.Context, LoopInfo) error {
return nil
}
// RemoteSegment implements the Observer interface.
func (NullObserver) RemoteSegment(context.Context, *Segment) error {
return nil
}
// InlineSegment implements the Observer interface.
func (NullObserver) InlineSegment(context.Context, *Segment) error {
return nil
}
type observerContext struct {
trigger bool
observer Observer
ctx context.Context
done chan error
remote *monkit.DurationDist
inline *monkit.DurationDist
}
func newObserverContext(ctx context.Context, obs Observer) *observerContext {
name := fmt.Sprintf("%T", obs)
key := monkit.NewSeriesKey("observer").WithTag("name", name)
return &observerContext{
observer: obs,
ctx: ctx,
done: make(chan error),
inline: monkit.NewDurationDist(key.WithTag("pointer_type", "inline")),
remote: monkit.NewDurationDist(key.WithTag("pointer_type", "remote")),
}
}
func (observer *observerContext) RemoteSegment(ctx context.Context, segment *Segment) error {
start := time.Now()
defer func() { observer.remote.Insert(time.Since(start)) }()
return observer.observer.RemoteSegment(ctx, segment)
}
func (observer *observerContext) InlineSegment(ctx context.Context, segment *Segment) error {
start := time.Now()
defer func() { observer.inline.Insert(time.Since(start)) }()
return observer.observer.InlineSegment(ctx, segment)
}
func (observer *observerContext) HandleError(err error) bool {
if err != nil {
observer.done <- err
observer.Finish()
return true
}
return false
}
func (observer *observerContext) Finish() {
close(observer.done)
name := fmt.Sprintf("%T", observer.observer)
stats := allObserverStatsCollectors.GetStats(name)
stats.Observe(observer)
}
func (observer *observerContext) Wait() error {
return <-observer.done
}
// Config contains configurable values for the segments loop.
type Config struct {
CoalesceDuration time.Duration `help:"how long to wait for new observers before starting iteration" releaseDefault:"5s" devDefault:"5s"`
RateLimit float64 `help:"rate limit (default is 0 which is unlimited segments per second)" default:"0"`
ListLimit int `help:"how many items to query in a batch" default:"2500"`
}
// MetabaseDB contains iterators for the metabase data.
type MetabaseDB interface {
// Now returns the time on the database.
Now(ctx context.Context) (time.Time, error)
// IterateLoopStreams iterates through all streams passed in as arguments.
IterateLoopSegments(ctx context.Context, opts metabase.IterateLoopSegments, fn func(context.Context, metabase.LoopSegmentsIterator) error) (err error)
}
// Service is a segments loop service.
//
// architecture: Service
type Service struct {
config Config
metabaseDB MetabaseDB
join chan *observerContext
done chan struct{}
}
// New creates a new segments loop service.
func New(config Config, metabaseDB MetabaseDB) *Service {
return &Service{
metabaseDB: metabaseDB,
config: config,
join: make(chan *observerContext),
done: make(chan struct{}),
}
}
// Join will join the looper for one full cycle until completion and then returns.
// Joining will trigger a new iteration after coalesce duration.
// On ctx cancel the observer will return without completely finishing.
// Only on full complete iteration it will return nil.
// Safe to be called concurrently.
func (loop *Service) Join(ctx context.Context, observer Observer) (err error) {
return loop.joinObserver(ctx, true, observer)
}
// Monitor will join the looper for one full cycle until completion and then returns.
// Joining with monitoring won't trigger after coalesce duration.
// On ctx cancel the observer will return without completely finishing.
// Only on full complete iteration it will return nil.
// Safe to be called concurrently.
func (loop *Service) Monitor(ctx context.Context, observer Observer) (err error) {
return loop.joinObserver(ctx, false, observer)
}
// joinObserver will join the looper for one full cycle until completion and then returns.
// On ctx cancel the observer will return without completely finishing.
// Only on full complete iteration it will return nil.
// Safe to be called concurrently.
func (loop *Service) joinObserver(ctx context.Context, trigger bool, obs Observer) (err error) {
defer mon.Task()(&ctx)(&err)
obsctx := newObserverContext(ctx, obs)
obsctx.trigger = trigger
select {
case loop.join <- obsctx:
case <-ctx.Done():
return ctx.Err()
case <-loop.done:
return ErrClosed
}
return obsctx.Wait()
}
// Run starts the looping service.
// It can only be called once, otherwise a panic will occur.
func (loop *Service) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
for {
err := loop.RunOnce(ctx)
if err != nil {
return err
}
}
}
// Close closes the looping services.
func (loop *Service) Close() (err error) {
close(loop.done)
return nil
}
// RunOnce goes through segments one time and sends information to observers.
//
// It is not safe to call this concurrently with Run.
func (loop *Service) RunOnce(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err) //mon:locked
coalesceTimer := time.NewTimer(loop.config.CoalesceDuration)
defer coalesceTimer.Stop()
stopTimer(coalesceTimer)
earlyExit := make(chan *observerContext)
earlyExitDone := make(chan struct{})
monitorEarlyExit := func(obs *observerContext) {
select {
case <-obs.ctx.Done():
select {
case <-earlyExitDone:
case earlyExit <- obs:
}
case <-earlyExitDone:
}
}
timerStarted := false
observers := []*observerContext{}
waitformore:
for {
select {
// when the coalesce timer hits, we have waited enough for observers to join.
case <-coalesceTimer.C:
break waitformore
// wait for a new observer to join.
case obsctx := <-loop.join:
// when the observer triggers the loop and it's the first one,
// then start the coalescing timer.
if obsctx.trigger {
if !timerStarted {
coalesceTimer.Reset(loop.config.CoalesceDuration)
timerStarted = true
}
}
observers = append(observers, obsctx)
go monitorEarlyExit(obsctx)
// remove an observer from waiting when it's canceled before the loop starts.
case obsctx := <-earlyExit:
for i, obs := range observers {
if obs == obsctx {
observers = append(observers[:i], observers[i+1:]...)
break
}
}
obsctx.HandleError(obsctx.ctx.Err())
// reevalute, whether we acually need to start the loop.
timerShouldRun := false
for _, obs := range observers {
timerShouldRun = timerShouldRun || obs.trigger
}
if !timerShouldRun && timerStarted {
stopTimer(coalesceTimer)
}
// when ctx done happens we can finish all the waiting observers.
case <-ctx.Done():
close(earlyExitDone)
errorObservers(observers, ctx.Err())
return ctx.Err()
}
}
close(earlyExitDone)
return iterateDatabase(ctx, loop.metabaseDB, observers, loop.config.ListLimit, rate.NewLimiter(rate.Limit(loop.config.RateLimit), 1))
}
func stopTimer(t *time.Timer) {
t.Stop()
// drain if it contains something
select {
case <-t.C:
default:
}
}
// Wait waits for run to be finished.
// Safe to be called concurrently.
func (loop *Service) Wait() {
<-loop.done
}
func iterateDatabase(ctx context.Context, metabaseDB MetabaseDB, observers []*observerContext, limit int, rateLimiter *rate.Limiter) (err error) {
defer func() {
if err != nil {
errorObservers(observers, err)
return
}
finishObservers(observers)
}()
observers, err = iterateSegments(ctx, metabaseDB, observers, limit, rateLimiter)
if err != nil {
return Error.Wrap(err)
}
return err
}
func iterateSegments(ctx context.Context, metabaseDB MetabaseDB, observers []*observerContext, limit int, rateLimiter *rate.Limiter) (_ []*observerContext, err error) {
defer mon.Task()(&ctx)(&err)
if limit <= 0 || limit > batchsizeLimit {
limit = batchsizeLimit
}
startingTime, err := metabaseDB.Now(ctx)
if err != nil {
return observers, Error.Wrap(err)
}
noObserversErr := errs.New("no observers")
observers = withObservers(ctx, observers, func(ctx context.Context, observer *observerContext) bool {
err := observer.observer.LoopStarted(ctx, LoopInfo{Started: startingTime})
return !observer.HandleError(err)
})
if len(observers) == 0 {
return observers, noObserversErr
}
var segmentsProcessed int64
err = metabaseDB.IterateLoopSegments(ctx, metabase.IterateLoopSegments{
BatchSize: limit,
AsOfSystemTime: startingTime,
}, func(ctx context.Context, iterator metabase.LoopSegmentsIterator) error {
defer mon.TaskNamed("iterateLoopSegmentsCB")(&ctx)(&err)
var entry metabase.LoopSegmentEntry
for iterator.Next(ctx, &entry) {
if err := ctx.Err(); err != nil {
return err
}
timer := mon.Timer("iterateLoopSegmentsRateLimit").Start()
if err := rateLimiter.Wait(ctx); err != nil {
// We don't really execute concurrent batches so we should never
// exceed the burst size of 1 and this should never happen.
// We can also enter here if the context is cancelled.
timer.Stop()
return err
}
timer.Stop()
observers = withObservers(ctx, observers, func(ctx context.Context, observer *observerContext) bool {
segment := Segment(entry)
return !observer.HandleError(handleSegment(ctx, observer, &segment))
})
if len(observers) == 0 {
return noObserversErr
}
segmentsProcessed++
mon.IntVal("segmentsProcessed").Observe(segmentsProcessed) //mon:locked
}
return nil
})
return observers, err
}
func withObservers(ctx context.Context, observers []*observerContext, handleObserver func(ctx context.Context, observer *observerContext) bool) []*observerContext {
defer mon.Task()(&ctx)(nil)
nextObservers := observers[:0]
for _, observer := range observers {
keepObserver := handleObserver(ctx, observer)
if keepObserver {
nextObservers = append(nextObservers, observer)
}
}
return nextObservers
}
func handleSegment(ctx context.Context, observer *observerContext, segment *Segment) (err error) {
defer mon.Task()(&ctx)(&err)
if segment.Inline() {
if err := observer.InlineSegment(ctx, segment); err != nil {
return err
}
} else {
if err := observer.RemoteSegment(ctx, segment); err != nil {
return err
}
}
return observer.ctx.Err()
}
func finishObservers(observers []*observerContext) {
for _, observer := range observers {
observer.Finish()
}
}
func errorObservers(observers []*observerContext, err error) {
for _, observer := range observers {
observer.HandleError(err)
}
}

View File

@ -0,0 +1,425 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package segmentloop_test
import (
"context"
"errors"
"strconv"
"strings"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"storj.io/common/errs2"
"storj.io/common/memory"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/common/uuid"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/segmentloop"
)
// TestSegmentsLoop does the following
// * upload 5 remote files with 1 segment
// * upload 2 remote files with 2 segments
// * upload 2 inline files
// * connect two observers to the segments loop
// * run the segments loop.
func TestSegmentsLoop(t *testing.T) {
segmentSize := 50 * memory.KiB
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: 4,
UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Metainfo.SegmentLoop.CoalesceDuration = 1 * time.Second
config.Metainfo.MaxSegmentSize = segmentSize
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
ul := planet.Uplinks[0]
satellite := planet.Satellites[0]
segmentLoop := satellite.Metainfo.SegmentLoop
// upload 5 remote objects with 1 segment
for i := 0; i < 5; i++ {
testData := testrand.Bytes(8 * memory.KiB)
path := "/some/remote/path/" + strconv.Itoa(i)
err := ul.Upload(ctx, satellite, "bucket", path, testData)
require.NoError(t, err)
}
// upload 2 remote objects with 2 segment each
for i := 0; i < 2; i++ {
// exact 2*segmentSize will make inline segment at the end of object
testData := testrand.Bytes(2*segmentSize - 1000)
path := "/some/other/remote/path/" + strconv.Itoa(i)
err := ul.Upload(ctx, satellite, "bucket", path, testData)
require.NoError(t, err)
}
// upload 2 inline files
for i := 0; i < 2; i++ {
testData := testrand.Bytes(1 * memory.KiB)
path := "/some/inline/path/" + strconv.Itoa(i)
err := ul.Upload(ctx, satellite, "bucket", path, testData)
require.NoError(t, err)
}
// create 2 observers
obs1 := newTestObserver(nil)
obs2 := newTestObserver(nil)
var group errgroup.Group
group.Go(func() error {
return segmentLoop.Join(ctx, obs1)
})
group.Go(func() error {
return segmentLoop.Join(ctx, obs2)
})
err := group.Wait()
require.NoError(t, err)
for _, obs := range []*testObserver{obs1, obs2} {
assert.EqualValues(t, 9, obs.remoteSegCount)
assert.EqualValues(t, 2, obs.inlineSegCount)
assert.EqualValues(t, 11, len(obs.uniqueKeys))
}
})
}
func TestSegmentsLoop_AllData(t *testing.T) {
segmentSize := 8 * memory.KiB
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: 4,
UplinkCount: 3,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Metainfo.SegmentLoop.CoalesceDuration = 1 * time.Second
config.Metainfo.SegmentLoop.ListLimit = 2
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
bucketNames := strings.Split("abc", "")
data := testrand.Bytes(segmentSize)
for _, up := range planet.Uplinks {
for _, bucketName := range bucketNames {
err := up.Upload(ctx, planet.Satellites[0], "zzz"+bucketName, "1", data)
require.NoError(t, err)
}
}
loop := planet.Satellites[0].Metainfo.SegmentLoop
obs := newTestObserver(nil)
err := loop.Join(ctx, obs)
require.NoError(t, err)
gotItems := len(obs.uniqueKeys)
require.Equal(t, len(bucketNames)*len(planet.Uplinks), gotItems)
})
}
// TestsegmentsLoopObserverCancel does the following:
// * upload 3 remote segments
// * hook three observers up to segments loop
// * let observer 1 run normally
// * let observer 2 return an error from one of its handlers
// * let observer 3's context be canceled
// * expect observer 1 to see all segments
// * expect observers 2 and 3 to finish with errors.
func TestSegmentsLoopObserverCancel(t *testing.T) {
segmentSize := 8 * memory.KiB
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: 4,
UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Metainfo.Loop.CoalesceDuration = 1 * time.Second
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
ul := planet.Uplinks[0]
satellite := planet.Satellites[0]
loop := satellite.Metainfo.SegmentLoop
// upload 3 remote files with 1 segment
for i := 0; i < 3; i++ {
testData := testrand.Bytes(segmentSize)
path := "/some/remote/path/" + strconv.Itoa(i)
err := ul.Upload(ctx, satellite, "bucket", path, testData)
require.NoError(t, err)
}
// create 1 "good" observer
obs1 := newTestObserver(nil)
mon1 := newTestObserver(nil)
// create observer that will return an error from RemoteSegment
obs2 := newTestObserver(func(ctx context.Context) error {
return errors.New("test error")
})
// create observer that will cancel its own context from RemoteSegment
obs3Ctx, cancel := context.WithCancel(ctx)
var once int64
obs3 := newTestObserver(func(ctx context.Context) error {
if atomic.AddInt64(&once, 1) == 1 {
cancel()
<-obs3Ctx.Done() // ensure we wait for cancellation to propagate
} else {
panic("multiple calls to observer after loop cancel")
}
return nil
})
var group errgroup.Group
group.Go(func() error {
return loop.Join(ctx, obs1)
})
group.Go(func() error {
return loop.Monitor(ctx, mon1)
})
group.Go(func() error {
err := loop.Join(ctx, obs2)
if err == nil {
return errors.New("got no error")
}
if !strings.Contains(err.Error(), "test error") {
return errors.New("expected to find error")
}
return nil
})
group.Go(func() error {
err := loop.Join(obs3Ctx, obs3)
if !errs2.IsCanceled(err) {
return errors.New("expected canceled")
}
return nil
})
err := group.Wait()
require.NoError(t, err)
// expect that obs1 saw all three segments, but obs2 and obs3 only saw the first one
assert.EqualValues(t, 3, obs1.remoteSegCount)
assert.EqualValues(t, 3, mon1.remoteSegCount)
assert.EqualValues(t, 1, obs2.remoteSegCount)
assert.EqualValues(t, 1, obs3.remoteSegCount)
})
}
// TestSegmentsLoopCancel does the following:
// * upload 3 remote segments
// * hook two observers up to segments loop
// * cancel loop context partway through
// * expect both observers to exit with an error and see fewer than 3 remote segments
// * expect that a new observer attempting to join at this point receives a loop closed error.
func TestSegmentsLoopCancel(t *testing.T) {
segmentSize := 8 * memory.KiB
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: 4,
UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
ul := planet.Uplinks[0]
satellite := planet.Satellites[0]
// upload 3 remote files with 1 segment
for i := 0; i < 3; i++ {
testData := testrand.Bytes(segmentSize)
path := "/some/remote/path/" + strconv.Itoa(i)
err := ul.Upload(ctx, satellite, "bucket", path, testData)
require.NoError(t, err)
}
loop := segmentloop.New(segmentloop.Config{
CoalesceDuration: 1 * time.Second,
ListLimit: 10000,
}, satellite.Metainfo.Metabase)
// create a cancelable context to pass into metaLoop.Run
loopCtx, cancel := context.WithCancel(ctx)
// create 1 normal observer
obs1 := newTestObserver(nil)
var once int64
// create another normal observer that will wait before returning during RemoteSegment so we can sync with context cancelation
obs2 := newTestObserver(func(ctx context.Context) error {
// cancel context during call to obs2.RemoteSegment inside loop
if atomic.AddInt64(&once, 1) == 1 {
cancel()
<-ctx.Done() // ensure we wait for cancellation to propagate
} else {
panic("multiple calls to observer after loop cancel")
}
return nil
})
var group errgroup.Group
// start loop with cancelable context
group.Go(func() error {
err := loop.Run(loopCtx)
if !errs2.IsCanceled(err) {
return errors.New("expected context canceled")
}
return nil
})
group.Go(func() error {
err := loop.Join(ctx, obs1)
if !errs2.IsCanceled(err) {
return errors.New("expected context canceled")
}
return nil
})
group.Go(func() error {
err := loop.Join(ctx, obs2)
if !errs2.IsCanceled(err) {
return errors.New("expected context canceled")
}
return nil
})
err := group.Wait()
require.NoError(t, err)
err = loop.Close()
require.NoError(t, err)
obs3 := newTestObserver(nil)
err = loop.Join(ctx, obs3)
require.Error(t, err)
assert.Contains(t, err.Error(), "loop closed")
// expect that obs1 and obs2 each saw fewer than three remote segments
assert.True(t, obs1.remoteSegCount < 3)
assert.True(t, obs2.remoteSegCount < 3)
})
}
func TestSegmentsLoop_MonitorCancel(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
loop := segmentloop.New(segmentloop.Config{
CoalesceDuration: time.Nanosecond,
ListLimit: 10000,
}, satellite.Metainfo.Metabase)
obs1 := newTestObserver(func(ctx context.Context) error {
return errors.New("test error")
})
var group errgroup.Group
loopCtx, loopCancel := context.WithCancel(ctx)
group.Go(func() error {
err := loop.Run(loopCtx)
t.Log("segments loop stopped")
if !errs2.IsCanceled(err) {
return errors.New("expected context canceled")
}
return nil
})
obsCtx, obsCancel := context.WithCancel(ctx)
group.Go(func() error {
defer loopCancel()
err := loop.Monitor(obsCtx, obs1)
t.Log("observer stopped")
if !errs2.IsCanceled(err) {
return errors.New("expected context canceled")
}
return nil
})
obsCancel()
err := group.Wait()
require.NoError(t, err)
err = loop.Close()
require.NoError(t, err)
})
}
type testKey struct {
StreamID uuid.UUID
Position metabase.SegmentPosition
}
type testObserver struct {
remoteSegCount int
inlineSegCount int
uniqueKeys map[testKey]struct{}
onSegment func(context.Context) error // if set, run this during RemoteSegment()
}
func newTestObserver(onSegment func(context.Context) error) *testObserver {
return &testObserver{
remoteSegCount: 0,
inlineSegCount: 0,
uniqueKeys: make(map[testKey]struct{}),
onSegment: onSegment,
}
}
// LoopStarted is called at each start of a loop.
func (obs *testObserver) LoopStarted(ctx context.Context, info segmentloop.LoopInfo) (err error) {
return nil
}
func (obs *testObserver) RemoteSegment(ctx context.Context, segment *segmentloop.Segment) error {
obs.remoteSegCount++
key := testKey{
StreamID: segment.StreamID,
Position: segment.Position,
}
if _, ok := obs.uniqueKeys[key]; ok {
// TODO: collect the errors and check in test
panic("Expected unique pair StreamID/Position in observer.RemoteSegment")
}
obs.uniqueKeys[key] = struct{}{}
if obs.onSegment != nil {
return obs.onSegment(ctx)
}
return nil
}
func (obs *testObserver) InlineSegment(ctx context.Context, segment *segmentloop.Segment) error {
obs.inlineSegCount++
key := testKey{
StreamID: segment.StreamID,
Position: segment.Position,
}
if _, ok := obs.uniqueKeys[key]; ok {
// TODO: collect the errors and check in test
panic("Expected unique pair StreamID/Position in observer.InlineSegment")
}
obs.uniqueKeys[key] = struct{}{}
return nil
}

View File

@ -0,0 +1,79 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package segmentloop
import (
"sync"
"time"
"github.com/spacemonkeygo/monkit/v3"
)
var allObserverStatsCollectors = newObserverStatsCollectors()
type observerStatsCollectors struct {
mu sync.Mutex
observer map[string]*observerStats
}
func newObserverStatsCollectors() *observerStatsCollectors {
return &observerStatsCollectors{
observer: make(map[string]*observerStats),
}
}
func (list *observerStatsCollectors) GetStats(name string) *observerStats {
list.mu.Lock()
defer list.mu.Unlock()
stats, ok := list.observer[name]
if !ok {
stats = newObserverStats(name)
mon.Chain(stats)
list.observer[name] = stats
}
return stats
}
// observerStats tracks the most recent observer stats.
type observerStats struct {
mu sync.Mutex
key monkit.SeriesKey
total time.Duration
inline *monkit.DurationDist
remote *monkit.DurationDist
}
func newObserverStats(name string) *observerStats {
return &observerStats{
key: monkit.NewSeriesKey("segment-observer").WithTag("name", name),
total: 0,
inline: nil,
remote: nil,
}
}
func (stats *observerStats) Observe(observer *observerContext) {
stats.mu.Lock()
defer stats.mu.Unlock()
stats.total = observer.inline.Sum + observer.remote.Sum
stats.inline = observer.inline
stats.remote = observer.remote
}
func (stats *observerStats) Stats(cb func(key monkit.SeriesKey, field string, val float64)) {
stats.mu.Lock()
defer stats.mu.Unlock()
cb(stats.key, "sum", stats.total.Seconds())
if stats.inline != nil {
stats.inline.Stats(cb)
}
if stats.remote != nil {
stats.remote.Stats(cb)
}
}

View File

@ -11,6 +11,7 @@ import (
"storj.io/common/memory"
"storj.io/storj/satellite/metabase/metaloop"
"storj.io/storj/satellite/metabase/segmentloop"
"storj.io/storj/satellite/metainfo/piecedeletion"
)
@ -114,6 +115,7 @@ type Config struct {
Overlay bool `default:"true" help:"toggle flag if overlay is enabled"`
RS RSConfig `releaseDefault:"29/35/80/110-256B" devDefault:"4/6/8/10-256B" help:"redundancy scheme configuration in the format k/m/o/n-sharesize"`
Loop metaloop.Config `help:"loop configuration"`
SegmentLoop segmentloop.Config `help:"segment loop configuration"`
RateLimiter RateLimiterConfig `help:"rate limiter configuration"`
ProjectLimits ProjectLimitConfig `help:"project limit configuration"`
PieceDeletion piecedeletion.Config `help:"piece deletion configuration"`

View File

@ -421,6 +421,15 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
# redundancy scheme configuration in the format k/m/o/n-sharesize
# metainfo.rs: 29/35/80/110-256 B
# how long to wait for new observers before starting iteration
# metainfo.segment-loop.coalesce-duration: 5s
# how many items to query in a batch
# metainfo.segment-loop.list-limit: 2500
# rate limit (default is 0 which is unlimited segments per second)
# metainfo.segment-loop.rate-limit: 0
# address(es) to send telemetry to (comma-separated)
# metrics.addr: collectora.storj.io:9000