satellite/repair: enable TestRemoveExpiredSegmentFromQueue test

Change adds ability to set `now` time during test for repair.

Change-Id: Idb8826b7b58b8789b0abc65817b888ecdc752a3f
This commit is contained in:
Michal Niewrzal 2020-12-18 09:49:31 +01:00
parent b3aa28cc02
commit f7a31308db
3 changed files with 30 additions and 19 deletions

View File

@ -397,8 +397,6 @@ func testCorruptDataRepairSucceed(t *testing.T, inMemoryRepair bool) {
// - Run the repairer // - Run the repairer
// - Verify segment is no longer in the repair queue. // - Verify segment is no longer in the repair queue.
func TestRemoveExpiredSegmentFromQueue(t *testing.T) { func TestRemoveExpiredSegmentFromQueue(t *testing.T) {
t.Skip("skipped until we will figure out how handle expiration date for segments")
testplanet.Run(t, testplanet.Config{ testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, SatelliteCount: 1,
StorageNodeCount: 10, StorageNodeCount: 10,
@ -419,7 +417,7 @@ func TestRemoveExpiredSegmentFromQueue(t *testing.T) {
testData := testrand.Bytes(8 * memory.KiB) testData := testrand.Bytes(8 * memory.KiB)
err := uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path", testData) err := uplinkPeer.UploadWithExpiration(ctx, satellite, "testbucket", "test/path", testData, time.Now().Add(1*time.Hour))
require.NoError(t, err) require.NoError(t, err)
segment, _ := getRemoteSegment(t, ctx, satellite, planet.Uplinks[0].Projects[0].ID, "testbucket") segment, _ := getRemoteSegment(t, ctx, satellite, planet.Uplinks[0].Projects[0].ID, "testbucket")
@ -454,21 +452,15 @@ func TestRemoveExpiredSegmentFromQueue(t *testing.T) {
satellite.Audit.Chore.Loop.TriggerWait() satellite.Audit.Chore.Loop.TriggerWait()
queue := satellite.Audit.Queues.Fetch() queue := satellite.Audit.Queues.Fetch()
require.EqualValues(t, queue.Size(), 1) require.EqualValues(t, queue.Size(), 1)
queueSegment, err := queue.Next()
require.NoError(t, err)
// replace pointer with one that is already expired
pointer := &pb.Pointer{}
pointer.ExpirationDate = time.Now().Add(-time.Hour)
err = satellite.Metainfo.Service.UnsynchronizedDelete(ctx, queueSegment.Encode())
require.NoError(t, err)
err = satellite.Metainfo.Service.UnsynchronizedPut(ctx, queueSegment.Encode(), pointer)
require.NoError(t, err)
// Verify that the segment is on the repair queue // Verify that the segment is on the repair queue
count, err := satellite.DB.RepairQueue().Count(ctx) count, err := satellite.DB.RepairQueue().Count(ctx)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, count, 1) require.Equal(t, 1, count)
satellite.Repair.Repairer.SetNow(func() time.Time {
return time.Now().Add(2 * time.Hour)
})
// Run the repairer // Run the repairer
satellite.Repair.Repairer.Loop.Restart() satellite.Repair.Repairer.Loop.Restart()
@ -479,7 +471,7 @@ func TestRemoveExpiredSegmentFromQueue(t *testing.T) {
// Verify that the segment was removed // Verify that the segment was removed
count, err = satellite.DB.RepairQueue().Count(ctx) count, err = satellite.DB.RepairQueue().Count(ctx)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, count, 0) require.Equal(t, 0, count)
}) })
} }

View File

