satellite/metainfo: delete pieces from nodes on object commit
We have new flow where existing object is deleted not on begin object but on commit object. Deletion on commit object is still missing deletion from storage nodes. This change adds this part to the code. Fixes https://github.com/storj/storj/issues/5222 Change-Id: Ibfd34665b2a055ec6c0d6e260c1a57e8a4c62b0e
This commit is contained in:
parent
dd60318147
commit
e5ac8430c3
@ -485,6 +485,10 @@ type CommitObject struct {
|
|||||||
EncryptedMetadataEncryptedKey []byte // optional
|
EncryptedMetadataEncryptedKey []byte // optional
|
||||||
|
|
||||||
DisallowDelete bool
|
DisallowDelete bool
|
||||||
|
// OnDelete will be triggered when/if existing object will be overwritten on commit.
|
||||||
|
// Wil be only executed after succesfull commit + delete DB operation.
|
||||||
|
// Error on this function won't revert back committed object.
|
||||||
|
OnDelete func(segments []DeletedSegmentInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify verifies reqest fields.
|
// Verify verifies reqest fields.
|
||||||
@ -507,7 +511,8 @@ func (c *CommitObject) Verify() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// CommitObject adds a pending object to the database.
|
// CommitObject adds a pending object to the database. If another committed object is under target location
|
||||||
|
// it will be deleted.
|
||||||
func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Object, err error) {
|
func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Object, err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
@ -515,6 +520,8 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
|
|||||||
return Object{}, err
|
return Object{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
deletedSegments := []DeletedSegmentInfo{}
|
||||||
|
|
||||||
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) error {
|
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) error {
|
||||||
segments, err := fetchSegmentsForCommit(ctx, tx, opts.StreamID)
|
segments, err := fetchSegmentsForCommit(ctx, tx, opts.StreamID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -657,8 +664,7 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, version := range versionsToDelete {
|
for _, version := range versionsToDelete {
|
||||||
// TODO delete pieces from stoage nodes
|
deleteResult, err := db.deleteObjectExactVersion(ctx, DeleteObjectExactVersion{
|
||||||
_, err := db.deleteObjectExactVersion(ctx, DeleteObjectExactVersion{
|
|
||||||
ObjectLocation: ObjectLocation{
|
ObjectLocation: ObjectLocation{
|
||||||
ProjectID: opts.ProjectID,
|
ProjectID: opts.ProjectID,
|
||||||
BucketName: opts.BucketName,
|
BucketName: opts.BucketName,
|
||||||
@ -669,6 +675,8 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return Error.New("failed to delete existing object: %w", err)
|
return Error.New("failed to delete existing object: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
deletedSegments = append(deletedSegments, deleteResult.Segments...)
|
||||||
}
|
}
|
||||||
|
|
||||||
object.StreamID = opts.StreamID
|
object.StreamID = opts.StreamID
|
||||||
@ -687,6 +695,11 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
|
|||||||
return Object{}, err
|
return Object{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// we can execute this only when whole transaction is committed without any error
|
||||||
|
if len(deletedSegments) > 0 && opts.OnDelete != nil {
|
||||||
|
opts.OnDelete(deletedSegments)
|
||||||
|
}
|
||||||
|
|
||||||
mon.Meter("object_commit").Mark(1)
|
mon.Meter("object_commit").Mark(1)
|
||||||
mon.IntVal("object_commit_segments").Observe(int64(object.SegmentCount))
|
mon.IntVal("object_commit_segments").Observe(int64(object.SegmentCount))
|
||||||
mon.IntVal("object_commit_encrypted_size").Observe(object.TotalEncryptedSize)
|
mon.IntVal("object_commit_encrypted_size").Observe(object.TotalEncryptedSize)
|
||||||
|
@ -8,6 +8,8 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"storj.io/common/memory"
|
"storj.io/common/memory"
|
||||||
"storj.io/common/storj"
|
"storj.io/common/storj"
|
||||||
"storj.io/common/testcontext"
|
"storj.io/common/testcontext"
|
||||||
@ -2935,6 +2937,55 @@ func TestCommitObject(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCommitObject_MultipleVersions(t *testing.T) {
|
||||||
|
metabasetest.RunWithConfig(t, metabase.Config{
|
||||||
|
ApplicationName: "satellite-test",
|
||||||
|
MinPartSize: 5 * memory.MiB,
|
||||||
|
MaxNumberOfParts: 1000,
|
||||||
|
MultipleVersions: true,
|
||||||
|
}, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||||
|
t.Run("OnDelete", func(t *testing.T) {
|
||||||
|
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
// check deleted segments
|
||||||
|
obj := metabasetest.RandObjectStream()
|
||||||
|
|
||||||
|
_, expectedSegments := metabasetest.CreateTestObject{}.Run(ctx, t, db, obj, 3)
|
||||||
|
|
||||||
|
expectedDeletedSegments := []metabase.DeletedSegmentInfo{}
|
||||||
|
for _, segment := range expectedSegments {
|
||||||
|
expectedDeletedSegments = append(expectedDeletedSegments, metabase.DeletedSegmentInfo{
|
||||||
|
RootPieceID: segment.RootPieceID,
|
||||||
|
Pieces: segment.Pieces,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
obj.Version++
|
||||||
|
|
||||||
|
metabasetest.BeginObjectExactVersion{
|
||||||
|
Opts: metabase.BeginObjectExactVersion{
|
||||||
|
ObjectStream: obj,
|
||||||
|
Encryption: metabasetest.DefaultEncryption,
|
||||||
|
},
|
||||||
|
Version: obj.Version,
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
deletedSegments := []metabase.DeletedSegmentInfo{}
|
||||||
|
metabasetest.CommitObject{
|
||||||
|
Opts: metabase.CommitObject{
|
||||||
|
ObjectStream: obj,
|
||||||
|
Encryption: metabasetest.DefaultEncryption,
|
||||||
|
OnDelete: func(segments []metabase.DeletedSegmentInfo) {
|
||||||
|
deletedSegments = append(deletedSegments, segments...)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
require.Equal(t, expectedDeletedSegments, deletedSegments)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestCommitObjectWithIncorrectPartSize(t *testing.T) {
|
func TestCommitObjectWithIncorrectPartSize(t *testing.T) {
|
||||||
metabasetest.RunWithConfig(t, metabase.Config{
|
metabasetest.RunWithConfig(t, metabase.Config{
|
||||||
ApplicationName: "satellite-test",
|
ApplicationName: "satellite-test",
|
||||||
|
@ -270,6 +270,9 @@ func (endpoint *Endpoint) CommitObject(ctx context.Context, req *pb.ObjectCommit
|
|||||||
Encryption: encryption,
|
Encryption: encryption,
|
||||||
|
|
||||||
DisallowDelete: !allowDelete,
|
DisallowDelete: !allowDelete,
|
||||||
|
OnDelete: func(segments []metabase.DeletedSegmentInfo) {
|
||||||
|
endpoint.deleteSegmentPieces(ctx, segments)
|
||||||
|
},
|
||||||
}
|
}
|
||||||
// uplink can send empty metadata with not empty key/nonce
|
// uplink can send empty metadata with not empty key/nonce
|
||||||
// we need to fix it on uplink side but that part will be
|
// we need to fix it on uplink side but that part will be
|
||||||
|
@ -35,6 +35,7 @@ import (
|
|||||||
"storj.io/storj/satellite/internalpb"
|
"storj.io/storj/satellite/internalpb"
|
||||||
"storj.io/storj/satellite/metabase"
|
"storj.io/storj/satellite/metabase"
|
||||||
"storj.io/storj/satellite/metainfo"
|
"storj.io/storj/satellite/metainfo"
|
||||||
|
"storj.io/storj/storage"
|
||||||
"storj.io/uplink"
|
"storj.io/uplink"
|
||||||
"storj.io/uplink/private/metaclient"
|
"storj.io/uplink/private/metaclient"
|
||||||
"storj.io/uplink/private/object"
|
"storj.io/uplink/private/object"
|
||||||
@ -1952,6 +1953,8 @@ func TestEndpoint_Object_MultipleVersions(t *testing.T) {
|
|||||||
Reconfigure: testplanet.Reconfigure{
|
Reconfigure: testplanet.Reconfigure{
|
||||||
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
||||||
config.Metainfo.MultipleVersions = true
|
config.Metainfo.MultipleVersions = true
|
||||||
|
|
||||||
|
testplanet.ReconfigureRS(2, 3, 4, 4)(log, index, config)
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||||
@ -1959,6 +1962,16 @@ func TestEndpoint_Object_MultipleVersions(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer ctx.Check(project.Close)
|
defer ctx.Check(project.Close)
|
||||||
|
|
||||||
|
deleteBucket := func(bucketName string) func() error {
|
||||||
|
return func() error {
|
||||||
|
_, err := project.DeleteBucketWithObjects(ctx, bucketName)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("multiple versions", func(t *testing.T) {
|
||||||
|
defer ctx.Check(deleteBucket("multipleversions"))
|
||||||
|
|
||||||
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "multipleversions", "object", testrand.Bytes(10*memory.MiB))
|
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "multipleversions", "object", testrand.Bytes(10*memory.MiB))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
@ -2067,6 +2080,60 @@ func TestEndpoint_Object_MultipleVersions(t *testing.T) {
|
|||||||
require.Equal(t, pendingUploadsLeft, count)
|
require.Equal(t, pendingUploadsLeft, count)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("override object", func(t *testing.T) {
|
||||||
|
defer ctx.Check(deleteBucket("bucket"))
|
||||||
|
|
||||||
|
bucketName := "bucket"
|
||||||
|
objectName := "file1"
|
||||||
|
|
||||||
|
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, objectName, testrand.Bytes(5*memory.KiB))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, segments, 1)
|
||||||
|
|
||||||
|
pieceIDs := map[storj.NodeID]storj.PieceID{}
|
||||||
|
for _, piece := range segments[0].Pieces {
|
||||||
|
pieceIDs[piece.StorageNode] = segments[0].RootPieceID.Derive(piece.StorageNode, int32(piece.Number))
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, node := range planet.StorageNodes {
|
||||||
|
pieceID, ok := pieceIDs[node.ID()]
|
||||||
|
require.True(t, ok)
|
||||||
|
piece, err := node.DB.Pieces().Stat(ctx, storage.BlobRef{
|
||||||
|
Namespace: planet.Satellites[0].ID().Bytes(),
|
||||||
|
Key: pieceID.Bytes(),
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, piece)
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedData := testrand.Bytes(5 * memory.KiB)
|
||||||
|
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, objectName, expectedData)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
planet.WaitForStorageNodeDeleters(ctx)
|
||||||
|
|
||||||
|
// verify that old object pieces are not stored on storage nodes anymore
|
||||||
|
for _, node := range planet.StorageNodes {
|
||||||
|
pieceID, ok := pieceIDs[node.ID()]
|
||||||
|
require.True(t, ok)
|
||||||
|
|
||||||
|
piece, err := node.DB.Pieces().Stat(ctx, storage.BlobRef{
|
||||||
|
Namespace: planet.Satellites[0].ID().Bytes(),
|
||||||
|
Key: pieceID.Bytes(),
|
||||||
|
})
|
||||||
|
require.Error(t, err)
|
||||||
|
require.Nil(t, piece)
|
||||||
|
}
|
||||||
|
|
||||||
|
data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], bucketName, objectName)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, expectedData, data)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestEndpoint_Object_CopyObject_MultipleVersions(t *testing.T) {
|
func TestEndpoint_Object_CopyObject_MultipleVersions(t *testing.T) {
|
||||||
|
Loading…
Reference in New Issue
Block a user