Enable checker in captplanet and staging (#643)
* enable checker * add option to use mock overlay in checker * adds logs to checker * appease linter
This commit is contained in:
parent
eb07715d53
commit
93c5f385a8
@ -139,17 +139,15 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
|
||||
if runCfg.Satellite.Web.SatelliteAddr == "" {
|
||||
runCfg.Satellite.Web.SatelliteAddr = runCfg.Satellite.Identity.Address
|
||||
}
|
||||
|
||||
// Run satellite
|
||||
errch <- runCfg.Satellite.Identity.Run(ctx,
|
||||
grpcauth.NewAPIKeyInterceptor(),
|
||||
runCfg.Satellite.PointerDB,
|
||||
runCfg.Satellite.Kademlia,
|
||||
runCfg.Satellite.Audit,
|
||||
runCfg.Satellite.StatDB,
|
||||
o,
|
||||
// TODO(coyle): re-enable the checker after we determine why it is panicing
|
||||
// runCfg.Satellite.Checker,
|
||||
runCfg.Satellite.PointerDB,
|
||||
runCfg.Satellite.Checker,
|
||||
runCfg.Satellite.Repairer,
|
||||
runCfg.Satellite.BwAgreement,
|
||||
runCfg.Satellite.Web,
|
||||
|
@ -20,7 +20,9 @@ import (
|
||||
"storj.io/storj/pkg/bwagreement"
|
||||
dbmanager "storj.io/storj/pkg/bwagreement/database-manager"
|
||||
"storj.io/storj/pkg/cfgstruct"
|
||||
"storj.io/storj/pkg/datarepair/checker"
|
||||
"storj.io/storj/pkg/datarepair/queue"
|
||||
"storj.io/storj/pkg/datarepair/repairer"
|
||||
"storj.io/storj/pkg/kademlia"
|
||||
"storj.io/storj/pkg/overlay"
|
||||
mockOverlay "storj.io/storj/pkg/overlay/mocks"
|
||||
@ -65,9 +67,8 @@ var (
|
||||
Overlay overlay.Config
|
||||
MockOverlay mockOverlay.Config
|
||||
StatDB statdb.Config
|
||||
// RepairQueue queue.Config
|
||||
// RepairChecker checker.Config
|
||||
// Repairer repairer.Config
|
||||
Checker checker.Config
|
||||
Repairer repairer.Config
|
||||
// Audit audit.Config
|
||||
BwAgreement bwagreement.Config
|
||||
}
|
||||
@ -111,6 +112,8 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
|
||||
runCfg.PointerDB,
|
||||
o,
|
||||
runCfg.StatDB,
|
||||
runCfg.Checker,
|
||||
runCfg.Repairer,
|
||||
// runCfg.Audit,
|
||||
runCfg.BwAgreement,
|
||||
)
|
||||
|
2
go.mod
2
go.mod
@ -115,4 +115,4 @@ require (
|
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f
|
||||
)
|
||||
|
||||
exclude gopkg.in/olivere/elastic.v5 v5.0.72 // buggy import, see https://github.com/olivere/elastic/pull/869
|
||||
exclude gopkg.in/olivere/elastic.v5 v5.0.72 // buggy import, see https://github.com/olivere/elastic/pull/869
|
@ -19,8 +19,7 @@ type rollup struct {
|
||||
logger *zap.Logger
|
||||
ticker *time.Ticker
|
||||
//TODO:
|
||||
//rollupDB
|
||||
//rawDB
|
||||
//accountingDBServer
|
||||
}
|
||||
|
||||
func newRollup(logger *zap.Logger, interval time.Duration) *rollup {
|
||||
@ -28,8 +27,7 @@ func newRollup(logger *zap.Logger, interval time.Duration) *rollup {
|
||||
logger: logger,
|
||||
ticker: time.NewTicker(interval),
|
||||
//TODO:
|
||||
//rollupDB
|
||||
//rawDB
|
||||
//accountingDBServer
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -33,7 +33,7 @@ type tally struct {
|
||||
logger *zap.Logger
|
||||
ticker *time.Ticker
|
||||
//TODO:
|
||||
//rawDB
|
||||
//accountingDBServer
|
||||
}
|
||||
|
||||
func newTally(pointerdb *pointerdb.Server, overlay pb.OverlayServer, kademlia *kademlia.Kademlia, limit int, logger *zap.Logger, interval time.Duration) *tally {
|
||||
@ -45,7 +45,7 @@ func newTally(pointerdb *pointerdb.Server, overlay pb.OverlayServer, kademlia *k
|
||||
logger: logger,
|
||||
ticker: time.NewTicker(interval),
|
||||
//TODO:
|
||||
//rawDB
|
||||
//accountingDBServer
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -82,7 +82,16 @@ func (c *checker) identifyInjuredSegments(ctx context.Context) (err error) {
|
||||
if err != nil {
|
||||
return Error.New("error unmarshalling pointer %s", err)
|
||||
}
|
||||
pieces := pointer.Remote.RemotePieces
|
||||
remote := pointer.GetRemote()
|
||||
if remote == nil {
|
||||
c.logger.Debug("no remote segment on pointer")
|
||||
continue
|
||||
}
|
||||
pieces := remote.GetRemotePieces()
|
||||
if pieces == nil {
|
||||
c.logger.Debug("no pieces on remote segment")
|
||||
continue
|
||||
}
|
||||
var nodeIDs []dht.NodeID
|
||||
for _, p := range pieces {
|
||||
nodeIDs = append(nodeIDs, node.IDFromString(p.NodeId))
|
||||
|
@ -8,8 +8,11 @@ import (
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/storj/pkg/datarepair/queue"
|
||||
"storj.io/storj/pkg/overlay"
|
||||
mock "storj.io/storj/pkg/overlay/mocks"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/pointerdb"
|
||||
"storj.io/storj/pkg/provider"
|
||||
"storj.io/storj/storage/redis"
|
||||
@ -23,14 +26,20 @@ type Config struct {
|
||||
|
||||
// Initialize a Checker struct
|
||||
func (c Config) initialize(ctx context.Context) (Checker, error) {
|
||||
pointerdb := pointerdb.LoadFromContext(ctx)
|
||||
overlay := overlay.LoadServerFromContext(ctx)
|
||||
pdb := pointerdb.LoadFromContext(ctx)
|
||||
var o pb.OverlayServer
|
||||
x := overlay.LoadServerFromContext(ctx)
|
||||
if x == nil {
|
||||
o = mock.LoadServerFromContext(ctx)
|
||||
} else {
|
||||
o = x
|
||||
}
|
||||
redisQ, err := redis.NewQueueFrom(c.QueueAddress)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
repairQueue := queue.NewQueue(redisQ)
|
||||
return newChecker(pointerdb, repairQueue, overlay, 0, zap.L(), c.Interval), nil
|
||||
return newChecker(pdb, repairQueue, o, 0, zap.L(), c.Interval), nil
|
||||
}
|
||||
|
||||
// Run runs the checker with configured values
|
||||
|
@ -5,6 +5,7 @@ package queue
|
||||
|
||||
import (
|
||||
"github.com/golang/protobuf/proto"
|
||||
"go.uber.org/zap"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/storage"
|
||||
)
|
||||
@ -23,6 +24,7 @@ type Queue struct {
|
||||
|
||||
// NewQueue returns a pointer to a new Queue instance with an initialized connection to Redis
|
||||
func NewQueue(client storage.Queue) *Queue {
|
||||
zap.L().Info("Initializing new data repair queue")
|
||||
return &Queue{db: client}
|
||||
}
|
||||
|
||||
|
@ -17,6 +17,7 @@ import (
|
||||
|
||||
// Repairer is the interface for the data repairer
|
||||
type Repairer interface {
|
||||
//do we need this method? It doesn't look implemented
|
||||
Repair(ctx context.Context, seg *pb.InjuredSegment) error
|
||||
Run(ctx context.Context) error
|
||||
}
|
||||
|
@ -33,7 +33,7 @@ type Config struct {
|
||||
RefreshInterval time.Duration `help:"the interval at which the cache refreshes itself in seconds" default:"30s"`
|
||||
}
|
||||
|
||||
// CtxKey used for assigning cache
|
||||
// CtxKey used for assigning cache and server
|
||||
type CtxKey int
|
||||
|
||||
const (
|
||||
|
@ -29,6 +29,13 @@ func NewOverlay(nodes []*pb.Node) *Overlay {
|
||||
|
||||
}
|
||||
|
||||
//CtxKey Used as kademlia key
|
||||
type CtxKey int
|
||||
|
||||
const (
|
||||
ctxKeyMockOverlay CtxKey = iota
|
||||
)
|
||||
|
||||
// FindStorageNodes is the mock implementation
|
||||
func (mo *Overlay) FindStorageNodes(ctx context.Context, req *pb.FindStorageNodesRequest) (resp *pb.FindStorageNodesResponse, err error) {
|
||||
nodes := make([]*pb.Node, 0, len(mo.nodes))
|
||||
@ -82,7 +89,16 @@ func (c Config) Run(ctx context.Context, server *provider.Provider) error {
|
||||
Address: addr,
|
||||
}})
|
||||
}
|
||||
|
||||
pb.RegisterOverlayServer(server.GRPC(), NewOverlay(nodes))
|
||||
srv := NewOverlay(nodes)
|
||||
pb.RegisterOverlayServer(server.GRPC(), srv)
|
||||
ctx = context.WithValue(ctx, ctxKeyMockOverlay, srv)
|
||||
return server.Run(ctx)
|
||||
}
|
||||
|
||||
// LoadServerFromContext gives access to the overlay server from the context, or returns nil
|
||||
func LoadServerFromContext(ctx context.Context) *Overlay {
|
||||
if v, ok := ctx.Value(ctxKeyMockOverlay).(*Overlay); ok {
|
||||
return v
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -57,7 +57,6 @@ func (o *Server) Lookup(ctx context.Context, req *pb.LookupRequest) (*pb.LookupR
|
||||
//BulkLookup finds the addresses of nodes in our overlay network
|
||||
func (o *Server) BulkLookup(ctx context.Context, reqs *pb.LookupRequests) (*pb.LookupResponses, error) {
|
||||
ns, err := o.cache.GetAll(ctx, lookupRequestsToNodeIDs(reqs))
|
||||
|
||||
if err != nil {
|
||||
return nil, ServerError.New("could not get nodes requested %s\n", err)
|
||||
}
|
||||
|
@ -277,7 +277,6 @@ func (ic IdentityConfig) Run(ctx context.Context, interceptor grpc.UnaryServerIn
|
||||
}
|
||||
defer func() { _ = s.Close() }()
|
||||
zap.S().Infof("Node %s started", s.Identity().ID)
|
||||
|
||||
return s.Run(ctx)
|
||||
}
|
||||
|
||||
|
@ -3,5 +3,5 @@
|
||||
|
||||
package statdb
|
||||
|
||||
//go:generate dbx.v1 schema -d postgres -d sqlite3 statdb.dbx .
|
||||
//go:generate dbx.v1 golang -d postgres -d sqlite3 statdb.dbx .
|
||||
// go:generate dbx.v1 schema -d postgres -d sqlite3 statdb.dbx .
|
||||
// go:generate dbx.v1 golang -d postgres -d sqlite3 statdb.dbx .
|
||||
|
Loading…
Reference in New Issue
Block a user