Fix repairing run (#523)
* Fix repairing run * Fix concurrency bugs * Add sync2.Limiter concurrency primitive
This commit is contained in:
parent
842ebc9546
commit
8efb4f0e89
50
internal/sync2/limiter.go
Normal file
50
internal/sync2/limiter.go
Normal file
@ -0,0 +1,50 @@
|
||||
// Copyright (C) 2018 Storj Labs, Inc.
|
||||
// See LICENSE for copying information
|
||||
|
||||
package sync2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Limiter implements concurrent goroutine limiting
|
||||
type Limiter struct {
|
||||
limit chan struct{}
|
||||
working sync.WaitGroup
|
||||
}
|
||||
|
||||
// NewLimiter creates a new limiter with limit set to n
|
||||
func NewLimiter(n int) *Limiter {
|
||||
limiter := &Limiter{}
|
||||
limiter.limit = make(chan struct{}, n)
|
||||
return limiter
|
||||
}
|
||||
|
||||
// Go tries to starts fn as a goroutine.
|
||||
// When the limit is reached it will wait until it can run it
|
||||
// or the context is canceled.
|
||||
func (limiter *Limiter) Go(ctx context.Context, fn func()) bool {
|
||||
select {
|
||||
case limiter.limit <- struct{}{}:
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
}
|
||||
|
||||
limiter.working.Add(1)
|
||||
go func() {
|
||||
defer func() {
|
||||
<-limiter.limit
|
||||
limiter.working.Done()
|
||||
}()
|
||||
|
||||
fn()
|
||||
}()
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// Wait waits for all running goroutines to finish
|
||||
func (limiter *Limiter) Wait() {
|
||||
limiter.working.Wait()
|
||||
}
|
67
internal/sync2/limiter_test.go
Normal file
67
internal/sync2/limiter_test.go
Normal file
@ -0,0 +1,67 @@
|
||||
// Copyright (C) 2018 Storj Labs, Inc.
|
||||
// See LICENSE for copying information
|
||||
|
||||
package sync2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestLimiterLimiting(t *testing.T) {
|
||||
const N, Limit = 1000, 10
|
||||
ctx := context.Background()
|
||||
limiter := NewLimiter(Limit)
|
||||
counter := int32(0)
|
||||
for i := 0; i < N; i++ {
|
||||
limiter.Go(ctx, func() {
|
||||
if atomic.AddInt32(&counter, 1) > Limit {
|
||||
panic("limit exceeded")
|
||||
}
|
||||
time.Sleep(time.Millisecond)
|
||||
atomic.AddInt32(&counter, -1)
|
||||
})
|
||||
}
|
||||
limiter.Wait()
|
||||
}
|
||||
|
||||
func TestLimiterCancelling(t *testing.T) {
|
||||
const N, Limit = 1000, 10
|
||||
limiter := NewLimiter(Limit)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
counter := int32(0)
|
||||
|
||||
waitForCancel := make(chan struct{}, N)
|
||||
block := make(chan struct{})
|
||||
allreturned := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
for i := 0; i < N; i++ {
|
||||
limiter.Go(ctx, func() {
|
||||
if atomic.AddInt32(&counter, 1) > Limit {
|
||||
panic("limit exceeded")
|
||||
}
|
||||
|
||||
waitForCancel <- struct{}{}
|
||||
<-block
|
||||
})
|
||||
}
|
||||
close(allreturned)
|
||||
}()
|
||||
|
||||
for i := 0; i < Limit; i++ {
|
||||
<-waitForCancel
|
||||
}
|
||||
cancel()
|
||||
<-allreturned
|
||||
close(block)
|
||||
|
||||
limiter.Wait()
|
||||
if counter > Limit {
|
||||
t.Fatal("too many times run")
|
||||
}
|
||||
}
|
@ -19,29 +19,15 @@ type Config struct {
|
||||
Interval time.Duration `help:"how frequently checker should audit segments" default:"3600s"`
|
||||
}
|
||||
|
||||
// Initialize a repairer struct
|
||||
func (c Config) initialize(ctx context.Context) (Repairer, error) {
|
||||
var r repairer
|
||||
r.ctx, r.cancel = context.WithCancel(ctx)
|
||||
|
||||
client, err := redis.NewClientFrom(c.QueueAddress)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
r.queue = q.NewQueue(client)
|
||||
|
||||
r.cond.L = &r.mu
|
||||
r.maxRepair = c.MaxRepair
|
||||
r.interval = c.Interval
|
||||
return &r, nil
|
||||
}
|
||||
|
||||
// Run runs the repairer with configured values
|
||||
func (c Config) Run(ctx context.Context, server *provider.Provider) (err error) {
|
||||
r, err := c.initialize(ctx)
|
||||
client, err := redis.NewClientFrom(c.QueueAddress)
|
||||
if err != nil {
|
||||
return err
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
return r.Run()
|
||||
queue := q.NewQueue(client)
|
||||
|
||||
repairer := newRepairer(ctx, queue, c.Interval, c.MaxRepair)
|
||||
return repairer.Run()
|
||||
}
|
||||
|
@ -5,90 +5,72 @@ package repairer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/storj/internal/sync2"
|
||||
q "storj.io/storj/pkg/datarepair/queue"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/utils"
|
||||
)
|
||||
|
||||
// Repairer is the interface for the data repair queue
|
||||
type Repairer interface {
|
||||
Repair(seg *pb.InjuredSegment) error
|
||||
Run() error
|
||||
Stop() error
|
||||
}
|
||||
|
||||
// repairer holds important values for data repair
|
||||
type repairer struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
queue q.RepairQueue
|
||||
errs []error
|
||||
mu sync.Mutex
|
||||
cond sync.Cond
|
||||
maxRepair int
|
||||
inProgress int
|
||||
interval time.Duration
|
||||
limiter *sync2.Limiter
|
||||
ticker *time.Ticker
|
||||
}
|
||||
|
||||
func newRepairer(ctx context.Context, queue q.RepairQueue, interval time.Duration, concurrency int) *repairer {
|
||||
return &repairer{
|
||||
ctx: ctx,
|
||||
queue: queue,
|
||||
limiter: sync2.NewLimiter(concurrency),
|
||||
ticker: time.NewTicker(interval),
|
||||
}
|
||||
}
|
||||
|
||||
// Run the repairer loop
|
||||
func (r *repairer) Run() (err error) {
|
||||
zap.S().Info("Repairer is starting up")
|
||||
defer mon.Task()(&r.ctx)(&err)
|
||||
|
||||
c := make(chan *pb.InjuredSegment)
|
||||
|
||||
ticker := time.NewTicker(r.interval)
|
||||
defer ticker.Stop()
|
||||
go func() {
|
||||
for range ticker.C {
|
||||
for r.inProgress >= r.maxRepair {
|
||||
r.cond.Wait()
|
||||
}
|
||||
|
||||
// GetNext should lock until there is an actual next item in the queue
|
||||
seg, err := r.queue.Dequeue()
|
||||
if err != nil {
|
||||
r.errs = append(r.errs, err)
|
||||
r.cancel()
|
||||
}
|
||||
c <- &seg
|
||||
}
|
||||
}()
|
||||
// wait for all repairs to complete
|
||||
defer r.limiter.Wait()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-r.ctx.Done():
|
||||
return utils.CombineErrors(r.errs...)
|
||||
case seg := <-c:
|
||||
go func() {
|
||||
err := r.Repair(seg)
|
||||
case <-r.ticker.C: // wait for the next interval to happen
|
||||
case <-r.ctx.Done(): // or the repairer is canceled via context
|
||||
return r.ctx.Err()
|
||||
}
|
||||
|
||||
seg, err := r.queue.Dequeue()
|
||||
if err != nil {
|
||||
r.errs = append(r.errs, err)
|
||||
r.cancel()
|
||||
// TODO: only log when err != ErrQueueEmpty
|
||||
zap.L().Error("dequeue", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
}()
|
||||
|
||||
r.limiter.Go(r.ctx, func() {
|
||||
err := r.Repair(&seg)
|
||||
if err != nil {
|
||||
zap.L().Error("Repair failed", zap.Error(err))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Repair starts repair of the segment
|
||||
func (r *repairer) Repair(seg *pb.InjuredSegment) (err error) {
|
||||
defer mon.Task()(&r.ctx)(&err)
|
||||
r.inProgress++
|
||||
fmt.Println(seg)
|
||||
|
||||
r.inProgress--
|
||||
r.cond.Signal()
|
||||
// TODO:
|
||||
zap.L().Debug("Repairing", zap.Any("segment", seg))
|
||||
return err
|
||||
}
|
||||
|
||||
// Stop the repairer loop
|
||||
func (r *repairer) Stop() (err error) {
|
||||
r.cancel()
|
||||
return nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user