uplink: Suppress one metainfo call on delete (#3511)
Change signature of metainfo DeleteObject to get rid of an extra call to kvmetainfo GetBucket method and eliminate one round trip to the satellite when deleting objects.
This commit is contained in:
parent
53a060cdfa
commit
e4a220347a
@ -111,7 +111,7 @@ func (b *Bucket) UploadObject(ctx context.Context, path storj.Path, data io.Read
|
||||
// DeleteObject removes an object, if authorized.
|
||||
func (b *Bucket) DeleteObject(ctx context.Context, path storj.Path) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
return b.metainfo.DeleteObject(ctx, b.bucket.Name, path)
|
||||
return b.metainfo.DeleteObject(ctx, b.bucket, path)
|
||||
}
|
||||
|
||||
// ListOptions controls options for the ListObjects() call.
|
||||
|
@ -101,7 +101,7 @@ func TestDeleteBucket(t *testing.T) {
|
||||
assert.Equal(t, minio.BucketNotFound{Bucket: TestBucket}, err)
|
||||
|
||||
// Create a bucket with a file using the Metainfo API
|
||||
_, err = m.CreateBucket(ctx, TestBucket, nil)
|
||||
bucket, err := m.CreateBucket(ctx, TestBucket, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
_, err = createFile(ctx, m, strms, TestBucket, TestFile, nil, nil)
|
||||
@ -112,7 +112,7 @@ func TestDeleteBucket(t *testing.T) {
|
||||
assert.Equal(t, minio.BucketNotEmpty{Bucket: TestBucket}, err)
|
||||
|
||||
// Delete the file using the Metainfo API, so the bucket becomes empty
|
||||
err = m.DeleteObject(ctx, TestBucket, TestFile)
|
||||
err = m.DeleteObject(ctx, bucket, TestFile)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Delete the bucket info using the Minio API
|
||||
|
@ -33,7 +33,7 @@ type Metainfo interface {
|
||||
// ModifyObject creates a mutable object for updating a partially uploaded object
|
||||
ModifyObject(ctx context.Context, bucket string, path Path) (MutableObject, error)
|
||||
// DeleteObject deletes an object from database
|
||||
DeleteObject(ctx context.Context, bucket string, path Path) error
|
||||
DeleteObject(ctx context.Context, bucket Bucket, path Path) error
|
||||
// ListObjects lists objects in bucket based on the ListOptions
|
||||
ListObjects(ctx context.Context, bucket string, options ListOptions) (ObjectList, error)
|
||||
|
||||
|
@ -14,7 +14,6 @@ import (
|
||||
"storj.io/storj/pkg/paths"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/storage"
|
||||
"storj.io/storj/uplink/metainfo"
|
||||
"storj.io/storj/uplink/storage/meta"
|
||||
"storj.io/storj/uplink/storage/objects"
|
||||
@ -130,19 +129,16 @@ func (db *DB) ModifyObject(ctx context.Context, bucket string, path storj.Path)
|
||||
}
|
||||
|
||||
// DeleteObject deletes an object from database
|
||||
func (db *DB) DeleteObject(ctx context.Context, bucket string, path storj.Path) (err error) {
|
||||
func (db *DB) DeleteObject(ctx context.Context, bucket storj.Bucket, path storj.Path) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
bucketInfo, err := db.GetBucket(ctx, bucket)
|
||||
if err != nil {
|
||||
if storage.ErrKeyNotFound.Has(err) {
|
||||
err = storj.ErrBucketNotFound.Wrap(err)
|
||||
}
|
||||
return err
|
||||
if bucket.Name == "" {
|
||||
return storj.ErrNoBucket.New("")
|
||||
}
|
||||
|
||||
prefixed := prefixedObjStore{
|
||||
store: objects.NewStore(db.streams, bucketInfo.PathCipher),
|
||||
prefix: bucket,
|
||||
store: objects.NewStore(db.streams, bucket.PathCipher),
|
||||
prefix: bucket.Name,
|
||||
}
|
||||
return prefixed.Delete(ctx, path)
|
||||
}
|
||||
|
@ -249,19 +249,34 @@ func TestDeleteObject(t *testing.T) {
|
||||
|
||||
upload(ctx, t, db, streams, bucket, TestFile, nil)
|
||||
|
||||
err = db.DeleteObject(ctx, "", "")
|
||||
err = db.DeleteObject(ctx, storj.Bucket{}, "")
|
||||
assert.True(t, storj.ErrNoBucket.Has(err))
|
||||
|
||||
err = db.DeleteObject(ctx, bucket.Name, "")
|
||||
err = db.DeleteObject(ctx, bucket, "")
|
||||
assert.True(t, storj.ErrNoPath.Has(err))
|
||||
|
||||
err = db.DeleteObject(ctx, "non-existing-bucket", TestFile)
|
||||
assert.True(t, storj.ErrBucketNotFound.Has(err))
|
||||
{
|
||||
unexistingBucket := storj.Bucket{
|
||||
Name: bucket.Name + "-not-exist",
|
||||
PathCipher: bucket.PathCipher,
|
||||
}
|
||||
err = db.DeleteObject(ctx, unexistingBucket, TestFile)
|
||||
assert.True(t, storj.ErrObjectNotFound.Has(err))
|
||||
}
|
||||
|
||||
err = db.DeleteObject(ctx, bucket.Name, "non-existing-file")
|
||||
err = db.DeleteObject(ctx, bucket, "non-existing-file")
|
||||
assert.True(t, storj.ErrObjectNotFound.Has(err))
|
||||
|
||||
err = db.DeleteObject(ctx, bucket.Name, TestFile)
|
||||
{
|
||||
invalidPathCipherBucket := storj.Bucket{
|
||||
Name: bucket.Name,
|
||||
PathCipher: bucket.PathCipher + 1,
|
||||
}
|
||||
err = db.DeleteObject(ctx, invalidPathCipherBucket, TestFile)
|
||||
assert.True(t, storj.ErrObjectNotFound.Has(err))
|
||||
}
|
||||
|
||||
err = db.DeleteObject(ctx, bucket, TestFile)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user