satellite/{metabase,metainfo}: require StreamID for UpdateObjectMetadata
This avoids corrupting objects if reuploads and metadata updates happen concurrently.
This commit is contained in:
parent
cbbbfca439
commit
a767aed591
2
go.mod
2
go.mod
@ -54,7 +54,7 @@ require (
|
|||||||
google.golang.org/api v0.20.0 // indirect
|
google.golang.org/api v0.20.0 // indirect
|
||||||
gopkg.in/segmentio/analytics-go.v3 v3.1.0
|
gopkg.in/segmentio/analytics-go.v3 v3.1.0
|
||||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c
|
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c
|
||||||
storj.io/common v0.0.0-20210702123130-0f973652e4bb
|
storj.io/common v0.0.0-20210708125041-4882a3ae3eda
|
||||||
storj.io/drpc v0.0.23
|
storj.io/drpc v0.0.23
|
||||||
storj.io/monkit-jaeger v0.0.0-20210426161729-debb1cbcbbd7
|
storj.io/monkit-jaeger v0.0.0-20210426161729-debb1cbcbbd7
|
||||||
storj.io/private v0.0.0-20210625132526-af46b647eda5
|
storj.io/private v0.0.0-20210625132526-af46b647eda5
|
||||||
|
4
go.sum
4
go.sum
@ -871,8 +871,8 @@ sourcegraph.com/sqs/pbtypes v0.0.0-20180604144634-d3ebe8f20ae4/go.mod h1:ketZ/q3
|
|||||||
storj.io/common v0.0.0-20200424175742-65ac59022f4f/go.mod h1:pZyXiIE7bGETIRXtfs0nICqMwp7PM8HqnDuyUeldNA0=
|
storj.io/common v0.0.0-20200424175742-65ac59022f4f/go.mod h1:pZyXiIE7bGETIRXtfs0nICqMwp7PM8HqnDuyUeldNA0=
|
||||||
storj.io/common v0.0.0-20201026135900-1aaeec90670b/go.mod h1:GqdmNf3fLm2UZX/7Zr0BLFCJ4gFjgm6eHrk/fnmr5jQ=
|
storj.io/common v0.0.0-20201026135900-1aaeec90670b/go.mod h1:GqdmNf3fLm2UZX/7Zr0BLFCJ4gFjgm6eHrk/fnmr5jQ=
|
||||||
storj.io/common v0.0.0-20210504141454-bcb03a80052f/go.mod h1:PdP3eTld9RqSV3E4K44JSlw7Z/zNsymj9rnKuHFKhJE=
|
storj.io/common v0.0.0-20210504141454-bcb03a80052f/go.mod h1:PdP3eTld9RqSV3E4K44JSlw7Z/zNsymj9rnKuHFKhJE=
|
||||||
storj.io/common v0.0.0-20210702123130-0f973652e4bb h1:p6cKlUcTJAgOnsW5ILRJ/WsxylRq6ys74ZDkxI+PHXE=
|
storj.io/common v0.0.0-20210708125041-4882a3ae3eda h1:UtmPvs92TUEwCWal1LYfXS7lg+X34tQI5giMqwu0dVo=
|
||||||
storj.io/common v0.0.0-20210702123130-0f973652e4bb/go.mod h1:SN7WwEhNC2PTSuIJDj8184eeBmq+VEDwGqJzrlPera4=
|
storj.io/common v0.0.0-20210708125041-4882a3ae3eda/go.mod h1:SN7WwEhNC2PTSuIJDj8184eeBmq+VEDwGqJzrlPera4=
|
||||||
storj.io/drpc v0.0.11/go.mod h1:TiFc2obNjL9/3isMW1Rpxjy8V9uE0B2HMeMFGiiI7Iw=
|
storj.io/drpc v0.0.11/go.mod h1:TiFc2obNjL9/3isMW1Rpxjy8V9uE0B2HMeMFGiiI7Iw=
|
||||||
storj.io/drpc v0.0.14/go.mod h1:82nfl+6YwRwF6UG31cEWWUqv/FaKvP5SGqUvoqTxCMA=
|
storj.io/drpc v0.0.14/go.mod h1:82nfl+6YwRwF6UG31cEWWUqv/FaKvP5SGqUvoqTxCMA=
|
||||||
storj.io/drpc v0.0.20/go.mod h1:eAxUDk8HWvGl9iqznpuphtZ+WIjIGPJFqNXuKHgRiMM=
|
storj.io/drpc v0.0.20/go.mod h1:eAxUDk8HWvGl9iqznpuphtZ+WIjIGPJFqNXuKHgRiMM=
|
||||||
|
@ -155,16 +155,16 @@ func (step DeleteBucketObjects) Check(ctx *testcontext.Context, t testing.TB, db
|
|||||||
checkError(t, err, step.ErrClass, step.ErrText)
|
checkError(t, err, step.ErrClass, step.ErrText)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetObjectMetadataLatestVersion is for testing metabase.SetObjectMetadataLatestVersion.
|
// UpdateObjectMetadata is for testing metabase.UpdateObjectMetadata.
|
||||||
type SetObjectMetadataLatestVersion struct {
|
type UpdateObjectMetadata struct {
|
||||||
Opts metabase.SetObjectMetadataLatestVersion
|
Opts metabase.UpdateObjectMetadata
|
||||||
ErrClass *errs.Class
|
ErrClass *errs.Class
|
||||||
ErrText string
|
ErrText string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check runs the test.
|
// Check runs the test.
|
||||||
func (step SetObjectMetadataLatestVersion) Check(ctx *testcontext.Context, t testing.TB, db *metabase.DB) {
|
func (step UpdateObjectMetadata) Check(ctx *testcontext.Context, t testing.TB, db *metabase.DB) {
|
||||||
err := db.SetObjectMetadataLatestVersion(ctx, step.Opts)
|
err := db.UpdateObjectMetadata(ctx, step.Opts)
|
||||||
checkError(t, err, step.ErrClass, step.ErrText)
|
checkError(t, err, step.ErrClass, step.ErrText)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -9,23 +9,27 @@ import (
|
|||||||
"storj.io/common/storj"
|
"storj.io/common/storj"
|
||||||
)
|
)
|
||||||
|
|
||||||
// SetObjectMetadataLatestVersion contains arguments necessary for replacing an object metadata.
|
// UpdateObjectMetadata contains arguments necessary for replacing an object metadata.
|
||||||
type SetObjectMetadataLatestVersion struct {
|
type UpdateObjectMetadata struct {
|
||||||
ObjectLocation
|
ObjectStream
|
||||||
|
|
||||||
EncryptedMetadata []byte
|
EncryptedMetadata []byte
|
||||||
EncryptedMetadataNonce []byte
|
EncryptedMetadataNonce []byte
|
||||||
EncryptedMetadataEncryptedKey []byte
|
EncryptedMetadataEncryptedKey []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetObjectMetadataLatestVersion replaces an object metadata.
|
// UpdateObjectMetadata updates an object metadata.
|
||||||
func (db *DB) SetObjectMetadataLatestVersion(ctx context.Context, opts SetObjectMetadataLatestVersion) (err error) {
|
func (db *DB) UpdateObjectMetadata(ctx context.Context, opts UpdateObjectMetadata) (err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
if err := opts.ObjectLocation.Verify(); err != nil {
|
if err := opts.ObjectStream.Verify(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if opts.ObjectStream.Version <= 0 {
|
||||||
|
return ErrInvalidRequest.New("Version invalid: %v", opts.Version)
|
||||||
|
}
|
||||||
|
|
||||||
// TODO So the issue is that during a multipart upload of an object,
|
// TODO So the issue is that during a multipart upload of an object,
|
||||||
// uplink can update object metadata. If we add the arguments EncryptedMetadata
|
// uplink can update object metadata. If we add the arguments EncryptedMetadata
|
||||||
// to CommitObject, they will need to account for them being optional.
|
// to CommitObject, they will need to account for them being optional.
|
||||||
@ -33,26 +37,17 @@ func (db *DB) SetObjectMetadataLatestVersion(ctx context.Context, opts SetObject
|
|||||||
// during commit object.
|
// during commit object.
|
||||||
result, err := db.db.ExecContext(ctx, `
|
result, err := db.db.ExecContext(ctx, `
|
||||||
UPDATE objects SET
|
UPDATE objects SET
|
||||||
encrypted_metadata_nonce = $4,
|
encrypted_metadata_nonce = $6,
|
||||||
encrypted_metadata = $5,
|
encrypted_metadata = $7,
|
||||||
encrypted_metadata_encrypted_key = $6
|
encrypted_metadata_encrypted_key = $8
|
||||||
FROM (
|
|
||||||
SELECT version, stream_id FROM objects WHERE
|
|
||||||
project_id = $1 AND
|
|
||||||
bucket_name = $2 AND
|
|
||||||
object_key = $3 AND
|
|
||||||
status = `+committedStatus+`
|
|
||||||
ORDER BY version DESC
|
|
||||||
LIMIT 1
|
|
||||||
) AS latest_object
|
|
||||||
WHERE
|
WHERE
|
||||||
project_id = $1 AND
|
project_id = $1 AND
|
||||||
bucket_name = $2 AND
|
bucket_name = $2 AND
|
||||||
object_key = $3 AND
|
object_key = $3 AND
|
||||||
objects.version = latest_object.version AND
|
version = $4 AND
|
||||||
objects.stream_id = latest_object.stream_id AND
|
stream_id = $5 AND
|
||||||
status = `+committedStatus,
|
status = `+committedStatus,
|
||||||
opts.ProjectID, []byte(opts.BucketName), []byte(opts.ObjectKey),
|
opts.ProjectID, []byte(opts.BucketName), []byte(opts.ObjectKey), opts.Version, opts.StreamID,
|
||||||
opts.EncryptedMetadataNonce, opts.EncryptedMetadata, opts.EncryptedMetadataEncryptedKey)
|
opts.EncryptedMetadataNonce, opts.EncryptedMetadata, opts.EncryptedMetadataEncryptedKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Error.New("unable to update object metadata: %w", err)
|
return Error.New("unable to update object metadata: %w", err)
|
||||||
@ -65,7 +60,7 @@ func (db *DB) SetObjectMetadataLatestVersion(ctx context.Context, opts SetObject
|
|||||||
|
|
||||||
if affected == 0 {
|
if affected == 0 {
|
||||||
return storj.ErrObjectNotFound.Wrap(
|
return storj.ErrObjectNotFound.Wrap(
|
||||||
Error.New("object with specified committed status is missing"),
|
Error.New("object with specified version and committed status is missing"),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -14,20 +14,18 @@ import (
|
|||||||
"storj.io/storj/satellite/metabase/metabasetest"
|
"storj.io/storj/satellite/metabase/metabasetest"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestSetObjectMetadataLatestVersion(t *testing.T) {
|
func TestUpdateObjectMetadata(t *testing.T) {
|
||||||
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||||
obj := metabasetest.RandObjectStream()
|
obj := metabasetest.RandObjectStream()
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
location := obj.Location()
|
for _, test := range metabasetest.InvalidObjectStreams(obj) {
|
||||||
|
|
||||||
for _, test := range metabasetest.InvalidObjectLocations(location) {
|
|
||||||
test := test
|
test := test
|
||||||
t.Run(test.Name, func(t *testing.T) {
|
t.Run(test.Name, func(t *testing.T) {
|
||||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||||
metabasetest.SetObjectMetadataLatestVersion{
|
metabasetest.UpdateObjectMetadata{
|
||||||
Opts: metabase.SetObjectMetadataLatestVersion{
|
Opts: metabase.UpdateObjectMetadata{
|
||||||
ObjectLocation: test.ObjectLocation,
|
ObjectStream: test.ObjectStream,
|
||||||
},
|
},
|
||||||
ErrClass: test.ErrClass,
|
ErrClass: test.ErrClass,
|
||||||
ErrText: test.ErrText,
|
ErrText: test.ErrText,
|
||||||
@ -36,15 +34,34 @@ func TestSetObjectMetadataLatestVersion(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
t.Run("Version invalid", func(t *testing.T) {
|
||||||
|
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
metabasetest.UpdateObjectMetadata{
|
||||||
|
Opts: metabase.UpdateObjectMetadata{
|
||||||
|
ObjectStream: metabase.ObjectStream{
|
||||||
|
ProjectID: obj.ProjectID,
|
||||||
|
BucketName: obj.BucketName,
|
||||||
|
ObjectKey: obj.ObjectKey,
|
||||||
|
Version: 0,
|
||||||
|
StreamID: obj.StreamID,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
ErrClass: &metabase.ErrInvalidRequest,
|
||||||
|
ErrText: "Version invalid: 0",
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
metabasetest.Verify{}.Check(ctx, t, db)
|
||||||
|
})
|
||||||
|
|
||||||
t.Run("Object missing", func(t *testing.T) {
|
t.Run("Object missing", func(t *testing.T) {
|
||||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||||
|
|
||||||
metabasetest.SetObjectMetadataLatestVersion{
|
metabasetest.UpdateObjectMetadata{
|
||||||
Opts: metabase.SetObjectMetadataLatestVersion{
|
Opts: metabase.UpdateObjectMetadata{
|
||||||
ObjectLocation: location,
|
ObjectStream: obj,
|
||||||
},
|
},
|
||||||
ErrClass: &storj.ErrObjectNotFound,
|
ErrClass: &storj.ErrObjectNotFound,
|
||||||
ErrText: "metabase: object with specified committed status is missing",
|
ErrText: "metabase: object with specified version and committed status is missing",
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
metabasetest.Verify{}.Check(ctx, t, db)
|
metabasetest.Verify{}.Check(ctx, t, db)
|
||||||
})
|
})
|
||||||
@ -69,9 +86,9 @@ func TestSetObjectMetadataLatestVersion(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
metabasetest.SetObjectMetadataLatestVersion{
|
metabasetest.UpdateObjectMetadata{
|
||||||
Opts: metabase.SetObjectMetadataLatestVersion{
|
Opts: metabase.UpdateObjectMetadata{
|
||||||
ObjectLocation: location,
|
ObjectStream: obj,
|
||||||
EncryptedMetadata: encryptedMetadata,
|
EncryptedMetadata: encryptedMetadata,
|
||||||
EncryptedMetadataNonce: encryptedMetadataNonce[:],
|
EncryptedMetadataNonce: encryptedMetadataNonce[:],
|
||||||
EncryptedMetadataEncryptedKey: encryptedMetadataKey,
|
EncryptedMetadataEncryptedKey: encryptedMetadataKey,
|
||||||
|
@ -585,11 +585,15 @@ func TestEndpoint_UpdateObjectMetadata(t *testing.T) {
|
|||||||
objects, err := satelliteSys.API.Metainfo.Metabase.TestingAllObjects(ctx)
|
objects, err := satelliteSys.API.Metainfo.Metabase.TestingAllObjects(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Len(t, objects, 1)
|
require.Len(t, objects, 1)
|
||||||
// TODO: we expect no encrypted metadata at this point, but for some reason there are 111 bytes of metadata.
|
|
||||||
// assert.Nil(t, objects[0].EncryptedMetadata, len(objects[0].EncryptedMetadata))
|
getResp, err := satelliteSys.API.Metainfo.Endpoint.GetObject(ctx, &pb.ObjectGetRequest{
|
||||||
assert.Nil(t, objects[0].EncryptedMetadataEncryptedKey)
|
Header: &pb.RequestHeader{
|
||||||
zeroNonce := storj.Nonce{}
|
ApiKey: apiKey.SerializeRaw(),
|
||||||
assert.Equal(t, zeroNonce[:], objects[0].EncryptedMetadataNonce)
|
},
|
||||||
|
Bucket: []byte("testbucket"),
|
||||||
|
EncryptedPath: []byte(objects[0].ObjectKey),
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
testEncryptedMetadata := testrand.BytesInt(64)
|
testEncryptedMetadata := testrand.BytesInt(64)
|
||||||
testEncryptedMetadataEncryptedKey := testrand.BytesInt(32)
|
testEncryptedMetadataEncryptedKey := testrand.BytesInt(32)
|
||||||
@ -600,8 +604,10 @@ func TestEndpoint_UpdateObjectMetadata(t *testing.T) {
|
|||||||
Header: &pb.RequestHeader{
|
Header: &pb.RequestHeader{
|
||||||
ApiKey: apiKey.SerializeRaw(),
|
ApiKey: apiKey.SerializeRaw(),
|
||||||
},
|
},
|
||||||
Bucket: []byte("testbucket"),
|
Bucket: getResp.Object.Bucket,
|
||||||
EncryptedObjectKey: []byte(objects[0].ObjectKey),
|
EncryptedObjectKey: getResp.Object.EncryptedPath,
|
||||||
|
Version: getResp.Object.Version,
|
||||||
|
StreamId: getResp.Object.StreamId,
|
||||||
EncryptedMetadataNonce: testEncryptedMetadataNonce,
|
EncryptedMetadataNonce: testEncryptedMetadataNonce,
|
||||||
EncryptedMetadata: testEncryptedMetadata,
|
EncryptedMetadata: testEncryptedMetadata,
|
||||||
EncryptedMetadataEncryptedKey: testEncryptedMetadataEncryptedKey,
|
EncryptedMetadataEncryptedKey: testEncryptedMetadataEncryptedKey,
|
||||||
|
@ -1580,11 +1580,23 @@ func (endpoint *Endpoint) UpdateObjectMetadata(ctx context.Context, req *pb.Obje
|
|||||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
err = endpoint.metainfo.metabaseDB.SetObjectMetadataLatestVersion(ctx, metabase.SetObjectMetadataLatestVersion{
|
streamID, err := endpoint.unmarshalSatStreamID(ctx, req.StreamId)
|
||||||
ObjectLocation: metabase.ObjectLocation{
|
if err != nil {
|
||||||
|
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
id, err := uuid.FromBytes(streamID.StreamId)
|
||||||
|
if err != nil {
|
||||||
|
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
err = endpoint.metainfo.metabaseDB.UpdateObjectMetadata(ctx, metabase.UpdateObjectMetadata{
|
||||||
|
ObjectStream: metabase.ObjectStream{
|
||||||
ProjectID: keyInfo.ProjectID,
|
ProjectID: keyInfo.ProjectID,
|
||||||
BucketName: string(req.Bucket),
|
BucketName: string(req.Bucket),
|
||||||
ObjectKey: metabase.ObjectKey(req.EncryptedObjectKey),
|
ObjectKey: metabase.ObjectKey(req.EncryptedObjectKey),
|
||||||
|
Version: metabase.Version(req.Version),
|
||||||
|
StreamID: id,
|
||||||
},
|
},
|
||||||
EncryptedMetadata: req.EncryptedMetadata,
|
EncryptedMetadata: req.EncryptedMetadata,
|
||||||
EncryptedMetadataNonce: req.EncryptedMetadataNonce[:],
|
EncryptedMetadataNonce: req.EncryptedMetadataNonce[:],
|
||||||
|
Loading…
Reference in New Issue
Block a user