Reorganize repair (#419)

* Reorganize repair

* Don't run the repair code yet

* Pass max repair from config to repairer initialize

* Add repairer Interface

* fix comment
This commit is contained in:
Alexander Leitner 2018-10-03 14:35:56 -04:00 committed by GitHub
parent 3c985a553e
commit f80ec62e9d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 78 additions and 64 deletions

View File

@ -10,6 +10,9 @@ import (
"github.com/spf13/cobra"
"storj.io/storj/pkg/cfgstruct"
// "storj.io/storj/pkg/datarepair/checker"
// "storj.io/storj/pkg/datarepair/queue"
// "storj.io/storj/pkg/datarepair/repairer"
"storj.io/storj/pkg/kademlia"
"storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/pointerdb"
@ -39,7 +42,9 @@ var (
PointerDB pointerdb.Config
Overlay overlay.Config
MockOverlay overlay.MockConfig
// Repair datarepair.Config
// RepairQueue queue.Config
// RepairChecker checker.Config
// Repairer repairer.Config
}
setupCfg struct {
BasePath string `default:"$CONFDIR" help:"base path for setup"`

View File

@ -2,3 +2,20 @@
// See LICENSE for copying information.
package checker
import (
"context"
)
// Config contains configurable values for checker
type Config struct {
// queueAddress string `help:"data repair queue address" default:"localhost:7777"`
}
// Run runs the checker with configured values
func (c *Config) Run(ctx context.Context) (err error) {
// TODO: start checker server
return err
}

View File

@ -4,6 +4,7 @@
package queue
import (
"context"
"encoding/binary"
"math/rand"
"sync"
@ -22,6 +23,19 @@ type RepairQueue interface {
Dequeue() (pb.InjuredSegment, error)
}
// Config contains configurable values for checker
type Config struct {
// address string `help:"data repair queue address" default:"localhost:7777"`
}
// Run runs the checker with configured values
func (c *Config) Run(ctx context.Context) (err error) {
// TODO: Start queue server
return err
}
// Queue implements the RepairQueue interface
type Queue struct {
mu sync.Mutex

View File

@ -1,47 +0,0 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package datarepair
import (
"context"
"gopkg.in/spacemonkeygo/monkit.v2"
q "storj.io/storj/pkg/datarepair/queue"
// "storj.io/storj/pkg/datarepair/checker"
"storj.io/storj/pkg/datarepair/repairer"
)
var (
mon = monkit.Package()
)
// Config contains configurable values for repairer
type Config struct {
maxRepair int
//TODO: Add things for checker
//TODO: Add things for repairer
}
// Run runs the repairer with configured values
func (c *Config) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
var queue q.RepairQueue
// TODO: Initialize Checker with queue
// Initialize Repairer with queue
_, err = repairer.Initialize(ctx, queue, c.maxRepair)
if err != nil {
return err
}
// TODO: Run the Checker in goroutine
// TODO: Run the Repairer in goroutine
// TODO: defer stop of checker and repairer
return err
}

View File

@ -18,8 +18,43 @@ var (
mon = monkit.Package()
)
// Repairer holds important values for data repair
type Repairer struct {
// Repairer is the interface for the data repair queue
type Repairer interface {
Repair(seg *pb.InjuredSegment) error
Run() error
Stop() error
}
// Config contains configurable values for repairer
type Config struct {
// queueAddress string `help:"data repair queue address" default:"localhost:7779"`
maxRepair int `help:"maximum segments that can be repaired concurrently" default:"100"`
}
// Initialize a repairer struct
func (c *Config) Initialize(ctx context.Context) (Repairer, error) {
var r repairer
r.ctx, r.cancel = context.WithCancel(ctx)
// TODO: Setup queue with c.queueAddress r.queue = queue
r.cond.L = &r.mu
r.maxRepair = c.maxRepair
return &r, nil
}
// Run runs the checker with configured values
func (c *Config) Run(ctx context.Context) (err error) {
r, err := c.Initialize(ctx)
if err != nil {
return err
}
return r.Run()
}
// repairer holds important values for data repair
type repairer struct {
ctx context.Context
cancel context.CancelFunc
queue q.RepairQueue
@ -30,18 +65,8 @@ type Repairer struct {
inProgress int
}
// Initialize a repairer struct
func Initialize(ctx context.Context, queue q.RepairQueue, max int) (*Repairer, error) {
var r Repairer
r.ctx, r.cancel = context.WithCancel(ctx)
r.queue = queue
r.cond.L = &r.mu
r.maxRepair = max
return &r, nil
}
// Run the repairer loop
func (r *Repairer) Run() (err error) {
func (r *repairer) Run() (err error) {
c := make(chan *pb.InjuredSegment)
go func() {
for {
@ -76,7 +101,7 @@ func (r *Repairer) Run() (err error) {
}
// Repair starts repair of the segment
func (r *Repairer) Repair(seg *pb.InjuredSegment) (err error) {
func (r *repairer) Repair(seg *pb.InjuredSegment) (err error) {
defer mon.Task()(&r.ctx)(&err)
r.inProgress++
fmt.Println(seg)
@ -87,12 +112,12 @@ func (r *Repairer) Repair(seg *pb.InjuredSegment) (err error) {
}
// Stop the repairer loop
func (r *Repairer) Stop() (err error) {
func (r *repairer) Stop() (err error) {
r.cancel()
return nil
}
func (r *Repairer) combinedError() error {
func (r *repairer) combinedError() error {
if len(r.errs) == 0 {
return nil
}