multinode: handling offline nodes added

nodes/service and payouts/service returns node with status offline
or 0 in case if node is offline.

Change-Id: I74c03dcba9ddc9c05885ab329f80f3b14fe8c0fa
This commit is contained in:
Qweder93 2021-03-27 00:17:34 +02:00 committed by Nikolai Siedov
parent 73b9223758
commit 8f4505f532
7 changed files with 110 additions and 50 deletions

View File

@ -46,16 +46,20 @@ func (service *Service) Monthly(ctx context.Context) (_ Monthly, err error) {
defer mon.Task()(&ctx)(&err)
var totalMonthly Monthly
nodes, err := service.nodes.List(ctx)
listNodes, err := service.nodes.List(ctx)
if err != nil {
return Monthly{}, Error.Wrap(err)
}
cache := make(UsageRollupDailyCache)
for _, node := range nodes {
for _, node := range listNodes {
monthly, err := service.getMonthly(ctx, node)
if err != nil {
if nodes.ErrNodeNotReachable.Has(err) {
continue
}
return Monthly{}, Error.Wrap(err)
}
totalMonthly.IngressSummary += monthly.IngressSummary
@ -93,16 +97,20 @@ func (service *Service) MonthlySatellite(ctx context.Context, satelliteID storj.
defer mon.Task()(&ctx)(&err)
var totalMonthly Monthly
nodes, err := service.nodes.List(ctx)
listNodes, err := service.nodes.List(ctx)
if err != nil {
return Monthly{}, Error.Wrap(err)
}
cache := make(UsageRollupDailyCache)
for _, node := range nodes {
for _, node := range listNodes {
monthly, err := service.getMonthlySatellite(ctx, node, satelliteID)
if err != nil {
if nodes.ErrNodeNotReachable.Has(err) {
continue
}
return Monthly{}, Error.Wrap(err)
}
@ -145,7 +153,7 @@ func (service *Service) getMonthlySatellite(ctx context.Context, node nodes.Node
Address: node.PublicAddress,
})
if err != nil {
return Monthly{}, Error.Wrap(err)
return Monthly{}, nodes.ErrNodeNotReachable.Wrap(err)
}
defer func() {
@ -223,7 +231,7 @@ func (service *Service) getMonthly(ctx context.Context, node nodes.Node) (_ Mont
Address: node.PublicAddress,
})
if err != nil {
return Monthly{}, Error.Wrap(err)
return Monthly{}, nodes.ErrNodeNotReachable.Wrap(err)
}
defer func() {

View File

@ -33,8 +33,10 @@ type DB interface {
UpdateName(ctx context.Context, id storj.NodeID, name string) error
}
// ErrNoNode is a special error type that indicates about absence of node in NodesDB.
var ErrNoNode = errs.Class("no such node")
var (
// ErrNoNode is a special error type that indicates about absence of node in NodesDB.
ErrNoNode = errs.Class("no such node")
)
// Node is a representation of storagenode, that SNO could add to the Multinode Dashboard.
type Node struct {

View File

@ -22,6 +22,8 @@ var (
// Error is an error class for nodes service error.
Error = errs.Class("nodes")
// ErrNodeNotReachable is an error class that indicates that we are not able to establish drpc connection with node.
ErrNodeNotReachable = errs.Class("node is not reachable")
)
// Service exposes all nodes related logic.
@ -261,15 +263,19 @@ func (service *Service) ListInfosSatellite(ctx context.Context, satelliteID stor
func (service *Service) TrustedSatellites(ctx context.Context) (_ storj.NodeURLs, err error) {
defer mon.Task()(&ctx)(&err)
nodes, err := service.nodes.List(ctx)
listNodes, err := service.nodes.List(ctx)
if err != nil {
return nil, Error.Wrap(err)
}
var trustedSatellites storj.NodeURLs
for _, node := range nodes {
for _, node := range listNodes {
nodeURLs, err := service.trustedSatellites(ctx, node)
if err != nil {
if ErrNodeNotReachable.Has(err) {
continue
}
return nil, Error.Wrap(err)
}
@ -288,7 +294,7 @@ func (service *Service) trustedSatellites(ctx context.Context, node Node) (_ sto
Address: node.PublicAddress,
})
if err != nil {
return nil, Error.Wrap(err)
return storj.NodeURLs{}, ErrNodeNotReachable.Wrap(err)
}
defer func() {

View File

@ -67,10 +67,11 @@ func (service *Service) ListPaginated(ctx context.Context, cursor Cursor) (_ Pag
for _, node := range page.Nodes {
operator, err := service.GetOperator(ctx, node)
if err != nil {
// TODO: handle ass offline operators in future.
// TODO: we should count number of offline operators and make new query to db to save
// TODO: the size of elements on the page.
continue
if nodes.ErrNodeNotReachable.Has(err) {
continue
}
return Page{}, Error.Wrap(err)
}
operators = append(operators, operator)
}
@ -94,7 +95,7 @@ func (service *Service) GetOperator(ctx context.Context, node nodes.Node) (_ Ope
Address: node.PublicAddress,
})
if err != nil {
return Operator{}, Error.Wrap(err)
return Operator{}, nodes.ErrNodeNotReachable.Wrap(err)
}
defer func() {
err = errs.Combine(err, conn.Close())

View File

@ -53,8 +53,11 @@ func (service *Service) Earned(ctx context.Context) (earned int64, err error) {
for _, node := range storageNodes {
amount, err := service.earned(ctx, node)
if err != nil {
service.log.Error("failed to getAmount", zap.Error(err))
continue
if nodes.ErrNodeNotReachable.Has(err) {
continue
}
return 0, Error.Wrap(err)
}
earned += amount
@ -78,8 +81,11 @@ func (service *Service) EarnedSatellite(ctx context.Context) (earned []Satellite
for _, node := range storageNodes {
earnedPerSatellite, err := service.earnedSatellite(ctx, node)
if err != nil {
service.log.Error("failed to getEarnedFromSatellite", zap.Error(err))
continue
if nodes.ErrNodeNotReachable.Has(err) {
continue
}
return nil, Error.Wrap(err)
}
listNodesEarnedPerSatellite = append(listNodesEarnedPerSatellite, earnedPerSatellite)
@ -119,14 +125,18 @@ func (service *Service) Summary(ctx context.Context) (_ Summary, err error) {
var summary Summary
list, err := service.nodes.List(ctx)
listNodes, err := service.nodes.List(ctx)
if err != nil {
return Summary{}, Error.Wrap(err)
}
for _, node := range list {
for _, node := range listNodes {
info, err := service.summary(ctx, node)
if err != nil {
if nodes.ErrNodeNotReachable.Has(err) {
continue
}
return Summary{}, Error.Wrap(err)
}
@ -142,14 +152,18 @@ func (service *Service) SummaryPeriod(ctx context.Context, period string) (_ Sum
var summary Summary
list, err := service.nodes.List(ctx)
listNodes, err := service.nodes.List(ctx)
if err != nil {
return Summary{}, Error.Wrap(err)
}
for _, node := range list {
for _, node := range listNodes {
info, err := service.summaryPeriod(ctx, node, period)
if err != nil {
if nodes.ErrNodeNotReachable.Has(err) {
continue
}
return Summary{}, Error.Wrap(err)
}
@ -164,14 +178,18 @@ func (service *Service) SummarySatellite(ctx context.Context, satelliteID storj.
defer mon.Task()(&ctx)(&err)
var summary Summary
list, err := service.nodes.List(ctx)
listNodes, err := service.nodes.List(ctx)
if err != nil {
return Summary{}, Error.Wrap(err)
}
for _, node := range list {
for _, node := range listNodes {
info, err := service.summarySatellite(ctx, node, satelliteID)
if err != nil {
if nodes.ErrNodeNotReachable.Has(err) {
continue
}
return Summary{}, Error.Wrap(err)
}
@ -186,14 +204,18 @@ func (service *Service) SummarySatellitePeriod(ctx context.Context, satelliteID
defer mon.Task()(&ctx)(&err)
var summary Summary
list, err := service.nodes.List(ctx)
listNodes, err := service.nodes.List(ctx)
if err != nil {
return Summary{}, Error.Wrap(err)
}
for _, node := range list {
for _, node := range listNodes {
info, err := service.summarySatellitePeriod(ctx, node, satelliteID, period)
if err != nil {
if nodes.ErrNodeNotReachable.Has(err) {
continue
}
return Summary{}, Error.Wrap(err)
}
@ -210,7 +232,7 @@ func (service *Service) summarySatellite(ctx context.Context, node nodes.Node, s
Address: node.PublicAddress,
})
if err != nil {
return &multinodepb.PayoutInfo{}, Error.Wrap(err)
return &multinodepb.PayoutInfo{}, nodes.ErrNodeNotReachable.Wrap(err)
}
defer func() {
@ -237,7 +259,7 @@ func (service *Service) summarySatellitePeriod(ctx context.Context, node nodes.N
Address: node.PublicAddress,
})
if err != nil {
return &multinodepb.PayoutInfo{}, Error.Wrap(err)
return &multinodepb.PayoutInfo{}, nodes.ErrNodeNotReachable.Wrap(err)
}
defer func() {
@ -264,7 +286,7 @@ func (service *Service) summaryPeriod(ctx context.Context, node nodes.Node, peri
Address: node.PublicAddress,
})
if err != nil {
return &multinodepb.PayoutInfo{}, Error.Wrap(err)
return &multinodepb.PayoutInfo{}, nodes.ErrNodeNotReachable.Wrap(err)
}
defer func() {
@ -291,7 +313,7 @@ func (service *Service) summary(ctx context.Context, node nodes.Node) (info *mul
Address: node.PublicAddress,
})
if err != nil {
return &multinodepb.PayoutInfo{}, Error.Wrap(err)
return &multinodepb.PayoutInfo{}, nodes.ErrNodeNotReachable.Wrap(err)
}
defer func() {
@ -334,14 +356,18 @@ func (service *Service) Expectations(ctx context.Context) (_ Expectations, err e
var expectations Expectations
list, err := service.nodes.List(ctx)
listNodes, err := service.nodes.List(ctx)
if err != nil {
return Expectations{}, Error.Wrap(err)
}
for _, node := range list {
for _, node := range listNodes {
expectation, err := service.nodeExpectations(ctx, node)
if err != nil {
if nodes.ErrNodeNotReachable.Has(err) {
continue
}
return Expectations{}, Error.Wrap(err)
}
@ -366,7 +392,7 @@ func (service *Service) HeldAmountSummary(ctx context.Context, nodeID storj.Node
Address: node.PublicAddress,
})
if err != nil {
return nil, Error.Wrap(err)
return nil, nodes.ErrNodeNotReachable.Wrap(err)
}
defer func() {
err = errs.Combine(err, conn.Close())
@ -469,7 +495,7 @@ func (service *Service) nodeExpectations(ctx context.Context, node nodes.Node) (
Address: node.PublicAddress,
})
if err != nil {
return Expectations{}, Error.Wrap(err)
return Expectations{}, nodes.ErrNodeNotReachable.Wrap(err)
}
defer func() {
@ -501,7 +527,7 @@ func (service *Service) earned(ctx context.Context, node nodes.Node) (_ int64, e
Address: node.PublicAddress,
})
if err != nil {
return 0, Error.Wrap(err)
return 0, nodes.ErrNodeNotReachable.Wrap(err)
}
defer func() {
@ -528,7 +554,7 @@ func (service *Service) earnedSatellite(ctx context.Context, node nodes.Node) (_
Address: node.PublicAddress,
})
if err != nil {
return multinodepb.EarnedPerSatelliteResponse{}, Error.Wrap(err)
return multinodepb.EarnedPerSatelliteResponse{}, nodes.ErrNodeNotReachable.Wrap(err)
}
defer func() {
@ -562,7 +588,7 @@ func (service *Service) PaystubSatellitePeriod(ctx context.Context, period strin
Address: node.PublicAddress,
})
if err != nil {
return Paystub{}, Error.Wrap(err)
return Paystub{}, nodes.ErrNodeNotReachable.Wrap(err)
}
defer func() {
@ -613,7 +639,7 @@ func (service *Service) PaystubPeriod(ctx context.Context, period string, nodeID
Address: node.PublicAddress,
})
if err != nil {
return Paystub{}, Error.Wrap(err)
return Paystub{}, nodes.ErrNodeNotReachable.Wrap(err)
}
defer func() {
@ -663,7 +689,7 @@ func (service *Service) PaystubSatellite(ctx context.Context, nodeID, satelliteI
Address: node.PublicAddress,
})
if err != nil {
return Paystub{}, Error.Wrap(err)
return Paystub{}, nodes.ErrNodeNotReachable.Wrap(err)
}
defer func() {
@ -713,7 +739,7 @@ func (service *Service) Paystub(ctx context.Context, nodeID storj.NodeID) (_ Pay
Address: node.PublicAddress,
})
if err != nil {
return Paystub{}, Error.Wrap(err)
return Paystub{}, nodes.ErrNodeNotReachable.Wrap(err)
}
defer func() {

View File

@ -59,6 +59,11 @@ func (service *Service) Stats(ctx context.Context, satelliteID storj.NodeID) (_
if ErrorNoStats.Has(err) {
continue
}
if nodes.ErrNodeNotReachable.Has(err) {
continue
}
return nil, Error.Wrap(err)
}
@ -77,7 +82,7 @@ func (service *Service) dialStats(ctx context.Context, node nodes.Node, satellit
Address: node.PublicAddress,
})
if err != nil {
return Stats{}, Error.Wrap(err)
return Stats{}, nodes.ErrNodeNotReachable.Wrap(err)
}
defer func() {
err = errs.Combine(err, conn.Close())

View File

@ -90,6 +90,10 @@ func (service *Service) TotalUsage(ctx context.Context, from, to time.Time) (_ U
for _, node := range nodesList {
usage, err := service.dialUsage(ctx, node, from, to)
if err != nil {
if nodes.ErrNodeNotReachable.Has(err) {
continue
}
return Usage{}, Error.Wrap(err)
}
@ -120,6 +124,10 @@ func (service *Service) TotalUsageSatellite(ctx context.Context, satelliteID sto
for _, node := range nodesList {
usage, err := service.dialUsageSatellite(ctx, node, satelliteID, from, to)
if err != nil {
if nodes.ErrNodeNotReachable.Has(err) {
continue
}
return Usage{}, Error.Wrap(err)
}
@ -139,17 +147,21 @@ func (service *Service) TotalUsageSatellite(ctx context.Context, satelliteID sto
func (service *Service) TotalDiskSpace(ctx context.Context) (totalDiskSpace DiskSpace, err error) {
defer mon.Task()(&ctx)(&err)
nodes, err := service.nodes.List(ctx)
listNodes, err := service.nodes.List(ctx)
if err != nil {
return DiskSpace{}, Error.Wrap(err)
}
for _, node := range nodes {
for _, node := range listNodes {
diskSpace, err := service.dialDiskSpace(ctx, node)
if err != nil {
// TODO: how should we handle offline node?
continue
if nodes.ErrNodeNotReachable.Has(err) {
continue
}
return DiskSpace{}, Error.Wrap(err)
}
totalDiskSpace.Add(diskSpace)
}
@ -175,7 +187,7 @@ func (service *Service) dialDiskSpace(ctx context.Context, node nodes.Node) (dis
Address: node.PublicAddress,
})
if err != nil {
return DiskSpace{}, Error.Wrap(err)
return DiskSpace{}, nodes.ErrNodeNotReachable.Wrap(err)
}
defer func() {
err = errs.Combine(err, conn.Close())
@ -211,7 +223,7 @@ func (service *Service) dialUsage(ctx context.Context, node nodes.Node, from, to
Address: node.PublicAddress,
})
if err != nil {
return Usage{}, Error.Wrap(err)
return Usage{}, nodes.ErrNodeNotReachable.Wrap(err)
}
defer func() {
err = errs.Combine(err, conn.Close())
@ -254,7 +266,7 @@ func (service *Service) dialUsageSatellite(ctx context.Context, node nodes.Node,
Address: node.PublicAddress,
})
if err != nil {
return Usage{}, Error.Wrap(err)
return Usage{}, nodes.ErrNodeNotReachable.Wrap(err)
}
defer func() {
err = errs.Combine(err, conn.Close())