satellite/{metabase,metainfo}: use ObjectStream as argument
Change-Id: I5a7f096002b4d7a6162b24d7a64346b058c4c89c
This commit is contained in:
parent
d06206488f
commit
5269596c7d
@ -216,8 +216,6 @@ func (seg SegmentLocation) Verify() error {
|
||||
}
|
||||
|
||||
// ObjectStream uniquely defines an object and stream.
|
||||
//
|
||||
// TODO: figure out whether ther's a better name.
|
||||
type ObjectStream struct {
|
||||
ProjectID uuid.UUID
|
||||
BucketName string
|
||||
|
@ -11,7 +11,6 @@ import (
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/private/dbutil"
|
||||
"storj.io/private/dbutil/pgutil"
|
||||
"storj.io/private/tagsql"
|
||||
@ -147,24 +146,14 @@ func (db *DB) DeleteObjectExactVersion(ctx context.Context, opts DeleteObjectExa
|
||||
|
||||
// DeletePendingObject contains arguments necessary for deleting a pending object.
|
||||
type DeletePendingObject struct {
|
||||
ObjectLocation
|
||||
Version
|
||||
StreamID uuid.UUID
|
||||
ObjectStream
|
||||
}
|
||||
|
||||
// Verify verifies delete pending object fields validity.
|
||||
func (opts *DeletePendingObject) Verify() error {
|
||||
if err := opts.ObjectLocation.Verify(); err != nil {
|
||||
if err := opts.ObjectStream.Verify(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if opts.Version <= 0 {
|
||||
return ErrInvalidRequest.New("Version invalid: %v", opts.Version)
|
||||
}
|
||||
|
||||
if opts.StreamID.IsZero() {
|
||||
return ErrInvalidRequest.New("StreamID missing")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -209,7 +198,7 @@ func (db *DB) DeletePendingObject(ctx context.Context, opts DeletePendingObject)
|
||||
FROM deleted_objects
|
||||
LEFT JOIN deleted_segments ON deleted_objects.stream_id = deleted_segments.stream_id
|
||||
`, opts.ProjectID, []byte(opts.BucketName), []byte(opts.ObjectKey), opts.Version, opts.StreamID))(func(rows tagsql.Rows) error {
|
||||
result.Objects, result.Segments, err = db.scanObjectDeletion(ctx, opts.ObjectLocation, rows)
|
||||
result.Objects, result.Segments, err = db.scanObjectDeletion(ctx, opts.Location(), rows)
|
||||
return err
|
||||
})
|
||||
|
||||
|
@ -18,20 +18,15 @@ import (
|
||||
func TestDeletePendingObject(t *testing.T) {
|
||||
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||
obj := metabasetest.RandObjectStream()
|
||||
|
||||
location := obj.Location()
|
||||
|
||||
now := time.Now()
|
||||
|
||||
for _, test := range metabasetest.InvalidObjectLocations(location) {
|
||||
for _, test := range metabasetest.InvalidObjectStreams(obj) {
|
||||
test := test
|
||||
t.Run(test.Name, func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
metabasetest.DeletePendingObject{
|
||||
Opts: metabase.DeletePendingObject{
|
||||
StreamID: obj.StreamID,
|
||||
Version: 1,
|
||||
ObjectLocation: test.ObjectLocation,
|
||||
ObjectStream: test.ObjectStream,
|
||||
},
|
||||
ErrClass: test.ErrClass,
|
||||
ErrText: test.ErrText,
|
||||
@ -40,44 +35,12 @@ func TestDeletePendingObject(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("Version invalid", func(t *testing.T) {
|
||||
t.Run("object missing", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.DeletePendingObject{
|
||||
Opts: metabase.DeletePendingObject{
|
||||
StreamID: obj.StreamID,
|
||||
Version: 0,
|
||||
ObjectLocation: location,
|
||||
},
|
||||
ErrClass: &metabase.ErrInvalidRequest,
|
||||
ErrText: "Version invalid: 0",
|
||||
}.Check(ctx, t, db)
|
||||
metabasetest.Verify{}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("StreamID missing", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.DeletePendingObject{
|
||||
Opts: metabase.DeletePendingObject{
|
||||
StreamID: uuid.UUID{},
|
||||
Version: 1,
|
||||
ObjectLocation: location,
|
||||
},
|
||||
ErrClass: &metabase.ErrInvalidRequest,
|
||||
ErrText: "StreamID missing",
|
||||
}.Check(ctx, t, db)
|
||||
metabasetest.Verify{}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("Object missing", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.DeletePendingObject{
|
||||
Opts: metabase.DeletePendingObject{
|
||||
StreamID: obj.StreamID,
|
||||
Version: 1,
|
||||
ObjectLocation: location,
|
||||
ObjectStream: obj,
|
||||
},
|
||||
ErrClass: &storj.ErrObjectNotFound,
|
||||
ErrText: "metabase: no rows deleted",
|
||||
@ -85,7 +48,7 @@ func TestDeletePendingObject(t *testing.T) {
|
||||
metabasetest.Verify{}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("Delete non existing object version", func(t *testing.T) {
|
||||
t.Run("non existing object version", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.BeginObjectExactVersion{
|
||||
@ -98,9 +61,13 @@ func TestDeletePendingObject(t *testing.T) {
|
||||
|
||||
metabasetest.DeletePendingObject{
|
||||
Opts: metabase.DeletePendingObject{
|
||||
StreamID: obj.StreamID,
|
||||
Version: 33,
|
||||
ObjectLocation: location,
|
||||
ObjectStream: metabase.ObjectStream{
|
||||
ProjectID: obj.ProjectID,
|
||||
BucketName: obj.BucketName,
|
||||
ObjectKey: obj.ObjectKey,
|
||||
Version: 33,
|
||||
StreamID: obj.StreamID,
|
||||
},
|
||||
},
|
||||
ErrClass: &storj.ErrObjectNotFound,
|
||||
ErrText: "metabase: no rows deleted",
|
||||
@ -118,16 +85,14 @@ func TestDeletePendingObject(t *testing.T) {
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("Delete committed object", func(t *testing.T) {
|
||||
t.Run("delete committed object", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
object := metabasetest.CreateObject(ctx, t, db, obj, 0)
|
||||
|
||||
metabasetest.DeletePendingObject{
|
||||
Opts: metabase.DeletePendingObject{
|
||||
StreamID: object.StreamID,
|
||||
Version: 1,
|
||||
ObjectLocation: object.Location(),
|
||||
ObjectStream: object.ObjectStream,
|
||||
},
|
||||
ErrClass: &storj.ErrObjectNotFound,
|
||||
ErrText: "metabase: no rows deleted",
|
||||
@ -146,7 +111,7 @@ func TestDeletePendingObject(t *testing.T) {
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("Delete pending object without segments with wrong StreamID", func(t *testing.T) {
|
||||
t.Run("without segments with wrong StreamID", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.BeginObjectExactVersion{
|
||||
@ -159,9 +124,13 @@ func TestDeletePendingObject(t *testing.T) {
|
||||
|
||||
metabasetest.DeletePendingObject{
|
||||
Opts: metabase.DeletePendingObject{
|
||||
StreamID: uuid.UUID{33},
|
||||
Version: 1,
|
||||
ObjectLocation: location,
|
||||
ObjectStream: metabase.ObjectStream{
|
||||
ProjectID: obj.ProjectID,
|
||||
BucketName: obj.BucketName,
|
||||
ObjectKey: obj.ObjectKey,
|
||||
Version: obj.Version,
|
||||
StreamID: uuid.UUID{33},
|
||||
},
|
||||
},
|
||||
Result: metabase.DeleteObjectResult{},
|
||||
ErrClass: &storj.ErrObjectNotFound,
|
||||
@ -181,7 +150,7 @@ func TestDeletePendingObject(t *testing.T) {
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("Delete pending object without segments", func(t *testing.T) {
|
||||
t.Run("without segments", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.BeginObjectExactVersion{
|
||||
@ -200,9 +169,7 @@ func TestDeletePendingObject(t *testing.T) {
|
||||
}
|
||||
metabasetest.DeletePendingObject{
|
||||
Opts: metabase.DeletePendingObject{
|
||||
StreamID: obj.StreamID,
|
||||
Version: 1,
|
||||
ObjectLocation: location,
|
||||
ObjectStream: obj,
|
||||
},
|
||||
Result: metabase.DeleteObjectResult{
|
||||
Objects: []metabase.Object{metabase.Object(object)},
|
||||
@ -212,7 +179,7 @@ func TestDeletePendingObject(t *testing.T) {
|
||||
metabasetest.Verify{}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("Delete pending object with segments", func(t *testing.T) {
|
||||
t.Run("with segments", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.CreatePendingObject(ctx, t, db, obj, 2)
|
||||
@ -224,9 +191,7 @@ func TestDeletePendingObject(t *testing.T) {
|
||||
|
||||
metabasetest.DeletePendingObject{
|
||||
Opts: metabase.DeletePendingObject{
|
||||
StreamID: obj.StreamID,
|
||||
Version: 1,
|
||||
ObjectLocation: location,
|
||||
ObjectStream: obj,
|
||||
},
|
||||
Result: metabase.DeleteObjectResult{
|
||||
Objects: []metabase.Object{
|
||||
@ -244,7 +209,7 @@ func TestDeletePendingObject(t *testing.T) {
|
||||
metabasetest.Verify{}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("Delete pending object with inline segment", func(t *testing.T) {
|
||||
t.Run("with inline segment", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.BeginObjectExactVersion{
|
||||
@ -272,9 +237,7 @@ func TestDeletePendingObject(t *testing.T) {
|
||||
|
||||
metabasetest.DeletePendingObject{
|
||||
Opts: metabase.DeletePendingObject{
|
||||
StreamID: obj.StreamID,
|
||||
Version: 1,
|
||||
ObjectLocation: location,
|
||||
ObjectStream: obj,
|
||||
},
|
||||
Result: metabase.DeleteObjectResult{
|
||||
Objects: []metabase.Object{
|
||||
|
@ -71,7 +71,14 @@ func TestEndpoint_DeletePendingObject(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Len(t, items, 1)
|
||||
|
||||
deletedObjects, err := planet.Satellites[0].Metainfo.Endpoint2.DeletePendingObject(ctx, projectID, bucketName, items[0].ObjectKey, 1, items[0].StreamID)
|
||||
deletedObjects, err := planet.Satellites[0].Metainfo.Endpoint2.DeletePendingObject(ctx,
|
||||
metabase.ObjectStream{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
ObjectKey: items[0].ObjectKey,
|
||||
Version: 1,
|
||||
StreamID: items[0].StreamID,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, deletedObjects, 1)
|
||||
|
||||
|
@ -1350,7 +1350,14 @@ func (endpoint *Endpoint) BeginDeleteObject(ctx context.Context, req *pb.ObjectB
|
||||
var streamID uuid.UUID
|
||||
streamID, err = uuid.FromBytes(pbStreamID.StreamId)
|
||||
if err == nil {
|
||||
deletedObjects, err = endpoint.DeletePendingObject(ctx, keyInfo.ProjectID, string(req.Bucket), metabase.ObjectKey(req.EncryptedPath), req.GetVersion(), streamID)
|
||||
deletedObjects, err = endpoint.DeletePendingObject(ctx,
|
||||
metabase.ObjectStream{
|
||||
ProjectID: keyInfo.ProjectID,
|
||||
BucketName: string(req.Bucket),
|
||||
ObjectKey: metabase.ObjectKey(req.EncryptedPath),
|
||||
Version: metabase.Version(req.GetVersion()),
|
||||
StreamID: streamID,
|
||||
})
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@ -2279,15 +2286,9 @@ func (endpoint *Endpoint) DeleteObjectAnyStatus(ctx context.Context, location me
|
||||
//
|
||||
// NOTE: this method is exported for being able to individually test it without
|
||||
// having import cycles.
|
||||
func (endpoint *Endpoint) DeletePendingObject(ctx context.Context, projectID uuid.UUID, bucket string, objectKey metabase.ObjectKey, version int32, streamID uuid.UUID) (deletedObjects []*pb.Object, err error) {
|
||||
func (endpoint *Endpoint) DeletePendingObject(ctx context.Context, stream metabase.ObjectStream) (deletedObjects []*pb.Object, err error) {
|
||||
req := metabase.DeletePendingObject{
|
||||
ObjectLocation: metabase.ObjectLocation{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucket,
|
||||
ObjectKey: objectKey,
|
||||
},
|
||||
Version: metabase.Version(version),
|
||||
StreamID: streamID,
|
||||
ObjectStream: stream,
|
||||
}
|
||||
result, err := endpoint.metainfo.metabaseDB.DeletePendingObject(ctx, req)
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user