satellite/{metainfo,repair}: Delete expired segments from metainfo

* Delete expired segments in expired segments service using metainfo
loop
* Add test to verify expired segments service deletes expired segments
* Ignore expired segments in checker observer
* Modify checker tests to verify that expired segments are ignored
* Ignore expired segments in segment repairer and drop from repair queue
* Add repair test to verify that a segment that expires after being
added to the repair queue is ignored and dropped from the repair queue

Change-Id: Ib2b0934db525fef58325583d2a7ca859b88ea60d
This commit is contained in:
Moby von Briesen 2020-04-15 15:20:16 -04:00 committed by Yingrong Zhao
parent e34937317c
commit 178aa8b5e0
13 changed files with 434 additions and 6 deletions

View File

@ -73,6 +73,7 @@ storj.io/storj/satellite/repair/repairer."download_failed_not_enough_pieces_repa
storj.io/storj/satellite/repair/repairer."healthy_ratio_after_repair" FloatVal
storj.io/storj/satellite/repair/repairer."healthy_ratio_before_repair" FloatVal
storj.io/storj/satellite/repair/repairer."repair_attempts" Meter
storj.io/storj/satellite/repair/repairer."repair_expired" Meter
storj.io/storj/satellite/repair/repairer."repair_failed" Meter
storj.io/storj/satellite/repair/repairer."repair_nodes_unavailable" Meter
storj.io/storj/satellite/repair/repairer."repair_partial" Meter

View File

