2018-10-02 20:46:29 +01:00
|
|
|
// Copyright (C) 2018 Storj Labs, Inc.
|
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
|
|
|
package repairer
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"fmt"
|
|
|
|
"sync"
|
2018-10-05 16:58:07 +01:00
|
|
|
"time"
|
2018-10-02 20:46:29 +01:00
|
|
|
|
|
|
|
q "storj.io/storj/pkg/datarepair/queue"
|
|
|
|
"storj.io/storj/pkg/pb"
|
2018-10-05 16:58:07 +01:00
|
|
|
"storj.io/storj/pkg/provider"
|
|
|
|
"storj.io/storj/storage/redis"
|
2018-10-02 20:46:29 +01:00
|
|
|
)
|
|
|
|
|
2018-10-03 19:35:56 +01:00
|
|
|
// Repairer is the interface for the data repair queue
|
|
|
|
type Repairer interface {
|
|
|
|
Repair(seg *pb.InjuredSegment) error
|
|
|
|
Run() error
|
|
|
|
Stop() error
|
|
|
|
}
|
|
|
|
|
|
|
|
// Config contains configurable values for repairer
|
|
|
|
type Config struct {
|
2018-10-05 16:58:07 +01:00
|
|
|
QueueAddress string `help:"data repair queue address" default:"redis://localhost:6379?db=5&password=123"`
|
|
|
|
MaxRepair int `help:"maximum segments that can be repaired concurrently" default:"100"`
|
|
|
|
Interval time.Duration `help:"how frequently checker should audit segments" default:"3600s"`
|
2018-10-03 19:35:56 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// Initialize a repairer struct
|
2018-10-05 16:58:07 +01:00
|
|
|
func (c Config) initialize(ctx context.Context) (Repairer, error) {
|
2018-10-03 19:35:56 +01:00
|
|
|
var r repairer
|
|
|
|
r.ctx, r.cancel = context.WithCancel(ctx)
|
|
|
|
|
2018-10-05 16:58:07 +01:00
|
|
|
client, err := redis.NewClientFrom(c.QueueAddress)
|
|
|
|
if err != nil {
|
2018-10-09 17:09:33 +01:00
|
|
|
return nil, Error.Wrap(err)
|
2018-10-05 16:58:07 +01:00
|
|
|
}
|
|
|
|
r.queue = q.NewQueue(client)
|
2018-10-03 19:35:56 +01:00
|
|
|
|
|
|
|
r.cond.L = &r.mu
|
2018-10-05 16:58:07 +01:00
|
|
|
r.maxRepair = c.MaxRepair
|
|
|
|
r.interval = c.Interval
|
2018-10-03 19:35:56 +01:00
|
|
|
return &r, nil
|
|
|
|
}
|
|
|
|
|
2018-10-05 16:58:07 +01:00
|
|
|
// Run runs the repairer with configured values
|
|
|
|
func (c Config) Run(ctx context.Context, server *provider.Provider) (err error) {
|
|
|
|
r, err := c.initialize(ctx)
|
2018-10-03 19:35:56 +01:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return r.Run()
|
|
|
|
}
|
|
|
|
|
|
|
|
// repairer holds important values for data repair
|
|
|
|
type repairer struct {
|
2018-10-02 20:46:29 +01:00
|
|
|
ctx context.Context
|
|
|
|
cancel context.CancelFunc
|
|
|
|
queue q.RepairQueue
|
|
|
|
errs []error
|
|
|
|
mu sync.Mutex
|
|
|
|
cond sync.Cond
|
|
|
|
maxRepair int
|
|
|
|
inProgress int
|
2018-10-05 16:58:07 +01:00
|
|
|
interval time.Duration
|
2018-10-02 20:46:29 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// Run the repairer loop
|
2018-10-03 19:35:56 +01:00
|
|
|
func (r *repairer) Run() (err error) {
|
2018-10-02 20:46:29 +01:00
|
|
|
c := make(chan *pb.InjuredSegment)
|
2018-10-05 16:58:07 +01:00
|
|
|
|
|
|
|
ticker := time.NewTicker(r.interval)
|
|
|
|
defer ticker.Stop()
|
2018-10-02 20:46:29 +01:00
|
|
|
go func() {
|
2018-10-05 16:58:07 +01:00
|
|
|
for range ticker.C {
|
2018-10-02 20:46:29 +01:00
|
|
|
for r.inProgress >= r.maxRepair {
|
|
|
|
r.cond.Wait()
|
|
|
|
}
|
|
|
|
|
|
|
|
// GetNext should lock until there is an actual next item in the queue
|
|
|
|
seg, err := r.queue.Dequeue()
|
|
|
|
if err != nil {
|
|
|
|
r.errs = append(r.errs, err)
|
|
|
|
r.cancel()
|
|
|
|
}
|
|
|
|
c <- &seg
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-r.ctx.Done():
|
|
|
|
return r.combinedError()
|
|
|
|
case seg := <-c:
|
|
|
|
go func() {
|
|
|
|
err := r.Repair(seg)
|
|
|
|
if err != nil {
|
|
|
|
r.errs = append(r.errs, err)
|
|
|
|
r.cancel()
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Repair starts repair of the segment
|
2018-10-03 19:35:56 +01:00
|
|
|
func (r *repairer) Repair(seg *pb.InjuredSegment) (err error) {
|
2018-10-02 20:46:29 +01:00
|
|
|
defer mon.Task()(&r.ctx)(&err)
|
|
|
|
r.inProgress++
|
|
|
|
fmt.Println(seg)
|
|
|
|
|
|
|
|
r.inProgress--
|
|
|
|
r.cond.Signal()
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Stop the repairer loop
|
2018-10-03 19:35:56 +01:00
|
|
|
func (r *repairer) Stop() (err error) {
|
2018-10-02 20:46:29 +01:00
|
|
|
r.cancel()
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2018-10-03 19:35:56 +01:00
|
|
|
func (r *repairer) combinedError() error {
|
2018-10-02 20:46:29 +01:00
|
|
|
if len(r.errs) == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
// TODO: combine errors
|
|
|
|
return r.errs[0]
|
|
|
|
}
|