satellite/metainfo: add as of system time to object iteration

Change-Id: Idc1d5c7d983f331c60f5b75c5a977a25e197faf9
This commit is contained in:
Egon Elbre 2021-03-03 13:20:41 +02:00
parent b0b7b81105
commit ec67413776
3 changed files with 109 additions and 34 deletions

View File

@ -373,7 +373,8 @@ func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*obs
segmentsInBatch := int32(0)
err = metabaseDB.IterateLoopObjects(ctx, metabase.IterateLoopObjects{
BatchSize: limit,
BatchSize: limit,
AsOfSystemTime: startingTime,
}, func(ctx context.Context, it metabase.LoopObjectsIterator) error {
var entry metabase.LoopObjectEntry
for it.Next(ctx, &entry) {

View File

@ -21,22 +21,11 @@ import (
const loopIteratorBatchSizeLimit = 2500
// LoopObjectEntry contains information about object needed by metainfo loop.
type LoopObjectEntry struct {
ObjectStream // metrics, repair, tally
ExpiresAt *time.Time // tally
SegmentCount int32 // metrics
EncryptedMetadataSize int // tally
}
// LoopObjectsIterator iterates over a sequence of LoopObjectEntry items.
type LoopObjectsIterator interface {
Next(ctx context.Context, item *LoopObjectEntry) bool
}
// IterateLoopObjects contains arguments necessary for listing objects in metabase.
type IterateLoopObjects struct {
BatchSize int
AsOfSystemTime time.Time
}
// Verify verifies get object request fields.
@ -47,6 +36,19 @@ func (opts *IterateLoopObjects) Verify() error {
return nil
}
// LoopObjectsIterator iterates over a sequence of LoopObjectEntry items.
type LoopObjectsIterator interface {
Next(ctx context.Context, item *LoopObjectEntry) bool
}
// LoopObjectEntry contains information about object needed by metainfo loop.
type LoopObjectEntry struct {
ObjectStream // metrics, repair, tally
ExpiresAt *time.Time // tally
SegmentCount int32 // metrics
EncryptedMetadataSize int // tally
}
// IterateLoopObjects iterates through all objects in metabase.
func (db *DB) IterateLoopObjects(ctx context.Context, opts IterateLoopObjects, fn func(context.Context, LoopObjectsIterator) error) (err error) {
defer mon.Task()(&ctx)(&err)
@ -88,7 +90,8 @@ func (db *DB) IterateLoopObjects(ctx context.Context, opts IterateLoopObjects, f
type loopIterator struct {
db *DB
batchSize int
batchSize int
asOfSystemTime time.Time
curIndex int
curRows tagsql.Rows
@ -148,6 +151,11 @@ func (it *loopIterator) Next(ctx context.Context, item *LoopObjectEntry) bool {
func (it *loopIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows, err error) {
defer mon.Task()(&ctx)(&err)
var asOfSystemTime string
if !it.asOfSystemTime.IsZero() && it.db.implementation == dbutil.Cockroach {
asOfSystemTime = fmt.Sprintf(` AS OF SYSTEM TIME '%d' `, it.asOfSystemTime.UnixNano())
}
return it.db.db.Query(ctx, `
SELECT
project_id, bucket_name,
@ -156,6 +164,7 @@ func (it *loopIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows, err err
segment_count,
LENGTH(COALESCE(encrypted_metadata,''))
FROM objects
`+asOfSystemTime+`
WHERE (project_id, bucket_name, object_key, version) > ($1, $2, $3, $4)
ORDER BY project_id ASC, bucket_name ASC, object_key ASC, version ASC
LIMIT $5
@ -303,5 +312,5 @@ func (db *DB) IterateLoopStreams(ctx context.Context, opts IterateLoopStreams, h
}
}
return Error.Wrap(err)
return nil
}

View File

@ -48,6 +48,14 @@ func TestIterateLoopObjects(t *testing.T) {
Result: nil,
}.Check(ctx, t, db)
IterateLoopObjects{
Opts: metabase.IterateLoopObjects{
BatchSize: 10,
AsOfSystemTime: time.Now(),
},
Result: nil,
}.Check(ctx, t, db)
Verify{}.Check(ctx, t, db)
})
@ -78,6 +86,7 @@ func TestIterateLoopObjects(t *testing.T) {
},
Version: 1,
}.Check(ctx, t, db)
CommitObject{
Opts: metabase.CommitObject{
ObjectStream: committed,
@ -87,19 +96,29 @@ func TestIterateLoopObjects(t *testing.T) {
},
}.Check(ctx, t, db)
expected := []metabase.LoopObjectEntry{
{
ObjectStream: pending,
},
{
ObjectStream: committed,
EncryptedMetadataSize: len(encryptedMetadata),
},
}
IterateLoopObjects{
Opts: metabase.IterateLoopObjects{
BatchSize: 1,
},
Result: []metabase.LoopObjectEntry{
{
ObjectStream: pending,
},
{
ObjectStream: committed,
EncryptedMetadataSize: len(encryptedMetadata),
},
Result: expected,
}.Check(ctx, t, db)
IterateLoopObjects{
Opts: metabase.IterateLoopObjects{
BatchSize: 1,
AsOfSystemTime: time.Now(),
},
Result: expected,
}.Check(ctx, t, db)
})
@ -112,12 +131,22 @@ func TestIterateLoopObjects(t *testing.T) {
for i, obj := range objects {
expected[i] = loopObjectEntryFromRaw(obj)
}
IterateLoopObjects{
Opts: metabase.IterateLoopObjects{
BatchSize: limit,
},
Result: expected,
}.Check(ctx, t, db)
IterateLoopObjects{
Opts: metabase.IterateLoopObjects{
BatchSize: limit,
AsOfSystemTime: time.Now(),
},
Result: expected,
}.Check(ctx, t, db)
Verify{Objects: objects}.Check(ctx, t, db)
})
@ -130,12 +159,22 @@ func TestIterateLoopObjects(t *testing.T) {
for i, obj := range objects {
expected[i] = loopObjectEntryFromRaw(obj)
}
IterateLoopObjects{
Opts: metabase.IterateLoopObjects{
BatchSize: limit,
},
Result: expected,
}.Check(ctx, t, db)
IterateLoopObjects{
Opts: metabase.IterateLoopObjects{
BatchSize: limit,
AsOfSystemTime: time.Now(),
},
Result: expected,
}.Check(ctx, t, db)
Verify{Objects: objects}.Check(ctx, t, db)
})
@ -155,21 +194,31 @@ func TestIterateLoopObjects(t *testing.T) {
"g",
})
expected := []metabase.LoopObjectEntry{
objects["a"],
objects["b/1"],
objects["b/2"],
objects["b/3"],
objects["c"],
objects["c/"],
objects["c//"],
objects["c/1"],
objects["g"],
}
IterateLoopObjects{
Opts: metabase.IterateLoopObjects{
BatchSize: 3,
},
Result: []metabase.LoopObjectEntry{
objects["a"],
objects["b/1"],
objects["b/2"],
objects["b/3"],
objects["c"],
objects["c/"],
objects["c//"],
objects["c/1"],
objects["g"],
Result: expected,
}.Check(ctx, t, db)
IterateLoopObjects{
Opts: metabase.IterateLoopObjects{
BatchSize: 3,
AsOfSystemTime: time.Now(),
},
Result: expected,
}.Check(ctx, t, db)
})
@ -200,6 +249,14 @@ func TestIterateLoopObjects(t *testing.T) {
},
Result: expected,
}.Check(ctx, t, db)
IterateLoopObjects{
Opts: metabase.IterateLoopObjects{
BatchSize: 3,
AsOfSystemTime: time.Now(),
},
Result: expected,
}.Check(ctx, t, db)
})
t.Run("multiple projects", func(t *testing.T) {
@ -233,6 +290,14 @@ func TestIterateLoopObjects(t *testing.T) {
},
Result: expected,
}.Check(ctx, t, db)
IterateLoopObjects{
Opts: metabase.IterateLoopObjects{
BatchSize: 2,
AsOfSystemTime: time.Now(),
},
Result: expected,
}.Check(ctx, t, db)
})
})
}