@ -46,6 +46,7 @@ import (
"storj.io/storj/satellite/mailservice"
"storj.io/storj/satellite/marketingweb"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/metainfo/expireddeletion"
"storj.io/storj/satellite/metainfo/piecedeletion"
"storj.io/storj/satellite/metrics"
"storj.io/storj/satellite/nodestats"
@ -125,6 +126,10 @@ type Satellite struct {
Service *gc.Service
}
ExpiredDeletion struct {
Chore *expireddeletion.Chore
}
DBCleanup struct {
Chore *dbcleanup.Chore
}
@ -412,6 +417,10 @@ func (planet *Planet) newSatellites(count int, satelliteDatabases satellitedbtes
ConcurrentSends: 1,
RunInCore: false,
},
ExpiredDeletion: expireddeletion.Config{
Interval: defaultInterval,
Enabled: true,
},
DBCleanup: dbcleanup.Config{
SerialsInterval: defaultInterval,
},
@ -590,6 +599,8 @@ func createNewSystem(log *zap.Logger, config satellite.Config, peer *satellite.C
system.GarbageCollection.Service = gcPeer.GarbageCollection.Service
system.ExpiredDeletion.Chore = peer.ExpiredDeletion.Chore
system.DBCleanup.Chore = peer.DBCleanup.Chore
system.Accounting.Tally = peer.Accounting.Tally

View File

@ -35,6 +35,7 @@ import (
"storj.io/storj/satellite/gc"
"storj.io/storj/satellite/gracefulexit"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/metainfo/expireddeletion"
"storj.io/storj/satellite/metrics"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
@ -105,6 +106,10 @@ type Core struct {
Service *gc.Service
}
ExpiredDeletion struct {
Chore *expireddeletion.Chore
}
DBCleanup struct {
Chore *dbcleanup.Chore
}
@ -379,6 +384,21 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
}
}
{ // setup expired segment cleanup
peer.ExpiredDeletion.Chore = expireddeletion.NewChore(
peer.Log.Named("core-expired-deletion"),
config.ExpiredDeletion,
peer.Metainfo.Service,
peer.Metainfo.Loop,
)
peer.Services.Add(lifecycle.Item{
Name: "expireddeletion:chore",
Run: peer.ExpiredDeletion.Chore.Run,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Expired Segments Chore", peer.ExpiredDeletion.Chore.Loop))
}
{ // setup db cleanup
peer.DBCleanup.Chore = dbcleanup.NewChore(peer.Log.Named("dbcleanup"), peer.DB.Orders(), config.DBCleanup)
peer.Services.Add(lifecycle.Item{

View File

@ -0,0 +1,77 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package expireddeletion
import (
"context"
"time"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/sync2"
"storj.io/storj/satellite/metainfo"
)
var (
// Error defines the expireddeletion chore errors class
Error = errs.Class("expireddeletion chore error")
mon = monkit.Package()
)
// Config contains configurable values for expired segment cleanup
type Config struct {
Interval time.Duration `help:"the time between each attempt to go through the db and clean up expired segments" releaseDefault:"120h" devDefault:"10m"`
Enabled bool `help:"set if expired segment cleanup is enabled or not" releaseDefault:"true" devDefault:"true"`
}
// Chore implements the expired segment cleanup chore
//
// architecture: Chore
type Chore struct {
log *zap.Logger
config Config
Loop *sync2.Cycle
metainfo *metainfo.Service
metainfoLoop *metainfo.Loop
}
// NewChore creates a new instance of the expireddeletion chore
func NewChore(log *zap.Logger, config Config, meta *metainfo.Service, loop *metainfo.Loop) *Chore {
return &Chore{
log: log,
config: config,
Loop: sync2.NewCycle(config.Interval),
metainfo: meta,
metainfoLoop: loop,
}
}
// Run starts the expireddeletion loop service
func (chore *Chore) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
if !chore.config.Enabled {
return nil
}
return chore.Loop.Run(ctx, func(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
deleter := &expiredDeleter{
log: chore.log.Named("expired deleter observer"),
metainfo: chore.metainfo,
}
// delete expired segments
err = chore.metainfoLoop.Join(ctx, deleter)
if err != nil {
chore.log.Error("error joining metainfoloop", zap.Error(err))
return nil
}
return nil
})
}

View File

@ -0,0 +1,14 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
/*
Package expireddeletion contains the functions needed to run expired segment deletion
The expireddeletion.expiredDeleter implements the metainfo loop Observer interface
allowing us to subscribe to the loop to get information for every segment
in the metainfo database.
The expireddeletion chore will subscribe the deleter to the metainfo loop
and delete any expired segments from metainfo.
*/
package expireddeletion

View File

@ -0,0 +1,65 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package expireddeletion
import (
"context"
"time"
"go.uber.org/zap"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/storage"
)
var _ metainfo.Observer = (*expiredDeleter)(nil)
// expiredDeleter implements the metainfo loop observer interface for expired segment cleanup
//
// architecture: Observer
type expiredDeleter struct {
log *zap.Logger
metainfo *metainfo.Service
}
// RemoteSegment deletes the segment if it is expired
func (ed *expiredDeleter) RemoteSegment(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error) {
defer mon.Task()(&ctx, path.Raw)(&err)
return ed.deleteSegmentIfExpired(ctx, path, pointer)
}
// InlineSegment deletes the segment if it is expired
func (ed *expiredDeleter) InlineSegment(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error) {
defer mon.Task()(&ctx, path.Raw)(&err)
return ed.deleteSegmentIfExpired(ctx, path, pointer)
}
// Object returns nil because the expired deleter only cares about segments
func (ed *expiredDeleter) Object(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error) {
return nil
}
func (ed *expiredDeleter) deleteSegmentIfExpired(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) error {
// delete segment if expired
if !pointer.ExpirationDate.IsZero() && pointer.ExpirationDate.Before(time.Now().UTC()) {
pointerBytes, err := pb.Marshal(pointer)
if err != nil {
return err
}
err = ed.metainfo.Delete(ctx, path.Raw, pointerBytes)
if storj.ErrObjectNotFound.Has(err) {
// segment already deleted
return nil
} else if storage.ErrValueChanged.Has(err) {
// segment was replaced
return nil
}
return err
}
return nil
}

View File

@ -0,0 +1,119 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package expireddeletion_test
import (
"context"
"strconv"
"testing"
"time"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/storage"
)
// TestExpiredDeletion does the following:
// * Set up a network with one storagenode
// * Upload three segments
// * Run the expired segment chore
// * Verify that all three segments still exist
// * Expire one of the segments
// * Run the expired segment chore
// * Verify that two segments still exist and the expired one has been deleted
func TestExpiredDeletion(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.ExpiredDeletion.Interval = 500 * time.Millisecond
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
upl := planet.Uplinks[0]
expiredChore := satellite.Core.ExpiredDeletion.Chore
// Upload three objects
for i := 0; i < 4; i++ {
// 0 and 1 inline, 2 and 3 remote
testData := testrand.Bytes(8 * memory.KiB)
if i <= 1 {
testData = testrand.Bytes(1 * memory.KiB)
}
err := upl.Upload(ctx, satellite, "testbucket", "test/path/"+strconv.Itoa(i), testData)
require.NoError(t, err)
}
// Wait for next iteration of expired cleanup to finish
expiredChore.Loop.Restart()
expiredChore.Loop.TriggerWait()
// Verify that all four objects exist in metainfo
var expiredInline storage.ListItem
var expiredRemote storage.ListItem
i := 0
err := satellite.Metainfo.Database.Iterate(ctx, storage.IterateOptions{Recurse: true},
func(ctx context.Context, it storage.Iterator) error {
var item storage.ListItem
for it.Next(ctx, &item) {
if i == 1 {
expiredInline = item
} else if i == 3 {
expiredRemote = item
}
i++
}
return nil
})
require.NoError(t, err)
require.EqualValues(t, i, 4)
// Expire one inline segment and one remote segment
pointer := &pb.Pointer{}
err = pb.Unmarshal(expiredInline.Value, pointer)
require.NoError(t, err)
pointer.ExpirationDate = time.Now().Add(-24 * time.Hour)
newPointerBytes, err := pb.Marshal(pointer)
require.NoError(t, err)
err = satellite.Metainfo.Database.CompareAndSwap(ctx, expiredInline.Key, expiredInline.Value, newPointerBytes)
require.NoError(t, err)
err = pb.Unmarshal(expiredRemote.Value, pointer)
require.NoError(t, err)
pointer.ExpirationDate = time.Now().Add(-24 * time.Hour)
newPointerBytes, err = pb.Marshal(pointer)
require.NoError(t, err)
err = satellite.Metainfo.Database.CompareAndSwap(ctx, expiredRemote.Key, expiredRemote.Value, newPointerBytes)
require.NoError(t, err)
// Wait for next iteration of expired cleanup to finish
expiredChore.Loop.Restart()
expiredChore.Loop.TriggerWait()
// Verify that only two objects exist in metainfo
// And that the expired ones do not exist
i = 0
err = satellite.Metainfo.Database.Iterate(ctx, storage.IterateOptions{Recurse: true},
func(ctx context.Context, it storage.Iterator) error {
var item storage.ListItem
for it.Next(ctx, &item) {
require.False(t, item.Key.Equal(expiredInline.Key))
require.False(t, item.Key.Equal(expiredRemote.Key))
i++
}
return nil
})
require.NoError(t, err)
require.EqualValues(t, i, 2)
})
}

View File

@ -32,6 +32,7 @@ import (
"storj.io/storj/satellite/mailservice"
"storj.io/storj/satellite/marketingweb"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/metainfo/expireddeletion"
"storj.io/storj/satellite/metrics"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
@ -117,6 +118,8 @@ type Config struct {
GarbageCollection gc.Config
ExpiredDeletion expireddeletion.Config
DBCleanup dbcleanup.Config
Tally tally.Config

View File

@ -241,6 +241,11 @@ type checkerObserver struct {
func (obs *checkerObserver) RemoteSegment(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error) {
defer mon.Task()(&ctx)(&err)
// ignore pointer if expired
if !pointer.ExpirationDate.IsZero() && pointer.ExpirationDate.Before(time.Now().UTC()) {
return nil
}
obs.monStats.remoteSegmentsChecked++
remote := pointer.GetRemote()

View File

@ -42,26 +42,30 @@ func TestIdentifyInjuredSegments(t *testing.T) {
// add some valid pointers
for x := 0; x < 10; x++ {
insertPointer(ctx, t, planet, rs, pointerPathPrefix+fmt.Sprintf("a-%d", x), false)
insertPointer(ctx, t, planet, rs, pointerPathPrefix+fmt.Sprintf("a-%d", x), false, time.Time{})
}
// add pointer that needs repair
insertPointer(ctx, t, planet, rs, pointerPathPrefix+"b", true)
insertPointer(ctx, t, planet, rs, pointerPathPrefix+"b-0", true, time.Time{})
// add pointer that is unhealthy, but is expired
insertPointer(ctx, t, planet, rs, pointerPathPrefix+"b-1", true, time.Now().Add(-time.Hour))
// add some valid pointers
for x := 0; x < 10; x++ {
insertPointer(ctx, t, planet, rs, pointerPathPrefix+fmt.Sprintf("c-%d", x), false)
insertPointer(ctx, t, planet, rs, pointerPathPrefix+fmt.Sprintf("c-%d", x), false, time.Time{})
}
checker.Loop.TriggerWait()
//check if the expected segments were added to the queue
// check that the unhealthy, non-expired segment was added to the queue
// and that the expired segment was ignored
injuredSegment, err := repairQueue.Select(ctx)
require.NoError(t, err)
err = repairQueue.Delete(ctx, injuredSegment)
require.NoError(t, err)
require.Equal(t, []byte(pointerPathPrefix+"b"), injuredSegment.Path)
require.Equal(t, []byte(pointerPathPrefix+"b-0"), injuredSegment.Path)
require.Equal(t, int(rs.SuccessThreshold-rs.MinReq), len(injuredSegment.LostPieces))
for _, lostPiece := range injuredSegment.LostPieces {
require.True(t, rs.MinReq <= lostPiece && lostPiece < rs.SuccessThreshold, fmt.Sprintf("%v", lostPiece))
@ -126,6 +130,10 @@ func TestIdentifyIrreparableSegments(t *testing.T) {
metainfo := planet.Satellites[0].Metainfo.Service
err := metainfo.Put(ctx, pointerPath, pointer)
require.NoError(t, err)
// modify pointer to make it expired and put to db
pointer.ExpirationDate = time.Now().Add(-time.Hour)
err = metainfo.Put(ctx, pointerPath+"-expired", pointer)
require.NoError(t, err)
err = checker.IdentifyInjuredSegments(ctx)
require.NoError(t, err)
@ -139,6 +147,9 @@ func TestIdentifyIrreparableSegments(t *testing.T) {
irreparable := planet.Satellites[0].DB.Irreparable()
remoteSegmentInfo, err := irreparable.Get(ctx, []byte(pointerPath))
require.NoError(t, err)
// check that the expired segment was not added to the irreparable DB
_, err = irreparable.Get(ctx, []byte(pointerPath+"-expired"))
require.Error(t, err)
require.Equal(t, len(expectedLostPieces), int(remoteSegmentInfo.LostPieces))
require.Equal(t, 1, int(remoteSegmentInfo.RepairAttemptCount))
@ -187,7 +198,7 @@ func TestIdentifyIrreparableSegments(t *testing.T) {
})
}
func insertPointer(ctx context.Context, t *testing.T, planet *testplanet.Planet, rs *pb.RedundancyScheme, pointerPath string, createLost bool) {
func insertPointer(ctx context.Context, t *testing.T, planet *testplanet.Planet, rs *pb.RedundancyScheme, pointerPath string, createLost bool, expire time.Time) {
pieces := make([]*pb.RemotePiece, rs.SuccessThreshold)
if !createLost {
for i := range pieces {
@ -220,6 +231,9 @@ func insertPointer(ctx context.Context, t *testing.T, planet *testplanet.Planet,
RemotePieces: pieces,
},
}
if !expire.IsZero() {
pointer.ExpirationDate = expire
}
// put test pointer to db
pointerdb := planet.Satellites[0].Metainfo.Service

View File

@ -405,6 +405,94 @@ func testCorruptDataRepairSucceed(t *testing.T, inMemoryRepair bool) {
})
}
// TestRemoveExpiredSegmentFromQueue
// - Upload tests data to 7 nodes
// - Kill nodes so that repair threshold > online nodes > minimum threshold
// - Call checker to add segment to the repair queue
// - Modify segment to be expired
// - Run the repairer
// - Verify segment is no longer in the repair queue
func TestRemoveExpiredSegmentFromQueue(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: 10,
UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.ReconfigureRS(3, 5, 7, 7),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
// first, upload some remote data
uplinkPeer := planet.Uplinks[0]
satellite := planet.Satellites[0]
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Stop()
satellite.Repair.Checker.Loop.Pause()
satellite.Repair.Repairer.Loop.Pause()
testData := testrand.Bytes(8 * memory.KiB)
err := uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)
pointer, _ := getRemoteSegment(t, ctx, satellite)
// kill nodes and track lost pieces
nodesToDQ := make(map[storj.NodeID]bool)
// Kill 3 nodes so that pointer has 4 left (less than repair threshold)
toKill := 3
remotePieces := pointer.GetRemote().GetRemotePieces()
for i, piece := range remotePieces {
if i >= toKill {
continue
}
nodesToDQ[piece.NodeId] = true
}
for nodeID := range nodesToDQ {
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, nodeID)
require.NoError(t, err)
}
// trigger checker to add segment to repair queue
satellite.Repair.Checker.Loop.Restart()
satellite.Repair.Checker.Loop.TriggerWait()
satellite.Repair.Checker.Loop.Pause()
// get encrypted path of segment with audit service
satellite.Audit.Chore.Loop.TriggerWait()
require.EqualValues(t, satellite.Audit.Queue.Size(), 1)
encryptedPath, err := satellite.Audit.Queue.Next()
require.NoError(t, err)
// replace pointer with one that is already expired
pointer.ExpirationDate = time.Now().Add(-time.Hour)
err = satellite.Metainfo.Service.UnsynchronizedDelete(ctx, encryptedPath)
require.NoError(t, err)
err = satellite.Metainfo.Service.UnsynchronizedPut(ctx, encryptedPath, pointer)
require.NoError(t, err)
// Verify that the segment is on the repair queue
count, err := satellite.DB.RepairQueue().Count(ctx)
require.NoError(t, err)
require.Equal(t, count, 1)
// Run the repairer
satellite.Repair.Repairer.Loop.Restart()
satellite.Repair.Repairer.Loop.TriggerWait()
satellite.Repair.Repairer.Loop.Pause()
satellite.Repair.Repairer.WaitForPendingRepairs()
// Verify that the segment was removed
count, err = satellite.DB.RepairQueue().Count(ctx)
require.NoError(t, err)
require.Equal(t, count, 0)
})
}
// TestRemoveDeletedSegmentFromQueue
// - Upload tests data to 7 nodes
// - Kill nodes so that repair threshold > online nodes > minimum threshold

View File

@ -115,6 +115,11 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (s
return true, invalidRepairError.New("cannot repair inline segment")
}
if !pointer.ExpirationDate.IsZero() && pointer.ExpirationDate.Before(time.Now().UTC()) {
mon.Meter("repair_expired").Mark(1) //locked
return true, nil
}
mon.Meter("repair_attempts").Mark(1) //locked
mon.IntVal("repair_segment_size").Observe(pointer.GetSegmentSize()) //locked

View File

@ -160,6 +160,12 @@ contact.external-address: ""
# how often to run the downtime estimation chore
# downtime.estimation-interval: 1h0m0s
# set if expired segment cleanup is enabled or not
# expired-deletion.enabled: true
# the time between each attempt to go through the db and clean up expired segments
# expired-deletion.interval: 120h0m0s
# the number of nodes to concurrently send garbage collection bloom filters to
# garbage-collection.concurrent-sends: 1