73cdefbc41
Change-Id: I12c90169f8959e5eafe2a64bf8b412f7afc48c1c
402 lines
11 KiB
Go
402 lines
11 KiB
Go
// Copyright (C) 2020 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package nodes
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/spacemonkeygo/monkit/v3"
|
|
"github.com/zeebo/errs"
|
|
"go.uber.org/zap"
|
|
|
|
"storj.io/common/rpc"
|
|
"storj.io/common/rpc/rpcstatus"
|
|
"storj.io/common/storj"
|
|
"storj.io/storj/private/multinodepb"
|
|
)
|
|
|
|
var (
|
|
mon = monkit.Package()
|
|
|
|
// Error is an error class for nodes service error.
|
|
Error = errs.Class("nodes")
|
|
// ErrNodeNotReachable is an error class that indicates that we are not able to establish drpc connection with node.
|
|
ErrNodeNotReachable = errs.Class("node is not reachable")
|
|
// ErrNodeAPIKeyInvalid is an error class that indicates that we uses wrong api key.
|
|
ErrNodeAPIKeyInvalid = errs.Class("node api key is invalid")
|
|
)
|
|
|
|
// Service exposes all nodes related logic.
|
|
//
|
|
// architecture: Service
|
|
type Service struct {
|
|
log *zap.Logger
|
|
dialer rpc.Dialer
|
|
nodes DB
|
|
}
|
|
|
|
// NewService creates new instance of Service.
|
|
func NewService(log *zap.Logger, dialer rpc.Dialer, nodes DB) *Service {
|
|
return &Service{
|
|
log: log,
|
|
dialer: dialer,
|
|
nodes: nodes,
|
|
}
|
|
}
|
|
|
|
// Add adds new node to the system.
|
|
func (service *Service) Add(ctx context.Context, id storj.NodeID, apiSecret []byte, publicAddress string) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
// trying to connect to node to check its availability.
|
|
conn, err := service.dialer.DialNodeURL(ctx, storj.NodeURL{
|
|
ID: id,
|
|
Address: publicAddress,
|
|
})
|
|
if err != nil {
|
|
return ErrNodeNotReachable.Wrap(err)
|
|
}
|
|
defer func() {
|
|
err = errs.Combine(err, conn.Close())
|
|
}()
|
|
|
|
nodeClient := multinodepb.NewDRPCNodeClient(conn)
|
|
header := &multinodepb.RequestHeader{
|
|
ApiKey: apiSecret,
|
|
}
|
|
|
|
// making test request to check node api key.
|
|
_, err = nodeClient.Version(ctx, &multinodepb.VersionRequest{Header: header})
|
|
if err != nil {
|
|
if rpcstatus.Code(err) == rpcstatus.Unauthenticated {
|
|
return ErrNodeAPIKeyInvalid.Wrap(err)
|
|
}
|
|
return Error.Wrap(err)
|
|
}
|
|
|
|
return Error.Wrap(service.nodes.Add(ctx, id, apiSecret, publicAddress))
|
|
}
|
|
|
|
// List returns list of all nodes.
|
|
func (service *Service) List(ctx context.Context) (_ []Node, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
nodes, err := service.nodes.List(ctx)
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
|
|
return nodes, nil
|
|
}
|
|
|
|
// UpdateName will update name of the specified node.
|
|
func (service *Service) UpdateName(ctx context.Context, id storj.NodeID, name string) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
return Error.Wrap(service.nodes.UpdateName(ctx, id, name))
|
|
}
|
|
|
|
// Get retrieves node by id.
|
|
func (service *Service) Get(ctx context.Context, id storj.NodeID) (_ Node, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
node, err := service.nodes.Get(ctx, id)
|
|
if err != nil {
|
|
return Node{}, Error.Wrap(err)
|
|
}
|
|
|
|
return node, nil
|
|
}
|
|
|
|
// Remove removes node from the system.
|
|
func (service *Service) Remove(ctx context.Context, id storj.NodeID) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
return Error.Wrap(service.nodes.Remove(ctx, id))
|
|
}
|
|
|
|
// ListInfos queries node basic info from all nodes via rpc.
|
|
func (service *Service) ListInfos(ctx context.Context) (_ []NodeInfo, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
nodes, err := service.nodes.List(ctx)
|
|
if err != nil {
|
|
if ErrNoNode.Has(err) {
|
|
return []NodeInfo{}, nil
|
|
}
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
|
|
var infos []NodeInfo
|
|
for _, node := range nodes {
|
|
info := func() NodeInfo {
|
|
nodeInfo := NodeInfo{
|
|
ID: node.ID,
|
|
Name: node.Name,
|
|
}
|
|
conn, err := service.dialer.DialNodeURL(ctx, storj.NodeURL{
|
|
ID: node.ID,
|
|
Address: node.PublicAddress,
|
|
})
|
|
if err != nil {
|
|
nodeInfo.Status = StatusNotReachable
|
|
return nodeInfo
|
|
}
|
|
|
|
defer func() {
|
|
err = errs.Combine(err, conn.Close())
|
|
}()
|
|
|
|
nodeClient := multinodepb.NewDRPCNodeClient(conn)
|
|
storageClient := multinodepb.NewDRPCStorageClient(conn)
|
|
bandwidthClient := multinodepb.NewDRPCBandwidthClient(conn)
|
|
payoutClient := multinodepb.NewDRPCPayoutClient(conn)
|
|
|
|
header := &multinodepb.RequestHeader{
|
|
ApiKey: node.APISecret,
|
|
}
|
|
|
|
nodeVersion, err := nodeClient.Version(ctx, &multinodepb.VersionRequest{Header: header})
|
|
if err != nil {
|
|
if rpcstatus.Code(err) == rpcstatus.Unauthenticated {
|
|
nodeInfo.Status = StatusUnauthorized
|
|
return nodeInfo
|
|
}
|
|
|
|
nodeInfo.Status = StatusStorageNodeInternalError
|
|
return nodeInfo
|
|
}
|
|
|
|
lastContact, err := nodeClient.LastContact(ctx, &multinodepb.LastContactRequest{Header: header})
|
|
if err != nil {
|
|
// TODO: since rpcstatus.Unauthenticated was checked in nodeVersion this sort of error can be caused
|
|
// only if new apikey was issued during ListInfos method call.
|
|
nodeInfo.Status = StatusStorageNodeInternalError
|
|
return nodeInfo
|
|
}
|
|
|
|
diskSpace, err := storageClient.DiskSpace(ctx, &multinodepb.DiskSpaceRequest{Header: header})
|
|
if err != nil {
|
|
nodeInfo.Status = StatusStorageNodeInternalError
|
|
return nodeInfo
|
|
}
|
|
|
|
earned, err := payoutClient.Earned(ctx, &multinodepb.EarnedRequest{Header: header})
|
|
if err != nil {
|
|
nodeInfo.Status = StatusStorageNodeInternalError
|
|
return nodeInfo
|
|
}
|
|
|
|
bandwidthSummaryRequest := &multinodepb.BandwidthMonthSummaryRequest{
|
|
Header: header,
|
|
}
|
|
bandwidthSummary, err := bandwidthClient.MonthSummary(ctx, bandwidthSummaryRequest)
|
|
if err != nil {
|
|
nodeInfo.Status = StatusStorageNodeInternalError
|
|
return nodeInfo
|
|
}
|
|
|
|
nodeInfo.Version = nodeVersion.Version
|
|
nodeInfo.LastContact = lastContact.LastContact
|
|
nodeInfo.DiskSpaceUsed = diskSpace.GetUsedPieces() + diskSpace.GetUsedTrash()
|
|
nodeInfo.DiskSpaceLeft = diskSpace.GetAvailable()
|
|
nodeInfo.BandwidthUsed = bandwidthSummary.GetUsed()
|
|
nodeInfo.TotalEarned = earned.Total
|
|
nodeInfo.Status = nodeStatus(lastContact.LastContact)
|
|
|
|
return nodeInfo
|
|
}()
|
|
|
|
infos = append(infos, info)
|
|
}
|
|
|
|
return infos, nil
|
|
}
|
|
|
|
// ListInfosSatellite queries node satellite specific info from all nodes via rpc.
|
|
func (service *Service) ListInfosSatellite(ctx context.Context, satelliteID storj.NodeID) (_ []NodeInfoSatellite, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
nodes, err := service.nodes.List(ctx)
|
|
if err != nil {
|
|
if ErrNoNode.Has(err) {
|
|
return []NodeInfoSatellite{}, nil
|
|
}
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
|
|
var infos []NodeInfoSatellite
|
|
for _, node := range nodes {
|
|
info := func() NodeInfoSatellite {
|
|
nodeInfoSatellite := NodeInfoSatellite{
|
|
ID: node.ID,
|
|
Name: node.Name,
|
|
}
|
|
conn, err := service.dialer.DialNodeURL(ctx, storj.NodeURL{
|
|
ID: node.ID,
|
|
Address: node.PublicAddress,
|
|
})
|
|
if err != nil {
|
|
nodeInfoSatellite.Status = StatusNotReachable
|
|
return nodeInfoSatellite
|
|
}
|
|
|
|
defer func() {
|
|
err = errs.Combine(err, conn.Close())
|
|
}()
|
|
|
|
nodeClient := multinodepb.NewDRPCNodeClient(conn)
|
|
payoutClient := multinodepb.NewDRPCPayoutClient(conn)
|
|
|
|
header := &multinodepb.RequestHeader{
|
|
ApiKey: node.APISecret,
|
|
}
|
|
|
|
nodeVersion, err := nodeClient.Version(ctx, &multinodepb.VersionRequest{Header: header})
|
|
if err != nil {
|
|
if rpcstatus.Code(err) == rpcstatus.Unauthenticated {
|
|
nodeInfoSatellite.Status = StatusUnauthorized
|
|
return nodeInfoSatellite
|
|
}
|
|
|
|
nodeInfoSatellite.Status = StatusStorageNodeInternalError
|
|
return nodeInfoSatellite
|
|
}
|
|
|
|
lastContact, err := nodeClient.LastContact(ctx, &multinodepb.LastContactRequest{Header: header})
|
|
if err != nil {
|
|
// TODO: since rpcstatus.Unauthenticated was checked in Version this sort of error can be caused
|
|
// only if new apikey was issued during ListInfosSatellite method call.
|
|
nodeInfoSatellite.Status = StatusStorageNodeInternalError
|
|
return nodeInfoSatellite
|
|
}
|
|
|
|
rep, err := nodeClient.Reputation(ctx, &multinodepb.ReputationRequest{
|
|
Header: header,
|
|
SatelliteId: satelliteID,
|
|
})
|
|
if err != nil {
|
|
nodeInfoSatellite.Status = StatusStorageNodeInternalError
|
|
return nodeInfoSatellite
|
|
}
|
|
|
|
earned, err := payoutClient.Earned(ctx, &multinodepb.EarnedRequest{Header: header})
|
|
if err != nil {
|
|
nodeInfoSatellite.Status = StatusStorageNodeInternalError
|
|
return nodeInfoSatellite
|
|
}
|
|
|
|
nodeInfoSatellite.Version = nodeVersion.Version
|
|
nodeInfoSatellite.LastContact = lastContact.LastContact
|
|
nodeInfoSatellite.OnlineScore = rep.Online.Score
|
|
nodeInfoSatellite.AuditScore = rep.Audit.Score
|
|
nodeInfoSatellite.SuspensionScore = rep.Audit.SuspensionScore
|
|
nodeInfoSatellite.TotalEarned = earned.Total
|
|
nodeInfoSatellite.Status = nodeStatus(lastContact.LastContact)
|
|
|
|
return nodeInfoSatellite
|
|
}()
|
|
|
|
infos = append(infos, info)
|
|
}
|
|
|
|
return infos, nil
|
|
}
|
|
|
|
// TrustedSatellites returns list of unique trusted satellites node urls.
|
|
func (service *Service) TrustedSatellites(ctx context.Context) (_ storj.NodeURLs, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
listNodes, err := service.nodes.List(ctx)
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
|
|
var trustedSatellites storj.NodeURLs
|
|
for _, node := range listNodes {
|
|
nodeURLs, err := service.trustedSatellites(ctx, node)
|
|
if err != nil {
|
|
if ErrNodeNotReachable.Has(err) {
|
|
continue
|
|
}
|
|
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
|
|
trustedSatellites = appendUniqueNodeURLs(trustedSatellites, nodeURLs)
|
|
}
|
|
|
|
return trustedSatellites, nil
|
|
}
|
|
|
|
// trustedSatellites retrieves list of trusted satellites node urls for a node.
|
|
func (service *Service) trustedSatellites(ctx context.Context, node Node) (_ storj.NodeURLs, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
conn, err := service.dialer.DialNodeURL(ctx, storj.NodeURL{
|
|
ID: node.ID,
|
|
Address: node.PublicAddress,
|
|
})
|
|
if err != nil {
|
|
return storj.NodeURLs{}, ErrNodeNotReachable.Wrap(err)
|
|
}
|
|
|
|
defer func() {
|
|
err = errs.Combine(err, conn.Close())
|
|
}()
|
|
|
|
nodeClient := multinodepb.NewDRPCNodeClient(conn)
|
|
|
|
header := &multinodepb.RequestHeader{
|
|
ApiKey: node.APISecret,
|
|
}
|
|
|
|
resp, err := nodeClient.TrustedSatellites(ctx, &multinodepb.TrustedSatellitesRequest{Header: header})
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
|
|
var nodeURLs storj.NodeURLs
|
|
for _, url := range resp.TrustedSatellites {
|
|
nodeURLs = append(nodeURLs, storj.NodeURL{
|
|
ID: url.NodeId,
|
|
Address: url.GetAddress(),
|
|
})
|
|
}
|
|
|
|
return nodeURLs, nil
|
|
}
|
|
|
|
// nodeStatus chooses node status offline or online depends on LastContact.
|
|
func nodeStatus(lastContact time.Time) Status {
|
|
now := time.Now().UTC()
|
|
|
|
if now.Sub(lastContact) < time.Hour*3 {
|
|
return StatusOnline
|
|
}
|
|
|
|
return StatusOffline
|
|
}
|
|
|
|
// appendUniqueNodeURLs appends unique node urls from incoming slice.
|
|
func appendUniqueNodeURLs(slice storj.NodeURLs, nodeURLs storj.NodeURLs) storj.NodeURLs {
|
|
for _, nodeURL := range nodeURLs {
|
|
slice = appendUniqueNodeURL(slice, nodeURL)
|
|
}
|
|
|
|
return slice
|
|
}
|
|
|
|
// appendUniqueNodeURL appends node url if it is unique.
|
|
func appendUniqueNodeURL(slice storj.NodeURLs, nodeURL storj.NodeURL) storj.NodeURLs {
|
|
for _, existing := range slice {
|
|
if bytes.Equal(existing.ID.Bytes(), nodeURL.ID.Bytes()) {
|
|
return slice
|
|
}
|
|
}
|
|
|
|
slice = append(slice, nodeURL)
|
|
return slice
|
|
}
|