satellite/metainfo: use deterministic signing for satStreamID

So we can have stable UploadID for multipart uploads.

Change-Id: Iac6780394c8cc0f96c0b9c4b850b92ed3627a9b0
This commit is contained in:
Kaloyan Raev 2021-01-12 13:29:13 +02:00
parent d54ae9f10f
commit ea48322dd3
8 changed files with 107 additions and 40 deletions

4
go.mod
View File

@ -47,9 +47,9 @@ require (
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e
google.golang.org/api v0.20.0 // indirect
google.golang.org/protobuf v1.25.0 // indirect
storj.io/common v0.0.0-20210113135631-07a5dc68dc1c
storj.io/common v0.0.0-20210115161819-ee11aaf35a7f
storj.io/drpc v0.0.16
storj.io/monkit-jaeger v0.0.0-20200518165323-80778fc3f91b
storj.io/private v0.0.0-20210108233641-2ba1ef686d1f
storj.io/uplink v1.4.5-0.20210114104337-ce4ca047ab1f
storj.io/uplink v1.4.6-0.20210115090500-10cfa3d1c277
)

12
go.sum
View File

@ -59,8 +59,6 @@ github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13P
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA=
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg=
github.com/btcsuite/btcutil v1.0.1/go.mod h1:j9HUFwoQRsZL3V4n+qG+CUnEGHOarIxfC3Le2Yhbcts=
github.com/btcsuite/btcutil v1.0.3-0.20201124182144-4031bdc69ded h1:WcPFZzCIqGt/TdFJHsOiX5dIlB/MUzrftltMhpjzfA8=
github.com/btcsuite/btcutil v1.0.3-0.20201124182144-4031bdc69ded/go.mod h1:0DVlHczLPewLcPGEIeUEzfOJhqGPQ0mJJRDBtD307+o=
github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce h1:YtWJF7RHm2pYCvA5t0RPmAaLUhREsKuKd+SLhxFbFeQ=
github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce/go.mod h1:0DVlHczLPewLcPGEIeUEzfOJhqGPQ0mJJRDBtD307+o=
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd/go.mod h1:HHNXQzUsZCxOoE+CPiyCTO6x34Zs86zZUiwtpXoGdtg=
@ -512,7 +510,6 @@ github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:
github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb h1:ZkM6LRnq40pR1Ox0hTHlnpkcOTuFIDQpZ1IN8rKKhX0=
github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb/go.mod h1:gqRgreBUhTSL0GeU64rtZ3Uq3wtjOa/TB2YfrtkCbVQ=
github.com/zeebo/admission/v2 v2.0.0/go.mod h1:gSeHGelDHW7Vq6UyJo2boeSt/6Dsnqpisv0i4YZSOyM=
github.com/zeebo/admission/v3 v3.0.1 h1:/IWg2jLhfjBOUhhdKcbweSzcY3QlbbE57sqvU72EpqA=
github.com/zeebo/admission/v3 v3.0.1/go.mod h1:BP3isIv9qa2A7ugEratNq1dnl2oZRXaQUGdU7WXKtbw=
github.com/zeebo/admission/v3 v3.0.2 h1:nI9rBKR97NS42JZ1o0Ki2NsF5DRq+7udnbVXYt3tRPI=
github.com/zeebo/admission/v3 v3.0.2/go.mod h1:BP3isIv9qa2A7ugEratNq1dnl2oZRXaQUGdU7WXKtbw=
@ -763,6 +760,7 @@ google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ij
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.27.0 h1:rRYRFMVgRv6E0D70Skyfsr28tDXIuuPZyWGMPdMcnXg=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.27.1 h1:zvIju4sqAGvwKspUQOhwnpcqSbzi7/H6QomNNjTL4sk=
google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
@ -812,9 +810,9 @@ honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
storj.io/common v0.0.0-20200424175742-65ac59022f4f/go.mod h1:pZyXiIE7bGETIRXtfs0nICqMwp7PM8HqnDuyUeldNA0=
storj.io/common v0.0.0-20201026135900-1aaeec90670b/go.mod h1:GqdmNf3fLm2UZX/7Zr0BLFCJ4gFjgm6eHrk/fnmr5jQ=
storj.io/common v0.0.0-20201210184814-6206aefd1d48/go.mod h1:6sepaQTRLuygvA+GNPzdgRPOB1+wFfjde76KBWofbMY=
storj.io/common v0.0.0-20210113135631-07a5dc68dc1c h1:07A5QJMYYYQrOQv51j6RiOTstzMh7OnbqTZGZljp9/M=
storj.io/common v0.0.0-20210113135631-07a5dc68dc1c/go.mod h1:KhVByBTvjV2rsaUQsft0pKgBRRMvCcY1JsDqt6BWr3I=
storj.io/common v0.0.0-20210115161819-ee11aaf35a7f h1:VmLstwTDGwrbn/jrdQ33fotRiOI4q8Swjl7W9Mt7qdY=
storj.io/common v0.0.0-20210115161819-ee11aaf35a7f/go.mod h1:KhVByBTvjV2rsaUQsft0pKgBRRMvCcY1JsDqt6BWr3I=
storj.io/drpc v0.0.11/go.mod h1:TiFc2obNjL9/3isMW1Rpxjy8V9uE0B2HMeMFGiiI7Iw=
storj.io/drpc v0.0.14/go.mod h1:82nfl+6YwRwF6UG31cEWWUqv/FaKvP5SGqUvoqTxCMA=
storj.io/drpc v0.0.16 h1:9sxypc5lKi/0D69cR21BR0S21+IvXfON8L5nXMVNTwQ=
@ -823,5 +821,5 @@ storj.io/monkit-jaeger v0.0.0-20200518165323-80778fc3f91b h1:Bbg9JCtY6l3HrDxs3BX
storj.io/monkit-jaeger v0.0.0-20200518165323-80778fc3f91b/go.mod h1:gj4vuCeyCRjRmH8LIrgoyU9Dc9uR6H+/GcDUXmTbf80=
storj.io/private v0.0.0-20210108233641-2ba1ef686d1f h1:ctEwD9AsWR8MGv+hKxATjsu114lOPuL2wL7fqO2qusg=
storj.io/private v0.0.0-20210108233641-2ba1ef686d1f/go.mod h1:3KcGiA7phL3a0HUCe5ar90SlIU3iFb8hKInaEZQ5P7o=
storj.io/uplink v1.4.5-0.20210114104337-ce4ca047ab1f h1:jp17GoEKBmo/JvLUQHAbO8E9AjMEfwuabYoUrYI33us=
storj.io/uplink v1.4.5-0.20210114104337-ce4ca047ab1f/go.mod h1:raBVCBf1/DwfkFNzKqjSKLPysk9+o8Ubt/LJIO9TVBw=
storj.io/uplink v1.4.6-0.20210115090500-10cfa3d1c277 h1:H+YVCCYBgk3xRda52MwQO3svpQvqE6P/bjccVLyvROs=
storj.io/uplink v1.4.6-0.20210115090500-10cfa3d1c277/go.mod h1:lpQO2Smf6gpOl7hkB/IEKdrMMdbIHeRF1QKhyZCoD5w=

