From 46839b992a21afb8cac991eb012c23c492adfcfa Mon Sep 17 00:00:00 2001 From: Kaloyan Raev Date: Mon, 30 Nov 2020 14:33:06 +0200 Subject: [PATCH] satellite/metainfo: expired deletion service to use Metabase It also sets the Expires time correctly in the Metabase when uploading objects. Change-Id: Iec1b3ec8d11346a91dfc2ba23a1b08edec4a84d3 --- satellite/core.go | 8 +- satellite/metainfo/config.go | 2 + satellite/metainfo/expireddeletion/chore.go | 60 ++++----- .../expireddeletion/expireddeleter.go | 64 ---------- .../expireddeletion/expireddeletion_test.go | 119 +++++++----------- satellite/metainfo/metainfo.go | 8 ++ 6 files changed, 92 insertions(+), 169 deletions(-) delete mode 100644 satellite/metainfo/expireddeletion/expireddeleter.go diff --git a/satellite/core.go b/satellite/core.go index 37bc5a788..74b5e5103 100644 --- a/satellite/core.go +++ b/satellite/core.go @@ -385,12 +385,12 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, peer.ExpiredDeletion.Chore = expireddeletion.NewChore( peer.Log.Named("core-expired-deletion"), config.ExpiredDeletion, - peer.Metainfo.Service, - peer.Metainfo.Loop, + peer.Metainfo.Metabase, ) peer.Services.Add(lifecycle.Item{ - Name: "expireddeletion:chore", - Run: peer.ExpiredDeletion.Chore.Run, + Name: "expireddeletion:chore", + Run: peer.ExpiredDeletion.Chore.Run, + Close: peer.ExpiredDeletion.Chore.Close, }) peer.Debug.Server.Panel.Add( debug.Cycle("Expired Segments Chore", peer.ExpiredDeletion.Chore.Loop)) diff --git a/satellite/metainfo/config.go b/satellite/metainfo/config.go index 099678683..8783abce2 100644 --- a/satellite/metainfo/config.go +++ b/satellite/metainfo/config.go @@ -170,6 +170,8 @@ type MetabaseDB interface { MigrateToLatest(ctx context.Context) error // DeleteObjectsAllVersions deletes all versions of multiple objects from the same bucket. DeleteObjectsAllVersions(ctx context.Context, opts metabase.DeleteObjectsAllVersions) (result metabase.DeleteObjectResult, err error) + // DeleteExpiredObjects deletes all objects that expired before expiredBefore. + DeleteExpiredObjects(ctx context.Context, expiredBefore time.Time) error // BeginObjectExactVersion adds a pending object to the database, with specific version. BeginObjectExactVersion(ctx context.Context, opts metabase.BeginObjectExactVersion) (committed metabase.Version, err error) // CommitObject adds a pending object to the database. diff --git a/satellite/metainfo/expireddeletion/chore.go b/satellite/metainfo/expireddeletion/chore.go index 46df1194c..64587d215 100644 --- a/satellite/metainfo/expireddeletion/chore.go +++ b/satellite/metainfo/expireddeletion/chore.go @@ -23,7 +23,7 @@ var ( // Config contains configurable values for expired segment cleanup. type Config struct { - Interval time.Duration `help:"the time between each attempt to go through the db and clean up expired segments" releaseDefault:"120h" devDefault:"10m"` + Interval time.Duration `help:"the time between each attempt to go through the db and clean up expired segments" releaseDefault:"120h" devDefault:"10s"` Enabled bool `help:"set if expired segment cleanup is enabled or not" releaseDefault:"true" devDefault:"true"` } @@ -31,22 +31,23 @@ type Config struct { // // architecture: Chore type Chore struct { - log *zap.Logger - config Config - Loop *sync2.Cycle + log *zap.Logger + config Config + metabase metainfo.MetabaseDB - metainfo *metainfo.Service - metainfoLoop *metainfo.Loop + nowFn func() time.Time + Loop *sync2.Cycle } // NewChore creates a new instance of the expireddeletion chore. -func NewChore(log *zap.Logger, config Config, meta *metainfo.Service, loop *metainfo.Loop) *Chore { +func NewChore(log *zap.Logger, config Config, metabase metainfo.MetabaseDB) *Chore { return &Chore{ - log: log, - config: config, - Loop: sync2.NewCycle(config.Interval), - metainfo: meta, - metainfoLoop: loop, + log: log, + config: config, + metabase: metabase, + + nowFn: time.Now, + Loop: sync2.NewCycle(config.Interval), } } @@ -58,20 +59,23 @@ func (chore *Chore) Run(ctx context.Context) (err error) { return nil } - return chore.Loop.Run(ctx, func(ctx context.Context) (err error) { - defer mon.Task()(&ctx)(&err) - - deleter := &expiredDeleter{ - log: chore.log.Named("expired deleter observer"), - metainfo: chore.metainfo, - } - - // delete expired segments - err = chore.metainfoLoop.Join(ctx, deleter) - if err != nil { - chore.log.Error("error joining metainfoloop", zap.Error(err)) - return nil - } - return nil - }) + return chore.Loop.Run(ctx, chore.deleteExpiredObjects) +} + +// Close stops the expireddeletion chore. +func (chore *Chore) Close() error { + chore.Loop.Close() + return nil +} + +// SetNow allows tests to have the server act as if the current time is whatever they want. +func (chore *Chore) SetNow(nowFn func() time.Time) { + chore.nowFn = nowFn +} + +func (chore *Chore) deleteExpiredObjects(ctx context.Context) (err error) { + defer mon.Task()(&ctx)(&err) + chore.log.Debug("deleting expired objects") + + return chore.metabase.DeleteExpiredObjects(ctx, chore.nowFn()) } diff --git a/satellite/metainfo/expireddeletion/expireddeleter.go b/satellite/metainfo/expireddeletion/expireddeleter.go deleted file mode 100644 index 2d10d2b53..000000000 --- a/satellite/metainfo/expireddeletion/expireddeleter.go +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright (C) 2020 Storj Labs, Inc. -// See LICENSE for copying information. - -package expireddeletion - -import ( - "context" - "time" - - "go.uber.org/zap" - - "storj.io/common/pb" - "storj.io/common/storj" - "storj.io/storj/satellite/metainfo" - "storj.io/storj/storage" -) - -var _ metainfo.Observer = (*expiredDeleter)(nil) - -// expiredDeleter implements the metainfo loop observer interface for expired segment cleanup -// -// architecture: Observer -type expiredDeleter struct { - log *zap.Logger - metainfo *metainfo.Service -} - -// RemoteSegment deletes the segment if it is expired. -func (ed *expiredDeleter) RemoteSegment(ctx context.Context, segment *metainfo.Segment) (err error) { - defer mon.Task()(&ctx)(&err) - - return ed.deleteSegmentIfExpired(ctx, segment) -} - -// InlineSegment deletes the segment if it is expired. -func (ed *expiredDeleter) InlineSegment(ctx context.Context, segment *metainfo.Segment) (err error) { - defer mon.Task()(&ctx)(&err) - - return ed.deleteSegmentIfExpired(ctx, segment) -} - -// Object returns nil because the expired deleter only cares about segments. -func (ed *expiredDeleter) Object(ctx context.Context, object *metainfo.Object) (err error) { - return nil -} - -func (ed *expiredDeleter) deleteSegmentIfExpired(ctx context.Context, segment *metainfo.Segment) error { - if segment.Expired(time.Now()) { - pointerBytes, err := pb.Marshal(segment.Pointer) - if err != nil { - return err - } - err = ed.metainfo.Delete(ctx, segment.Location.Encode(), pointerBytes) - if storj.ErrObjectNotFound.Has(err) { - // segment already deleted - return nil - } else if storage.ErrValueChanged.Has(err) { - // segment was replaced - return nil - } - return err - } - return nil -} diff --git a/satellite/metainfo/expireddeletion/expireddeletion_test.go b/satellite/metainfo/expireddeletion/expireddeletion_test.go index e373b6fb4..eaa40c27b 100644 --- a/satellite/metainfo/expireddeletion/expireddeletion_test.go +++ b/satellite/metainfo/expireddeletion/expireddeletion_test.go @@ -5,7 +5,6 @@ package expireddeletion_test import ( "context" - "strconv" "testing" "time" @@ -13,22 +12,13 @@ import ( "go.uber.org/zap" "storj.io/common/memory" - "storj.io/common/pb" "storj.io/common/testcontext" "storj.io/common/testrand" "storj.io/storj/private/testplanet" "storj.io/storj/satellite" - "storj.io/storj/storage" + "storj.io/storj/satellite/metainfo/metabase" ) -// TestExpiredDeletion does the following: -// * Set up a network with one storagenode -// * Upload three segments -// * Run the expired segment chore -// * Verify that all three segments still exist -// * Expire one of the segments -// * Run the expired segment chore -// * Verify that two segments still exist and the expired one has been deleted. func TestExpiredDeletion(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1, @@ -42,78 +32,61 @@ func TestExpiredDeletion(t *testing.T) { upl := planet.Uplinks[0] expiredChore := satellite.Core.ExpiredDeletion.Chore - // Upload three objects - for i := 0; i < 4; i++ { - // 0 and 1 inline, 2 and 3 remote - testData := testrand.Bytes(8 * memory.KiB) - if i <= 1 { - testData = testrand.Bytes(1 * memory.KiB) + expiredChore.Loop.Pause() - } - err := upl.Upload(ctx, satellite, "testbucket", "test/path/"+strconv.Itoa(i), testData) - require.NoError(t, err) - } + // Upload four objects: + // 1. Inline object without expiraton date + err := upl.Upload(ctx, satellite, "testbucket", "inline_no_expire", testrand.Bytes(1*memory.KiB)) + require.NoError(t, err) + // 2. Remote object without expiraton date + err = upl.Upload(ctx, satellite, "testbucket", "remote_no_expire", testrand.Bytes(8*memory.KiB)) + require.NoError(t, err) + // 3. Inline object with expiraton date + err = upl.UploadWithExpiration(ctx, satellite, "testbucket", "inline_expire", testrand.Bytes(1*memory.KiB), time.Now().Add(1*time.Hour)) + require.NoError(t, err) + // 4. Remote object with expiraton date + err = upl.UploadWithExpiration(ctx, satellite, "testbucket", "remote_expire", testrand.Bytes(8*memory.KiB), time.Now().Add(1*time.Hour)) + require.NoError(t, err) - // Wait for next iteration of expired cleanup to finish - expiredChore.Loop.Restart() - expiredChore.Loop.TriggerWait() - - // Verify that all four objects exist in metainfo - var expiredInline storage.ListItem - var expiredRemote storage.ListItem - i := 0 - err := satellite.Metainfo.Database.Iterate(ctx, storage.IterateOptions{Recurse: true}, - func(ctx context.Context, it storage.Iterator) error { - var item storage.ListItem - for it.Next(ctx, &item) { - if i == 1 { - expiredInline = item - } else if i == 3 { - expiredRemote = item - } - i++ + // Verify that all four objects are in the metabase + count := 0 + err = satellite.Metainfo.Metabase.IterateObjectsAllVersions(ctx, + metabase.IterateObjects{ + ProjectID: upl.Projects[0].ID, + BucketName: "testbucket", + Status: metabase.Committed, + }, func(ctx context.Context, it metabase.ObjectsIterator) error { + for it.Next(ctx, &metabase.ObjectEntry{}) { + count++ } return nil - }) + }, + ) require.NoError(t, err) - require.EqualValues(t, i, 4) + require.EqualValues(t, 4, count) - // Expire one inline segment and one remote segment - pointer := &pb.Pointer{} - err = pb.Unmarshal(expiredInline.Value, pointer) - require.NoError(t, err) - pointer.ExpirationDate = time.Now().Add(-24 * time.Hour) - newPointerBytes, err := pb.Marshal(pointer) - require.NoError(t, err) - err = satellite.Metainfo.Database.CompareAndSwap(ctx, expiredInline.Key, expiredInline.Value, newPointerBytes) - require.NoError(t, err) - - err = pb.Unmarshal(expiredRemote.Value, pointer) - require.NoError(t, err) - pointer.ExpirationDate = time.Now().Add(-24 * time.Hour) - newPointerBytes, err = pb.Marshal(pointer) - require.NoError(t, err) - err = satellite.Metainfo.Database.CompareAndSwap(ctx, expiredRemote.Key, expiredRemote.Value, newPointerBytes) - require.NoError(t, err) - - // Wait for next iteration of expired cleanup to finish - expiredChore.Loop.Restart() + // Trigger the next iteration of expired cleanup and wait to finish + expiredChore.SetNow(func() time.Time { + // Set the Now function to return time after the objects expiration time + return time.Now().Add(2 * time.Hour) + }) expiredChore.Loop.TriggerWait() - // Verify that only two objects exist in metainfo - // And that the expired ones do not exist - i = 0 - err = satellite.Metainfo.Database.Iterate(ctx, storage.IterateOptions{Recurse: true}, - func(ctx context.Context, it storage.Iterator) error { - var item storage.ListItem - for it.Next(ctx, &item) { - require.False(t, item.Key.Equal(expiredInline.Key)) - require.False(t, item.Key.Equal(expiredRemote.Key)) - i++ + // Verify that only two objects remain in the metabase + count = 0 + err = satellite.Metainfo.Metabase.IterateObjectsAllVersions(ctx, + metabase.IterateObjects{ + ProjectID: upl.Projects[0].ID, + BucketName: "testbucket", + Status: metabase.Committed, + }, func(ctx context.Context, it metabase.ObjectsIterator) error { + for it.Next(ctx, &metabase.ObjectEntry{}) { + count++ } return nil - }) + }, + ) require.NoError(t, err) - require.EqualValues(t, i, 2) + require.EqualValues(t, 2, count) }) } diff --git a/satellite/metainfo/metainfo.go b/satellite/metainfo/metainfo.go index 2c4c97bfc..502a00140 100644 --- a/satellite/metainfo/metainfo.go +++ b/satellite/metainfo/metainfo.go @@ -697,6 +697,13 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe BlockSize: int32(req.EncryptionParameters.BlockSize), // TODO check conversion } + var expiresAt *time.Time + if req.ExpiresAt.IsZero() { + expiresAt = nil + } else { + expiresAt = &req.ExpiresAt + } + _, err = endpoint.metainfo.metabaseDB.BeginObjectExactVersion(ctx, metabase.BeginObjectExactVersion{ ObjectStream: metabase.ObjectStream{ ProjectID: keyInfo.ProjectID, @@ -705,6 +712,7 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe StreamID: streamID, Version: metabase.Version(1), }, + ExpiresAt: expiresAt, Encryption: encryptionParameters, }) if err != nil {