satellite/metainfo: Override pending object on upload
On upload we need to override pending and committed object. This change is adjusting DeleteObjectAllVersions to delete both. Change-Id: Ib66c2af207c618119f7bf0de7fa9d3e5145d8641
This commit is contained in:
parent
95320912b4
commit
38beecc7ad
@ -169,6 +169,8 @@ type MetabaseDB interface {
|
||||
io.Closer
|
||||
// MigrateToLatest migrates to latest schema version.
|
||||
MigrateToLatest(ctx context.Context) error
|
||||
// DeleteObjectAnyStatusAllVersions deletes all object versions.
|
||||
DeleteObjectAnyStatusAllVersions(ctx context.Context, opts metabase.DeleteObjectAnyStatusAllVersions) (result metabase.DeleteObjectResult, err error)
|
||||
// DeleteObjectsAllVersions deletes all versions of multiple objects from the same bucket.
|
||||
DeleteObjectsAllVersions(ctx context.Context, opts metabase.DeleteObjectsAllVersions) (result metabase.DeleteObjectResult, err error)
|
||||
// DeletePendingObject deletes a pending object.
|
||||
|
@ -16,6 +16,7 @@ import (
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/testrand"
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/satellite/metainfo/metabase"
|
||||
"storj.io/uplink"
|
||||
)
|
||||
|
||||
@ -74,6 +75,69 @@ func TestEndpoint_DeletePendingObject(t *testing.T) {
|
||||
testDeleteObject(t, createObject, deleteObject)
|
||||
}
|
||||
|
||||
func TestEndpoint_DeleteObjectAnyStatus(t *testing.T) {
|
||||
bucketName := "a-bucket"
|
||||
createCommittedObject := func(ctx context.Context, t *testing.T, planet *testplanet.Planet, data []byte) {
|
||||
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, "object-filename", data)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
deleteCommittedObject := func(ctx context.Context, t *testing.T, planet *testplanet.Planet) {
|
||||
projectID := planet.Uplinks[0].Projects[0].ID
|
||||
items, err := planet.Satellites[0].Metainfo.Metabase.TestingAllCommittedObjects(ctx, projectID, bucketName)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, items, 1)
|
||||
|
||||
deletedObjects, err := planet.Satellites[0].Metainfo.Endpoint2.DeleteObjectAnyStatus(ctx, metabase.ObjectLocation{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
ObjectKey: items[0].ObjectKey,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, deletedObjects, 1)
|
||||
|
||||
items, err = planet.Satellites[0].Metainfo.Metabase.TestingAllPendingObjects(ctx, projectID, bucketName)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, items, 0)
|
||||
}
|
||||
testDeleteObject(t, createCommittedObject, deleteCommittedObject)
|
||||
|
||||
createPendingObject := func(ctx context.Context, t *testing.T, planet *testplanet.Planet, data []byte) {
|
||||
// TODO This should be replaced by a call to testplanet.Uplink.MultipartUpload when available.
|
||||
project, err := planet.Uplinks[0].GetProject(ctx, planet.Satellites[0])
|
||||
require.NoError(t, err, "failed to retrieve project")
|
||||
|
||||
_, err = project.CreateBucket(ctx, bucketName)
|
||||
require.NoError(t, err, "failed to create bucket")
|
||||
|
||||
info, err := project.NewMultipartUpload(ctx, bucketName, "object-filename", &uplink.MultipartUploadOptions{})
|
||||
require.NoError(t, err, "failed to start multipart upload")
|
||||
|
||||
_, err = project.PutObjectPart(ctx, bucketName, bucketName, info.StreamID, 1, bytes.NewReader(data))
|
||||
require.NoError(t, err, "failed to put object part")
|
||||
}
|
||||
|
||||
deletePendingObject := func(ctx context.Context, t *testing.T, planet *testplanet.Planet) {
|
||||
projectID := planet.Uplinks[0].Projects[0].ID
|
||||
items, err := planet.Satellites[0].Metainfo.Metabase.TestingAllPendingObjects(ctx, projectID, bucketName)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, items, 1)
|
||||
|
||||
deletedObjects, err := planet.Satellites[0].Metainfo.Endpoint2.DeleteObjectAnyStatus(ctx, metabase.ObjectLocation{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketName,
|
||||
ObjectKey: items[0].ObjectKey,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, deletedObjects, 1)
|
||||
|
||||
items, err = planet.Satellites[0].Metainfo.Metabase.TestingAllPendingObjects(ctx, projectID, bucketName)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, items, 0)
|
||||
}
|
||||
|
||||
testDeleteObject(t, createPendingObject, deletePendingObject)
|
||||
}
|
||||
|
||||
func testDeleteObject(t *testing.T, createObject func(ctx context.Context, t *testing.T, planet *testplanet.Planet,
|
||||
data []byte), deleteObject func(ctx context.Context, t *testing.T, planet *testplanet.Planet)) {
|
||||
t.Run("all nodes up", func(t *testing.T) {
|
||||
|
@ -48,8 +48,8 @@ type DeletedSegmentInfo struct {
|
||||
Pieces Pieces
|
||||
}
|
||||
|
||||
// DeleteObjectAllVersions contains arguments necessary for deleting all object versions.
|
||||
type DeleteObjectAllVersions struct {
|
||||
// DeleteObjectAnyStatusAllVersions contains arguments necessary for deleting all object versions.
|
||||
type DeleteObjectAnyStatusAllVersions struct {
|
||||
ObjectLocation
|
||||
}
|
||||
|
||||
@ -311,8 +311,8 @@ func (db *DB) DeleteObjectLatestVersion(ctx context.Context, opts DeleteObjectLa
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// DeleteObjectAllVersions deletes all object versions.
|
||||
func (db *DB) DeleteObjectAllVersions(ctx context.Context, opts DeleteObjectAllVersions) (result DeleteObjectResult, err error) {
|
||||
// DeleteObjectAnyStatusAllVersions deletes all object versions.
|
||||
func (db *DB) DeleteObjectAnyStatusAllVersions(ctx context.Context, opts DeleteObjectAnyStatusAllVersions) (result DeleteObjectResult, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if err := opts.Verify(); err != nil {
|
||||
@ -324,8 +324,7 @@ func (db *DB) DeleteObjectAllVersions(ctx context.Context, opts DeleteObjectAllV
|
||||
WHERE
|
||||
project_id = $1 AND
|
||||
bucket_name = $2 AND
|
||||
object_key = $3 AND
|
||||
status = `+committedStatus+`
|
||||
object_key = $3
|
||||
RETURNING
|
||||
version, stream_id,
|
||||
created_at, expires_at,
|
||||
|
@ -776,7 +776,7 @@ func TestDeleteObjectLatestVersion(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestDeleteObjectAllVersions(t *testing.T) {
|
||||
func TestDeleteObjectAnyStatusAllVersions(t *testing.T) {
|
||||
All(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||
obj := randObjectStream()
|
||||
|
||||
@ -788,8 +788,8 @@ func TestDeleteObjectAllVersions(t *testing.T) {
|
||||
test := test
|
||||
t.Run(test.Name, func(t *testing.T) {
|
||||
defer DeleteAll{}.Check(ctx, t, db)
|
||||
DeleteObjectAllVersions{
|
||||
Opts: metabase.DeleteObjectAllVersions{ObjectLocation: test.ObjectLocation},
|
||||
DeleteObjectAnyStatusAllVersions{
|
||||
Opts: metabase.DeleteObjectAnyStatusAllVersions{ObjectLocation: test.ObjectLocation},
|
||||
ErrClass: test.ErrClass,
|
||||
ErrText: test.ErrText,
|
||||
}.Check(ctx, t, db)
|
||||
@ -800,8 +800,8 @@ func TestDeleteObjectAllVersions(t *testing.T) {
|
||||
t.Run("Object missing", func(t *testing.T) {
|
||||
defer DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
DeleteObjectAllVersions{
|
||||
Opts: metabase.DeleteObjectAllVersions{ObjectLocation: obj.Location()},
|
||||
DeleteObjectAnyStatusAllVersions{
|
||||
Opts: metabase.DeleteObjectAnyStatusAllVersions{ObjectLocation: obj.Location()},
|
||||
ErrClass: &storj.ErrObjectNotFound,
|
||||
ErrText: "metabase: no rows deleted",
|
||||
}.Check(ctx, t, db)
|
||||
@ -811,8 +811,8 @@ func TestDeleteObjectAllVersions(t *testing.T) {
|
||||
t.Run("Delete non existing object version", func(t *testing.T) {
|
||||
defer DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
DeleteObjectAllVersions{
|
||||
Opts: metabase.DeleteObjectAllVersions{ObjectLocation: obj.Location()},
|
||||
DeleteObjectAnyStatusAllVersions{
|
||||
Opts: metabase.DeleteObjectAnyStatusAllVersions{ObjectLocation: obj.Location()},
|
||||
ErrClass: &storj.ErrObjectNotFound,
|
||||
ErrText: "metabase: no rows deleted",
|
||||
}.Check(ctx, t, db)
|
||||
@ -830,23 +830,20 @@ func TestDeleteObjectAllVersions(t *testing.T) {
|
||||
Version: 1,
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
DeleteObjectAllVersions{
|
||||
Opts: metabase.DeleteObjectAllVersions{ObjectLocation: obj.Location()},
|
||||
ErrClass: &storj.ErrObjectNotFound,
|
||||
ErrText: "metabase: no rows deleted",
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
{
|
||||
DeleteObjectAnyStatusAllVersions{
|
||||
Opts: metabase.DeleteObjectAnyStatusAllVersions{ObjectLocation: obj.Location()},
|
||||
Result: metabase.DeleteObjectResult{
|
||||
Objects: []metabase.Object{{
|
||||
ObjectStream: obj,
|
||||
CreatedAt: now,
|
||||
Status: metabase.Pending,
|
||||
|
||||
Encryption: defaultTestEncryption,
|
||||
},
|
||||
}},
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
Verify{}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("Delete object without segments", func(t *testing.T) {
|
||||
@ -865,8 +862,8 @@ func TestDeleteObjectAllVersions(t *testing.T) {
|
||||
},
|
||||
}.Run(ctx, t, db, obj, 0)
|
||||
|
||||
DeleteObjectAllVersions{
|
||||
Opts: metabase.DeleteObjectAllVersions{ObjectLocation: obj.Location()},
|
||||
DeleteObjectAnyStatusAllVersions{
|
||||
Opts: metabase.DeleteObjectAnyStatusAllVersions{ObjectLocation: obj.Location()},
|
||||
Result: metabase.DeleteObjectResult{
|
||||
Objects: []metabase.Object{object},
|
||||
},
|
||||
@ -885,8 +882,8 @@ func TestDeleteObjectAllVersions(t *testing.T) {
|
||||
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
|
||||
}
|
||||
|
||||
DeleteObjectAllVersions{
|
||||
Opts: metabase.DeleteObjectAllVersions{
|
||||
DeleteObjectAnyStatusAllVersions{
|
||||
Opts: metabase.DeleteObjectAnyStatusAllVersions{
|
||||
ObjectLocation: location,
|
||||
},
|
||||
Result: metabase.DeleteObjectResult{
|
||||
@ -930,8 +927,8 @@ func TestDeleteObjectAllVersions(t *testing.T) {
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
DeleteObjectAllVersions{
|
||||
Opts: metabase.DeleteObjectAllVersions{ObjectLocation: obj.Location()},
|
||||
DeleteObjectAnyStatusAllVersions{
|
||||
Opts: metabase.DeleteObjectAnyStatusAllVersions{ObjectLocation: obj.Location()},
|
||||
Result: metabase.DeleteObjectResult{
|
||||
Objects: []metabase.Object{object},
|
||||
},
|
||||
@ -956,8 +953,8 @@ func TestDeleteObjectAllVersions(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
DeleteObjectAllVersions{
|
||||
Opts: metabase.DeleteObjectAllVersions{ObjectLocation: obj.Location()},
|
||||
DeleteObjectAnyStatusAllVersions{
|
||||
Opts: metabase.DeleteObjectAnyStatusAllVersions{ObjectLocation: obj.Location()},
|
||||
Result: expected,
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
|
@ -296,15 +296,15 @@ func (step DeleteObjectLatestVersion) Check(ctx *testcontext.Context, t *testing
|
||||
require.Zero(t, diff)
|
||||
}
|
||||
|
||||
type DeleteObjectAllVersions struct {
|
||||
Opts metabase.DeleteObjectAllVersions
|
||||
type DeleteObjectAnyStatusAllVersions struct {
|
||||
Opts metabase.DeleteObjectAnyStatusAllVersions
|
||||
Result metabase.DeleteObjectResult
|
||||
ErrClass *errs.Class
|
||||
ErrText string
|
||||
}
|
||||
|
||||
func (step DeleteObjectAllVersions) Check(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||
result, err := db.DeleteObjectAllVersions(ctx, step.Opts)
|
||||
func (step DeleteObjectAnyStatusAllVersions) Check(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||
result, err := db.DeleteObjectAnyStatusAllVersions(ctx, step.Opts)
|
||||
checkError(t, err, step.ErrClass, step.ErrText)
|
||||
|
||||
diff := cmp.Diff(step.Result, result, cmpopts.EquateApproxTime(5*time.Second))
|
||||
|
@ -630,8 +630,12 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe
|
||||
canDelete := err == nil
|
||||
|
||||
if canDelete {
|
||||
_, err = endpoint.DeleteCommittedObject(ctx, keyInfo.ProjectID, string(req.Bucket), metabase.ObjectKey(req.EncryptedPath))
|
||||
if err != nil {
|
||||
_, err = endpoint.DeleteObjectAnyStatus(ctx, metabase.ObjectLocation{
|
||||
ProjectID: keyInfo.ProjectID,
|
||||
BucketName: string(req.Bucket),
|
||||
ObjectKey: metabase.ObjectKey(req.EncryptedPath),
|
||||
})
|
||||
if err != nil && !storj.ErrObjectNotFound.Has(err) {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
@ -1807,6 +1811,36 @@ func (endpoint *Endpoint) DeleteCommittedObject(
|
||||
return deletedObjects, nil
|
||||
}
|
||||
|
||||
// DeleteObjectAnyStatus deletes all the pieces of the storage nodes that belongs
|
||||
// to the specified object.
|
||||
//
|
||||
// NOTE: this method is exported for being able to individually test it without
|
||||
// having import cycles.
|
||||
func (endpoint *Endpoint) DeleteObjectAnyStatus(ctx context.Context, location metabase.ObjectLocation,
|
||||
) (deletedObjects []*pb.Object, err error) {
|
||||
defer mon.Task()(&ctx, location.ProjectID.String(), location.BucketName, location.ObjectKey)(&err)
|
||||
|
||||
result, err := endpoint.metainfo.metabaseDB.DeleteObjectAnyStatusAllVersions(ctx, metabase.DeleteObjectAnyStatusAllVersions{
|
||||
ObjectLocation: location,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
deletedObjects, err = endpoint.deleteObjectsPieces(ctx, result)
|
||||
if err != nil {
|
||||
endpoint.log.Error("failed to delete pointers",
|
||||
zap.Stringer("project", location.ProjectID),
|
||||
zap.String("bucket", location.BucketName),
|
||||
zap.Binary("object", []byte(location.ObjectKey)),
|
||||
zap.Error(err),
|
||||
)
|
||||
return deletedObjects, err
|
||||
}
|
||||
|
||||
return deletedObjects, nil
|
||||
}
|
||||
|
||||
// DeletePendingObject deletes all the pieces of the storage nodes that belongs
|
||||
// to the specified pending object.
|
||||
//
|
||||
|
@ -1780,3 +1780,47 @@ func TestMultipartObjectDownloadRejection(t *testing.T) {
|
||||
require.EqualError(t, err, "metainfo error: Used uplink version cannot download multipart objects.")
|
||||
})
|
||||
}
|
||||
|
||||
func TestObjectOverrideOnUpload(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
|
||||
initialData := testrand.Bytes(20 * memory.KB)
|
||||
overrideData := testrand.Bytes(25 * memory.KB)
|
||||
|
||||
{ // committed object
|
||||
|
||||
// upload committed object
|
||||
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "pip-first", "committed-object", initialData)
|
||||
require.NoError(t, err)
|
||||
|
||||
// upload once again to override
|
||||
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "pip-first", "committed-object", overrideData)
|
||||
require.NoError(t, err)
|
||||
|
||||
data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], "pip-first", "committed-object")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, overrideData, data)
|
||||
}
|
||||
|
||||
{ // pending object
|
||||
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
|
||||
require.NoError(t, err)
|
||||
|
||||
// upload pending object
|
||||
info, err := project.NewMultipartUpload(ctx, "pip-first", "pending-object", nil)
|
||||
require.NoError(t, err)
|
||||
_, err = project.PutObjectPart(ctx, "pip-first", "pending-object", info.StreamID, 1, bytes.NewReader(initialData))
|
||||
require.NoError(t, err)
|
||||
|
||||
// upload once again to override
|
||||
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "pip-first", "pending-object", overrideData)
|
||||
require.NoError(t, err)
|
||||
|
||||
data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], "pip-first", "pending-object")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, overrideData, data)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user