// Copyright (C) 2020 Storj Labs, Inc. // See LICENSE for copying information. package metabase import ( "context" "database/sql" "errors" "time" "github.com/zeebo/errs" "storj.io/common/storj" "storj.io/common/uuid" ) // ErrSegmentNotFound is an error class for non-existing segment. var ErrSegmentNotFound = errs.Class("segment not found") // Object object metadata. // TODO define separated struct. type Object RawObject // IsMigrated returns whether the object comes from PointerDB. // Pointer objects are special that they are missing some information. // // * TotalPlainSize = 0 and FixedSegmentSize = 0. // * Segment.PlainOffset = 0, Segment.PlainSize = 0 func (obj *Object) IsMigrated() bool { return obj.TotalPlainSize <= 0 } // Segment segment metadata. // TODO define separated struct. type Segment RawSegment // Inline returns true if segment is inline. func (s Segment) Inline() bool { return s.Redundancy.IsZero() && len(s.Pieces) == 0 } // Expired checks if segment is expired relative to now. func (s Segment) Expired(now time.Time) bool { return s.ExpiresAt != nil && s.ExpiresAt.Before(now) } // GetObjectExactVersion contains arguments necessary for fetching an information // about exact object version. type GetObjectExactVersion struct { Version Version ObjectLocation } // Verify verifies get object reqest fields. func (obj *GetObjectExactVersion) Verify() error { if err := obj.ObjectLocation.Verify(); err != nil { return err } if obj.Version <= 0 { return ErrInvalidRequest.New("Version invalid: %v", obj.Version) } return nil } // GetObjectExactVersion returns object information for exact version. func (db *DB) GetObjectExactVersion(ctx context.Context, opts GetObjectExactVersion) (_ Object, err error) { defer mon.Task()(&ctx)(&err) if err := opts.Verify(); err != nil { return Object{}, err } object := Object{} err = db.db.QueryRowContext(ctx, ` SELECT stream_id, created_at, expires_at, segment_count, encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key, total_plain_size, total_encrypted_size, fixed_segment_size, encryption FROM objects WHERE project_id = $1 AND bucket_name = $2 AND object_key = $3 AND version = $4 AND status = `+committedStatus, opts.ProjectID, []byte(opts.BucketName), []byte(opts.ObjectKey), opts.Version). Scan( &object.StreamID, &object.CreatedAt, &object.ExpiresAt, &object.SegmentCount, &object.EncryptedMetadataNonce, &object.EncryptedMetadata, &object.EncryptedMetadataEncryptedKey, &object.TotalPlainSize, &object.TotalEncryptedSize, &object.FixedSegmentSize, encryptionParameters{&object.Encryption}, ) if err != nil { if errors.Is(err, sql.ErrNoRows) { return Object{}, storj.ErrObjectNotFound.Wrap(Error.Wrap(err)) } return Object{}, Error.New("unable to query object status: %w", err) } object.ProjectID = opts.ProjectID object.BucketName = opts.BucketName object.ObjectKey = opts.ObjectKey object.Version = opts.Version object.Status = Committed return object, nil } // GetObjectLatestVersion contains arguments necessary for fetching // an object information for latest version. type GetObjectLatestVersion struct { ObjectLocation } // GetObjectLatestVersion returns object information for latest version. func (db *DB) GetObjectLatestVersion(ctx context.Context, opts GetObjectLatestVersion) (_ Object, err error) { defer mon.Task()(&ctx)(&err) if err := opts.Verify(); err != nil { return Object{}, err } object := Object{} err = db.db.QueryRowContext(ctx, ` SELECT stream_id, version, created_at, expires_at, segment_count, encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key, total_plain_size, total_encrypted_size, fixed_segment_size, encryption FROM objects WHERE project_id = $1 AND bucket_name = $2 AND object_key = $3 AND status = `+committedStatus+` ORDER BY version desc LIMIT 1 `, opts.ProjectID, []byte(opts.BucketName), []byte(opts.ObjectKey)). Scan( &object.StreamID, &object.Version, &object.CreatedAt, &object.ExpiresAt, &object.SegmentCount, &object.EncryptedMetadataNonce, &object.EncryptedMetadata, &object.EncryptedMetadataEncryptedKey, &object.TotalPlainSize, &object.TotalEncryptedSize, &object.FixedSegmentSize, encryptionParameters{&object.Encryption}, ) if err != nil { if errors.Is(err, sql.ErrNoRows) { return Object{}, storj.ErrObjectNotFound.Wrap(Error.Wrap(err)) } return Object{}, Error.New("unable to query object status: %w", err) } object.ProjectID = opts.ProjectID object.BucketName = opts.BucketName object.ObjectKey = opts.ObjectKey object.Status = Committed return object, nil } // GetSegmentByLocation contains arguments necessary for fetching a segment on specific segment location. type GetSegmentByLocation struct { SegmentLocation } // GetSegmentByLocation returns information about segment on the specified location. func (db *DB) GetSegmentByLocation(ctx context.Context, opts GetSegmentByLocation) (segment Segment, err error) { defer mon.Task()(&ctx)(&err) if err := opts.Verify(); err != nil { return Segment{}, err } var aliasPieces AliasPieces err = db.db.QueryRowContext(ctx, ` SELECT stream_id, created_at, expires_at, repaired_at, root_piece_id, encrypted_key_nonce, encrypted_key, encrypted_size, plain_offset, plain_size, encrypted_etag, redundancy, inline_data, remote_alias_pieces FROM segments WHERE stream_id IN (SELECT stream_id FROM objects WHERE project_id = $1 AND bucket_name = $2 AND object_key = $3 ORDER BY version DESC LIMIT 1 ) AND position = $4 `, opts.ProjectID, []byte(opts.BucketName), []byte(opts.ObjectKey), opts.Position.Encode()). Scan( &segment.StreamID, &segment.CreatedAt, &segment.ExpiresAt, &segment.RepairedAt, &segment.RootPieceID, &segment.EncryptedKeyNonce, &segment.EncryptedKey, &segment.EncryptedSize, &segment.PlainOffset, &segment.PlainSize, &segment.EncryptedETag, redundancyScheme{&segment.Redundancy}, &segment.InlineData, &aliasPieces, ) if err != nil { if errors.Is(err, sql.ErrNoRows) { return Segment{}, storj.ErrObjectNotFound.Wrap(Error.New("object or segment missing")) } return Segment{}, Error.New("unable to query segment: %w", err) } segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces) if err != nil { return Segment{}, Error.New("unable to convert aliases to pieces: %w", err) } segment.Position = opts.Position return segment, nil } // GetSegmentByPosition contains arguments necessary for fetching a segment on specific position. type GetSegmentByPosition struct { StreamID uuid.UUID Position SegmentPosition } // Verify verifies get segment request fields. func (seg *GetSegmentByPosition) Verify() error { if seg.StreamID.IsZero() { return ErrInvalidRequest.New("StreamID missing") } return nil } // GetSegmentByPosition returns information about segment on the specified position. func (db *DB) GetSegmentByPosition(ctx context.Context, opts GetSegmentByPosition) (segment Segment, err error) { defer mon.Task()(&ctx)(&err) if err := opts.Verify(); err != nil { return Segment{}, err } var aliasPieces AliasPieces err = db.db.QueryRowContext(ctx, ` SELECT created_at, expires_at, repaired_at, root_piece_id, encrypted_key_nonce, encrypted_key, encrypted_size, plain_offset, plain_size, encrypted_etag, redundancy, inline_data, remote_alias_pieces FROM segments WHERE stream_id = $1 AND position = $2 `, opts.StreamID, opts.Position.Encode()). Scan( &segment.CreatedAt, &segment.ExpiresAt, &segment.RepairedAt, &segment.RootPieceID, &segment.EncryptedKeyNonce, &segment.EncryptedKey, &segment.EncryptedSize, &segment.PlainOffset, &segment.PlainSize, &segment.EncryptedETag, redundancyScheme{&segment.Redundancy}, &segment.InlineData, &aliasPieces, ) if err != nil { if errors.Is(err, sql.ErrNoRows) { return Segment{}, ErrSegmentNotFound.New("segment missing") } return Segment{}, Error.New("unable to query segment: %w", err) } segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces) if err != nil { return Segment{}, Error.New("unable to convert aliases to pieces: %w", err) } segment.StreamID = opts.StreamID segment.Position = opts.Position return segment, nil } // GetLatestObjectLastSegment contains arguments necessary for fetching a last segment information. type GetLatestObjectLastSegment struct { ObjectLocation } // GetLatestObjectLastSegment returns an object last segment information. func (db *DB) GetLatestObjectLastSegment(ctx context.Context, opts GetLatestObjectLastSegment) (segment Segment, err error) { defer mon.Task()(&ctx)(&err) if err := opts.Verify(); err != nil { return Segment{}, err } var aliasPieces AliasPieces err = db.db.QueryRowContext(ctx, ` SELECT stream_id, position, created_at, repaired_at, root_piece_id, encrypted_key_nonce, encrypted_key, encrypted_size, plain_offset, plain_size, encrypted_etag, redundancy, inline_data, remote_alias_pieces FROM segments WHERE stream_id IN (SELECT stream_id FROM objects WHERE project_id = $1 AND bucket_name = $2 AND object_key = $3 AND status = `+committedStatus+` ORDER BY version DESC LIMIT 1 ) ORDER BY position DESC LIMIT 1 `, opts.ProjectID, []byte(opts.BucketName), []byte(opts.ObjectKey)). Scan( &segment.StreamID, &segment.Position, &segment.CreatedAt, &segment.RepairedAt, &segment.RootPieceID, &segment.EncryptedKeyNonce, &segment.EncryptedKey, &segment.EncryptedSize, &segment.PlainOffset, &segment.PlainSize, &segment.EncryptedETag, redundancyScheme{&segment.Redundancy}, &segment.InlineData, &aliasPieces, ) if err != nil { if errors.Is(err, sql.ErrNoRows) { return Segment{}, storj.ErrObjectNotFound.Wrap(Error.New("object or segment missing")) } return Segment{}, Error.New("unable to query segment: %w", err) } segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces) if err != nil { return Segment{}, Error.New("unable to convert aliases to pieces: %w", err) } return segment, nil } // GetSegmentByOffset contains arguments necessary for fetching a segment information. type GetSegmentByOffset struct { ObjectLocation PlainOffset int64 } // GetSegmentByOffset returns an object segment information. func (db *DB) GetSegmentByOffset(ctx context.Context, opts GetSegmentByOffset) (segment Segment, err error) { defer mon.Task()(&ctx)(&err) if err := opts.Verify(); err != nil { return Segment{}, err } if opts.PlainOffset < 0 { return Segment{}, ErrInvalidRequest.New("Invalid PlainOffset: %d", opts.PlainOffset) } var aliasPieces AliasPieces err = db.db.QueryRowContext(ctx, ` SELECT stream_id, position, created_at, expires_at, repaired_at, root_piece_id, encrypted_key_nonce, encrypted_key, encrypted_size, plain_offset, plain_size, encrypted_etag, redundancy, inline_data, remote_alias_pieces FROM segments WHERE stream_id IN (SELECT stream_id FROM objects WHERE project_id = $1 AND bucket_name = $2 AND object_key = $3 AND status = `+committedStatus+` ORDER BY version DESC LIMIT 1 ) AND plain_offset <= $4 AND (plain_size + plain_offset) > $4 ORDER BY plain_offset ASC LIMIT 1 `, opts.ProjectID, []byte(opts.BucketName), []byte(opts.ObjectKey), opts.PlainOffset). Scan( &segment.StreamID, &segment.Position, &segment.CreatedAt, &segment.ExpiresAt, &segment.RepairedAt, &segment.RootPieceID, &segment.EncryptedKeyNonce, &segment.EncryptedKey, &segment.EncryptedSize, &segment.PlainOffset, &segment.PlainSize, &segment.EncryptedETag, redundancyScheme{&segment.Redundancy}, &segment.InlineData, &aliasPieces, ) if err != nil { if errors.Is(err, sql.ErrNoRows) { return Segment{}, storj.ErrObjectNotFound.Wrap(Error.New("object or segment missing")) } return Segment{}, Error.New("unable to query segment: %w", err) } segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces) if err != nil { return Segment{}, Error.New("unable to convert aliases to pieces: %w", err) } return segment, nil } // BucketEmpty contains arguments necessary for checking if bucket is empty. type BucketEmpty struct { ProjectID uuid.UUID BucketName string } // BucketEmpty returns true if bucket does not contain objects (pending or committed). // This method doesn't check bucket existence. func (db *DB) BucketEmpty(ctx context.Context, opts BucketEmpty) (empty bool, err error) { defer mon.Task()(&ctx)(&err) switch { case opts.ProjectID.IsZero(): return false, ErrInvalidRequest.New("ProjectID missing") case opts.BucketName == "": return false, ErrInvalidRequest.New("BucketName missing") } var value int err = db.db.QueryRowContext(ctx, ` SELECT 1 FROM objects WHERE project_id = $1 AND bucket_name = $2 LIMIT 1 `, opts.ProjectID, []byte(opts.BucketName)).Scan(&value) if err != nil { if errors.Is(err, sql.ErrNoRows) { return true, nil } return false, Error.New("unable to query objects: %w", err) } return false, nil } // TestingAllCommittedObjects gets all objects from bucket. // Use only for testing purposes. func (db *DB) TestingAllCommittedObjects(ctx context.Context, projectID uuid.UUID, bucketName string) (objects []ObjectEntry, err error) { defer mon.Task()(&ctx)(&err) return db.testingAllObjectsByStatus(ctx, projectID, bucketName, Committed) } // TestingAllPendingObjects gets all objects from bucket. // Use only for testing purposes. func (db *DB) TestingAllPendingObjects(ctx context.Context, projectID uuid.UUID, bucketName string) (objects []ObjectEntry, err error) { defer mon.Task()(&ctx)(&err) return db.testingAllObjectsByStatus(ctx, projectID, bucketName, Pending) } func (db *DB) testingAllObjectsByStatus(ctx context.Context, projectID uuid.UUID, bucketName string, status ObjectStatus) (objects []ObjectEntry, err error) { defer mon.Task()(&ctx)(&err) err = db.IterateObjectsAllVersionsWithStatus(ctx, IterateObjectsWithStatus{ ProjectID: projectID, BucketName: bucketName, Recursive: true, Status: status, IncludeMetadata: true, }, func(ctx context.Context, it ObjectsIterator) error { entry := ObjectEntry{} for it.Next(ctx, &entry) { objects = append(objects, entry) } return nil }, ) if err != nil { return nil, Error.Wrap(err) } return objects, nil } // TestingAllObjectSegments gets all segments for given object. // Use only for testing purposes. func (db *DB) TestingAllObjectSegments(ctx context.Context, objectLocation ObjectLocation) (segments []Segment, err error) { defer mon.Task()(&ctx)(&err) object, err := db.GetObjectLatestVersion(ctx, GetObjectLatestVersion{ ObjectLocation: objectLocation, }) if err != nil { return nil, Error.Wrap(err) } response, err := db.ListSegments(ctx, ListSegments{ StreamID: object.StreamID, }) if err != nil { return nil, Error.Wrap(err) } return response.Segments, nil } // TestingAllObjects gets all objects. // Use only for testing purposes. func (db *DB) TestingAllObjects(ctx context.Context) (objects []Object, err error) { defer mon.Task()(&ctx)(&err) rawObjects, err := db.testingGetAllObjects(ctx) if err != nil { return nil, Error.Wrap(err) } for _, o := range rawObjects { objects = append(objects, Object(o)) } return objects, nil } // TestingAllSegments gets all segments. // Use only for testing purposes. func (db *DB) TestingAllSegments(ctx context.Context) (segments []Segment, err error) { defer mon.Task()(&ctx)(&err) rawSegments, err := db.testingGetAllSegments(ctx) if err != nil { return nil, Error.Wrap(err) } for _, s := range rawSegments { segments = append(segments, Segment(s)) } return segments, nil }