satellite/metabase: don't use empty stream id for delete markers
Using a an empty stream id makes it more difficult to target a specific delete marker. Similarly, we don't want to confuse actual stream id-s with normal ones. So, we'll create stream id-s where the first few bytes are 0xFF, but the rest is random. Change-Id: Ia7fffb0da9a071be2935df99c0846027ee2e03c3
This commit is contained in:
parent
aeec4bd213
commit
55bddb6ce1
@ -586,6 +586,11 @@ func (db *DB) DeleteObjectLastCommitted(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if opts.Suspended {
|
if opts.Suspended {
|
||||||
|
deleterMarkerStreamID, err := generateDeleteMarkerStreamID()
|
||||||
|
if err != nil {
|
||||||
|
return DeleteObjectResult{}, Error.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) {
|
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) {
|
||||||
// TODO(ver) fold deleteObjectUnversionedCommitted into query below using ON CONFLICT
|
// TODO(ver) fold deleteObjectUnversionedCommitted into query below using ON CONFLICT
|
||||||
deleted, err := db.deleteObjectUnversionedCommitted(ctx, opts.ObjectLocation, tx)
|
deleted, err := db.deleteObjectUnversionedCommitted(ctx, opts.ObjectLocation, tx)
|
||||||
@ -609,13 +614,14 @@ func (db *DB) DeleteObjectLastCommitted(
|
|||||||
RETURNING
|
RETURNING
|
||||||
version,
|
version,
|
||||||
created_at
|
created_at
|
||||||
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, deleted.MaxVersion+1, uuid.UUID{})
|
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, deleted.MaxVersion+1, deleterMarkerStreamID)
|
||||||
|
|
||||||
var marker Object
|
var marker Object
|
||||||
marker.ProjectID = opts.ProjectID
|
marker.ProjectID = opts.ProjectID
|
||||||
marker.BucketName = opts.BucketName
|
marker.BucketName = opts.BucketName
|
||||||
marker.ObjectKey = opts.ObjectKey
|
marker.ObjectKey = opts.ObjectKey
|
||||||
marker.Status = DeleteMarkerUnversioned
|
marker.Status = DeleteMarkerUnversioned
|
||||||
|
marker.StreamID = deleterMarkerStreamID
|
||||||
|
|
||||||
err = row.Scan(&marker.Version, &marker.CreatedAt)
|
err = row.Scan(&marker.Version, &marker.CreatedAt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -630,7 +636,11 @@ func (db *DB) DeleteObjectLastCommitted(
|
|||||||
}
|
}
|
||||||
if opts.Versioned {
|
if opts.Versioned {
|
||||||
// Instead of deleting we insert a deletion marker.
|
// Instead of deleting we insert a deletion marker.
|
||||||
streamID := uuid.UUID{}
|
deleterMarkerStreamID, err := generateDeleteMarkerStreamID()
|
||||||
|
if err != nil {
|
||||||
|
return DeleteObjectResult{}, Error.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
row := db.db.QueryRowContext(ctx, `
|
row := db.db.QueryRowContext(ctx, `
|
||||||
WITH check_existing_object AS (
|
WITH check_existing_object AS (
|
||||||
SELECT status
|
SELECT status
|
||||||
@ -663,13 +673,13 @@ func (db *DB) DeleteObjectLastCommitted(
|
|||||||
RETURNING *
|
RETURNING *
|
||||||
)
|
)
|
||||||
SELECT version, created_at FROM added_object
|
SELECT version, created_at FROM added_object
|
||||||
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, streamID)
|
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, deleterMarkerStreamID)
|
||||||
|
|
||||||
var deleted Object
|
var deleted Object
|
||||||
deleted.ProjectID = opts.ProjectID
|
deleted.ProjectID = opts.ProjectID
|
||||||
deleted.BucketName = opts.BucketName
|
deleted.BucketName = opts.BucketName
|
||||||
deleted.ObjectKey = opts.ObjectKey
|
deleted.ObjectKey = opts.ObjectKey
|
||||||
deleted.StreamID = streamID
|
deleted.StreamID = deleterMarkerStreamID
|
||||||
deleted.Status = DeleteMarkerVersioned
|
deleted.Status = DeleteMarkerVersioned
|
||||||
|
|
||||||
err = row.Scan(&deleted.Version, &deleted.CreatedAt)
|
err = row.Scan(&deleted.Version, &deleted.CreatedAt)
|
||||||
@ -725,3 +735,17 @@ func (db *DB) DeleteObjectLastCommitted(
|
|||||||
|
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// generateDeleteMarkerStreamID returns a uuid that has the first 6 bytes as 0xff.
|
||||||
|
// Creating a stream id, where the first bytes are 0xff makes it easily recognizable as a delete marker.
|
||||||
|
func generateDeleteMarkerStreamID() (uuid.UUID, error) {
|
||||||
|
v, err := uuid.New()
|
||||||
|
if err != nil {
|
||||||
|
return v, Error.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range v[:6] {
|
||||||
|
v[i] = 0xFF
|
||||||
|
}
|
||||||
|
return v, nil
|
||||||
|
}
|
||||||
|
@ -782,7 +782,6 @@ func TestDeleteObjectVersioning(t *testing.T) {
|
|||||||
}.Run(ctx, t, db, obj, 0)
|
}.Run(ctx, t, db, obj, 0)
|
||||||
|
|
||||||
marker := committed.ObjectStream
|
marker := committed.ObjectStream
|
||||||
marker.StreamID = uuid.UUID{}
|
|
||||||
marker.Version = committed.Version + 1
|
marker.Version = committed.Version + 1
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
@ -800,6 +799,7 @@ func TestDeleteObjectVersioning(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
OutputMarkerStreamID: &marker.StreamID,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
metabasetest.Verify{
|
metabasetest.Verify{
|
||||||
@ -829,7 +829,6 @@ func TestDeleteObjectVersioning(t *testing.T) {
|
|||||||
}.Run(ctx, t, db, obj, 0)
|
}.Run(ctx, t, db, obj, 0)
|
||||||
|
|
||||||
marker := committed.ObjectStream
|
marker := committed.ObjectStream
|
||||||
marker.StreamID = uuid.UUID{}
|
|
||||||
marker.Version = committed.Version + 1
|
marker.Version = committed.Version + 1
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
@ -847,6 +846,7 @@ func TestDeleteObjectVersioning(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
OutputMarkerStreamID: &marker.StreamID,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
marker2 := marker
|
marker2 := marker
|
||||||
@ -865,6 +865,7 @@ func TestDeleteObjectVersioning(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
OutputMarkerStreamID: &marker2.StreamID,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
metabasetest.Verify{
|
metabasetest.Verify{
|
||||||
@ -1165,6 +1166,7 @@ func TestDeleteObjectsAllVersions(t *testing.T) {
|
|||||||
metabasetest.Verify{}.Check(ctx, t, db)
|
metabasetest.Verify{}.Check(ctx, t, db)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// TODO(ver): these tests look like they are in the wrong location
|
||||||
t.Run("delete last committed unversioned with suspended", func(t *testing.T) {
|
t.Run("delete last committed unversioned with suspended", func(t *testing.T) {
|
||||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||||
|
|
||||||
@ -1200,6 +1202,7 @@ func TestDeleteObjectsAllVersions(t *testing.T) {
|
|||||||
object,
|
object,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
OutputMarkerStreamID: &marker.StreamID,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
metabasetest.Verify{
|
metabasetest.Verify{
|
||||||
@ -1241,6 +1244,7 @@ func TestDeleteObjectsAllVersions(t *testing.T) {
|
|||||||
Result: metabase.DeleteObjectResult{
|
Result: metabase.DeleteObjectResult{
|
||||||
Markers: []metabase.Object{marker},
|
Markers: []metabase.Object{marker},
|
||||||
},
|
},
|
||||||
|
OutputMarkerStreamID: &marker.StreamID,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
metabasetest.Verify{
|
metabasetest.Verify{
|
||||||
|
@ -200,6 +200,7 @@ func TestGetObjectExactVersion(t *testing.T) {
|
|||||||
Result: metabase.DeleteObjectResult{
|
Result: metabase.DeleteObjectResult{
|
||||||
Markers: []metabase.Object{versionedMarker},
|
Markers: []metabase.Object{versionedMarker},
|
||||||
},
|
},
|
||||||
|
OutputMarkerStreamID: &versionedMarker.StreamID,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
metabasetest.GetObjectExactVersion{
|
metabasetest.GetObjectExactVersion{
|
||||||
@ -229,6 +230,7 @@ func TestGetObjectExactVersion(t *testing.T) {
|
|||||||
Removed: []metabase.Object{unversioned},
|
Removed: []metabase.Object{unversioned},
|
||||||
Markers: []metabase.Object{unversionedMarker},
|
Markers: []metabase.Object{unversionedMarker},
|
||||||
},
|
},
|
||||||
|
OutputMarkerStreamID: &unversionedMarker.StreamID,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
metabasetest.GetObjectExactVersion{
|
metabasetest.GetObjectExactVersion{
|
||||||
@ -1678,6 +1680,7 @@ func TestGetLatestObjectLastSegment(t *testing.T) {
|
|||||||
Result: metabase.DeleteObjectResult{
|
Result: metabase.DeleteObjectResult{
|
||||||
Markers: []metabase.Object{marker},
|
Markers: []metabase.Object{marker},
|
||||||
},
|
},
|
||||||
|
OutputMarkerStreamID: &marker.StreamID,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
metabasetest.GetLatestObjectLastSegment{
|
metabasetest.GetLatestObjectLastSegment{
|
||||||
@ -1737,6 +1740,7 @@ func TestGetLatestObjectLastSegment(t *testing.T) {
|
|||||||
Markers: []metabase.Object{marker},
|
Markers: []metabase.Object{marker},
|
||||||
Removed: []metabase.Object{unversioned},
|
Removed: []metabase.Object{unversioned},
|
||||||
},
|
},
|
||||||
|
OutputMarkerStreamID: &marker.StreamID,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
metabasetest.GetLatestObjectLastSegment{
|
metabasetest.GetLatestObjectLastSegment{
|
||||||
|
@ -17,6 +17,7 @@ import (
|
|||||||
|
|
||||||
"storj.io/common/storj"
|
"storj.io/common/storj"
|
||||||
"storj.io/common/testcontext"
|
"storj.io/common/testcontext"
|
||||||
|
"storj.io/common/uuid"
|
||||||
"storj.io/storj/satellite/metabase"
|
"storj.io/storj/satellite/metabase"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -430,6 +431,13 @@ func compareDeleteObjectResult(t testing.TB, got, exp metabase.DeleteObjectResul
|
|||||||
|
|
||||||
sortObjects(got.Markers)
|
sortObjects(got.Markers)
|
||||||
sortObjects(exp.Markers)
|
sortObjects(exp.Markers)
|
||||||
|
if len(got.Markers) == len(exp.Markers) {
|
||||||
|
// marker stream ID-s are internally generated, so we cannot upfront figure out what
|
||||||
|
// the values are.
|
||||||
|
for i := range got.Markers {
|
||||||
|
exp.Markers[i].StreamID = got.Markers[i].StreamID
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
sortObjects(got.Removed)
|
sortObjects(got.Removed)
|
||||||
sortObjects(exp.Removed)
|
sortObjects(exp.Removed)
|
||||||
@ -788,10 +796,13 @@ func (step FinishCopyObject) Check(ctx *testcontext.Context, t testing.TB, db *m
|
|||||||
|
|
||||||
// DeleteObjectLastCommitted is for testing metabase.DeleteObjectLastCommitted.
|
// DeleteObjectLastCommitted is for testing metabase.DeleteObjectLastCommitted.
|
||||||
type DeleteObjectLastCommitted struct {
|
type DeleteObjectLastCommitted struct {
|
||||||
Opts metabase.DeleteObjectLastCommitted
|
Opts metabase.DeleteObjectLastCommitted
|
||||||
Result metabase.DeleteObjectResult
|
Result metabase.DeleteObjectResult
|
||||||
|
|
||||||
ErrClass *errs.Class
|
ErrClass *errs.Class
|
||||||
ErrText string
|
ErrText string
|
||||||
|
|
||||||
|
OutputMarkerStreamID *uuid.UUID
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check runs the test.
|
// Check runs the test.
|
||||||
@ -799,6 +810,11 @@ func (step DeleteObjectLastCommitted) Check(ctx *testcontext.Context, t testing.
|
|||||||
result, err := db.DeleteObjectLastCommitted(ctx, step.Opts)
|
result, err := db.DeleteObjectLastCommitted(ctx, step.Opts)
|
||||||
checkError(t, err, step.ErrClass, step.ErrText)
|
checkError(t, err, step.ErrClass, step.ErrText)
|
||||||
compareDeleteObjectResult(t, result, step.Result)
|
compareDeleteObjectResult(t, result, step.Result)
|
||||||
|
|
||||||
|
if step.OutputMarkerStreamID != nil && len(result.Markers) > 0 {
|
||||||
|
*step.OutputMarkerStreamID = result.Markers[0].StreamID
|
||||||
|
}
|
||||||
|
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -187,7 +187,6 @@ func TestUpdateObjectLastCommittedMetadata(t *testing.T) {
|
|||||||
Status: metabase.DeleteMarkerVersioned,
|
Status: metabase.DeleteMarkerVersioned,
|
||||||
CreatedAt: time.Now(),
|
CreatedAt: time.Now(),
|
||||||
}
|
}
|
||||||
marker.StreamID = uuid.UUID{}
|
|
||||||
marker.Version++
|
marker.Version++
|
||||||
|
|
||||||
metabasetest.DeleteObjectLastCommitted{
|
metabasetest.DeleteObjectLastCommitted{
|
||||||
@ -198,6 +197,7 @@ func TestUpdateObjectLastCommittedMetadata(t *testing.T) {
|
|||||||
Result: metabase.DeleteObjectResult{
|
Result: metabase.DeleteObjectResult{
|
||||||
Markers: []metabase.Object{marker},
|
Markers: []metabase.Object{marker},
|
||||||
},
|
},
|
||||||
|
OutputMarkerStreamID: &marker.StreamID,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
// verify we cannot update the metadata of a deleted object
|
// verify we cannot update the metadata of a deleted object
|
||||||
@ -226,8 +226,8 @@ func TestUpdateObjectLastCommittedMetadata(t *testing.T) {
|
|||||||
EncryptedMetadataNonce: encryptedMetadataNonce[:],
|
EncryptedMetadataNonce: encryptedMetadataNonce[:],
|
||||||
EncryptedMetadataEncryptedKey: encryptedMetadataKey,
|
EncryptedMetadataEncryptedKey: encryptedMetadataKey,
|
||||||
},
|
},
|
||||||
ErrClass: &metabase.ErrInvalidRequest,
|
ErrClass: &metabase.ErrObjectNotFound,
|
||||||
ErrText: "StreamID missing",
|
ErrText: "object with specified version and committed status is missing",
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
metabasetest.Verify{
|
metabasetest.Verify{
|
||||||
@ -255,7 +255,6 @@ func TestUpdateObjectLastCommittedMetadata(t *testing.T) {
|
|||||||
CreatedAt: time.Now(),
|
CreatedAt: time.Now(),
|
||||||
}
|
}
|
||||||
marker.Version++
|
marker.Version++
|
||||||
marker.StreamID = uuid.UUID{}
|
|
||||||
|
|
||||||
metabasetest.DeleteObjectLastCommitted{
|
metabasetest.DeleteObjectLastCommitted{
|
||||||
Opts: metabase.DeleteObjectLastCommitted{
|
Opts: metabase.DeleteObjectLastCommitted{
|
||||||
@ -267,6 +266,7 @@ func TestUpdateObjectLastCommittedMetadata(t *testing.T) {
|
|||||||
Markers: []metabase.Object{marker},
|
Markers: []metabase.Object{marker},
|
||||||
Removed: []metabase.Object{object2},
|
Removed: []metabase.Object{object2},
|
||||||
},
|
},
|
||||||
|
OutputMarkerStreamID: &marker.StreamID,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
encryptedMetadata := testrand.Bytes(1024)
|
encryptedMetadata := testrand.Bytes(1024)
|
||||||
@ -299,8 +299,8 @@ func TestUpdateObjectLastCommittedMetadata(t *testing.T) {
|
|||||||
EncryptedMetadataNonce: encryptedMetadataNonce[:],
|
EncryptedMetadataNonce: encryptedMetadataNonce[:],
|
||||||
EncryptedMetadataEncryptedKey: encryptedMetadataKey,
|
EncryptedMetadataEncryptedKey: encryptedMetadataKey,
|
||||||
},
|
},
|
||||||
ErrClass: &metabase.ErrInvalidRequest,
|
ErrClass: &metabase.ErrObjectNotFound,
|
||||||
ErrText: "StreamID missing",
|
ErrText: "object with specified version and committed status is missing",
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
metabasetest.Verify{
|
metabasetest.Verify{
|
||||||
@ -322,7 +322,6 @@ func TestUpdateObjectLastCommittedMetadata(t *testing.T) {
|
|||||||
Status: metabase.DeleteMarkerVersioned,
|
Status: metabase.DeleteMarkerVersioned,
|
||||||
CreatedAt: time.Now(),
|
CreatedAt: time.Now(),
|
||||||
}
|
}
|
||||||
marker.StreamID = uuid.UUID{}
|
|
||||||
marker.Version++
|
marker.Version++
|
||||||
|
|
||||||
metabasetest.DeleteObjectLastCommitted{
|
metabasetest.DeleteObjectLastCommitted{
|
||||||
@ -333,6 +332,7 @@ func TestUpdateObjectLastCommittedMetadata(t *testing.T) {
|
|||||||
Result: metabase.DeleteObjectResult{
|
Result: metabase.DeleteObjectResult{
|
||||||
Markers: []metabase.Object{marker},
|
Markers: []metabase.Object{marker},
|
||||||
},
|
},
|
||||||
|
OutputMarkerStreamID: &marker.StreamID,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
obj2 := obj
|
obj2 := obj
|
||||||
|
Loading…
Reference in New Issue
Block a user