diff --git a/pkg/piecestore/psserver/bucket_refresher.go b/pkg/piecestore/psserver/bucket_refresher.go index 035bb5262..a6581860c 100644 --- a/pkg/piecestore/psserver/bucket_refresher.go +++ b/pkg/piecestore/psserver/bucket_refresher.go @@ -37,9 +37,7 @@ func newService(log *zap.Logger, interval time.Duration, rt *kademlia.RoutingTab } // Run runs the bucket refresher service -func (service *refreshService) Run(ctx context.Context) (err error) { - defer mon.Task()(&ctx)(&err) - +func (service *refreshService) Run(ctx context.Context) { for { err := service.process(ctx) if err != nil { @@ -49,7 +47,7 @@ func (service *refreshService) Run(ctx context.Context) (err error) { select { case <-service.ticker.C: // wait for the next interval to happen case <-ctx.Done(): // or the bucket refresher service is canceled via context - return ctx.Err() + return } } } diff --git a/pkg/piecestore/psserver/config.go b/pkg/piecestore/psserver/config.go index e97f84acf..3d1943edf 100644 --- a/pkg/piecestore/psserver/config.go +++ b/pkg/piecestore/psserver/config.go @@ -38,9 +38,8 @@ type Config struct { // Run implements provider.Responsibility func (c Config) Run(ctx context.Context, server *provider.Provider) (err error) { defer mon.Task()(&ctx)(&err) - ctx, cancel := context.WithCancel(ctx) - //piecestore + // piecestore Storage Driver storage := pstore.NewStorage(filepath.Join(c.Path, "piece-store-data")) db, err := psdb.Open(ctx, storage, filepath.Join(c.Path, "piecestore.db")) @@ -48,38 +47,43 @@ func (c Config) Run(ctx context.Context, server *provider.Provider) (err error) return ServerError.Wrap(err) } - //kademlia + // Load kademlia from context kad := kademlia.LoadFromContext(ctx) if kad == nil { return ServerError.New("Failed to load Kademlia from context") } + // Initialize piecestore server struct s, err := NewEndpoint(zap.L(), c, storage, db, server.Identity().Key, kad) if err != nil { return err } + defer func() { + if stopErr := s.Stop(ctx); stopErr != nil { + log.Fatal(stopErr) + } + }() + pb.RegisterPieceStoreRoutesServer(server.GRPC(), s) rt, err := kad.GetRoutingTable(ctx) if err != nil { return ServerError.Wrap(err) } + krt, ok := rt.(*kademlia.RoutingTable) if !ok { return ServerError.New("Could not convert dht.RoutingTable to *kademlia.RoutingTable") } - refreshProcess := newService(zap.L(), c.KBucketRefreshInterval, krt, s) - go func() { - if err := refreshProcess.Run(ctx); err != nil { - cancel() - } - }() - //agreementsender + // Initialize Refresh process for updating storage node meta in kademlia + refreshProcess := newService(zap.L(), c.KBucketRefreshInterval, krt, s) + go refreshProcess.Run(ctx) + + // Initialize agreementsender process for sending received bandwidth agreements to satellites agreementSender := agreementsender.New(zap.L(), s.DB, server.Identity(), kad, c.AgreementSenderCheckInterval) go agreementSender.Run(ctx) - defer func() { log.Fatal(s.Stop(ctx)) }() s.log.Info("Started Node", zap.String("ID", fmt.Sprint(server.Identity().ID))) return server.Run(ctx) } diff --git a/pkg/piecestore/psserver/server.go b/pkg/piecestore/psserver/server.go index 77232cf7e..4edd7412c 100644 --- a/pkg/piecestore/psserver/server.go +++ b/pkg/piecestore/psserver/server.go @@ -228,6 +228,10 @@ func (s *Server) Dashboard(in *pb.DashboardReq, stream pb.PieceStoreRoutes_Dashb for { select { case <-ctx.Done(): + if ctx.Err() == context.Canceled { + return nil + } + return ctx.Err() case <-ticker.C: data, err := s.getDashboardData(ctx) diff --git a/pkg/server/config.go b/pkg/server/config.go index 69171bce5..efc10541c 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -52,7 +52,13 @@ func (sc Config) Run(ctx context.Context, if err != nil { return err } - defer func() { _ = s.Close() }() + + go func() { + <-ctx.Done() + if closeErr := s.Close(); closeErr != nil { + zap.S().Errorf("Failed to close server: %s", closeErr) + } + }() zap.S().Infof("Node %s started on %s", s.Identity().ID, sc.Address) return s.Run(ctx)