Repairer points to redis server (#427)
* Let's do it right this time * Oh travis... * Handle redis URL * Travis... why u gotta be like this? * Handle when address does not use redis scheme * Start repairer * Match provider.Responsibility interface * Simplify if statement * Config doesn't need to be a pointer * Initialize doesn't need to be exported * Don't run checker or repairer on startup * Fix travis complaints
This commit is contained in:
parent
7812f1bbc0
commit
dc8bea2cd1
@ -15,6 +15,7 @@ import (
|
||||
"storj.io/storj/pkg/miniogw"
|
||||
"storj.io/storj/pkg/overlay"
|
||||
"storj.io/storj/pkg/datarepair/checker"
|
||||
"storj.io/storj/pkg/datarepair/repairer"
|
||||
psserver "storj.io/storj/pkg/piecestore/rpc/server"
|
||||
"storj.io/storj/pkg/pointerdb"
|
||||
"storj.io/storj/pkg/process"
|
||||
@ -31,7 +32,8 @@ type Satellite struct {
|
||||
Kademlia kademlia.Config
|
||||
PointerDB pointerdb.Config
|
||||
Overlay overlay.Config
|
||||
Checker checker.Config
|
||||
Checker checker.Config
|
||||
Repairer repairer.Config
|
||||
MockOverlay struct {
|
||||
Enabled bool `default:"true" help:"if false, use real overlay"`
|
||||
Host string `default:"" help:"if set, the mock overlay will return storage nodes with this host"`
|
||||
@ -108,7 +110,8 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
|
||||
errch <- runCfg.Satellite.Identity.Run(ctx,
|
||||
runCfg.Satellite.Kademlia,
|
||||
runCfg.Satellite.PointerDB,
|
||||
runCfg.Satellite.Checker,
|
||||
// runCfg.Satellite.Checker,
|
||||
// runCfg.Satellite.Repairer,
|
||||
o)
|
||||
}()
|
||||
|
||||
|
@ -11,9 +11,8 @@ import (
|
||||
"github.com/spf13/cobra"
|
||||
"storj.io/storj/pkg/cfgstruct"
|
||||
|
||||
// "storj.io/storj/pkg/datarepair/queue"
|
||||
// "storj.io/storj/pkg/datarepair/repairer"
|
||||
"storj.io/storj/pkg/datarepair/checker"
|
||||
// "storj.io/storj/pkg/datarepair/checker"
|
||||
"storj.io/storj/pkg/kademlia"
|
||||
"storj.io/storj/pkg/overlay"
|
||||
"storj.io/storj/pkg/pointerdb"
|
||||
@ -41,12 +40,10 @@ var (
|
||||
Identity provider.IdentityConfig
|
||||
Kademlia kademlia.Config
|
||||
PointerDB pointerdb.Config
|
||||
Checker checker.Config
|
||||
// Checker checker.Config
|
||||
// Repairer repairer.Config
|
||||
Overlay overlay.Config
|
||||
MockOverlay overlay.MockConfig
|
||||
// RepairQueue queue.Config
|
||||
// RepairChecker checker.Config
|
||||
// Repairer repairer.Config
|
||||
}
|
||||
setupCfg struct {
|
||||
BasePath string `default:"$CONFDIR" help:"base path for setup"`
|
||||
@ -71,7 +68,11 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
|
||||
o = runCfg.MockOverlay
|
||||
}
|
||||
return runCfg.Identity.Run(process.Ctx(cmd),
|
||||
runCfg.Kademlia, o, runCfg.PointerDB, runCfg.Checker)
|
||||
runCfg.Kademlia,
|
||||
o,
|
||||
runCfg.PointerDB,
|
||||
// runCfg.Checker, runCfg.Repairer
|
||||
)
|
||||
}
|
||||
|
||||
func cmdSetup(cmd *cobra.Command, args []string) (err error) {
|
||||
|
@ -22,6 +22,7 @@ var (
|
||||
|
||||
// Config contains configurable values for checker
|
||||
type Config struct {
|
||||
// QueueAddress string `help:"data repair queue address" default:"redis://localhost:6379?db=5&password=123"`
|
||||
Interval time.Duration `help:"how frequently checker should audit segments" default:"30s"`
|
||||
}
|
||||
|
||||
|
@ -4,7 +4,6 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"math/rand"
|
||||
"sync"
|
||||
@ -23,19 +22,6 @@ type RepairQueue interface {
|
||||
Dequeue() (pb.InjuredSegment, error)
|
||||
}
|
||||
|
||||
// Config contains configurable values for checker
|
||||
type Config struct {
|
||||
// address string `help:"data repair queue address" default:"localhost:7777"`
|
||||
}
|
||||
|
||||
// Run runs the checker with configured values
|
||||
func (c *Config) Run(ctx context.Context) (err error) {
|
||||
|
||||
// TODO: Start queue server
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Queue implements the RepairQueue interface
|
||||
type Queue struct {
|
||||
mu sync.Mutex
|
||||
|
@ -7,15 +7,22 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
"gopkg.in/spacemonkeygo/monkit.v2"
|
||||
|
||||
q "storj.io/storj/pkg/datarepair/queue"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/provider"
|
||||
"storj.io/storj/storage/redis"
|
||||
)
|
||||
|
||||
var (
|
||||
mon = monkit.Package()
|
||||
|
||||
// Error is a redis error
|
||||
repairerError = errs.Class("repairer error")
|
||||
)
|
||||
|
||||
// Repairer is the interface for the data repair queue
|
||||
@ -27,25 +34,31 @@ type Repairer interface {
|
||||
|
||||
// Config contains configurable values for repairer
|
||||
type Config struct {
|
||||
// queueAddress string `help:"data repair queue address" default:"localhost:7779"`
|
||||
maxRepair int `help:"maximum segments that can be repaired concurrently" default:"100"`
|
||||
QueueAddress string `help:"data repair queue address" default:"redis://localhost:6379?db=5&password=123"`
|
||||
MaxRepair int `help:"maximum segments that can be repaired concurrently" default:"100"`
|
||||
Interval time.Duration `help:"how frequently checker should audit segments" default:"3600s"`
|
||||
}
|
||||
|
||||
// Initialize a repairer struct
|
||||
func (c *Config) Initialize(ctx context.Context) (Repairer, error) {
|
||||
func (c Config) initialize(ctx context.Context) (Repairer, error) {
|
||||
var r repairer
|
||||
r.ctx, r.cancel = context.WithCancel(ctx)
|
||||
|
||||
// TODO: Setup queue with c.queueAddress r.queue = queue
|
||||
client, err := redis.NewClientFrom(c.QueueAddress)
|
||||
if err != nil {
|
||||
return nil, repairerError.Wrap(err)
|
||||
}
|
||||
r.queue = q.NewQueue(client)
|
||||
|
||||
r.cond.L = &r.mu
|
||||
r.maxRepair = c.maxRepair
|
||||
r.maxRepair = c.MaxRepair
|
||||
r.interval = c.Interval
|
||||
return &r, nil
|
||||
}
|
||||
|
||||
// Run runs the checker with configured values
|
||||
func (c *Config) Run(ctx context.Context) (err error) {
|
||||
r, err := c.Initialize(ctx)
|
||||
// Run runs the repairer with configured values
|
||||
func (c Config) Run(ctx context.Context, server *provider.Provider) (err error) {
|
||||
r, err := c.initialize(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -63,13 +76,17 @@ type repairer struct {
|
||||
cond sync.Cond
|
||||
maxRepair int
|
||||
inProgress int
|
||||
interval time.Duration
|
||||
}
|
||||
|
||||
// Run the repairer loop
|
||||
func (r *repairer) Run() (err error) {
|
||||
c := make(chan *pb.InjuredSegment)
|
||||
|
||||
ticker := time.NewTicker(r.interval)
|
||||
defer ticker.Stop()
|
||||
go func() {
|
||||
for {
|
||||
for range ticker.C {
|
||||
for r.inProgress >= r.maxRepair {
|
||||
r.cond.Wait()
|
||||
}
|
||||
|
@ -5,10 +5,13 @@ package redis
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis"
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/pkg/utils"
|
||||
"storj.io/storj/storage"
|
||||
)
|
||||
|
||||
@ -44,6 +47,27 @@ func NewClient(address, password string, db int) (*Client, error) {
|
||||
return client, nil
|
||||
}
|
||||
|
||||
// NewClientFrom returns a configured Client instance from a redis address, verifying a successful connection to redis
|
||||
func NewClientFrom(address string) (*Client, error) {
|
||||
redisurl, err := utils.ParseURL(address)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if redisurl.Scheme != "redis" {
|
||||
return nil, Error.New("not a redis:// formatted address")
|
||||
}
|
||||
|
||||
q := redisurl.Query()
|
||||
|
||||
db, err := strconv.Atoi(q.Get("db"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return NewClient(redisurl.Host, q.Get("password"), db)
|
||||
}
|
||||
|
||||
// Get looks up the provided key from redis returning either an error or the result.
|
||||
func (client *Client) Get(key storage.Key) (storage.Value, error) {
|
||||
value, err := client.db.Get(string(key)).Bytes()
|
||||
|
Loading…
Reference in New Issue
Block a user