Run repairer and checker early (#565)

* Run repairers, checker, auditors first time they run to detect potential setup problems.
* Fix error handling in audit.Service
This commit is contained in:
Egon Elbre 2018-11-01 16:03:45 +02:00 committed by GitHub
parent 1d3367bb09
commit 2a8b681c4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 93 additions and 86 deletions

View File

@ -13,7 +13,6 @@ import (
"storj.io/storj/pkg/pointerdb/pdbclient"
"storj.io/storj/pkg/provider"
"storj.io/storj/pkg/transport"
"storj.io/storj/pkg/utils"
)
// Service helps coordinate Cursor and Verifier to run the audit process continuously
@ -21,7 +20,7 @@ type Service struct {
Cursor *Cursor
Verifier *Verifier
Reporter reporter
errs []error
ticker *time.Ticker
}
// Config contains configurable values for audit service
@ -37,15 +36,15 @@ type Config struct {
// Run runs the repairer with the configured values
func (c Config) Run(ctx context.Context, server *provider.Provider) (err error) {
service, err := NewService(ctx, c.StatDBPort, c.MaxRetriesStatDB, c.Pointers, c.Transport, c.Overlay, c.ID)
service, err := NewService(ctx, c.StatDBPort, c.Interval, c.MaxRetriesStatDB, c.Pointers, c.Transport, c.Overlay, c.ID)
if err != nil {
return err
}
return service.Run(ctx, c.Interval)
return service.Run(ctx)
}
// NewService instantiates a Service with access to a Cursor and Verifier
func NewService(ctx context.Context, statDBPort string, maxRetries int, pointers pdbclient.Client, transport transport.Client, overlay overlay.Client,
func NewService(ctx context.Context, statDBPort string, interval time.Duration, maxRetries int, pointers pdbclient.Client, transport transport.Client, overlay overlay.Client,
id provider.FullIdentity) (service *Service, err error) {
cursor := NewCursor(pointers)
verifier := NewVerifier(transport, overlay, id)
@ -54,57 +53,56 @@ func NewService(ctx context.Context, statDBPort string, maxRetries int, pointers
return nil, err
}
return &Service{Cursor: cursor,
return &Service{
Cursor: cursor,
Verifier: verifier,
Reporter: reporter,
errs: []error{},
ticker: time.NewTicker(interval),
}, nil
}
// Run calls Cursor and Verifier to continuously request random pointers, then verify data correctness at
// a random stripe within a segment
func (service *Service) Run(ctx context.Context, interval time.Duration) (err error) {
// Run runs auditing service
func (service *Service) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
zap.S().Info("Audit cron is starting up")
ticker := time.NewTicker(interval)
defer ticker.Stop()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
for {
err := service.process(ctx)
if err != nil {
zap.L().Error("process", zap.Error(err))
}
select {
case <-ticker.C:
case <-service.ticker.C:
case <-ctx.Done():
return ctx.Err()
}
}
}
// process picks a random stripe and verifies correctness
func (service *Service) process(ctx context.Context) error {
stripe, err := service.Cursor.NextStripe(ctx)
if err != nil {
service.errs = append(service.errs, err)
cancel()
return err
}
authorization, err := service.Cursor.pointers.SignedMessage()
if err != nil {
service.errs = append(service.errs, err)
cancel()
return err
}
verifiedNodes, err := service.Verifier.verify(ctx, stripe.Index, stripe.Segment, authorization)
if err != nil {
service.errs = append(service.errs, err)
cancel()
return err
}
err = service.Reporter.RecordAudits(ctx, verifiedNodes)
// TODO: if Error.Has(err) then log the error because it means not all node stats updated
if err != nil {
service.errs = append(service.errs, err)
cancel()
return err
}
case <-ctx.Done():
return
}
}
}()
return utils.CombineErrors(service.errs...)
return nil
}

View File

@ -46,6 +46,24 @@ func newChecker(pointerdb *pointerdb.Server, repairQueue *queue.Queue, overlay p
}
}
// Run the checker loop
func (c *checker) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
for {
err = c.IdentifyInjuredSegments(ctx)
if err != nil {
zap.L().Error("Checker failed", zap.Error(err))
}
select {
case <-c.ticker.C: // wait for the next interval to happen
case <-ctx.Done(): // or the checker is canceled via context
return ctx.Err()
}
}
}
// IdentifyInjuredSegments checks for missing pieces off of the pointerdb and overlay cache
func (c *checker) IdentifyInjuredSegments(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
@ -122,21 +140,3 @@ func lookupResponsesToNodes(responses *pb.LookupResponses) []*pb.Node {
}
return nodes
}
// Run the checker loop
func (c *checker) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
for {
select {
case <-c.ticker.C: // wait for the next interval to happen
case <-ctx.Done(): // or the checker is canceled via context
return ctx.Err()
}
err = c.IdentifyInjuredSegments(ctx)
if err != nil {
zap.L().Error("Checker failed", zap.Error(err))
}
}
}

View File

@ -8,7 +8,8 @@ import (
"time"
"go.uber.org/zap"
q "storj.io/storj/pkg/datarepair/queue"
"storj.io/storj/pkg/datarepair/queue"
"storj.io/storj/pkg/provider"
"storj.io/storj/storage/redis"
)
@ -27,8 +28,7 @@ func (c Config) Run(ctx context.Context, server *provider.Provider) (err error)
return Error.Wrap(err)
}
queue := q.NewQueue(client)
queue := queue.NewQueue(client)
repairer := newRepairer(queue, c.Interval, c.MaxRepair)
// TODO(coyle): we need to figure out how to propagate the error up to cancel the service

View File

@ -10,7 +10,7 @@ import (
"go.uber.org/zap"
"storj.io/storj/internal/sync2"
q "storj.io/storj/pkg/datarepair/queue"
"storj.io/storj/pkg/datarepair/queue"
"storj.io/storj/pkg/pb"
)
@ -22,12 +22,12 @@ type Repairer interface {
// repairer holds important values for data repair
type repairer struct {
queue q.RepairQueue
queue queue.RepairQueue
limiter *sync2.Limiter
ticker *time.Ticker
}
func newRepairer(queue q.RepairQueue, interval time.Duration, concurrency int) *repairer {
func newRepairer(queue queue.RepairQueue, interval time.Duration, concurrency int) *repairer {
return &repairer{
queue: queue,
limiter: sync2.NewLimiter(concurrency),
@ -35,7 +35,7 @@ func newRepairer(queue q.RepairQueue, interval time.Duration, concurrency int) *
}
}
// Run the repairer loop
// Run runs the repairer service
func (r *repairer) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
@ -43,17 +43,25 @@ func (r *repairer) Run(ctx context.Context) (err error) {
defer r.limiter.Wait()
for {
err := r.process(ctx)
if err != nil {
zap.L().Error("process", zap.Error(err))
}
select {
case <-r.ticker.C: // wait for the next interval to happen
case <-ctx.Done(): // or the repairer is canceled via context
return ctx.Err()
}
}
}
// process picks an item from repair queue and spawns a repairer
func (r *repairer) process(ctx context.Context) error {
seg, err := r.queue.Dequeue()
if err != nil {
// TODO: only log when err != ErrQueueEmpty
zap.L().Error("dequeue", zap.Error(err))
continue
return err
}
r.limiter.Go(ctx, func() {
@ -62,7 +70,8 @@ func (r *repairer) Run(ctx context.Context) (err error) {
zap.L().Error("Repair failed", zap.Error(err))
}
})
}
return nil
}
// Repair starts repair of the segment