View File

@ -182,7 +182,7 @@ type MetabaseDB interface {
// DeleteObjectLatestVersion deletes latest object version.
DeleteObjectLatestVersion(ctx context.Context, opts metabase.DeleteObjectLatestVersion) (result metabase.DeleteObjectResult, err error)
// BeginObjectExactVersion adds a pending object to the database, with specific version.
BeginObjectExactVersion(ctx context.Context, opts metabase.BeginObjectExactVersion) (committed metabase.Version, err error)
BeginObjectExactVersion(ctx context.Context, opts metabase.BeginObjectExactVersion) (committed metabase.Object, err error)
// CommitObject adds a pending object to the database.
CommitObject(ctx context.Context, opts metabase.CommitObject) (object metabase.Object, err error)
// BeginSegment verifies whether a new segment upload can be started.

View File

@ -95,23 +95,36 @@ type BeginObjectExactVersion struct {
}
// BeginObjectExactVersion adds a pending object to the database, with specific version.
func (db *DB) BeginObjectExactVersion(ctx context.Context, opts BeginObjectExactVersion) (committed Version, err error) {
func (db *DB) BeginObjectExactVersion(ctx context.Context, opts BeginObjectExactVersion) (committed Object, err error) {
defer mon.Task()(&ctx)(&err)
if err := opts.ObjectStream.Verify(); err != nil {
return -1, err
return Object{}, err
}
switch {
case opts.Encryption.IsZero() || opts.Encryption.CipherSuite == storj.EncUnspecified:
return -1, ErrInvalidRequest.New("Encryption is missing")
return Object{}, ErrInvalidRequest.New("Encryption is missing")
case opts.Encryption.BlockSize <= 0:
return -1, ErrInvalidRequest.New("Encryption.BlockSize is negative or zero")
return Object{}, ErrInvalidRequest.New("Encryption.BlockSize is negative or zero")
case opts.Version == NextVersion:
return -1, ErrInvalidRequest.New("Version should not be metabase.NextVersion")
return Object{}, ErrInvalidRequest.New("Version should not be metabase.NextVersion")
}
_, err = db.db.ExecContext(ctx, `
object := Object{
ObjectStream: ObjectStream{
ProjectID: opts.ProjectID,
BucketName: opts.BucketName,
ObjectKey: opts.ObjectKey,
Version: opts.Version,
StreamID: opts.StreamID,
},
ExpiresAt: opts.ExpiresAt,
Encryption: opts.Encryption,
ZombieDeletionDeadline: opts.ZombieDeletionDeadline,
}
err = db.db.QueryRow(ctx, `
INSERT INTO objects (
project_id, bucket_name, object_key, version, stream_id,
expires_at, encryption,
@ -121,18 +134,21 @@ func (db *DB) BeginObjectExactVersion(ctx context.Context, opts BeginObjectExact
$6, $7,
$8
)
`,
opts.ProjectID, opts.BucketName, []byte(opts.ObjectKey), opts.Version, opts.StreamID,
RETURNING status, created_at
`, opts.ProjectID, opts.BucketName, []byte(opts.ObjectKey), opts.Version, opts.StreamID,
opts.ExpiresAt, encryptionParameters{&opts.Encryption},
opts.ZombieDeletionDeadline)
opts.ZombieDeletionDeadline).
Scan(
&object.Status, &object.CreatedAt,
)
if err != nil {
if code := pgerrcode.FromError(err); code == pgxerrcode.UniqueViolation {
return -1, ErrConflict.New("object already exists")
return Object{}, ErrConflict.New("object already exists")
}
return -1, Error.New("unable to insert object: %w", err)
return Object{}, Error.New("unable to insert object: %w", err)
}
return opts.Version, nil
return object, nil
}
// BeginSegment contains options to verify, whether a new segment upload can be started.

View File

@ -44,7 +44,14 @@ type BeginObjectExactVersion struct {
func (step BeginObjectExactVersion) Check(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
got, err := db.BeginObjectExactVersion(ctx, step.Opts)
checkError(t, err, step.ErrClass, step.ErrText)
require.Equal(t, step.Version, got)
if step.ErrClass == nil {
require.Equal(t, step.Version, got.Version)
require.WithinDuration(t, time.Now(), got.CreatedAt, 5*time.Second)
require.Equal(t, step.Opts.ObjectStream, got.ObjectStream)
require.Equal(t, step.Opts.ExpiresAt, got.ExpiresAt)
require.Equal(t, step.Opts.ZombieDeletionDeadline, got.ZombieDeletionDeadline)
require.Equal(t, step.Opts.Encryption, got.Encryption)
}
}
type CommitObject struct {

View File

@ -676,7 +676,7 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe
expiresAt = &req.ExpiresAt
}
_, err = endpoint.metainfo.metabaseDB.BeginObjectExactVersion(ctx, metabase.BeginObjectExactVersion{
object, err := endpoint.metainfo.metabaseDB.BeginObjectExactVersion(ctx, metabase.BeginObjectExactVersion{
ObjectStream: metabase.ObjectStream{
ProjectID: keyInfo.ProjectID,
BucketName: string(req.Bucket),
@ -695,11 +695,12 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe
satStreamID, err := endpoint.packStreamID(ctx, &internalpb.StreamID{
Bucket: req.Bucket,
EncryptedPath: req.EncryptedPath,
Version: req.Version,
Version: int32(object.Version),
Redundancy: pbRS,
CreationDate: time.Now(),
CreationDate: object.CreatedAt,
ExpirationDate: req.ExpiresAt,
StreamId: streamID[:],
MultipartObject: object.FixedSegmentSize <= 0,
})
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
@ -1912,7 +1913,8 @@ func (endpoint *Endpoint) objectToProto(ctx context.Context, object metabase.Obj
Bucket: []byte(object.BucketName),
EncryptedPath: []byte(object.ObjectKey),
Version: int32(object.Version), // TODO incomatible types
CreationDate: time.Now(),
CreationDate: object.CreatedAt,
ExpirationDate: expires,
StreamId: object.StreamID[:],
MultipartObject: object.FixedSegmentSize <= 0,
// TODO: defaultRS may change while the upload is pending.
@ -2046,7 +2048,9 @@ func (endpoint *Endpoint) objectEntryToProtoListItem(ctx context.Context, bucket
EncryptedPath: item.EncryptedPath,
Version: item.Version,
CreationDate: item.CreatedAt,
ExpirationDate: item.ExpiresAt,
StreamId: entry.StreamID[:],
MultipartObject: entry.FixedSegmentSize <= 0,
// TODO: defaultRS may change while the upload is pending.
// Ideally, we should remove redundancy from satStreamID.
Redundancy: endpoint.defaultRS,

View File

@ -1825,3 +1825,45 @@ func TestObjectOverrideOnUpload(t *testing.T) {
}
})
}
func TestStableUploadID(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()]
client, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey)
require.NoError(t, err)
defer ctx.Check(client.Close)
err = planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], "testbucket")
require.NoError(t, err)
beginResp, err := client.BeginObject(ctx, metainfo.BeginObjectParams{
Bucket: []byte("testbucket"),
EncryptedPath: []byte("testobject"),
EncryptionParameters: storj.EncryptionParameters{
CipherSuite: storj.EncAESGCM,
BlockSize: 256,
},
})
require.NoError(t, err)
listResp, _, err := client.ListObjects(ctx, metainfo.ListObjectsParams{
Bucket: []byte("testbucket"),
Status: int32(metabase.Pending),
})
require.NoError(t, err)
require.Len(t, listResp, 1)
// check that BeginObject and ListObjects return the same StreamID.
assert.Equal(t, beginResp.StreamID, listResp[0].StreamID)
listResp2, _, err := client.ListObjects(ctx, metainfo.ListObjectsParams{
Bucket: []byte("testbucket"),
Status: int32(metabase.Pending),
})
require.NoError(t, err)
require.Len(t, listResp2, 1)
// check that the two list results return the same StreamID.
assert.Equal(t, listResp[0].StreamID, listResp2[0].StreamID)
})
}

View File

@ -21,7 +21,7 @@ func SignStreamID(ctx context.Context, signer signing.Signer, unsigned *internal
}
signed := *unsigned
signed.SatelliteSignature, err = signer.HashAndSign(ctx, bytes)
signed.SatelliteSignature, err = signer.SignHMACSHA256(ctx, bytes)
if err != nil {
return nil, Error.Wrap(err)
}
@ -68,14 +68,14 @@ func EncodeSegmentID(ctx context.Context, segmentID *internalpb.SegmentID) (_ []
}
// VerifyStreamID verifies that the signature inside stream ID belongs to the satellite.
func VerifyStreamID(ctx context.Context, satellite signing.Signee, signed *internalpb.StreamID) (err error) {
func VerifyStreamID(ctx context.Context, satellite signing.Signer, signed *internalpb.StreamID) (err error) {
defer mon.Task()(&ctx)(&err)
bytes, err := EncodeStreamID(ctx, signed)
if err != nil {
return Error.Wrap(err)
}
return satellite.HashAndVerifySignature(ctx, bytes, signed.SatelliteSignature)
return satellite.VerifyHMACSHA256(ctx, bytes, signed.SatelliteSignature)
}
// VerifySegmentID verifies that the signature inside segment ID belongs to the satellite.