satellite/metabase: don't return expired objects
Resolves: https://github.com/storj/team-metainfo/issues/112 Change-Id: If48f865826c22764fc1ff83745c34524cd396aac
This commit is contained in:
parent
d162788489
commit
417df4d73c
@ -35,8 +35,8 @@ const (
|
||||
// ListLimit is the maximum number of items the client can request for listing.
|
||||
const ListLimit = intLimitRange(1000)
|
||||
|
||||
// MoveLimit is the maximum number of segments that can be moved.
|
||||
const MoveLimit = int64(10000)
|
||||
// MoveSegmentLimit is the maximum number of segments that can be moved.
|
||||
const MoveSegmentLimit = int64(10000)
|
||||
|
||||
// CopySegmentLimit is the maximum number of segments that can be copied.
|
||||
const CopySegmentLimit = int64(10000)
|
||||
|
@ -57,7 +57,7 @@ type GetObjectExactVersion struct {
|
||||
ObjectLocation
|
||||
}
|
||||
|
||||
// Verify verifies get object reqest fields.
|
||||
// Verify verifies get object request fields.
|
||||
func (obj *GetObjectExactVersion) Verify() error {
|
||||
if err := obj.ObjectLocation.Verify(); err != nil {
|
||||
return err
|
||||
@ -91,7 +91,8 @@ func (db *DB) GetObjectExactVersion(ctx context.Context, opts GetObjectExactVers
|
||||
bucket_name = $2 AND
|
||||
object_key = $3 AND
|
||||
version = $4 AND
|
||||
status = `+committedStatus,
|
||||
status = `+committedStatus+` AND
|
||||
(expires_at IS NULL OR expires_at > now())`,
|
||||
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version).
|
||||
Scan(
|
||||
&object.StreamID,
|
||||
|
@ -133,6 +133,28 @@ func TestGetObjectExactVersion(t *testing.T) {
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("Get expired object", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
expiresAt := now.Add(-2 * time.Hour)
|
||||
metabasetest.CreateExpiredObject(ctx, t, db, obj, 0, expiresAt)
|
||||
metabasetest.GetObjectExactVersion{
|
||||
Opts: metabase.GetObjectExactVersion{
|
||||
ObjectLocation: location,
|
||||
Version: 1,
|
||||
},
|
||||
ErrClass: &storj.ErrObjectNotFound,
|
||||
}.Check(ctx, t, db)
|
||||
metabasetest.Verify{Objects: []metabase.RawObject{
|
||||
{
|
||||
ObjectStream: obj,
|
||||
CreatedAt: now,
|
||||
Status: metabase.Committed,
|
||||
ExpiresAt: &expiresAt,
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
},
|
||||
}}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("Get object", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
|
@ -138,7 +138,6 @@ func (it *objectsIterator) Next(ctx context.Context, item *ObjectEntry) bool {
|
||||
}
|
||||
|
||||
// TODO: implement this on the database side
|
||||
|
||||
ok := it.next(ctx, item)
|
||||
if !ok {
|
||||
return false
|
||||
@ -263,6 +262,7 @@ func doNextQueryAllVersionsWithStatus(ctx context.Context, it *objectsIterator)
|
||||
(project_id, bucket_name, object_key, version) `+cursorCompare+` ($1, $2, $4, $5)
|
||||
AND (project_id, bucket_name) < ($1, $7)
|
||||
AND status = $3
|
||||
AND (expires_at IS NULL OR expires_at > now())
|
||||
ORDER BY (project_id, bucket_name, object_key, version) ASC
|
||||
LIMIT $6
|
||||
`, it.projectID, it.bucketName,
|
||||
@ -283,6 +283,7 @@ func doNextQueryAllVersionsWithStatus(ctx context.Context, it *objectsIterator)
|
||||
(project_id, bucket_name, object_key, version) `+cursorCompare+` ($1, $2, $4, $5)
|
||||
AND (project_id, bucket_name, object_key) < ($1, $2, $6)
|
||||
AND status = $3
|
||||
AND (expires_at IS NULL OR expires_at > now())
|
||||
ORDER BY (project_id, bucket_name, object_key, version) ASC
|
||||
LIMIT $7
|
||||
`, it.projectID, it.bucketName,
|
||||
|
@ -5,6 +5,7 @@ package metabase_test
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -814,6 +815,100 @@ func TestIterateObjectsWithStatus(t *testing.T) {
|
||||
require.NotNil(t, entry.EncryptedMetadataEncryptedKey)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("verify-cursor-continuation", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
projectID, bucketName := uuid.UUID{1}, "bucky"
|
||||
createObjectsWithKeys(ctx, t, db, projectID, bucketName, []metabase.ObjectKey{
|
||||
"1",
|
||||
"a/a",
|
||||
"a/0",
|
||||
})
|
||||
var collector metabasetest.IterateCollector
|
||||
err := db.IterateObjectsAllVersionsWithStatus(ctx, metabase.IterateObjectsWithStatus{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
Prefix: metabase.ObjectKey("a/"),
|
||||
Status: metabase.Committed,
|
||||
BatchSize: 1,
|
||||
}, collector.Add)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, len(collector))
|
||||
})
|
||||
t.Run("skip-expired-objects", func(t *testing.T) {
|
||||
now := time.Now()
|
||||
type test struct {
|
||||
notExpired []metabase.ObjectKey
|
||||
expired []metabase.ObjectKey
|
||||
}
|
||||
testCases := []test{
|
||||
{
|
||||
notExpired: []metabase.ObjectKey{"1"},
|
||||
expired: []metabase.ObjectKey{"2"},
|
||||
},
|
||||
{
|
||||
notExpired: []metabase.ObjectKey{"2"},
|
||||
expired: []metabase.ObjectKey{"1"},
|
||||
},
|
||||
{
|
||||
notExpired: []metabase.ObjectKey{"2"},
|
||||
expired: []metabase.ObjectKey{"1", "3"},
|
||||
},
|
||||
{
|
||||
notExpired: []metabase.ObjectKey{"2", "4"},
|
||||
expired: []metabase.ObjectKey{"1", "3"},
|
||||
},
|
||||
{
|
||||
expired: []metabase.ObjectKey{"1", "2", "3", "4"},
|
||||
},
|
||||
}
|
||||
stream := metabase.ObjectStream{
|
||||
ProjectID: uuid.UUID{1},
|
||||
BucketName: "bucket",
|
||||
Version: 1,
|
||||
StreamID: testrand.UUID(),
|
||||
}
|
||||
for i, tc := range testCases {
|
||||
tc := tc
|
||||
t.Run(strconv.Itoa(i), func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
expectedResult := []metabase.ObjectEntry{}
|
||||
if len(tc.notExpired) == 0 {
|
||||
expectedResult = nil
|
||||
}
|
||||
for _, key := range tc.notExpired {
|
||||
stream.ObjectKey = key
|
||||
object := metabasetest.CreateObject(ctx, t, db, stream, 0)
|
||||
expectedResult = append(expectedResult, objectEntryFromRaw(metabase.RawObject(object)))
|
||||
}
|
||||
for _, key := range tc.expired {
|
||||
stream.ObjectKey = key
|
||||
metabasetest.CreateExpiredObject(ctx, t, db, stream, 0, now.Add(-2*time.Hour))
|
||||
}
|
||||
for _, batchSize := range []int{1, 2, 3} {
|
||||
opts := metabase.IterateObjectsWithStatus{
|
||||
ProjectID: stream.ProjectID,
|
||||
BucketName: stream.BucketName,
|
||||
BatchSize: batchSize,
|
||||
Status: 3,
|
||||
IncludeSystemMetadata: true,
|
||||
}
|
||||
metabasetest.IterateObjectsWithStatus{
|
||||
Opts: opts,
|
||||
Result: expectedResult,
|
||||
}.Check(ctx, t, db)
|
||||
{
|
||||
opts := opts
|
||||
opts.Recursive = true
|
||||
metabasetest.IterateObjectsWithStatus{
|
||||
Opts: opts,
|
||||
Result: expectedResult,
|
||||
}.Check(ctx, t, db)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -81,7 +81,7 @@ func (db *DB) BeginMoveObject(ctx context.Context, opts BeginMoveObject) (result
|
||||
return BeginMoveObjectResult{}, Error.New("unable to query object status: %w", err)
|
||||
}
|
||||
|
||||
if segmentCount > MoveLimit {
|
||||
if segmentCount > MoveSegmentLimit {
|
||||
return BeginMoveObjectResult{}, Error.New("segment count of chosen object is beyond limit")
|
||||
}
|
||||
|
||||
|
@ -165,7 +165,7 @@ func TestInlineSegment(t *testing.T) {
|
||||
require.Len(t, objects, 1)
|
||||
|
||||
require.Equal(t, params.EncryptedObjectKey, objects[0].EncryptedObjectKey)
|
||||
// TODO find better way to compare (one ExpiresAt contains time zone informations)
|
||||
// TODO find better way to compare (one ExpiresAt contains time zone information)
|
||||
require.Equal(t, params.ExpiresAt.Unix(), objects[0].ExpiresAt.Unix())
|
||||
|
||||
object, err := metainfoClient.GetObject(ctx, metaclient.GetObjectParams{
|
||||
|
Loading…
Reference in New Issue
Block a user