storj/pkg/datarepair/repairer/repairer.go
Alexander Leitner dc8bea2cd1
Repairer points to redis server (#427)
* Let's do it right this time

* Oh travis...

* Handle redis URL

* Travis... why u gotta be like this?

* Handle when address does not use redis scheme

* Start repairer

* Match provider.Responsibility interface

* Simplify if statement

* Config doesn't need to be a pointer

* Initialize doesn't need to be exported

* Don't run checker or repairer on startup

* Fix travis complaints
2018-10-05 11:58:07 -04:00

144 lines
2.9 KiB
Go

// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package repairer
import (
"context"
"fmt"
"sync"
"time"
"github.com/zeebo/errs"
"gopkg.in/spacemonkeygo/monkit.v2"
q "storj.io/storj/pkg/datarepair/queue"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/provider"
"storj.io/storj/storage/redis"
)
var (
mon = monkit.Package()
// Error is a redis error
repairerError = errs.Class("repairer error")
)
// Repairer is the interface for the data repair queue
type Repairer interface {
Repair(seg *pb.InjuredSegment) error
Run() error
Stop() error
}
// Config contains configurable values for repairer
type Config struct {
QueueAddress string `help:"data repair queue address" default:"redis://localhost:6379?db=5&password=123"`
MaxRepair int `help:"maximum segments that can be repaired concurrently" default:"100"`
Interval time.Duration `help:"how frequently checker should audit segments" default:"3600s"`
}
// Initialize a repairer struct
func (c Config) initialize(ctx context.Context) (Repairer, error) {
var r repairer
r.ctx, r.cancel = context.WithCancel(ctx)
client, err := redis.NewClientFrom(c.QueueAddress)
if err != nil {
return nil, repairerError.Wrap(err)
}
r.queue = q.NewQueue(client)
r.cond.L = &r.mu
r.maxRepair = c.MaxRepair
r.interval = c.Interval
return &r, nil
}
// Run runs the repairer with configured values
func (c Config) Run(ctx context.Context, server *provider.Provider) (err error) {
r, err := c.initialize(ctx)
if err != nil {
return err
}
return r.Run()
}
// repairer holds important values for data repair
type repairer struct {
ctx context.Context
cancel context.CancelFunc
queue q.RepairQueue
errs []error
mu sync.Mutex
cond sync.Cond
maxRepair int
inProgress int
interval time.Duration
}
// Run the repairer loop
func (r *repairer) Run() (err error) {
c := make(chan *pb.InjuredSegment)
ticker := time.NewTicker(r.interval)
defer ticker.Stop()
go func() {
for range ticker.C {
for r.inProgress >= r.maxRepair {
r.cond.Wait()
}
// GetNext should lock until there is an actual next item in the queue
seg, err := r.queue.Dequeue()
if err != nil {
r.errs = append(r.errs, err)
r.cancel()
}
c <- &seg
}
}()
for {
select {
case <-r.ctx.Done():
return r.combinedError()
case seg := <-c:
go func() {
err := r.Repair(seg)
if err != nil {
r.errs = append(r.errs, err)
r.cancel()
}
}()
}
}
}
// Repair starts repair of the segment
func (r *repairer) Repair(seg *pb.InjuredSegment) (err error) {
defer mon.Task()(&r.ctx)(&err)
r.inProgress++
fmt.Println(seg)
r.inProgress--
r.cond.Signal()
return err
}
// Stop the repairer loop
func (r *repairer) Stop() (err error) {
r.cancel()
return nil
}
func (r *repairer) combinedError() error {
if len(r.errs) == 0 {
return nil
}
// TODO: combine errors
return r.errs[0]
}