Add repairer & checker to Satelite (#561)

* Added repairer & checker to Satellite

* fixed repairer and checker configs
This commit is contained in:
Dennis Coyle 2018-10-31 12:22:35 -04:00 committed by GitHub
parent 40b210f604
commit a3becb8a7b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 35 additions and 23 deletions

View File

@ -105,6 +105,16 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
}(i, storagenode)
}
// start mini redis
m := miniredis.NewMiniRedis()
m.RequireAuth("abc123")
if err = m.StartAddr(":6378"); err != nil {
errch <- err
} else {
defer m.Close()
}
// start satellite
go func() {
_, _ = fmt.Printf("starting satellite on %s\n",
@ -119,28 +129,11 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
runCfg.Satellite.PointerDB,
runCfg.Satellite.Kademlia,
o,
runCfg.Satellite.Checker,
runCfg.Satellite.Repairer,
)
}()
// start Repair
m := miniredis.NewMiniRedis()
m.RequireAuth("abc123")
if err = m.StartAddr(":6378"); err != nil {
errch <- err
} else {
defer m.Close()
go func() {
errch <- runCfg.Satellite.Checker.Run(ctx, nil)
}()
go func() {
errch <- runCfg.Satellite.Repairer.Run(ctx, nil)
}()
}
// start s3 uplink
go func() {
_, _ = fmt.Printf("Starting s3-gateway on %s\nAccess key: %s\nSecret key: %s\n",

View File

@ -40,5 +40,13 @@ func (c Config) Run(ctx context.Context, server *provider.Provider) (err error)
if err != nil {
return err
}
return check.Run(ctx)
// TODO(coyle): we need to figure out how to propagate the error up to cancel the service
go func() {
if err := check.Run(ctx); err != nil {
zap.L().Error("Error running checker", zap.Error(err))
}
}()
return server.Run(ctx)
}

View File

@ -7,6 +7,7 @@ import (
"context"
"time"
"go.uber.org/zap"
q "storj.io/storj/pkg/datarepair/queue"
"storj.io/storj/pkg/provider"
"storj.io/storj/storage/redis"
@ -29,5 +30,13 @@ func (c Config) Run(ctx context.Context, server *provider.Provider) (err error)
queue := q.NewQueue(client)
repairer := newRepairer(queue, c.Interval, c.MaxRepair)
return repairer.Run(ctx)
// TODO(coyle): we need to figure out how to propagate the error up to cancel the service
go func() {
if err := repairer.Run(ctx); err != nil {
zap.L().Error("Error running repairer", zap.Error(err))
}
}()
return server.Run(ctx)
}

View File

@ -61,8 +61,10 @@ func (c Config) Run(ctx context.Context, server *provider.Provider) error {
cache := overlay.LoadFromContext(ctx)
dblogged := storelogger.New(zap.L(), db)
pb.RegisterPointerDBServer(server.GRPC(), NewServer(dblogged, cache, zap.L(), c, server.Identity()))
s := NewServer(dblogged, cache, zap.L(), c, server.Identity())
pb.RegisterPointerDBServer(server.GRPC(), s)
// add the server to the context
ctx = context.WithValue(ctx, ctxKey, s)
return server.Run(ctx)
}