satellite/metabase: adjust some Get queries

Change-Id: I758ac42ce0a388c5a71a1b45502286800e3b07b8
This commit is contained in:
Egon Elbre 2023-10-16 17:27:22 +03:00 committed by Storj Robot
parent aed664a78d
commit ff9013b8ab
3 changed files with 156 additions and 76 deletions

View File

@ -412,6 +412,11 @@ var (
_ = statusesDeleteMarker
)
// IsDeleteMarker return whether the status is a delete marker.
func (status ObjectStatus) IsDeleteMarker() bool {
return status == DeleteMarkerUnversioned || status == DeleteMarkerVersioned
}
// Pieces defines information for pieces.
type Pieces []Piece

View File

@ -10,10 +10,8 @@ import (
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/uuid"
"storj.io/private/tagsql"
)
// ErrSegmentNotFound is an error class for non-existing segment.
@ -83,7 +81,7 @@ func (db *DB) GetObjectExactVersion(ctx context.Context, opts GetObjectExactVers
object := Object{}
err = db.db.QueryRowContext(ctx, `
SELECT
stream_id,
stream_id, status,
created_at, expires_at,
segment_count,
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key,
@ -91,15 +89,12 @@ func (db *DB) GetObjectExactVersion(ctx context.Context, opts GetObjectExactVers
encryption
FROM objects
WHERE
project_id = $1 AND
bucket_name = $2 AND
object_key = $3 AND
version = $4 AND
status = `+statusCommittedUnversioned+` AND
(project_id, bucket_name, object_key, version) = ($1, $2, $3, $4) AND
status <> `+statusPending+` AND
(expires_at IS NULL OR expires_at > now())`,
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version).
Scan(
&object.StreamID,
&object.StreamID, &object.Status,
&object.CreatedAt, &object.ExpiresAt,
&object.SegmentCount,
&object.EncryptedMetadataNonce, &object.EncryptedMetadata, &object.EncryptedMetadataEncryptedKey,
@ -118,8 +113,6 @@ func (db *DB) GetObjectExactVersion(ctx context.Context, opts GetObjectExactVers
object.ObjectKey = opts.ObjectKey
object.Version = opts.Version
object.Status = CommittedUnversioned
return object, nil
}
@ -138,10 +131,13 @@ func (db *DB) GetObjectLastCommitted(ctx context.Context, opts GetObjectLastComm
}
var object Object
object.ProjectID = opts.ProjectID
object.BucketName = opts.BucketName
object.ObjectKey = opts.ObjectKey
err = withRows(db.db.QueryContext(ctx, `
row := db.db.QueryRowContext(ctx, `
SELECT
stream_id, version,
stream_id, version, status,
created_at, expires_at,
segment_count,
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key,
@ -149,59 +145,29 @@ func (db *DB) GetObjectLastCommitted(ctx context.Context, opts GetObjectLastComm
encryption
FROM objects
WHERE
project_id = $1 AND
bucket_name = $2 AND
object_key = $3 AND
status = `+statusCommittedUnversioned+` AND
(project_id, bucket_name, object_key) = ($1, $2, $3) AND
status <> `+statusPending+` AND
(expires_at IS NULL OR expires_at > now())
ORDER BY version desc
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey))(func(rows tagsql.Rows) error {
objectFound := false
for rows.Next() {
var scannedObject Object
if err = rows.Scan(
&scannedObject.StreamID, &scannedObject.Version,
&scannedObject.CreatedAt, &scannedObject.ExpiresAt,
&scannedObject.SegmentCount,
&scannedObject.EncryptedMetadataNonce, &scannedObject.EncryptedMetadata, &scannedObject.EncryptedMetadataEncryptedKey,
&scannedObject.TotalPlainSize, &scannedObject.TotalEncryptedSize, &scannedObject.FixedSegmentSize,
encryptionParameters{&scannedObject.Encryption},
); err != nil {
return Error.New("unable to query object status: %w", err)
}
ORDER BY version DESC
LIMIT 1`,
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey)
if objectFound {
db.log.Warn("object with multiple committed versions were found!",
zap.Stringer("Project ID", opts.ProjectID), zap.String("Bucket Name", opts.BucketName),
zap.ByteString("Object Key", []byte(opts.ObjectKey)), zap.Int("Version", int(scannedObject.Version)),
zap.Stringer("Stream ID", scannedObject.StreamID), zap.Stack("stacktrace"))
mon.Meter("multiple_committed_versions").Mark(1)
continue
}
object = scannedObject
err = row.Scan(
&object.StreamID, &object.Version, &object.Status,
&object.CreatedAt, &object.ExpiresAt,
&object.SegmentCount,
&object.EncryptedMetadataNonce, &object.EncryptedMetadata, &object.EncryptedMetadataEncryptedKey,
&object.TotalPlainSize, &object.TotalEncryptedSize, &object.FixedSegmentSize,
encryptionParameters{&object.Encryption},
)
objectFound = true
}
if !objectFound {
return sql.ErrNoRows
}
return nil
})
if errors.Is(err, sql.ErrNoRows) || object.Status.IsDeleteMarker() {
return Object{}, ErrObjectNotFound.Wrap(Error.Wrap(sql.ErrNoRows))
}
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return Object{}, ErrObjectNotFound.Wrap(Error.Wrap(err))
}
return Object{}, Error.New("unable to query object status: %w", err)
}
object.ProjectID = opts.ProjectID
object.BucketName = opts.BucketName
object.ObjectKey = opts.ObjectKey
object.Status = CommittedUnversioned
return object, nil
}

View File

@ -12,6 +12,7 @@ import (
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/common/uuid"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/metabasetest"
)
@ -154,34 +155,87 @@ func TestGetObjectExactVersion(t *testing.T) {
}}.Check(ctx, t, db)
})
t.Run("Get object", func(t *testing.T) {
t.Run("get committed/deletemarker unversioned/versioned", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
metabasetest.CreateObject(ctx, t, db, obj, 0)
unversionedLocation := obj
unversioned := metabasetest.CreateObject(ctx, t, db, unversionedLocation, 0)
metabasetest.GetObjectExactVersion{
Opts: metabase.GetObjectExactVersion{
ObjectLocation: location,
Version: obj.Version,
ObjectLocation: unversioned.Location(),
Version: unversioned.Version,
},
Result: metabase.Object{
ObjectStream: obj,
CreatedAt: now,
Status: metabase.CommittedUnversioned,
Result: unversioned,
}.Check(ctx, t, db)
Encryption: metabasetest.DefaultEncryption,
versionedLocation := obj
versionedLocation.Version++
versioned := metabasetest.CreateObjectVersioned(ctx, t, db, versionedLocation, 0)
metabasetest.GetObjectExactVersion{
Opts: metabase.GetObjectExactVersion{
ObjectLocation: versioned.Location(),
Version: versioned.Version,
},
Result: versioned,
}.Check(ctx, t, db)
markerLocation := obj
markerLocation.StreamID = uuid.UUID{}
markerLocation.Version = versioned.Version + 1
versionedMarker := metabase.Object{
ObjectStream: markerLocation,
CreatedAt: time.Now(),
Status: metabase.DeleteMarkerVersioned,
}
// this creates a versioned delete marker
metabasetest.DeleteObjectLastCommitted{
Opts: metabase.DeleteObjectLastCommitted{
ObjectLocation: location,
Versioned: true,
},
Result: metabase.DeleteObjectResult{
Objects: []metabase.Object{versionedMarker},
},
}.Check(ctx, t, db)
metabasetest.Verify{Objects: []metabase.RawObject{
{
ObjectStream: obj,
CreatedAt: now,
Status: metabase.CommittedUnversioned,
Encryption: metabasetest.DefaultEncryption,
metabasetest.GetObjectExactVersion{
Opts: metabase.GetObjectExactVersion{
ObjectLocation: versionedMarker.Location(),
Version: versionedMarker.Version,
},
}}.Check(ctx, t, db)
Result: versionedMarker,
}.Check(ctx, t, db)
unversionedMarkerLocation := obj
unversionedMarkerLocation.StreamID = uuid.UUID{}
unversionedMarkerLocation.Version = versionedMarker.Version + 1
unversionedMarker := metabase.Object{
ObjectStream: unversionedMarkerLocation,
CreatedAt: time.Now(),
Status: metabase.DeleteMarkerUnversioned,
}
// this creates an unversioned delete marker
metabasetest.DeleteObjectLastCommitted{
Opts: metabase.DeleteObjectLastCommitted{
ObjectLocation: location,
Suspended: true,
},
Result: metabase.DeleteObjectResult{
Objects: []metabase.Object{unversionedMarker},
},
}.Check(ctx, t, db)
metabasetest.GetObjectExactVersion{
Opts: metabase.GetObjectExactVersion{
ObjectLocation: unversionedMarker.Location(),
Version: unversionedMarker.Version,
},
Result: unversionedMarker,
}.Check(ctx, t, db)
})
})
}
@ -335,6 +389,61 @@ func TestGetObjectLastCommitted(t *testing.T) {
}}.Check(ctx, t, db)
})
t.Run("Get object last committed version, multiple versions", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
first := obj
first.Version = metabase.Version(10)
firstObject := metabasetest.CreateObjectVersioned(ctx, t, db, first, 0)
second := obj
second.Version = metabase.Version(11)
secondObject := metabasetest.CreateObjectVersioned(ctx, t, db, second, 0)
metabasetest.GetObjectLastCommitted{
Opts: metabase.GetObjectLastCommitted{
ObjectLocation: location,
},
Result: secondObject,
}.Check(ctx, t, db)
metabasetest.Verify{Objects: []metabase.RawObject{
metabase.RawObject(firstObject),
metabase.RawObject(secondObject),
}}.Check(ctx, t, db)
})
t.Run("Get object delete marker, multiple versions", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
first := obj
first.Version = metabase.Version(5)
firstObject := metabasetest.CreateObjectVersioned(ctx, t, db, first, 0)
second := obj
second.Version = metabase.Version(8)
secondObject := metabasetest.CreateObjectVersioned(ctx, t, db, second, 0)
result, err := db.DeleteObjectLastCommitted(ctx, metabase.DeleteObjectLastCommitted{
ObjectLocation: second.Location(),
Versioned: true,
})
require.NoError(t, err)
metabasetest.GetObjectLastCommitted{
Opts: metabase.GetObjectLastCommitted{
ObjectLocation: second.Location(),
},
ErrClass: &metabase.ErrObjectNotFound,
}.Check(ctx, t, db)
metabasetest.Verify{Objects: []metabase.RawObject{
metabase.RawObject(result.Objects[0]),
metabase.RawObject(firstObject),
metabase.RawObject(secondObject),
}}.Check(ctx, t, db)
})
t.Run("Get latest copied object version with duplicate metadata", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
copyObjStream := metabasetest.RandObjectStream()