Closing context doesn't stop storage node (#1084)
* Print when cancelled * Close properly * Don't log nil * Don't print error when closing dashboard
This commit is contained in:
parent
3bf7c7d43b
commit
944daeab02
@ -37,9 +37,7 @@ func newService(log *zap.Logger, interval time.Duration, rt *kademlia.RoutingTab
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Run runs the bucket refresher service
|
// Run runs the bucket refresher service
|
||||||
func (service *refreshService) Run(ctx context.Context) (err error) {
|
func (service *refreshService) Run(ctx context.Context) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
err := service.process(ctx)
|
err := service.process(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -49,7 +47,7 @@ func (service *refreshService) Run(ctx context.Context) (err error) {
|
|||||||
select {
|
select {
|
||||||
case <-service.ticker.C: // wait for the next interval to happen
|
case <-service.ticker.C: // wait for the next interval to happen
|
||||||
case <-ctx.Done(): // or the bucket refresher service is canceled via context
|
case <-ctx.Done(): // or the bucket refresher service is canceled via context
|
||||||
return ctx.Err()
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -38,9 +38,8 @@ type Config struct {
|
|||||||
// Run implements provider.Responsibility
|
// Run implements provider.Responsibility
|
||||||
func (c Config) Run(ctx context.Context, server *provider.Provider) (err error) {
|
func (c Config) Run(ctx context.Context, server *provider.Provider) (err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
|
||||||
|
|
||||||
//piecestore
|
// piecestore Storage Driver
|
||||||
storage := pstore.NewStorage(filepath.Join(c.Path, "piece-store-data"))
|
storage := pstore.NewStorage(filepath.Join(c.Path, "piece-store-data"))
|
||||||
|
|
||||||
db, err := psdb.Open(ctx, storage, filepath.Join(c.Path, "piecestore.db"))
|
db, err := psdb.Open(ctx, storage, filepath.Join(c.Path, "piecestore.db"))
|
||||||
@ -48,38 +47,43 @@ func (c Config) Run(ctx context.Context, server *provider.Provider) (err error)
|
|||||||
return ServerError.Wrap(err)
|
return ServerError.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
//kademlia
|
// Load kademlia from context
|
||||||
kad := kademlia.LoadFromContext(ctx)
|
kad := kademlia.LoadFromContext(ctx)
|
||||||
if kad == nil {
|
if kad == nil {
|
||||||
return ServerError.New("Failed to load Kademlia from context")
|
return ServerError.New("Failed to load Kademlia from context")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Initialize piecestore server struct
|
||||||
s, err := NewEndpoint(zap.L(), c, storage, db, server.Identity().Key, kad)
|
s, err := NewEndpoint(zap.L(), c, storage, db, server.Identity().Key, kad)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
defer func() {
|
||||||
|
if stopErr := s.Stop(ctx); stopErr != nil {
|
||||||
|
log.Fatal(stopErr)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
pb.RegisterPieceStoreRoutesServer(server.GRPC(), s)
|
pb.RegisterPieceStoreRoutesServer(server.GRPC(), s)
|
||||||
|
|
||||||
rt, err := kad.GetRoutingTable(ctx)
|
rt, err := kad.GetRoutingTable(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ServerError.Wrap(err)
|
return ServerError.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
krt, ok := rt.(*kademlia.RoutingTable)
|
krt, ok := rt.(*kademlia.RoutingTable)
|
||||||
if !ok {
|
if !ok {
|
||||||
return ServerError.New("Could not convert dht.RoutingTable to *kademlia.RoutingTable")
|
return ServerError.New("Could not convert dht.RoutingTable to *kademlia.RoutingTable")
|
||||||
}
|
}
|
||||||
refreshProcess := newService(zap.L(), c.KBucketRefreshInterval, krt, s)
|
|
||||||
go func() {
|
|
||||||
if err := refreshProcess.Run(ctx); err != nil {
|
|
||||||
cancel()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
//agreementsender
|
// Initialize Refresh process for updating storage node meta in kademlia
|
||||||
|
refreshProcess := newService(zap.L(), c.KBucketRefreshInterval, krt, s)
|
||||||
|
go refreshProcess.Run(ctx)
|
||||||
|
|
||||||
|
// Initialize agreementsender process for sending received bandwidth agreements to satellites
|
||||||
agreementSender := agreementsender.New(zap.L(), s.DB, server.Identity(), kad, c.AgreementSenderCheckInterval)
|
agreementSender := agreementsender.New(zap.L(), s.DB, server.Identity(), kad, c.AgreementSenderCheckInterval)
|
||||||
go agreementSender.Run(ctx)
|
go agreementSender.Run(ctx)
|
||||||
|
|
||||||
defer func() { log.Fatal(s.Stop(ctx)) }()
|
|
||||||
s.log.Info("Started Node", zap.String("ID", fmt.Sprint(server.Identity().ID)))
|
s.log.Info("Started Node", zap.String("ID", fmt.Sprint(server.Identity().ID)))
|
||||||
return server.Run(ctx)
|
return server.Run(ctx)
|
||||||
}
|
}
|
||||||
|
@ -228,6 +228,10 @@ func (s *Server) Dashboard(in *pb.DashboardReq, stream pb.PieceStoreRoutes_Dashb
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
if ctx.Err() == context.Canceled {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
data, err := s.getDashboardData(ctx)
|
data, err := s.getDashboardData(ctx)
|
||||||
|
@ -52,7 +52,13 @@ func (sc Config) Run(ctx context.Context,
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer func() { _ = s.Close() }()
|
|
||||||
|
go func() {
|
||||||
|
<-ctx.Done()
|
||||||
|
if closeErr := s.Close(); closeErr != nil {
|
||||||
|
zap.S().Errorf("Failed to close server: %s", closeErr)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
zap.S().Infof("Node %s started on %s", s.Identity().ID, sc.Address)
|
zap.S().Infof("Node %s started on %s", s.Identity().ID, sc.Address)
|
||||||
return s.Run(ctx)
|
return s.Run(ctx)
|
||||||
|
Loading…
Reference in New Issue
Block a user