c258f4bbac
At some point we moved metabase package outside Metainfo but we didn't do that for satellite structure. This change refactors only tests. When uplink will be adjusted we can remove old entries in Metainfo struct. Change-Id: I2b66ed29f539b0ec0f490cad42c72840e0351bcb
427 lines
12 KiB
Go
427 lines
12 KiB
Go
// Copyright (C) 2021 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package segmentloop_test
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"strconv"
|
|
"strings"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
"go.uber.org/zap"
|
|
"go.uber.org/zap/zaptest"
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
"storj.io/common/errs2"
|
|
"storj.io/common/memory"
|
|
"storj.io/common/testcontext"
|
|
"storj.io/common/testrand"
|
|
"storj.io/common/uuid"
|
|
"storj.io/storj/private/testplanet"
|
|
"storj.io/storj/satellite"
|
|
"storj.io/storj/satellite/metabase"
|
|
"storj.io/storj/satellite/metabase/segmentloop"
|
|
)
|
|
|
|
// TestSegmentsLoop does the following
|
|
// * upload 5 remote files with 1 segment
|
|
// * upload 2 remote files with 2 segments
|
|
// * upload 2 inline files
|
|
// * connect two observers to the segments loop
|
|
// * run the segments loop.
|
|
func TestSegmentsLoop(t *testing.T) {
|
|
segmentSize := 50 * memory.KiB
|
|
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1,
|
|
StorageNodeCount: 4,
|
|
UplinkCount: 1,
|
|
Reconfigure: testplanet.Reconfigure{
|
|
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
|
config.Metainfo.SegmentLoop.CoalesceDuration = 1 * time.Second
|
|
config.Metainfo.MaxSegmentSize = segmentSize
|
|
},
|
|
},
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
ul := planet.Uplinks[0]
|
|
satellite := planet.Satellites[0]
|
|
segmentLoop := satellite.Metabase.SegmentLoop
|
|
|
|
// upload 5 remote objects with 1 segment
|
|
for i := 0; i < 5; i++ {
|
|
testData := testrand.Bytes(8 * memory.KiB)
|
|
path := "/some/remote/path/" + strconv.Itoa(i)
|
|
err := ul.Upload(ctx, satellite, "bucket", path, testData)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
// upload 2 remote objects with 2 segment each
|
|
for i := 0; i < 2; i++ {
|
|
// exact 2*segmentSize will make inline segment at the end of object
|
|
testData := testrand.Bytes(2*segmentSize - 1000)
|
|
path := "/some/other/remote/path/" + strconv.Itoa(i)
|
|
err := ul.Upload(ctx, satellite, "bucket", path, testData)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
// upload 2 inline files
|
|
for i := 0; i < 2; i++ {
|
|
testData := testrand.Bytes(1 * memory.KiB)
|
|
path := "/some/inline/path/" + strconv.Itoa(i)
|
|
err := ul.Upload(ctx, satellite, "bucket", path, testData)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
// create 2 observers
|
|
obs1 := newTestObserver(nil)
|
|
obs2 := newTestObserver(nil)
|
|
|
|
var group errgroup.Group
|
|
group.Go(func() error {
|
|
return segmentLoop.Join(ctx, obs1)
|
|
})
|
|
group.Go(func() error {
|
|
return segmentLoop.Join(ctx, obs2)
|
|
})
|
|
|
|
err := group.Wait()
|
|
require.NoError(t, err)
|
|
|
|
for _, obs := range []*testObserver{obs1, obs2} {
|
|
assert.EqualValues(t, 9, obs.remoteSegCount)
|
|
assert.EqualValues(t, 2, obs.inlineSegCount)
|
|
assert.EqualValues(t, 11, len(obs.uniqueKeys))
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestSegmentsLoop_AllData(t *testing.T) {
|
|
segmentSize := 8 * memory.KiB
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1,
|
|
StorageNodeCount: 4,
|
|
UplinkCount: 3,
|
|
Reconfigure: testplanet.Reconfigure{
|
|
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
|
config.Metainfo.SegmentLoop.CoalesceDuration = 1 * time.Second
|
|
config.Metainfo.SegmentLoop.ListLimit = 2
|
|
},
|
|
},
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
bucketNames := strings.Split("abc", "")
|
|
|
|
data := testrand.Bytes(segmentSize)
|
|
for _, up := range planet.Uplinks {
|
|
for _, bucketName := range bucketNames {
|
|
err := up.Upload(ctx, planet.Satellites[0], "zzz"+bucketName, "1", data)
|
|
require.NoError(t, err)
|
|
}
|
|
}
|
|
|
|
loop := planet.Satellites[0].Metabase.SegmentLoop
|
|
|
|
obs := newTestObserver(nil)
|
|
err := loop.Join(ctx, obs)
|
|
require.NoError(t, err)
|
|
|
|
gotItems := len(obs.uniqueKeys)
|
|
require.Equal(t, len(bucketNames)*len(planet.Uplinks), gotItems)
|
|
})
|
|
}
|
|
|
|
// TestsegmentsLoopObserverCancel does the following:
|
|
// * upload 3 remote segments
|
|
// * hook three observers up to segments loop
|
|
// * let observer 1 run normally
|
|
// * let observer 2 return an error from one of its handlers
|
|
// * let observer 3's context be canceled
|
|
// * expect observer 1 to see all segments
|
|
// * expect observers 2 and 3 to finish with errors.
|
|
func TestSegmentsLoopObserverCancel(t *testing.T) {
|
|
segmentSize := 8 * memory.KiB
|
|
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1,
|
|
StorageNodeCount: 4,
|
|
UplinkCount: 1,
|
|
Reconfigure: testplanet.Reconfigure{
|
|
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
|
config.Metainfo.SegmentLoop.CoalesceDuration = 1 * time.Second
|
|
},
|
|
},
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
ul := planet.Uplinks[0]
|
|
satellite := planet.Satellites[0]
|
|
loop := satellite.Metabase.SegmentLoop
|
|
|
|
// upload 3 remote files with 1 segment
|
|
for i := 0; i < 3; i++ {
|
|
testData := testrand.Bytes(segmentSize)
|
|
path := "/some/remote/path/" + strconv.Itoa(i)
|
|
err := ul.Upload(ctx, satellite, "bucket", path, testData)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
// create 1 "good" observer
|
|
obs1 := newTestObserver(nil)
|
|
mon1 := newTestObserver(nil)
|
|
|
|
// create observer that will return an error from RemoteSegment
|
|
obs2 := newTestObserver(func(ctx context.Context) error {
|
|
return errors.New("test error")
|
|
})
|
|
|
|
// create observer that will cancel its own context from RemoteSegment
|
|
obs3Ctx, cancel := context.WithCancel(ctx)
|
|
var once int64
|
|
obs3 := newTestObserver(func(ctx context.Context) error {
|
|
if atomic.AddInt64(&once, 1) == 1 {
|
|
cancel()
|
|
<-obs3Ctx.Done() // ensure we wait for cancellation to propagate
|
|
} else {
|
|
panic("multiple calls to observer after loop cancel")
|
|
}
|
|
return nil
|
|
})
|
|
|
|
var group errgroup.Group
|
|
group.Go(func() error {
|
|
return loop.Join(ctx, obs1)
|
|
})
|
|
group.Go(func() error {
|
|
return loop.Monitor(ctx, mon1)
|
|
})
|
|
group.Go(func() error {
|
|
err := loop.Join(ctx, obs2)
|
|
if err == nil {
|
|
return errors.New("got no error")
|
|
}
|
|
if !strings.Contains(err.Error(), "test error") {
|
|
return errors.New("expected to find error")
|
|
}
|
|
return nil
|
|
})
|
|
group.Go(func() error {
|
|
err := loop.Join(obs3Ctx, obs3)
|
|
if !errs2.IsCanceled(err) {
|
|
return errors.New("expected canceled")
|
|
}
|
|
return nil
|
|
})
|
|
|
|
err := group.Wait()
|
|
require.NoError(t, err)
|
|
|
|
// expect that obs1 saw all three segments, but obs2 and obs3 only saw the first one
|
|
assert.EqualValues(t, 3, obs1.remoteSegCount)
|
|
assert.EqualValues(t, 3, mon1.remoteSegCount)
|
|
assert.EqualValues(t, 1, obs2.remoteSegCount)
|
|
assert.EqualValues(t, 1, obs3.remoteSegCount)
|
|
})
|
|
}
|
|
|
|
// TestSegmentsLoopCancel does the following:
|
|
// * upload 3 remote segments
|
|
// * hook two observers up to segments loop
|
|
// * cancel loop context partway through
|
|
// * expect both observers to exit with an error and see fewer than 3 remote segments
|
|
// * expect that a new observer attempting to join at this point receives a loop closed error.
|
|
func TestSegmentsLoopCancel(t *testing.T) {
|
|
segmentSize := 8 * memory.KiB
|
|
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1,
|
|
StorageNodeCount: 4,
|
|
UplinkCount: 1,
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
ul := planet.Uplinks[0]
|
|
satellite := planet.Satellites[0]
|
|
|
|
// upload 3 remote files with 1 segment
|
|
for i := 0; i < 3; i++ {
|
|
testData := testrand.Bytes(segmentSize)
|
|
path := "/some/remote/path/" + strconv.Itoa(i)
|
|
err := ul.Upload(ctx, satellite, "bucket", path, testData)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
loop := segmentloop.New(zaptest.NewLogger(t), segmentloop.Config{
|
|
CoalesceDuration: 1 * time.Second,
|
|
ListLimit: 10000,
|
|
}, satellite.Metabase.DB)
|
|
|
|
// create a cancelable context to pass into metaLoop.Run
|
|
loopCtx, cancel := context.WithCancel(ctx)
|
|
|
|
// create 1 normal observer
|
|
obs1 := newTestObserver(nil)
|
|
|
|
var once int64
|
|
// create another normal observer that will wait before returning during RemoteSegment so we can sync with context cancelation
|
|
obs2 := newTestObserver(func(ctx context.Context) error {
|
|
// cancel context during call to obs2.RemoteSegment inside loop
|
|
if atomic.AddInt64(&once, 1) == 1 {
|
|
cancel()
|
|
<-ctx.Done() // ensure we wait for cancellation to propagate
|
|
} else {
|
|
panic("multiple calls to observer after loop cancel")
|
|
}
|
|
return nil
|
|
})
|
|
|
|
var group errgroup.Group
|
|
|
|
// start loop with cancelable context
|
|
group.Go(func() error {
|
|
err := loop.Run(loopCtx)
|
|
if !errs2.IsCanceled(err) {
|
|
return errors.New("expected context canceled")
|
|
}
|
|
return nil
|
|
})
|
|
group.Go(func() error {
|
|
err := loop.Join(ctx, obs1)
|
|
if !errs2.IsCanceled(err) {
|
|
return errors.New("expected context canceled")
|
|
}
|
|
return nil
|
|
})
|
|
group.Go(func() error {
|
|
err := loop.Join(ctx, obs2)
|
|
if !errs2.IsCanceled(err) {
|
|
return errors.New("expected context canceled")
|
|
}
|
|
return nil
|
|
})
|
|
|
|
err := group.Wait()
|
|
require.NoError(t, err)
|
|
|
|
err = loop.Close()
|
|
require.NoError(t, err)
|
|
|
|
obs3 := newTestObserver(nil)
|
|
err = loop.Join(ctx, obs3)
|
|
require.Error(t, err)
|
|
assert.Contains(t, err.Error(), "loop closed")
|
|
|
|
// expect that obs1 and obs2 each saw fewer than three remote segments
|
|
assert.True(t, obs1.remoteSegCount < 3)
|
|
assert.True(t, obs2.remoteSegCount < 3)
|
|
})
|
|
}
|
|
|
|
func TestSegmentsLoop_MonitorCancel(t *testing.T) {
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1,
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
satellite := planet.Satellites[0]
|
|
|
|
loop := segmentloop.New(zaptest.NewLogger(t), segmentloop.Config{
|
|
CoalesceDuration: time.Nanosecond,
|
|
ListLimit: 10000,
|
|
}, satellite.Metabase.DB)
|
|
|
|
obs1 := newTestObserver(func(ctx context.Context) error {
|
|
return errors.New("test error")
|
|
})
|
|
|
|
var group errgroup.Group
|
|
|
|
loopCtx, loopCancel := context.WithCancel(ctx)
|
|
group.Go(func() error {
|
|
err := loop.Run(loopCtx)
|
|
t.Log("segments loop stopped")
|
|
if !errs2.IsCanceled(err) {
|
|
return errors.New("expected context canceled")
|
|
}
|
|
return nil
|
|
})
|
|
|
|
obsCtx, obsCancel := context.WithCancel(ctx)
|
|
group.Go(func() error {
|
|
defer loopCancel()
|
|
err := loop.Monitor(obsCtx, obs1)
|
|
t.Log("observer stopped")
|
|
if !errs2.IsCanceled(err) {
|
|
return errors.New("expected context canceled")
|
|
}
|
|
return nil
|
|
})
|
|
|
|
obsCancel()
|
|
|
|
err := group.Wait()
|
|
require.NoError(t, err)
|
|
|
|
err = loop.Close()
|
|
require.NoError(t, err)
|
|
})
|
|
}
|
|
|
|
type testKey struct {
|
|
StreamID uuid.UUID
|
|
Position metabase.SegmentPosition
|
|
}
|
|
|
|
type testObserver struct {
|
|
remoteSegCount int
|
|
inlineSegCount int
|
|
uniqueKeys map[testKey]struct{}
|
|
onSegment func(context.Context) error // if set, run this during RemoteSegment()
|
|
}
|
|
|
|
func newTestObserver(onSegment func(context.Context) error) *testObserver {
|
|
return &testObserver{
|
|
remoteSegCount: 0,
|
|
inlineSegCount: 0,
|
|
uniqueKeys: make(map[testKey]struct{}),
|
|
onSegment: onSegment,
|
|
}
|
|
}
|
|
|
|
// LoopStarted is called at each start of a loop.
|
|
func (obs *testObserver) LoopStarted(ctx context.Context, info segmentloop.LoopInfo) (err error) {
|
|
return nil
|
|
}
|
|
|
|
func (obs *testObserver) RemoteSegment(ctx context.Context, segment *segmentloop.Segment) error {
|
|
obs.remoteSegCount++
|
|
|
|
key := testKey{
|
|
StreamID: segment.StreamID,
|
|
Position: segment.Position,
|
|
}
|
|
if _, ok := obs.uniqueKeys[key]; ok {
|
|
// TODO: collect the errors and check in test
|
|
panic("Expected unique pair StreamID/Position in observer.RemoteSegment")
|
|
}
|
|
obs.uniqueKeys[key] = struct{}{}
|
|
|
|
if obs.onSegment != nil {
|
|
return obs.onSegment(ctx)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (obs *testObserver) InlineSegment(ctx context.Context, segment *segmentloop.Segment) error {
|
|
obs.inlineSegCount++
|
|
key := testKey{
|
|
StreamID: segment.StreamID,
|
|
Position: segment.Position,
|
|
}
|
|
if _, ok := obs.uniqueKeys[key]; ok {
|
|
// TODO: collect the errors and check in test
|
|
panic("Expected unique pair StreamID/Position in observer.InlineSegment")
|
|
}
|
|
obs.uniqueKeys[key] = struct{}{}
|
|
return nil
|
|
}
|