storj/satellite/metabase/get.go
Qweder93 2abe709b6e satellite/metabase: add metabase.GetObjectLatestVersion back
Restored GetObjectLatestVersion and renamed it to GetObjectLastCommitted
Add test cases to cover server-side copy

Closes https://github.com/storj/storj/issues/4866

Change-Id: I343b339a60152b8fb92fda97baf80bd8fe60d631
2022-08-11 22:14:38 +00:00

522 lines
14 KiB
Go

// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package metabase
import (
"context"
"database/sql"
"errors"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/storj"
"storj.io/common/uuid"
"storj.io/private/tagsql"
)
// ErrSegmentNotFound is an error class for non-existing segment.
var ErrSegmentNotFound = errs.Class("segment not found")
// Object object metadata.
// TODO define separated struct.
type Object RawObject
// IsMigrated returns whether the object comes from PointerDB.
// Pointer objects are special that they are missing some information.
//
// - TotalPlainSize = 0 and FixedSegmentSize = 0.
// - Segment.PlainOffset = 0, Segment.PlainSize = 0
func (obj *Object) IsMigrated() bool {
return obj.TotalPlainSize <= 0
}
// Segment segment metadata.
// TODO define separated struct.
type Segment RawSegment
// Inline returns true if segment is inline.
func (s Segment) Inline() bool {
return s.Redundancy.IsZero() && len(s.Pieces) == 0
}
// PiecesInAncestorSegment returns true if remote alias pieces are to be found in an ancestor segment.
func (s Segment) PiecesInAncestorSegment() bool {
return s.EncryptedSize != 0 && len(s.InlineData) == 0 && len(s.Pieces) == 0
}
// Expired checks if segment is expired relative to now.
func (s Segment) Expired(now time.Time) bool {
return s.ExpiresAt != nil && s.ExpiresAt.Before(now)
}
// GetObjectExactVersion contains arguments necessary for fetching an information
// about exact object version.
type GetObjectExactVersion struct {
Version Version
ObjectLocation
}
// Verify verifies get object request fields.
func (obj *GetObjectExactVersion) Verify() error {
if err := obj.ObjectLocation.Verify(); err != nil {
return err
}
if obj.Version <= 0 {
return ErrInvalidRequest.New("Version invalid: %v", obj.Version)
}
return nil
}
// GetObjectExactVersion returns object information for exact version.
func (db *DB) GetObjectExactVersion(ctx context.Context, opts GetObjectExactVersion) (_ Object, err error) {
defer mon.Task()(&ctx)(&err)
if err := opts.Verify(); err != nil {
return Object{}, err
}
object := Object{}
err = db.db.QueryRowContext(ctx, `
SELECT
stream_id,
created_at, expires_at,
segment_count,
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key,
total_plain_size, total_encrypted_size, fixed_segment_size,
encryption
FROM objects
WHERE
project_id = $1 AND
bucket_name = $2 AND
object_key = $3 AND
version = $4 AND
status = `+committedStatus+` AND
(expires_at IS NULL OR expires_at > now())`,
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version).
Scan(
&object.StreamID,
&object.CreatedAt, &object.ExpiresAt,
&object.SegmentCount,
&object.EncryptedMetadataNonce, &object.EncryptedMetadata, &object.EncryptedMetadataEncryptedKey,
&object.TotalPlainSize, &object.TotalEncryptedSize, &object.FixedSegmentSize,
encryptionParameters{&object.Encryption},
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return Object{}, storj.ErrObjectNotFound.Wrap(Error.Wrap(err))
}
return Object{}, Error.New("unable to query object status: %w", err)
}
object.ProjectID = opts.ProjectID
object.BucketName = opts.BucketName
object.ObjectKey = opts.ObjectKey
object.Version = opts.Version
object.Status = Committed
return object, nil
}
// GetObjectLastCommitted contains arguments necessary for fetching
// an object information for last committed version.
type GetObjectLastCommitted struct {
ObjectLocation
}
// GetObjectLastCommitted returns object information for last committed version.
func (db *DB) GetObjectLastCommitted(ctx context.Context, opts GetObjectLastCommitted) (_ Object, err error) {
defer mon.Task()(&ctx)(&err)
if err := opts.Verify(); err != nil {
return Object{}, err
}
var object Object
err = withRows(db.db.QueryContext(ctx, `
SELECT
stream_id, version,
created_at, expires_at,
segment_count,
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key,
total_plain_size, total_encrypted_size, fixed_segment_size,
encryption
FROM objects
WHERE
project_id = $1 AND
bucket_name = $2 AND
object_key = $3 AND
status = `+committedStatus+` AND
(expires_at IS NULL OR expires_at > now())
ORDER BY version desc
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey))(func(rows tagsql.Rows) error {
objectFound := false
for rows.Next() {
var scannedObject Object
if err = rows.Scan(
&scannedObject.StreamID, &scannedObject.Version,
&scannedObject.CreatedAt, &scannedObject.ExpiresAt,
&scannedObject.SegmentCount,
&scannedObject.EncryptedMetadataNonce, &scannedObject.EncryptedMetadata, &scannedObject.EncryptedMetadataEncryptedKey,
&scannedObject.TotalPlainSize, &scannedObject.TotalEncryptedSize, &scannedObject.FixedSegmentSize,
encryptionParameters{&scannedObject.Encryption},
); err != nil {
return Error.New("unable to query object status: %w", err)
}
if objectFound {
db.log.Warn("object with multiple committed versions were found!",
zap.Stringer("Project ID", scannedObject.ProjectID), zap.String("Bucket Name", scannedObject.BucketName),
zap.String("Object Key", string(scannedObject.ObjectKey)), zap.Int("Version", int(scannedObject.Version)),
zap.Stringer("Stream ID", scannedObject.StreamID))
mon.Meter("multiple_committed_versions").Mark(1)
continue
}
object = scannedObject
objectFound = true
}
if !objectFound {
return sql.ErrNoRows
}
return nil
})
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return Object{}, storj.ErrObjectNotFound.Wrap(Error.Wrap(err))
}
return Object{}, Error.New("unable to query object status: %w", err)
}
object.ProjectID = opts.ProjectID
object.BucketName = opts.BucketName
object.ObjectKey = opts.ObjectKey
object.Status = Committed
return object, nil
}
// GetSegmentByPosition contains arguments necessary for fetching a segment on specific position.
type GetSegmentByPosition struct {
StreamID uuid.UUID
Position SegmentPosition
}
// Verify verifies get segment request fields.
func (seg *GetSegmentByPosition) Verify() error {
if seg.StreamID.IsZero() {
return ErrInvalidRequest.New("StreamID missing")
}
return nil
}
// GetSegmentByPosition returns information about segment on the specified position.
func (db *DB) GetSegmentByPosition(ctx context.Context, opts GetSegmentByPosition) (segment Segment, err error) {
defer mon.Task()(&ctx)(&err)
if err := opts.Verify(); err != nil {
return Segment{}, err
}
var aliasPieces AliasPieces
err = db.db.QueryRowContext(ctx, `
SELECT
created_at, expires_at, repaired_at,
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size, plain_offset, plain_size,
encrypted_etag,
redundancy,
inline_data, remote_alias_pieces,
placement
FROM segments
WHERE
stream_id = $1 AND
position = $2
`, opts.StreamID, opts.Position.Encode()).
Scan(
&segment.CreatedAt, &segment.ExpiresAt, &segment.RepairedAt,
&segment.RootPieceID, &segment.EncryptedKeyNonce, &segment.EncryptedKey,
&segment.EncryptedSize, &segment.PlainOffset, &segment.PlainSize,
&segment.EncryptedETag,
redundancyScheme{&segment.Redundancy},
&segment.InlineData, &aliasPieces,
&segment.Placement,
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return Segment{}, ErrSegmentNotFound.New("segment missing")
}
return Segment{}, Error.New("unable to query segment: %w", err)
}
if len(aliasPieces) > 0 {
segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
if err != nil {
return Segment{}, Error.New("unable to convert aliases to pieces: %w", err)
}
}
segment.StreamID = opts.StreamID
segment.Position = opts.Position
if db.config.ServerSideCopy {
err = db.updateWithAncestorSegment(ctx, &segment)
if err != nil {
return Segment{}, err
}
}
return segment, nil
}
// GetLatestObjectLastSegment contains arguments necessary for fetching a last segment information.
type GetLatestObjectLastSegment struct {
ObjectLocation
}
// GetLatestObjectLastSegment returns an object last segment information.
func (db *DB) GetLatestObjectLastSegment(ctx context.Context, opts GetLatestObjectLastSegment) (segment Segment, err error) {
defer mon.Task()(&ctx)(&err)
if err := opts.Verify(); err != nil {
return Segment{}, err
}
var aliasPieces AliasPieces
err = db.db.QueryRowContext(ctx, `
SELECT
stream_id, position,
created_at, repaired_at,
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size, plain_offset, plain_size,
encrypted_etag,
redundancy,
inline_data, remote_alias_pieces,
placement
FROM segments
WHERE
stream_id IN (SELECT stream_id FROM objects WHERE
project_id = $1 AND
bucket_name = $2 AND
object_key = $3 AND
status = `+committedStatus+`
ORDER BY version DESC
LIMIT 1
)
ORDER BY position DESC
LIMIT 1
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey).
Scan(
&segment.StreamID, &segment.Position,
&segment.CreatedAt, &segment.RepairedAt,
&segment.RootPieceID, &segment.EncryptedKeyNonce, &segment.EncryptedKey,
&segment.EncryptedSize, &segment.PlainOffset, &segment.PlainSize,
&segment.EncryptedETag,
redundancyScheme{&segment.Redundancy},
&segment.InlineData, &aliasPieces,
&segment.Placement,
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return Segment{}, storj.ErrObjectNotFound.Wrap(Error.New("object or segment missing"))
}
return Segment{}, Error.New("unable to query segment: %w", err)
}
if len(aliasPieces) > 0 {
segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
if err != nil {
return Segment{}, Error.New("unable to convert aliases to pieces: %w", err)
}
}
if db.config.ServerSideCopy {
err = db.updateWithAncestorSegment(ctx, &segment)
if err != nil {
return Segment{}, err
}
}
return segment, nil
}
func (db *DB) updateWithAncestorSegment(ctx context.Context, segment *Segment) (err error) {
if !segment.PiecesInAncestorSegment() {
return nil
}
var aliasPieces AliasPieces
err = db.db.QueryRowContext(ctx, `
SELECT
root_piece_id,
repaired_at,
remote_alias_pieces
FROM segments
WHERE
stream_id IN (SELECT ancestor_stream_id FROM segment_copies WHERE stream_id = $1)
AND position = $2
`, segment.StreamID, segment.Position.Encode()).Scan(
&segment.RootPieceID,
&segment.RepairedAt,
&aliasPieces,
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return ErrSegmentNotFound.New("segment missing")
}
return Error.New("unable to query segment: %w", err)
}
segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
if err != nil {
return Error.New("unable to convert aliases to pieces: %w", err)
}
return nil
}
// BucketEmpty contains arguments necessary for checking if bucket is empty.
type BucketEmpty struct {
ProjectID uuid.UUID
BucketName string
}
// BucketEmpty returns true if bucket does not contain objects (pending or committed).
// This method doesn't check bucket existence.
func (db *DB) BucketEmpty(ctx context.Context, opts BucketEmpty) (empty bool, err error) {
defer mon.Task()(&ctx)(&err)
switch {
case opts.ProjectID.IsZero():
return false, ErrInvalidRequest.New("ProjectID missing")
case opts.BucketName == "":
return false, ErrInvalidRequest.New("BucketName missing")
}
var value int
err = db.db.QueryRowContext(ctx, `
SELECT
1
FROM objects
WHERE
project_id = $1 AND
bucket_name = $2
LIMIT 1
`, opts.ProjectID, []byte(opts.BucketName)).Scan(&value)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return true, nil
}
return false, Error.New("unable to query objects: %w", err)
}
return false, nil
}
// TestingAllCommittedObjects gets all objects from bucket.
// Use only for testing purposes.
func (db *DB) TestingAllCommittedObjects(ctx context.Context, projectID uuid.UUID, bucketName string) (objects []ObjectEntry, err error) {
defer mon.Task()(&ctx)(&err)
return db.testingAllObjectsByStatus(ctx, projectID, bucketName, Committed)
}
// TestingAllPendingObjects gets all objects from bucket.
// Use only for testing purposes.
func (db *DB) TestingAllPendingObjects(ctx context.Context, projectID uuid.UUID, bucketName string) (objects []ObjectEntry, err error) {
defer mon.Task()(&ctx)(&err)
return db.testingAllObjectsByStatus(ctx, projectID, bucketName, Pending)
}
func (db *DB) testingAllObjectsByStatus(ctx context.Context, projectID uuid.UUID, bucketName string, status ObjectStatus) (objects []ObjectEntry, err error) {
defer mon.Task()(&ctx)(&err)
err = db.IterateObjectsAllVersionsWithStatus(ctx,
IterateObjectsWithStatus{
ProjectID: projectID,
BucketName: bucketName,
Recursive: true,
Status: status,
IncludeCustomMetadata: true,
IncludeSystemMetadata: true,
}, func(ctx context.Context, it ObjectsIterator) error {
entry := ObjectEntry{}
for it.Next(ctx, &entry) {
objects = append(objects, entry)
}
return nil
},
)
if err != nil {
return nil, Error.Wrap(err)
}
return objects, nil
}
// TestingAllObjectSegments gets all segments for given object.
// Use only for testing purposes.
func (db *DB) TestingAllObjectSegments(ctx context.Context, objectLocation ObjectLocation) (segments []Segment, err error) {
defer mon.Task()(&ctx)(&err)
object, err := db.GetObjectExactVersion(ctx, GetObjectExactVersion{
ObjectLocation: objectLocation,
Version: DefaultVersion,
})
if err != nil {
return nil, Error.Wrap(err)
}
response, err := db.ListSegments(ctx, ListSegments{
StreamID: object.StreamID,
})
if err != nil {
return nil, Error.Wrap(err)
}
return response.Segments, nil
}
// TestingAllObjects gets all objects.
// Use only for testing purposes.
func (db *DB) TestingAllObjects(ctx context.Context) (objects []Object, err error) {
defer mon.Task()(&ctx)(&err)
rawObjects, err := db.testingGetAllObjects(ctx)
if err != nil {
return nil, Error.Wrap(err)
}
for _, o := range rawObjects {
objects = append(objects, Object(o))
}
return objects, nil
}
// TestingAllSegments gets all segments.
// Use only for testing purposes.
func (db *DB) TestingAllSegments(ctx context.Context) (segments []Segment, err error) {
defer mon.Task()(&ctx)(&err)
rawSegments, err := db.testingGetAllSegments(ctx)
if err != nil {
return nil, Error.Wrap(err)
}
for _, s := range rawSegments {
segments = append(segments, Segment(s))
}
return segments, nil
}