diff --git a/monkit.lock b/monkit.lock index a097d9810..102bf2d36 100644 --- a/monkit.lock +++ b/monkit.lock @@ -73,6 +73,7 @@ storj.io/storj/satellite/repair/repairer."download_failed_not_enough_pieces_repa storj.io/storj/satellite/repair/repairer."healthy_ratio_after_repair" FloatVal storj.io/storj/satellite/repair/repairer."healthy_ratio_before_repair" FloatVal storj.io/storj/satellite/repair/repairer."repair_attempts" Meter +storj.io/storj/satellite/repair/repairer."repair_expired" Meter storj.io/storj/satellite/repair/repairer."repair_failed" Meter storj.io/storj/satellite/repair/repairer."repair_nodes_unavailable" Meter storj.io/storj/satellite/repair/repairer."repair_partial" Meter diff --git a/private/testplanet/satellite.go b/private/testplanet/satellite.go index 75daa34b4..2a7d3251f 100644 --- a/private/testplanet/satellite.go +++ b/private/testplanet/satellite.go @@ -46,6 +46,7 @@ import ( "storj.io/storj/satellite/mailservice" "storj.io/storj/satellite/marketingweb" "storj.io/storj/satellite/metainfo" + "storj.io/storj/satellite/metainfo/expireddeletion" "storj.io/storj/satellite/metainfo/piecedeletion" "storj.io/storj/satellite/metrics" "storj.io/storj/satellite/nodestats" @@ -125,6 +126,10 @@ type Satellite struct { Service *gc.Service } + ExpiredDeletion struct { + Chore *expireddeletion.Chore + } + DBCleanup struct { Chore *dbcleanup.Chore } @@ -412,6 +417,10 @@ func (planet *Planet) newSatellites(count int, satelliteDatabases satellitedbtes ConcurrentSends: 1, RunInCore: false, }, + ExpiredDeletion: expireddeletion.Config{ + Interval: defaultInterval, + Enabled: true, + }, DBCleanup: dbcleanup.Config{ SerialsInterval: defaultInterval, }, @@ -590,6 +599,8 @@ func createNewSystem(log *zap.Logger, config satellite.Config, peer *satellite.C system.GarbageCollection.Service = gcPeer.GarbageCollection.Service + system.ExpiredDeletion.Chore = peer.ExpiredDeletion.Chore + system.DBCleanup.Chore = peer.DBCleanup.Chore system.Accounting.Tally = peer.Accounting.Tally diff --git a/satellite/core.go b/satellite/core.go index df5f715f2..bc923246a 100644 --- a/satellite/core.go +++ b/satellite/core.go @@ -35,6 +35,7 @@ import ( "storj.io/storj/satellite/gc" "storj.io/storj/satellite/gracefulexit" "storj.io/storj/satellite/metainfo" + "storj.io/storj/satellite/metainfo/expireddeletion" "storj.io/storj/satellite/metrics" "storj.io/storj/satellite/orders" "storj.io/storj/satellite/overlay" @@ -105,6 +106,10 @@ type Core struct { Service *gc.Service } + ExpiredDeletion struct { + Chore *expireddeletion.Chore + } + DBCleanup struct { Chore *dbcleanup.Chore } @@ -379,6 +384,21 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, } } + { // setup expired segment cleanup + peer.ExpiredDeletion.Chore = expireddeletion.NewChore( + peer.Log.Named("core-expired-deletion"), + config.ExpiredDeletion, + peer.Metainfo.Service, + peer.Metainfo.Loop, + ) + peer.Services.Add(lifecycle.Item{ + Name: "expireddeletion:chore", + Run: peer.ExpiredDeletion.Chore.Run, + }) + peer.Debug.Server.Panel.Add( + debug.Cycle("Expired Segments Chore", peer.ExpiredDeletion.Chore.Loop)) + } + { // setup db cleanup peer.DBCleanup.Chore = dbcleanup.NewChore(peer.Log.Named("dbcleanup"), peer.DB.Orders(), config.DBCleanup) peer.Services.Add(lifecycle.Item{ diff --git a/satellite/metainfo/expireddeletion/chore.go b/satellite/metainfo/expireddeletion/chore.go new file mode 100644 index 000000000..747e9d70f --- /dev/null +++ b/satellite/metainfo/expireddeletion/chore.go @@ -0,0 +1,77 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +package expireddeletion + +import ( + "context" + "time" + + "github.com/spacemonkeygo/monkit/v3" + "github.com/zeebo/errs" + "go.uber.org/zap" + + "storj.io/common/sync2" + "storj.io/storj/satellite/metainfo" +) + +var ( + // Error defines the expireddeletion chore errors class + Error = errs.Class("expireddeletion chore error") + mon = monkit.Package() +) + +// Config contains configurable values for expired segment cleanup +type Config struct { + Interval time.Duration `help:"the time between each attempt to go through the db and clean up expired segments" releaseDefault:"120h" devDefault:"10m"` + Enabled bool `help:"set if expired segment cleanup is enabled or not" releaseDefault:"true" devDefault:"true"` +} + +// Chore implements the expired segment cleanup chore +// +// architecture: Chore +type Chore struct { + log *zap.Logger + config Config + Loop *sync2.Cycle + + metainfo *metainfo.Service + metainfoLoop *metainfo.Loop +} + +// NewChore creates a new instance of the expireddeletion chore +func NewChore(log *zap.Logger, config Config, meta *metainfo.Service, loop *metainfo.Loop) *Chore { + return &Chore{ + log: log, + config: config, + Loop: sync2.NewCycle(config.Interval), + metainfo: meta, + metainfoLoop: loop, + } +} + +// Run starts the expireddeletion loop service +func (chore *Chore) Run(ctx context.Context) (err error) { + defer mon.Task()(&ctx)(&err) + + if !chore.config.Enabled { + return nil + } + + return chore.Loop.Run(ctx, func(ctx context.Context) (err error) { + defer mon.Task()(&ctx)(&err) + + deleter := &expiredDeleter{ + log: chore.log.Named("expired deleter observer"), + metainfo: chore.metainfo, + } + + // delete expired segments + err = chore.metainfoLoop.Join(ctx, deleter) + if err != nil { + chore.log.Error("error joining metainfoloop", zap.Error(err)) + return nil + } + return nil + }) +} diff --git a/satellite/metainfo/expireddeletion/doc.go b/satellite/metainfo/expireddeletion/doc.go new file mode 100644 index 000000000..0d19081ed --- /dev/null +++ b/satellite/metainfo/expireddeletion/doc.go @@ -0,0 +1,14 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +/* +Package expireddeletion contains the functions needed to run expired segment deletion + +The expireddeletion.expiredDeleter implements the metainfo loop Observer interface +allowing us to subscribe to the loop to get information for every segment +in the metainfo database. + +The expireddeletion chore will subscribe the deleter to the metainfo loop +and delete any expired segments from metainfo. +*/ +package expireddeletion diff --git a/satellite/metainfo/expireddeletion/expireddeleter.go b/satellite/metainfo/expireddeletion/expireddeleter.go new file mode 100644 index 000000000..2c280a99b --- /dev/null +++ b/satellite/metainfo/expireddeletion/expireddeleter.go @@ -0,0 +1,65 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +package expireddeletion + +import ( + "context" + "time" + + "go.uber.org/zap" + + "storj.io/common/pb" + "storj.io/common/storj" + "storj.io/storj/satellite/metainfo" + "storj.io/storj/storage" +) + +var _ metainfo.Observer = (*expiredDeleter)(nil) + +// expiredDeleter implements the metainfo loop observer interface for expired segment cleanup +// +// architecture: Observer +type expiredDeleter struct { + log *zap.Logger + metainfo *metainfo.Service +} + +// RemoteSegment deletes the segment if it is expired +func (ed *expiredDeleter) RemoteSegment(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error) { + defer mon.Task()(&ctx, path.Raw)(&err) + + return ed.deleteSegmentIfExpired(ctx, path, pointer) +} + +// InlineSegment deletes the segment if it is expired +func (ed *expiredDeleter) InlineSegment(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error) { + defer mon.Task()(&ctx, path.Raw)(&err) + + return ed.deleteSegmentIfExpired(ctx, path, pointer) +} + +// Object returns nil because the expired deleter only cares about segments +func (ed *expiredDeleter) Object(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error) { + return nil +} + +func (ed *expiredDeleter) deleteSegmentIfExpired(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) error { + // delete segment if expired + if !pointer.ExpirationDate.IsZero() && pointer.ExpirationDate.Before(time.Now().UTC()) { + pointerBytes, err := pb.Marshal(pointer) + if err != nil { + return err + } + err = ed.metainfo.Delete(ctx, path.Raw, pointerBytes) + if storj.ErrObjectNotFound.Has(err) { + // segment already deleted + return nil + } else if storage.ErrValueChanged.Has(err) { + // segment was replaced + return nil + } + return err + } + return nil +} diff --git a/satellite/metainfo/expireddeletion/expireddeletion_test.go b/satellite/metainfo/expireddeletion/expireddeletion_test.go new file mode 100644 index 000000000..1d21eea22 --- /dev/null +++ b/satellite/metainfo/expireddeletion/expireddeletion_test.go @@ -0,0 +1,119 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +package expireddeletion_test + +import ( + "context" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "storj.io/common/memory" + "storj.io/common/pb" + "storj.io/common/testcontext" + "storj.io/common/testrand" + "storj.io/storj/private/testplanet" + "storj.io/storj/satellite" + "storj.io/storj/storage" +) + +// TestExpiredDeletion does the following: +// * Set up a network with one storagenode +// * Upload three segments +// * Run the expired segment chore +// * Verify that all three segments still exist +// * Expire one of the segments +// * Run the expired segment chore +// * Verify that two segments still exist and the expired one has been deleted +func TestExpiredDeletion(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: func(log *zap.Logger, index int, config *satellite.Config) { + config.ExpiredDeletion.Interval = 500 * time.Millisecond + }, + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + satellite := planet.Satellites[0] + upl := planet.Uplinks[0] + expiredChore := satellite.Core.ExpiredDeletion.Chore + + // Upload three objects + for i := 0; i < 4; i++ { + // 0 and 1 inline, 2 and 3 remote + testData := testrand.Bytes(8 * memory.KiB) + if i <= 1 { + testData = testrand.Bytes(1 * memory.KiB) + + } + err := upl.Upload(ctx, satellite, "testbucket", "test/path/"+strconv.Itoa(i), testData) + require.NoError(t, err) + } + + // Wait for next iteration of expired cleanup to finish + expiredChore.Loop.Restart() + expiredChore.Loop.TriggerWait() + + // Verify that all four objects exist in metainfo + var expiredInline storage.ListItem + var expiredRemote storage.ListItem + i := 0 + err := satellite.Metainfo.Database.Iterate(ctx, storage.IterateOptions{Recurse: true}, + func(ctx context.Context, it storage.Iterator) error { + var item storage.ListItem + for it.Next(ctx, &item) { + if i == 1 { + expiredInline = item + } else if i == 3 { + expiredRemote = item + } + i++ + } + return nil + }) + require.NoError(t, err) + require.EqualValues(t, i, 4) + + // Expire one inline segment and one remote segment + pointer := &pb.Pointer{} + err = pb.Unmarshal(expiredInline.Value, pointer) + require.NoError(t, err) + pointer.ExpirationDate = time.Now().Add(-24 * time.Hour) + newPointerBytes, err := pb.Marshal(pointer) + require.NoError(t, err) + err = satellite.Metainfo.Database.CompareAndSwap(ctx, expiredInline.Key, expiredInline.Value, newPointerBytes) + require.NoError(t, err) + + err = pb.Unmarshal(expiredRemote.Value, pointer) + require.NoError(t, err) + pointer.ExpirationDate = time.Now().Add(-24 * time.Hour) + newPointerBytes, err = pb.Marshal(pointer) + require.NoError(t, err) + err = satellite.Metainfo.Database.CompareAndSwap(ctx, expiredRemote.Key, expiredRemote.Value, newPointerBytes) + require.NoError(t, err) + + // Wait for next iteration of expired cleanup to finish + expiredChore.Loop.Restart() + expiredChore.Loop.TriggerWait() + + // Verify that only two objects exist in metainfo + // And that the expired ones do not exist + i = 0 + err = satellite.Metainfo.Database.Iterate(ctx, storage.IterateOptions{Recurse: true}, + func(ctx context.Context, it storage.Iterator) error { + var item storage.ListItem + for it.Next(ctx, &item) { + require.False(t, item.Key.Equal(expiredInline.Key)) + require.False(t, item.Key.Equal(expiredRemote.Key)) + i++ + } + return nil + }) + require.NoError(t, err) + require.EqualValues(t, i, 2) + }) +} diff --git a/satellite/peer.go b/satellite/peer.go index 8ebdb1c78..0ff6a63c2 100644 --- a/satellite/peer.go +++ b/satellite/peer.go @@ -32,6 +32,7 @@ import ( "storj.io/storj/satellite/mailservice" "storj.io/storj/satellite/marketingweb" "storj.io/storj/satellite/metainfo" + "storj.io/storj/satellite/metainfo/expireddeletion" "storj.io/storj/satellite/metrics" "storj.io/storj/satellite/orders" "storj.io/storj/satellite/overlay" @@ -117,6 +118,8 @@ type Config struct { GarbageCollection gc.Config + ExpiredDeletion expireddeletion.Config + DBCleanup dbcleanup.Config Tally tally.Config diff --git a/satellite/repair/checker/checker.go b/satellite/repair/checker/checker.go index b23004fe7..87b1f8b1e 100644 --- a/satellite/repair/checker/checker.go +++ b/satellite/repair/checker/checker.go @@ -241,6 +241,11 @@ type checkerObserver struct { func (obs *checkerObserver) RemoteSegment(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error) { defer mon.Task()(&ctx)(&err) + // ignore pointer if expired + if !pointer.ExpirationDate.IsZero() && pointer.ExpirationDate.Before(time.Now().UTC()) { + return nil + } + obs.monStats.remoteSegmentsChecked++ remote := pointer.GetRemote() diff --git a/satellite/repair/checker/checker_test.go b/satellite/repair/checker/checker_test.go index 256b0a234..857c4d73e 100644 --- a/satellite/repair/checker/checker_test.go +++ b/satellite/repair/checker/checker_test.go @@ -42,26 +42,30 @@ func TestIdentifyInjuredSegments(t *testing.T) { // add some valid pointers for x := 0; x < 10; x++ { - insertPointer(ctx, t, planet, rs, pointerPathPrefix+fmt.Sprintf("a-%d", x), false) + insertPointer(ctx, t, planet, rs, pointerPathPrefix+fmt.Sprintf("a-%d", x), false, time.Time{}) } // add pointer that needs repair - insertPointer(ctx, t, planet, rs, pointerPathPrefix+"b", true) + insertPointer(ctx, t, planet, rs, pointerPathPrefix+"b-0", true, time.Time{}) + + // add pointer that is unhealthy, but is expired + insertPointer(ctx, t, planet, rs, pointerPathPrefix+"b-1", true, time.Now().Add(-time.Hour)) // add some valid pointers for x := 0; x < 10; x++ { - insertPointer(ctx, t, planet, rs, pointerPathPrefix+fmt.Sprintf("c-%d", x), false) + insertPointer(ctx, t, planet, rs, pointerPathPrefix+fmt.Sprintf("c-%d", x), false, time.Time{}) } checker.Loop.TriggerWait() - //check if the expected segments were added to the queue + // check that the unhealthy, non-expired segment was added to the queue + // and that the expired segment was ignored injuredSegment, err := repairQueue.Select(ctx) require.NoError(t, err) err = repairQueue.Delete(ctx, injuredSegment) require.NoError(t, err) - require.Equal(t, []byte(pointerPathPrefix+"b"), injuredSegment.Path) + require.Equal(t, []byte(pointerPathPrefix+"b-0"), injuredSegment.Path) require.Equal(t, int(rs.SuccessThreshold-rs.MinReq), len(injuredSegment.LostPieces)) for _, lostPiece := range injuredSegment.LostPieces { require.True(t, rs.MinReq <= lostPiece && lostPiece < rs.SuccessThreshold, fmt.Sprintf("%v", lostPiece)) @@ -126,6 +130,10 @@ func TestIdentifyIrreparableSegments(t *testing.T) { metainfo := planet.Satellites[0].Metainfo.Service err := metainfo.Put(ctx, pointerPath, pointer) require.NoError(t, err) + // modify pointer to make it expired and put to db + pointer.ExpirationDate = time.Now().Add(-time.Hour) + err = metainfo.Put(ctx, pointerPath+"-expired", pointer) + require.NoError(t, err) err = checker.IdentifyInjuredSegments(ctx) require.NoError(t, err) @@ -139,6 +147,9 @@ func TestIdentifyIrreparableSegments(t *testing.T) { irreparable := planet.Satellites[0].DB.Irreparable() remoteSegmentInfo, err := irreparable.Get(ctx, []byte(pointerPath)) require.NoError(t, err) + // check that the expired segment was not added to the irreparable DB + _, err = irreparable.Get(ctx, []byte(pointerPath+"-expired")) + require.Error(t, err) require.Equal(t, len(expectedLostPieces), int(remoteSegmentInfo.LostPieces)) require.Equal(t, 1, int(remoteSegmentInfo.RepairAttemptCount)) @@ -187,7 +198,7 @@ func TestIdentifyIrreparableSegments(t *testing.T) { }) } -func insertPointer(ctx context.Context, t *testing.T, planet *testplanet.Planet, rs *pb.RedundancyScheme, pointerPath string, createLost bool) { +func insertPointer(ctx context.Context, t *testing.T, planet *testplanet.Planet, rs *pb.RedundancyScheme, pointerPath string, createLost bool, expire time.Time) { pieces := make([]*pb.RemotePiece, rs.SuccessThreshold) if !createLost { for i := range pieces { @@ -220,6 +231,9 @@ func insertPointer(ctx context.Context, t *testing.T, planet *testplanet.Planet, RemotePieces: pieces, }, } + if !expire.IsZero() { + pointer.ExpirationDate = expire + } // put test pointer to db pointerdb := planet.Satellites[0].Metainfo.Service diff --git a/satellite/repair/repair_test.go b/satellite/repair/repair_test.go index f8cec7bb4..bca38e22e 100644 --- a/satellite/repair/repair_test.go +++ b/satellite/repair/repair_test.go @@ -405,6 +405,94 @@ func testCorruptDataRepairSucceed(t *testing.T, inMemoryRepair bool) { }) } +// TestRemoveExpiredSegmentFromQueue +// - Upload tests data to 7 nodes +// - Kill nodes so that repair threshold > online nodes > minimum threshold +// - Call checker to add segment to the repair queue +// - Modify segment to be expired +// - Run the repairer +// - Verify segment is no longer in the repair queue +func TestRemoveExpiredSegmentFromQueue(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, + StorageNodeCount: 10, + UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: testplanet.ReconfigureRS(3, 5, 7, 7), + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + // first, upload some remote data + uplinkPeer := planet.Uplinks[0] + satellite := planet.Satellites[0] + // stop audit to prevent possible interactions i.e. repair timeout problems + satellite.Audit.Worker.Loop.Stop() + + satellite.Repair.Checker.Loop.Pause() + satellite.Repair.Repairer.Loop.Pause() + + testData := testrand.Bytes(8 * memory.KiB) + + err := uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path", testData) + require.NoError(t, err) + + pointer, _ := getRemoteSegment(t, ctx, satellite) + + // kill nodes and track lost pieces + nodesToDQ := make(map[storj.NodeID]bool) + + // Kill 3 nodes so that pointer has 4 left (less than repair threshold) + toKill := 3 + + remotePieces := pointer.GetRemote().GetRemotePieces() + + for i, piece := range remotePieces { + if i >= toKill { + continue + } + nodesToDQ[piece.NodeId] = true + } + + for nodeID := range nodesToDQ { + err := satellite.DB.OverlayCache().DisqualifyNode(ctx, nodeID) + require.NoError(t, err) + + } + + // trigger checker to add segment to repair queue + satellite.Repair.Checker.Loop.Restart() + satellite.Repair.Checker.Loop.TriggerWait() + satellite.Repair.Checker.Loop.Pause() + + // get encrypted path of segment with audit service + satellite.Audit.Chore.Loop.TriggerWait() + require.EqualValues(t, satellite.Audit.Queue.Size(), 1) + encryptedPath, err := satellite.Audit.Queue.Next() + require.NoError(t, err) + // replace pointer with one that is already expired + pointer.ExpirationDate = time.Now().Add(-time.Hour) + err = satellite.Metainfo.Service.UnsynchronizedDelete(ctx, encryptedPath) + require.NoError(t, err) + err = satellite.Metainfo.Service.UnsynchronizedPut(ctx, encryptedPath, pointer) + require.NoError(t, err) + + // Verify that the segment is on the repair queue + count, err := satellite.DB.RepairQueue().Count(ctx) + require.NoError(t, err) + require.Equal(t, count, 1) + + // Run the repairer + satellite.Repair.Repairer.Loop.Restart() + satellite.Repair.Repairer.Loop.TriggerWait() + satellite.Repair.Repairer.Loop.Pause() + satellite.Repair.Repairer.WaitForPendingRepairs() + + // Verify that the segment was removed + count, err = satellite.DB.RepairQueue().Count(ctx) + require.NoError(t, err) + require.Equal(t, count, 0) + }) +} + // TestRemoveDeletedSegmentFromQueue // - Upload tests data to 7 nodes // - Kill nodes so that repair threshold > online nodes > minimum threshold diff --git a/satellite/repair/repairer/segments.go b/satellite/repair/repairer/segments.go index 359b33afd..e2eb7a68c 100644 --- a/satellite/repair/repairer/segments.go +++ b/satellite/repair/repairer/segments.go @@ -115,6 +115,11 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (s return true, invalidRepairError.New("cannot repair inline segment") } + if !pointer.ExpirationDate.IsZero() && pointer.ExpirationDate.Before(time.Now().UTC()) { + mon.Meter("repair_expired").Mark(1) //locked + return true, nil + } + mon.Meter("repair_attempts").Mark(1) //locked mon.IntVal("repair_segment_size").Observe(pointer.GetSegmentSize()) //locked diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index eca85ec74..e089165c2 100644 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -160,6 +160,12 @@ contact.external-address: "" # how often to run the downtime estimation chore # downtime.estimation-interval: 1h0m0s +# set if expired segment cleanup is enabled or not +# expired-deletion.enabled: true + +# the time between each attempt to go through the db and clean up expired segments +# expired-deletion.interval: 120h0m0s + # the number of nodes to concurrently send garbage collection bloom filters to # garbage-collection.concurrent-sends: 1