storj/satellite/metainfo/metabase/delete.go
Egon Elbre 8182d8a726 satellite/metainfo/metabase: use txutil.WithTx in delete
Change-Id: I75965c4dcf57479ace2367f5d3069d785628e86a
2020-11-12 14:38:36 +02:00

473 lines
13 KiB
Go

// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package metabase
import (
"bytes"
"context"
"database/sql"
"errors"
"sort"
"github.com/zeebo/errs"
"storj.io/common/storj"
"storj.io/storj/private/dbutil/pgutil"
"storj.io/storj/private/dbutil/txutil"
"storj.io/storj/private/tagsql"
)
// DeleteObjectExactVersion contains arguments necessary for deleting an exact version of object.
type DeleteObjectExactVersion struct {
Version Version
ObjectLocation
}
// Verify delete object fields.
func (obj *DeleteObjectExactVersion) Verify() error {
if err := obj.ObjectLocation.Verify(); err != nil {
return err
}
if obj.Version <= 0 {
return ErrInvalidRequest.New("Version invalid: %v", obj.Version)
}
return nil
}
// DeleteObjectResult result of deleting object.
type DeleteObjectResult struct {
Objects []Object
Segments []DeletedSegmentInfo
}
// DeletedSegmentInfo info about deleted segment.
type DeletedSegmentInfo struct {
RootPieceID storj.PieceID
Pieces Pieces
}
// DeleteObjectAllVersions contains arguments necessary for deleting all object versions.
type DeleteObjectAllVersions struct {
ObjectLocation
}
// DeleteObjectsAllVersions contains arguments necessary for deleting all versions of multiple objects from the same bucket.
type DeleteObjectsAllVersions struct {
Locations []ObjectLocation
}
// Verify delete objects fields.
func (delete *DeleteObjectsAllVersions) Verify() error {
if len(delete.Locations) == 0 {
return nil
}
// TODO: make this limit configurable
if len(delete.Locations) > 1000 {
return ErrInvalidRequest.New("cannot delete more than 1000 objects in a single request")
}
var errGroup errs.Group
for _, location := range delete.Locations {
errGroup.Add(location.Verify())
}
err := errGroup.Err()
if err != nil {
return err
}
// Verify if all locations are in the same bucket
first := delete.Locations[0]
for _, item := range delete.Locations[1:] {
if first.ProjectID != item.ProjectID || first.BucketName != item.BucketName {
return ErrInvalidRequest.New("all objects must be in the same bucket")
}
}
return nil
}
// DeleteObjectLatestVersion contains arguments necessary for deleting latest object version.
type DeleteObjectLatestVersion struct {
ObjectLocation
}
// DeleteObjectExactVersion deletes an exact object version.
func (db *DB) DeleteObjectExactVersion(ctx context.Context, opts DeleteObjectExactVersion) (result DeleteObjectResult, err error) {
defer mon.Task()(&ctx)(&err)
if err := opts.Verify(); err != nil {
return DeleteObjectResult{}, err
}
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) error {
rows, err := tx.Query(ctx, `
DELETE FROM objects
WHERE
project_id = $1 AND
bucket_name = $2 AND
object_key = $3 AND
version = $4 AND
status = 1
RETURNING
version, stream_id,
created_at, expires_at,
status, segment_count,
encrypted_metadata_nonce, encrypted_metadata,
total_encrypted_size, fixed_segment_size,
encryption;
`, opts.ProjectID, opts.BucketName, []byte(opts.ObjectKey), opts.Version)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return storj.ErrObjectNotFound.Wrap(Error.Wrap(err))
}
return Error.New("unable to delete object: %w", err)
}
result.Objects, err = scanObjectDeletion(opts.ObjectLocation, rows)
if err != nil {
return err
}
if len(result.Objects) == 0 {
return storj.ErrObjectNotFound.Wrap(Error.New("no rows deleted"))
}
segmentInfos, err := deleteSegments(ctx, tx, result.Objects)
if err != nil {
return err
}
if len(segmentInfos) != 0 {
result.Segments = segmentInfos
}
return nil
})
if err != nil {
return DeleteObjectResult{}, err
}
return result, nil
}
// DeleteObjectLatestVersion deletes latest object version.
func (db *DB) DeleteObjectLatestVersion(ctx context.Context, opts DeleteObjectLatestVersion) (result DeleteObjectResult, err error) {
defer mon.Task()(&ctx)(&err)
if err := opts.Verify(); err != nil {
return DeleteObjectResult{}, err
}
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) error {
// TODO different sql for Postgres and CockroachDB
// version ONLY for cockroachdb
// Postgres doesn't support ORDER BY and LIMIT in DELETE
// rows, err = tx.Query(ctx, `
// DELETE FROM objects
// WHERE
// project_id = $1 AND
// bucket_name = $2 AND
// object_key = $3 AND
// status = 1
// ORDER BY version DESC
// LIMIT 1
// RETURNING stream_id;
// `, opts.ProjectID, opts.BucketName, opts.ObjectKey)
// version for Postgres and Cockroachdb (but slow for Cockroachdb)
rows, err := tx.Query(ctx, `
DELETE FROM objects
WHERE
project_id = $1 AND
bucket_name = $2 AND
object_key = $3 AND
version = (SELECT version FROM objects WHERE
project_id = $1 AND
bucket_name = $2 AND
object_key = $3 AND
status = 1
ORDER BY version DESC LIMIT 1
) AND
status = 1
RETURNING
version, stream_id,
created_at, expires_at,
status, segment_count,
encrypted_metadata_nonce, encrypted_metadata,
total_encrypted_size, fixed_segment_size,
encryption;
`, opts.ProjectID, opts.BucketName, []byte(opts.ObjectKey))
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return storj.ErrObjectNotFound.Wrap(Error.Wrap(err))
}
return Error.New("unable to delete object: %w", err)
}
result.Objects, err = scanObjectDeletion(opts.ObjectLocation, rows)
if err != nil {
return err
}
if len(result.Objects) == 0 {
return storj.ErrObjectNotFound.Wrap(Error.New("no rows deleted"))
}
segmentInfos, err := deleteSegments(ctx, tx, result.Objects)
if err != nil {
return err
}
if len(segmentInfos) != 0 {
result.Segments = segmentInfos
}
return nil
})
if err != nil {
return DeleteObjectResult{}, err
}
return result, nil
}
// DeleteObjectAllVersions deletes all object versions.
func (db *DB) DeleteObjectAllVersions(ctx context.Context, opts DeleteObjectAllVersions) (result DeleteObjectResult, err error) {
defer mon.Task()(&ctx)(&err)
if err := opts.Verify(); err != nil {
return DeleteObjectResult{}, err
}
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) error {
rows, err := tx.Query(ctx, `
DELETE FROM objects
WHERE
project_id = $1 AND
bucket_name = $2 AND
object_key = $3 AND
status = 1
RETURNING
version, stream_id,
created_at, expires_at,
status, segment_count,
encrypted_metadata_nonce, encrypted_metadata,
total_encrypted_size, fixed_segment_size,
encryption;
`, opts.ProjectID, opts.BucketName, []byte(opts.ObjectKey))
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return storj.ErrObjectNotFound.Wrap(Error.Wrap(err))
}
return Error.New("unable to delete object: %w", err)
}
result.Objects, err = scanObjectDeletion(opts.ObjectLocation, rows)
if err != nil {
return err
}
if len(result.Objects) == 0 {
return storj.ErrObjectNotFound.Wrap(Error.New("no rows deleted"))
}
segmentInfos, err := deleteSegments(ctx, tx, result.Objects)
if err != nil {
return err
}
if len(segmentInfos) != 0 {
result.Segments = segmentInfos
}
return nil
})
if err != nil {
return DeleteObjectResult{}, err
}
return result, nil
}
// DeleteObjectsAllVersions deletes all versions of multiple objects from the same bucket.
func (db *DB) DeleteObjectsAllVersions(ctx context.Context, opts DeleteObjectsAllVersions) (result DeleteObjectResult, err error) {
defer mon.Task()(&ctx)(&err)
if len(opts.Locations) == 0 {
// nothing to delete, no error
return DeleteObjectResult{}, nil
}
if err := opts.Verify(); err != nil {
return DeleteObjectResult{}, err
}
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) error {
// It is aleady verified that all object locations are in the same bucket
projectID := opts.Locations[0].ProjectID
bucketName := opts.Locations[0].BucketName
objectKeys := make([][]byte, len(opts.Locations))
for i := range opts.Locations {
objectKeys[i] = []byte(opts.Locations[i].ObjectKey)
}
// Sorting the object keys just in case.
// TODO: Check if this is really necessary for the SQL query.
sort.Slice(objectKeys, func(i, j int) bool {
return bytes.Compare(objectKeys[i], objectKeys[j]) < 0
})
rows, err := tx.Query(ctx, `
DELETE FROM objects
WHERE
project_id = $1 AND
bucket_name = $2 AND
object_key = ANY ($3) AND
status = 1
RETURNING
project_id, bucket_name,
object_key, version, stream_id,
created_at, expires_at,
status, segment_count,
encrypted_metadata_nonce, encrypted_metadata,
total_encrypted_size, fixed_segment_size,
encryption;
`, projectID, bucketName, pgutil.ByteaArray(objectKeys))
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return storj.ErrObjectNotFound.Wrap(Error.Wrap(err))
}
return Error.New("unable to delete object: %w", err)
}
result.Objects, err = scanMultipleObjectsDeletion(rows)
if err != nil {
return err
}
if len(result.Objects) == 0 {
// nothing was delete, no error
return nil
}
segmentInfos, err := deleteSegments(ctx, tx, result.Objects)
if err != nil {
return err
}
if len(segmentInfos) != 0 {
result.Segments = segmentInfos
}
return nil
})
if err != nil {
return DeleteObjectResult{}, err
}
return result, nil
}
func scanObjectDeletion(location ObjectLocation, rows tagsql.Rows) (objects []Object, err error) {
defer func() { err = errs.Combine(err, rows.Close()) }()
objects = make([]Object, 0, 10)
for rows.Next() {
var object Object
object.ProjectID = location.ProjectID
object.BucketName = location.BucketName
object.ObjectKey = location.ObjectKey
err = rows.Scan(&object.Version, &object.StreamID,
&object.CreatedAt, &object.ExpiresAt,
&object.Status, &object.SegmentCount,
&object.EncryptedMetadataNonce, &object.EncryptedMetadata,
&object.TotalEncryptedSize, &object.FixedSegmentSize,
encryptionParameters{&object.Encryption})
if err != nil {
return nil, Error.New("unable to delete object: %w", err)
}
objects = append(objects, object)
}
if err := rows.Err(); err != nil {
return nil, Error.New("unable to delete object: %w", err)
}
return objects, nil
}
func scanMultipleObjectsDeletion(rows tagsql.Rows) (objects []Object, err error) {
defer func() { err = errs.Combine(err, rows.Close()) }()
objects = make([]Object, 0, 10)
for rows.Next() {
var object Object
err = rows.Scan(&object.ProjectID, &object.BucketName,
&object.ObjectKey, &object.Version, &object.StreamID,
&object.CreatedAt, &object.ExpiresAt,
&object.Status, &object.SegmentCount,
&object.EncryptedMetadataNonce, &object.EncryptedMetadata,
&object.TotalEncryptedSize, &object.FixedSegmentSize,
encryptionParameters{&object.Encryption})
if err != nil {
return nil, Error.New("unable to delete object: %w", err)
}
objects = append(objects, object)
}
if err := rows.Err(); err != nil {
return nil, Error.New("unable to delete object: %w", err)
}
if len(objects) == 0 {
objects = nil
}
return objects, nil
}
func deleteSegments(ctx context.Context, tx tagsql.Tx, objects []Object) (_ []DeletedSegmentInfo, err error) {
defer mon.Task()(&ctx)(&err)
// TODO we need to figure out how integrate this with piece deletion code
// one issue is that with this approach we need to return all pieces SN ids at once
streamIDs := make([][]byte, len(objects))
for i := range objects {
streamIDs[i] = objects[i].StreamID[:]
}
// Sorting the stream IDs just in case.
// TODO: Check if this is really necessary for the SQL query.
sort.Slice(streamIDs, func(i, j int) bool {
return bytes.Compare(streamIDs[i], streamIDs[j]) < 0
})
segmentsRows, err := tx.Query(ctx, `
DELETE FROM segments
WHERE stream_id = ANY ($1)
RETURNING root_piece_id, remote_pieces;
`, pgutil.ByteaArray(streamIDs))
if err != nil {
return []DeletedSegmentInfo{}, Error.New("unable to delete object: %w", err)
}
defer func() { err = errs.Combine(err, segmentsRows.Close()) }()
infos := make([]DeletedSegmentInfo, 0, len(objects))
for segmentsRows.Next() {
var segmentInfo DeletedSegmentInfo
err = segmentsRows.Scan(&segmentInfo.RootPieceID, &segmentInfo.Pieces)
if err != nil {
return []DeletedSegmentInfo{}, Error.New("unable to delete object: %w", err)
}
if len(segmentInfo.Pieces) != 0 {
infos = append(infos, segmentInfo)
}
}
if err := segmentsRows.Err(); err != nil {
return []DeletedSegmentInfo{}, Error.New("unable to delete object: %w", err)
}
return infos, nil
}