@ -49,6 +49,8 @@ type Service struct {
Loop *sync2.Cycle Loop *sync2.Cycle
repairer *SegmentRepairer repairer *SegmentRepairer
irrDB irreparable.DB irrDB irreparable.DB
nowFn func() time.Time
} }
// NewService creates repairing service. // NewService creates repairing service.
@ -61,6 +63,8 @@ func NewService(log *zap.Logger, queue queue.RepairQueue, config *Config, repair
Loop: sync2.NewCycle(config.Interval), Loop: sync2.NewCycle(config.Interval),
repairer: repairer, repairer: repairer,
irrDB: irrDB, irrDB: irrDB,
nowFn: time.Now,
} }
} }
@ -152,7 +156,7 @@ func (service *Service) process(ctx context.Context) (err error) {
func (service *Service) worker(ctx context.Context, seg *internalpb.InjuredSegment) (err error) { func (service *Service) worker(ctx context.Context, seg *internalpb.InjuredSegment) (err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
workerStartTime := time.Now().UTC() workerStartTime := service.nowFn().UTC()
service.log.Debug("Limiter running repair on segment") service.log.Debug("Limiter running repair on segment")
// note that shouldDelete is used even in the case where err is not null // note that shouldDelete is used even in the case where err is not null
@ -164,7 +168,7 @@ func (service *Service) worker(ctx context.Context, seg *internalpb.InjuredSegme
segmentInfo := &internalpb.IrreparableSegment{ segmentInfo := &internalpb.IrreparableSegment{
Path: seg.GetPath(), Path: seg.GetPath(),
LostPieces: irreparableErr.piecesRequired - irreparableErr.piecesAvailable, LostPieces: irreparableErr.piecesRequired - irreparableErr.piecesAvailable,
LastRepairAttempt: time.Now().Unix(), LastRepairAttempt: service.nowFn().Unix(),
RepairAttemptCount: int64(1), RepairAttemptCount: int64(1),
} }
if err := service.irrDB.IncrementRepairAttempts(ctx, segmentInfo); err != nil { if err := service.irrDB.IncrementRepairAttempts(ctx, segmentInfo); err != nil {
@ -188,7 +192,7 @@ func (service *Service) worker(ctx context.Context, seg *internalpb.InjuredSegme
return Error.Wrap(err) return Error.Wrap(err)
} }
repairedTime := time.Now().UTC() repairedTime := service.nowFn().UTC()
timeForRepair := repairedTime.Sub(workerStartTime) timeForRepair := repairedTime.Sub(workerStartTime)
mon.FloatVal("time_for_repair").Observe(timeForRepair.Seconds()) //mon:locked mon.FloatVal("time_for_repair").Observe(timeForRepair.Seconds()) //mon:locked
@ -201,3 +205,9 @@ func (service *Service) worker(ctx context.Context, seg *internalpb.InjuredSegme
return nil return nil
} }
// SetNow allows tests to have the server act as if the current time is whatever they want.
func (service *Service) SetNow(nowFn func() time.Time) {
service.nowFn = nowFn
service.repairer.SetNow(nowFn)
}

View File

@ -63,6 +63,8 @@ type SegmentRepairer struct {
// repairOverrides is the set of values configured by the checker to override the repair threshold for various RS schemes. // repairOverrides is the set of values configured by the checker to override the repair threshold for various RS schemes.
repairOverrides checker.RepairOverridesMap repairOverrides checker.RepairOverridesMap
nowFn func() time.Time
} }
// NewSegmentRepairer creates a new instance of SegmentRepairer. // NewSegmentRepairer creates a new instance of SegmentRepairer.
@ -91,6 +93,8 @@ func NewSegmentRepairer(
timeout: timeout, timeout: timeout,
multiplierOptimalThreshold: 1 + excessOptimalThreshold, multiplierOptimalThreshold: 1 + excessOptimalThreshold,
repairOverrides: repairOverrides.GetMap(), repairOverrides: repairOverrides.GetMap(),
nowFn: time.Now,
} }
} }
@ -138,7 +142,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (s
} }
// TODO how to deal with expiration date for segment // TODO how to deal with expiration date for segment
if object.ExpiresAt != nil && object.ExpiresAt.Before(time.Now().UTC()) { if object.ExpiresAt != nil && object.ExpiresAt.Before(repairer.nowFn().UTC()) {
mon.Meter("repair_expired").Mark(1) //mon:locked mon.Meter("repair_expired").Mark(1) //mon:locked
return true, nil return true, nil
} }
@ -449,6 +453,11 @@ func (repairer *SegmentRepairer) updateAuditFailStatus(ctx context.Context, fail
return 0, nil return 0, nil
} }
// SetNow allows tests to have the server act as if the current time is whatever they want.
func (repairer *SegmentRepairer) SetNow(nowFn func() time.Time) {
repairer.nowFn = nowFn
}
// sliceToSet converts the given slice to a set. // sliceToSet converts the given slice to a set.
func sliceToSet(slice []uint16) map[uint16]bool { func sliceToSet(slice []uint16) map[uint16]bool {
set := make(map[uint16]bool, len(slice)) set := make(map[uint16]bool, len(slice))