satellite/metainfo: select segments in batches for metainfo loop
Segments are not read in batches. For each batch of objects we are reading all segments for those objects. Change-Id: Idaf19bbe4d4b095065d59399dd326e22c57499a6
This commit is contained in:
parent
d995fb497f
commit
95b78e8011
@ -205,6 +205,8 @@ type MetabaseDB interface {
|
|||||||
GetLatestObjectLastSegment(ctx context.Context, opts metabase.GetLatestObjectLastSegment) (segment metabase.Segment, err error)
|
GetLatestObjectLastSegment(ctx context.Context, opts metabase.GetLatestObjectLastSegment) (segment metabase.Segment, err error)
|
||||||
// ListSegments lists specified stream segments.
|
// ListSegments lists specified stream segments.
|
||||||
ListSegments(ctx context.Context, opts metabase.ListSegments) (result metabase.ListSegmentsResult, err error)
|
ListSegments(ctx context.Context, opts metabase.ListSegments) (result metabase.ListSegmentsResult, err error)
|
||||||
|
// ListObjectsSegments lists multiple streams segments.
|
||||||
|
ListObjectsSegments(ctx context.Context, opts metabase.ListObjectsSegments) (result metabase.ListObjectsSegmentsResult, err error)
|
||||||
// IterateObjectsAllVersions iterates through all versions of all objects.
|
// IterateObjectsAllVersions iterates through all versions of all objects.
|
||||||
IterateObjectsAllVersions(ctx context.Context, opts metabase.IterateObjects, fn func(context.Context, metabase.ObjectsIterator) error) (err error)
|
IterateObjectsAllVersions(ctx context.Context, opts metabase.IterateObjects, fn func(context.Context, metabase.ObjectsIterator) error) (err error)
|
||||||
// IterateObjectsAllVersionsWithStatus iterates through all versions of all objects with specified status.
|
// IterateObjectsAllVersionsWithStatus iterates through all versions of all objects with specified status.
|
||||||
|
@ -5,6 +5,7 @@ package metainfo
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -300,7 +301,98 @@ func iterateDatabase(ctx context.Context, metabaseDB MetabaseDB, observers []*ob
|
|||||||
func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*observerContext, limit int, rateLimiter *rate.Limiter) (_ []*observerContext, err error) {
|
func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*observerContext, limit int, rateLimiter *rate.Limiter) (_ []*observerContext, err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
// TODO we should improve performance here, this is just most straightforward solution
|
noObserversErr := errs.New("no observers")
|
||||||
|
|
||||||
|
// TODO we may consider keeping only expiration time as its
|
||||||
|
// only thing we need to handle segments
|
||||||
|
objectsMap := make(map[uuid.UUID]metabase.FullObjectEntry)
|
||||||
|
ids := make([]uuid.UUID, 0, limit)
|
||||||
|
|
||||||
|
processBatch := func() error {
|
||||||
|
if len(objectsMap) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
segments, err := metabaseDB.ListObjectsSegments(ctx, metabase.ListObjectsSegments{
|
||||||
|
StreamIDs: ids,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var lastObject metabase.FullObjectEntry
|
||||||
|
for _, segment := range segments.Segments {
|
||||||
|
if err := rateLimiter.Wait(ctx); err != nil {
|
||||||
|
// We don't really execute concurrent batches so we should never
|
||||||
|
// exceed the burst size of 1 and this should never happen.
|
||||||
|
// We can also enter here if the context is cancelled.
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if segment.StreamID != lastObject.StreamID {
|
||||||
|
var ok bool
|
||||||
|
lastObject, ok = objectsMap[segment.StreamID]
|
||||||
|
if !ok {
|
||||||
|
return errs.New("unable to find corresponding object: %v", segment.StreamID)
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(objectsMap, lastObject.StreamID)
|
||||||
|
|
||||||
|
// TODO should we move this directly to iterator to have object
|
||||||
|
// state as close as possible to time of reading
|
||||||
|
observers = withObservers(observers, func(observer *observerContext) bool {
|
||||||
|
return handleObject(ctx, observer, lastObject)
|
||||||
|
})
|
||||||
|
if len(observers) == 0 {
|
||||||
|
return noObserversErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// if context has been canceled exit. Otherwise, continue
|
||||||
|
if err := ctx.Err(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
location := metabase.SegmentLocation{
|
||||||
|
ProjectID: lastObject.ProjectID,
|
||||||
|
BucketName: lastObject.BucketName,
|
||||||
|
ObjectKey: lastObject.ObjectKey,
|
||||||
|
Position: segment.Position,
|
||||||
|
}
|
||||||
|
segment := segment
|
||||||
|
observers = withObservers(observers, func(observer *observerContext) bool {
|
||||||
|
return handleSegment(ctx, observer, location, segment, lastObject.ExpiresAt)
|
||||||
|
})
|
||||||
|
if len(observers) == 0 {
|
||||||
|
return noObserversErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// if context has been canceled exit. Otherwise, continue
|
||||||
|
if err := ctx.Err(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// we have now only objects without segments
|
||||||
|
for id, object := range objectsMap {
|
||||||
|
delete(objectsMap, id)
|
||||||
|
|
||||||
|
object := object
|
||||||
|
observers = withObservers(observers, func(observer *observerContext) bool {
|
||||||
|
return handleObject(ctx, observer, object)
|
||||||
|
})
|
||||||
|
if len(observers) == 0 {
|
||||||
|
return noObserversErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// if context has been canceled exit. Otherwise, continue
|
||||||
|
if err := ctx.Err(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
err = metabaseDB.FullIterateObjects(ctx, metabase.FullIterateObjects{
|
err = metabaseDB.FullIterateObjects(ctx, metabase.FullIterateObjects{
|
||||||
BatchSize: limit,
|
BatchSize: limit,
|
||||||
}, func(ctx context.Context, it metabase.FullObjectsIterator) error {
|
}, func(ctx context.Context, it metabase.FullObjectsIterator) error {
|
||||||
@ -313,82 +405,46 @@ func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*obs
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
nextObservers := observers[:0]
|
objectsMap[entry.StreamID] = entry
|
||||||
for _, observer := range observers {
|
ids = append(ids, entry.StreamID)
|
||||||
keepObserver := handleObject(ctx, observer, entry)
|
|
||||||
if keepObserver {
|
|
||||||
nextObservers = append(nextObservers, observer)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
observers = nextObservers
|
if len(objectsMap) == limit {
|
||||||
if len(observers) == 0 {
|
err := processBatch()
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// if context has been canceled exit. Otherwise, continue
|
|
||||||
if err := ctx.Err(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
more := true
|
|
||||||
cursor := metabase.SegmentPosition{}
|
|
||||||
for more {
|
|
||||||
if err := rateLimiter.Wait(ctx); err != nil {
|
|
||||||
// We don't really execute concurrent batches so we should never
|
|
||||||
// exceed the burst size of 1 and this should never happen.
|
|
||||||
// We can also enter here if the context is cancelled.
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
segments, err := metabaseDB.ListSegments(ctx, metabase.ListSegments{
|
|
||||||
StreamID: entry.StreamID,
|
|
||||||
Cursor: cursor,
|
|
||||||
Limit: limit,
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
if errors.Is(err, noObserversErr) {
|
||||||
}
|
|
||||||
|
|
||||||
for _, segment := range segments.Segments {
|
|
||||||
nextObservers := observers[:0]
|
|
||||||
location := metabase.SegmentLocation{
|
|
||||||
ProjectID: entry.ProjectID,
|
|
||||||
BucketName: entry.BucketName,
|
|
||||||
ObjectKey: entry.ObjectKey,
|
|
||||||
Position: segment.Position,
|
|
||||||
}
|
|
||||||
for _, observer := range observers {
|
|
||||||
keepObserver := handleSegment(ctx, observer, location, segment, entry.ExpiresAt)
|
|
||||||
if keepObserver {
|
|
||||||
nextObservers = append(nextObservers, observer)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
observers = nextObservers
|
|
||||||
if len(observers) == 0 {
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
return err
|
||||||
// if context has been canceled exit. Otherwise, continue
|
|
||||||
if err := ctx.Err(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
more = segments.More
|
if len(objectsMap) > 0 {
|
||||||
if more {
|
return errs.New("objects map is not empty")
|
||||||
lastSegment := segments.Segments[len(segments.Segments)-1]
|
|
||||||
cursor = lastSegment.Position
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ids = ids[:0]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
err = processBatch()
|
||||||
|
if errors.Is(err, noObserversErr) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
})
|
})
|
||||||
|
|
||||||
return observers, err
|
return observers, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func withObservers(observers []*observerContext, handleObserver func(observer *observerContext) bool) []*observerContext {
|
||||||
|
nextObservers := observers[:0]
|
||||||
|
for _, observer := range observers {
|
||||||
|
keepObserver := handleObserver(observer)
|
||||||
|
if keepObserver {
|
||||||
|
nextObservers = append(nextObservers, observer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nextObservers
|
||||||
|
}
|
||||||
|
|
||||||
func handleObject(ctx context.Context, observer *observerContext, object metabase.FullObjectEntry) bool {
|
func handleObject(ctx context.Context, observer *observerContext, object metabase.FullObjectEntry) bool {
|
||||||
expirationDate := time.Time{}
|
expirationDate := time.Time{}
|
||||||
if object.ExpiresAt != nil {
|
if object.ExpiresAt != nil {
|
||||||
|
@ -25,6 +25,7 @@ import (
|
|||||||
"storj.io/storj/satellite"
|
"storj.io/storj/satellite"
|
||||||
"storj.io/storj/satellite/metainfo"
|
"storj.io/storj/satellite/metainfo"
|
||||||
"storj.io/storj/satellite/metainfo/metabase"
|
"storj.io/storj/satellite/metainfo/metabase"
|
||||||
|
"storj.io/uplink/private/multipart"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TestLoop does the following
|
// TestLoop does the following
|
||||||
@ -143,6 +144,59 @@ func TestLoop_AllData(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestLoop_ObjectNoSegments(t *testing.T) {
|
||||||
|
testplanet.Run(t, testplanet.Config{
|
||||||
|
SatelliteCount: 1,
|
||||||
|
StorageNodeCount: 4,
|
||||||
|
UplinkCount: 1,
|
||||||
|
Reconfigure: testplanet.Reconfigure{
|
||||||
|
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
||||||
|
config.Metainfo.Loop.CoalesceDuration = 1 * time.Second
|
||||||
|
config.Metainfo.Loop.ListLimit = 2
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||||
|
err := planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], "abcd")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer ctx.Check(project.Close)
|
||||||
|
|
||||||
|
expectedNumberOfObjects := 5
|
||||||
|
for i := 0; i < expectedNumberOfObjects; i++ {
|
||||||
|
info, err := multipart.NewMultipartUpload(ctx, project, "abcd", "t"+strconv.Itoa(i), nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
_, err = multipart.CompleteMultipartUpload(ctx, project, "abcd", "t"+strconv.Itoa(i), info.StreamID, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
metaLoop := planet.Satellites[0].Metainfo.Loop
|
||||||
|
|
||||||
|
obs := newTestObserver(nil)
|
||||||
|
err = metaLoop.Join(ctx, obs)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, expectedNumberOfObjects, obs.objectCount)
|
||||||
|
require.Zero(t, obs.inlineSegCount)
|
||||||
|
require.Zero(t, obs.remoteSegCount)
|
||||||
|
|
||||||
|
// add object with single segment
|
||||||
|
data := testrand.Bytes(8 * memory.KiB)
|
||||||
|
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "dcba", "1", data)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
obs = newTestObserver(nil)
|
||||||
|
err = metaLoop.Join(ctx, obs)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, expectedNumberOfObjects+1, obs.objectCount)
|
||||||
|
require.Zero(t, obs.inlineSegCount)
|
||||||
|
require.Equal(t, 1, obs.remoteSegCount)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// TestLoopObserverCancel does the following:
|
// TestLoopObserverCancel does the following:
|
||||||
// * upload 3 remote segments
|
// * upload 3 remote segments
|
||||||
// * hook three observers up to metainfo loop
|
// * hook three observers up to metainfo loop
|
||||||
|
@ -4,11 +4,14 @@
|
|||||||
package metabase
|
package metabase
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"errors"
|
"errors"
|
||||||
|
"sort"
|
||||||
|
|
||||||
"storj.io/common/uuid"
|
"storj.io/common/uuid"
|
||||||
|
"storj.io/storj/private/dbutil/pgutil"
|
||||||
"storj.io/storj/private/tagsql"
|
"storj.io/storj/private/tagsql"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -93,3 +96,82 @@ func (db *DB) ListSegments(ctx context.Context, opts ListSegments) (result ListS
|
|||||||
|
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ListObjectsSegments contains arguments necessary for listing multiple streams segments.
|
||||||
|
type ListObjectsSegments struct {
|
||||||
|
StreamIDs []uuid.UUID
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListObjectsSegmentsResult result of listing segments.
|
||||||
|
type ListObjectsSegmentsResult struct {
|
||||||
|
Segments []Segment
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListObjectsSegments lists multiple streams segments.
|
||||||
|
func (db *DB) ListObjectsSegments(ctx context.Context, opts ListObjectsSegments) (result ListObjectsSegmentsResult, err error) {
|
||||||
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
|
if len(opts.StreamIDs) == 0 {
|
||||||
|
return ListObjectsSegmentsResult{}, ErrInvalidRequest.New("StreamIDs list is empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO do something like pgutil.UUIDArray()
|
||||||
|
ids := make([][]byte, len(opts.StreamIDs))
|
||||||
|
for i, streamID := range opts.StreamIDs {
|
||||||
|
if streamID.IsZero() {
|
||||||
|
return ListObjectsSegmentsResult{}, ErrInvalidRequest.New("StreamID missing: index %d", i)
|
||||||
|
}
|
||||||
|
|
||||||
|
id := streamID
|
||||||
|
ids[i] = id[:]
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Slice(ids, func(i, j int) bool {
|
||||||
|
return bytes.Compare(ids[i], ids[j]) < 0
|
||||||
|
})
|
||||||
|
|
||||||
|
err = withRows(db.db.Query(ctx, `
|
||||||
|
SELECT
|
||||||
|
stream_id, position,
|
||||||
|
root_piece_id, encrypted_key_nonce, encrypted_key,
|
||||||
|
encrypted_size, plain_offset, plain_size,
|
||||||
|
redundancy,
|
||||||
|
inline_data, remote_alias_pieces
|
||||||
|
FROM segments
|
||||||
|
WHERE
|
||||||
|
-- this turns out to be a little bit faster than stream_id IN (SELECT unnest($1::BYTEA[]))
|
||||||
|
stream_id = ANY ($1::BYTEA[])
|
||||||
|
ORDER BY stream_id ASC, position ASC
|
||||||
|
`, pgutil.ByteaArray(ids)))(func(rows tagsql.Rows) error {
|
||||||
|
for rows.Next() {
|
||||||
|
var segment Segment
|
||||||
|
var aliasPieces AliasPieces
|
||||||
|
err = rows.Scan(
|
||||||
|
&segment.StreamID, &segment.Position,
|
||||||
|
&segment.RootPieceID, &segment.EncryptedKeyNonce, &segment.EncryptedKey,
|
||||||
|
&segment.EncryptedSize, &segment.PlainOffset, &segment.PlainSize,
|
||||||
|
redundancyScheme{&segment.Redundancy},
|
||||||
|
&segment.InlineData, &aliasPieces,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return Error.New("failed to scan segments: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
|
||||||
|
if err != nil {
|
||||||
|
return Error.New("failed to convert aliases to pieces: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
result.Segments = append(result.Segments, segment)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
|
return ListObjectsSegmentsResult{}, nil
|
||||||
|
}
|
||||||
|
return ListObjectsSegmentsResult{}, Error.New("unable to fetch object segments: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
@ -4,11 +4,14 @@
|
|||||||
package metabase_test
|
package metabase_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"storj.io/common/storj"
|
"storj.io/common/storj"
|
||||||
"storj.io/common/testcontext"
|
"storj.io/common/testcontext"
|
||||||
"storj.io/common/testrand"
|
"storj.io/common/testrand"
|
||||||
|
"storj.io/common/uuid"
|
||||||
"storj.io/storj/satellite/metainfo/metabase"
|
"storj.io/storj/satellite/metainfo/metabase"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -259,3 +262,96 @@ func TestListSegments(t *testing.T) {
|
|||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestListObjectsSegments(t *testing.T) {
|
||||||
|
All(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||||
|
t.Run("StreamIDs list is empty", func(t *testing.T) {
|
||||||
|
defer DeleteAll{}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
ListObjectsSegments{
|
||||||
|
Opts: metabase.ListObjectsSegments{},
|
||||||
|
ErrClass: &metabase.ErrInvalidRequest,
|
||||||
|
ErrText: "StreamIDs list is empty",
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
Verify{}.Check(ctx, t, db)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("StreamIDs list contains empty ID", func(t *testing.T) {
|
||||||
|
defer DeleteAll{}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
ListObjectsSegments{
|
||||||
|
Opts: metabase.ListObjectsSegments{
|
||||||
|
StreamIDs: []uuid.UUID{{}},
|
||||||
|
},
|
||||||
|
ErrClass: &metabase.ErrInvalidRequest,
|
||||||
|
ErrText: "StreamID missing: index 0",
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
Verify{}.Check(ctx, t, db)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("List objects segments", func(t *testing.T) {
|
||||||
|
defer DeleteAll{}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
expectedObject01 := createObject(ctx, t, db, randObjectStream(), 1)
|
||||||
|
expectedObject02 := createObject(ctx, t, db, randObjectStream(), 5)
|
||||||
|
expectedObject03 := createObject(ctx, t, db, randObjectStream(), 3)
|
||||||
|
|
||||||
|
expectedSegments := []metabase.Segment{}
|
||||||
|
expectedRawSegments := []metabase.RawSegment{}
|
||||||
|
|
||||||
|
objects := []metabase.Object{expectedObject01, expectedObject02, expectedObject03}
|
||||||
|
|
||||||
|
sort.Slice(objects, func(i, j int) bool {
|
||||||
|
return bytes.Compare(objects[i].StreamID[:], objects[j].StreamID[:]) < 0
|
||||||
|
})
|
||||||
|
|
||||||
|
addSegments := func(object metabase.Object) {
|
||||||
|
for i := 0; i < int(object.SegmentCount); i++ {
|
||||||
|
segment := metabase.Segment{
|
||||||
|
StreamID: object.StreamID,
|
||||||
|
Position: metabase.SegmentPosition{
|
||||||
|
Index: uint32(i),
|
||||||
|
},
|
||||||
|
RootPieceID: storj.PieceID{1},
|
||||||
|
EncryptedKey: []byte{3},
|
||||||
|
EncryptedKeyNonce: []byte{4},
|
||||||
|
EncryptedSize: 1024,
|
||||||
|
PlainSize: 512,
|
||||||
|
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
|
||||||
|
Redundancy: defaultTestRedundancy,
|
||||||
|
}
|
||||||
|
expectedSegments = append(expectedSegments, segment)
|
||||||
|
expectedRawSegments = append(expectedRawSegments, metabase.RawSegment(segment))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, object := range objects {
|
||||||
|
addSegments(object)
|
||||||
|
}
|
||||||
|
|
||||||
|
ListObjectsSegments{
|
||||||
|
Opts: metabase.ListObjectsSegments{
|
||||||
|
StreamIDs: []uuid.UUID{
|
||||||
|
expectedObject01.StreamID,
|
||||||
|
expectedObject02.StreamID,
|
||||||
|
expectedObject03.StreamID,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Result: metabase.ListObjectsSegmentsResult{
|
||||||
|
Segments: expectedSegments,
|
||||||
|
},
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
Verify{
|
||||||
|
Objects: []metabase.RawObject{
|
||||||
|
metabase.RawObject(expectedObject01),
|
||||||
|
metabase.RawObject(expectedObject02),
|
||||||
|
metabase.RawObject(expectedObject03),
|
||||||
|
},
|
||||||
|
Segments: expectedRawSegments,
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
@ -274,6 +274,21 @@ func (step ListSegments) Check(ctx *testcontext.Context, t testing.TB, db *metab
|
|||||||
require.Zero(t, diff)
|
require.Zero(t, diff)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ListObjectsSegments struct {
|
||||||
|
Opts metabase.ListObjectsSegments
|
||||||
|
Result metabase.ListObjectsSegmentsResult
|
||||||
|
ErrClass *errs.Class
|
||||||
|
ErrText string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (step ListObjectsSegments) Check(ctx *testcontext.Context, t testing.TB, db *metabase.DB) {
|
||||||
|
result, err := db.ListObjectsSegments(ctx, step.Opts)
|
||||||
|
checkError(t, err, step.ErrClass, step.ErrText)
|
||||||
|
|
||||||
|
diff := cmp.Diff(step.Result, result, cmpopts.EquateApproxTime(5*time.Second))
|
||||||
|
require.Zero(t, diff)
|
||||||
|
}
|
||||||
|
|
||||||
type DeleteObjectExactVersion struct {
|
type DeleteObjectExactVersion struct {
|
||||||
Opts metabase.DeleteObjectExactVersion
|
Opts metabase.DeleteObjectExactVersion
|
||||||
Result metabase.DeleteObjectResult
|
Result metabase.DeleteObjectResult
|
||||||
|
Loading…
Reference in New Issue
Block a user