diff --git a/pkg/datarepair/checker/checker.go b/pkg/datarepair/checker/checker.go index 49adfd00f..bca56d38d 100644 --- a/pkg/datarepair/checker/checker.go +++ b/pkg/datarepair/checker/checker.go @@ -12,6 +12,7 @@ import ( "go.uber.org/zap" monkit "gopkg.in/spacemonkeygo/monkit.v2" + "storj.io/storj/internal/sync2" "storj.io/storj/pkg/datarepair/irreparable" "storj.io/storj/pkg/datarepair/queue" "storj.io/storj/pkg/pb" @@ -32,17 +33,8 @@ type Config struct { Interval time.Duration `help:"how frequently checker should audit segments" default:"30s"` } -// Checker is the interface for data repair checker -type Checker interface { - // TODO: remove interface - Run(ctx context.Context) error - IdentifyInjuredSegments(ctx context.Context) (err error) - OfflineNodes(ctx context.Context, nodeIDs storj.NodeIDList) (offline []int32, err error) - Close() error -} - // Checker contains the information needed to do checks for missing pieces -type checker struct { +type Checker struct { statdb statdb.DB pointerdb *pointerdb.Service repairQueue queue.RepairQueue @@ -50,13 +42,13 @@ type checker struct { irrdb irreparable.DB limit int logger *zap.Logger - ticker *time.Ticker + Interval sync2.Cycle } // NewChecker creates a new instance of checker -func NewChecker(pointerdb *pointerdb.Service, sdb statdb.DB, repairQueue queue.RepairQueue, overlay pb.OverlayServer, irrdb irreparable.DB, limit int, logger *zap.Logger, interval time.Duration) Checker { +func NewChecker(pointerdb *pointerdb.Service, sdb statdb.DB, repairQueue queue.RepairQueue, overlay pb.OverlayServer, irrdb irreparable.DB, limit int, logger *zap.Logger, interval time.Duration) *Checker { // TODO: reorder arguments - return &checker{ + checker := &Checker{ statdb: sdb, pointerdb: pointerdb, repairQueue: repairQueue, @@ -64,39 +56,37 @@ func NewChecker(pointerdb *pointerdb.Service, sdb statdb.DB, repairQueue queue.R irrdb: irrdb, limit: limit, logger: logger, - ticker: time.NewTicker(interval), } + + checker.Interval.SetInterval(interval) + + return checker } // Run the checker loop -func (c *checker) Run(ctx context.Context) (err error) { +func (checker *Checker) Run(ctx context.Context) (err error) { defer mon.Task()(&ctx)(&err) - for { - err = c.IdentifyInjuredSegments(ctx) + return checker.Interval.Run(ctx, func(ctx context.Context) error { + err := checker.IdentifyInjuredSegments(ctx) if err != nil { - c.logger.Error("Checker failed", zap.Error(err)) + checker.logger.Error("error with injured segments identification: ", zap.Error(err)) } - - select { - case <-c.ticker.C: // wait for the next interval to happen - case <-ctx.Done(): // or the checker is canceled via context - return ctx.Err() - } - } + return nil + }) } // Close closes resources -func (c *checker) Close() error { return nil } +func (checker *Checker) Close() error { return nil } // IdentifyInjuredSegments checks for missing pieces off of the pointerdb and overlay cache -func (c *checker) IdentifyInjuredSegments(ctx context.Context) (err error) { +func (checker *Checker) IdentifyInjuredSegments(ctx context.Context) (err error) { defer mon.Task()(&ctx)(&err) - err = c.pointerdb.Iterate("", "", true, false, + err = checker.pointerdb.Iterate("", "", true, false, func(it storage.Iterator) error { var item storage.ListItem - lim := c.limit + lim := checker.limit if lim <= 0 || lim > storage.LookupLimit { lim = storage.LookupLimit } @@ -115,7 +105,7 @@ func (c *checker) IdentifyInjuredSegments(ctx context.Context) (err error) { pieces := remote.GetRemotePieces() if pieces == nil { - c.logger.Debug("no pieces on remote segment") + checker.logger.Debug("no pieces on remote segment") continue } @@ -125,12 +115,12 @@ func (c *checker) IdentifyInjuredSegments(ctx context.Context) (err error) { } // Find all offline nodes - offlineNodes, err := c.OfflineNodes(ctx, nodeIDs) + offlineNodes, err := checker.OfflineNodes(ctx, nodeIDs) if err != nil { return Error.New("error getting offline nodes %s", err) } - invalidNodes, err := c.invalidNodes(ctx, nodeIDs) + invalidNodes, err := checker.invalidNodes(ctx, nodeIDs) if err != nil { return Error.New("error getting invalid nodes %s", err) } @@ -139,7 +129,7 @@ func (c *checker) IdentifyInjuredSegments(ctx context.Context) (err error) { numHealthy := len(nodeIDs) - len(missingPieces) if (int32(numHealthy) >= pointer.Remote.Redundancy.MinReq) && (int32(numHealthy) < pointer.Remote.Redundancy.RepairThreshold) { - err = c.repairQueue.Enqueue(ctx, &pb.InjuredSegment{ + err = checker.repairQueue.Enqueue(ctx, &pb.InjuredSegment{ Path: string(item.Key), LostPieces: missingPieces, }) @@ -157,7 +147,7 @@ func (c *checker) IdentifyInjuredSegments(ctx context.Context) (err error) { } //add the entry if new or update attempt count if already exists - err := c.irrdb.IncrementRepairAttempts(ctx, segmentInfo) + err := checker.irrdb.IncrementRepairAttempts(ctx, segmentInfo) if err != nil { return Error.New("error handling irreparable segment to queue %s", err) } @@ -170,8 +160,8 @@ func (c *checker) IdentifyInjuredSegments(ctx context.Context) (err error) { } // OfflineNodes returns the indices of offline nodes -func (c *checker) OfflineNodes(ctx context.Context, nodeIDs storj.NodeIDList) (offline []int32, err error) { - responses, err := c.overlay.BulkLookup(ctx, pb.NodeIDsToLookupRequests(nodeIDs)) +func (checker *Checker) OfflineNodes(ctx context.Context, nodeIDs storj.NodeIDList) (offline []int32, err error) { + responses, err := checker.overlay.BulkLookup(ctx, pb.NodeIDsToLookupRequests(nodeIDs)) if err != nil { return []int32{}, err } @@ -185,14 +175,14 @@ func (c *checker) OfflineNodes(ctx context.Context, nodeIDs storj.NodeIDList) (o } // Find invalidNodes by checking the audit results that are place in statdb -func (c *checker) invalidNodes(ctx context.Context, nodeIDs storj.NodeIDList) (invalidNodes []int32, err error) { +func (checker *Checker) invalidNodes(ctx context.Context, nodeIDs storj.NodeIDList) (invalidNodes []int32, err error) { // filter if nodeIDs have invalid pieces from auditing results maxStats := &statdb.NodeStats{ AuditSuccessRatio: 0, // TODO: update when we have stats added to statdb UptimeRatio: 0, // TODO: update when we have stats added to statdb } - invalidIDs, err := c.statdb.FindInvalidNodes(ctx, nodeIDs, maxStats) + invalidIDs, err := checker.statdb.FindInvalidNodes(ctx, nodeIDs, maxStats) if err != nil { return nil, Error.New("error getting valid nodes from statdb %s", err) } diff --git a/pkg/datarepair/checker/checker_test.go b/pkg/datarepair/checker/checker_test.go index b7a156669..02c68e401 100644 --- a/pkg/datarepair/checker/checker_test.go +++ b/pkg/datarepair/checker/checker_test.go @@ -17,13 +17,13 @@ import ( ) func TestIdentifyInjuredSegments(t *testing.T) { - // TODO note satellite's: own sub-systems need to be disabled testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 0, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - time.Sleep(2 * time.Second) - const numberOfNodes = 10 + checker := planet.Satellites[0].Repair.Checker + checker.Interval.Stop() + const numberOfNodes = 10 pieces := make([]*pb.RemotePiece, 0, numberOfNodes) // use online nodes for i, storagenode := range planet.StorageNodes { @@ -58,7 +58,6 @@ func TestIdentifyInjuredSegments(t *testing.T) { err := pointerdb.Put(pointer.Remote.PieceId, pointer) assert.NoError(t, err) - checker := planet.Satellites[0].Repair.Checker err = checker.IdentifyInjuredSegments(ctx) assert.NoError(t, err) @@ -78,11 +77,11 @@ func TestIdentifyInjuredSegments(t *testing.T) { } func TestOfflineNodes(t *testing.T) { - // TODO note satellite's: own sub-systems need to be disabled testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 0, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - time.Sleep(2 * time.Second) + checker := planet.Satellites[0].Repair.Checker + checker.Interval.Stop() const numberOfNodes = 10 nodeIDs := storj.NodeIDList{} @@ -99,7 +98,6 @@ func TestOfflineNodes(t *testing.T) { expectedOffline = append(expectedOffline, int32(i)) } - checker := planet.Satellites[0].Repair.Checker offline, err := checker.OfflineNodes(ctx, nodeIDs) assert.NoError(t, err) assert.Equal(t, expectedOffline, offline) @@ -107,11 +105,11 @@ func TestOfflineNodes(t *testing.T) { } func TestIdentifyIrreparableSegments(t *testing.T) { - // TODO note satellite's: own sub-systems need to be disabled testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 3, UplinkCount: 0, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - time.Sleep(2 * time.Second) + checker := planet.Satellites[0].Repair.Checker + checker.Interval.Stop() const numberOfNodes = 10 pieces := make([]*pb.RemotePiece, 0, numberOfNodes) @@ -148,7 +146,6 @@ func TestIdentifyIrreparableSegments(t *testing.T) { err := pointerdb.Put(pointer.Remote.PieceId, pointer) assert.NoError(t, err) - checker := planet.Satellites[0].Repair.Checker err = checker.IdentifyInjuredSegments(ctx) assert.NoError(t, err) diff --git a/satellite/peer.go b/satellite/peer.go index 897e3e03d..63a0defdc 100644 --- a/satellite/peer.go +++ b/satellite/peer.go @@ -151,7 +151,7 @@ type Peer struct { } Repair struct { - Checker checker.Checker // TODO: convert to actual struct + Checker *checker.Checker Repairer *repairer.Service } Audit struct {