storj/pkg/datarepair/repairer/repairer.go
Cameron 027e4045c6
setup repairer loop (#378)
* setup repairer loop

* added read from queue

* Refactor to make things easier to import

* add more control flow to repairer

* add comment

* basic interval structure for running check/repair

* change function name GetNext to Dequeue

* better increment/decrement syntax

* export Repairer struct

* delete 'unreachable code'

* add mon.Task() to Repairer.Repair

* remove 24 hour interval

* set maxRepair on Config as well as Repairer

* add comment for Repairer struct, check err

* comment out runCfg.Repair in cmd/satellite/main.go because it is NI yet
2018-10-02 15:46:29 -04:00

102 lines
1.8 KiB
Go

// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package repairer
import (
"context"
"fmt"
"sync"
"gopkg.in/spacemonkeygo/monkit.v2"
q "storj.io/storj/pkg/datarepair/queue"
"storj.io/storj/pkg/pb"
)
var (
mon = monkit.Package()
)
// Repairer holds important values for data repair
type Repairer struct {
ctx context.Context
cancel context.CancelFunc
queue q.RepairQueue
errs []error
mu sync.Mutex
cond sync.Cond
maxRepair int
inProgress int
}
// Initialize a repairer struct
func Initialize(ctx context.Context, queue q.RepairQueue, max int) (*Repairer, error) {
var r Repairer
r.ctx, r.cancel = context.WithCancel(ctx)
r.queue = queue
r.cond.L = &r.mu
r.maxRepair = max
return &r, nil
}
// Run the repairer loop
func (r *Repairer) Run() (err error) {
c := make(chan *pb.InjuredSegment)
go func() {
for {
for r.inProgress >= r.maxRepair {
r.cond.Wait()
}
// GetNext should lock until there is an actual next item in the queue
seg, err := r.queue.Dequeue()
if err != nil {
r.errs = append(r.errs, err)
r.cancel()
}
c <- &seg
}
}()
for {
select {
case <-r.ctx.Done():
return r.combinedError()
case seg := <-c:
go func() {
err := r.Repair(seg)
if err != nil {
r.errs = append(r.errs, err)
r.cancel()
}
}()
}
}
}
// Repair starts repair of the segment
func (r *Repairer) Repair(seg *pb.InjuredSegment) (err error) {
defer mon.Task()(&r.ctx)(&err)
r.inProgress++
fmt.Println(seg)
r.inProgress--
r.cond.Signal()
return err
}
// Stop the repairer loop
func (r *Repairer) Stop() (err error) {
r.cancel()
return nil
}
func (r *Repairer) combinedError() error {
if len(r.errs) == 0 {
return nil
}
// TODO: combine errors
return r.errs[0]
}