storj/multinode/operators/service.go

127 lines
3.0 KiB
Go
Raw Normal View History

// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package operators
import (
"context"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/rpc"
"storj.io/common/storj"
"storj.io/storj/multinode/nodes"
"storj.io/storj/private/multinodepb"
)
// MaxOperatorsOnPage defines maximum limit on operators page.
const MaxOperatorsOnPage = 5
var (
mon = monkit.Package()
// Error is an error class for operators service error.
Error = errs.Class("operators")
)
// Service exposes all operators related logic.
//
// architecture: Service
type Service struct {
log *zap.Logger
dialer rpc.Dialer
nodes nodes.DB
}
// NewService creates new instance of Service.
func NewService(log *zap.Logger, dialer rpc.Dialer, nodes nodes.DB) *Service {
return &Service{
log: log,
dialer: dialer,
nodes: nodes,
}
}
// ListPaginated returns paginated list of operators.
func (service *Service) ListPaginated(ctx context.Context, cursor Cursor) (_ Page, err error) {
defer mon.Task()(&ctx)(&err)
if cursor.Limit > MaxOperatorsOnPage {
cursor.Limit = MaxOperatorsOnPage
}
if cursor.Limit < 1 {
cursor.Limit = 1
}
if cursor.Page == 0 {
return Page{}, Error.Wrap(errs.New("page can not be 0"))
}
page, err := service.nodes.ListPaged(ctx, nodes.Cursor{
Limit: cursor.Limit,
Page: cursor.Page,
})
if err != nil {
return Page{}, Error.Wrap(err)
}
var operators []Operator
for _, node := range page.Nodes {
operator, err := service.GetOperator(ctx, node)
if err != nil {
if nodes.ErrNodeNotReachable.Has(err) {
continue
}
return Page{}, Error.Wrap(err)
}
operators = append(operators, operator)
}
return Page{
Operators: operators,
Offset: page.Offset,
Limit: page.Limit,
CurrentPage: page.CurrentPage,
PageCount: page.PageCount,
TotalCount: page.TotalCount,
}, nil
}
// GetOperator retrieves operator form node via rpc.
func (service *Service) GetOperator(ctx context.Context, node nodes.Node) (_ Operator, err error) {
defer mon.Task()(&ctx)(&err)
conn, err := service.dialer.DialNodeURL(ctx, storj.NodeURL{
ID: node.ID,
Address: node.PublicAddress,
})
if err != nil {
return Operator{}, nodes.ErrNodeNotReachable.Wrap(err)
}
defer func() {
err = errs.Combine(err, conn.Close())
}()
nodeClient := multinodepb.NewDRPCNodeClient(conn)
payoutClient := multinodepb.NewDRPCPayoutClient(conn)
header := &multinodepb.RequestHeader{
ApiKey: node.APISecret,
}
operatorResponse, err := nodeClient.Operator(ctx, &multinodepb.OperatorRequest{Header: header})
if err != nil {
return Operator{}, Error.Wrap(err)
}
undistributedResponse, err := payoutClient.Undistributed(ctx, &multinodepb.UndistributedRequest{Header: header})
if err != nil {
return Operator{}, Error.Wrap(err)
}
return Operator{
NodeID: node.ID,
Email: operatorResponse.Email,
Wallet: operatorResponse.Wallet,
WalletFeatures: operatorResponse.WalletFeatures,
Undistributed: undistributedResponse.Total,
}, nil
}