storj/storagenode/console/service.go
Clement Sam 59b37db670 storagenode: overhaul QUIC check implementation
The current implementation blocks the the startup until one or none
of the trusted satellites is able to reach the node via QUIC.
This can cause delayed startup. Also, the quic check is done
once during startup, and if there is a misconfiguration later,
snos would have to restart to node.

In this change, we reuse the contact service which pings the satellite
periodically for node checkin. During checkin the satellite tries
pinging the node back via both TCP and QUIC and reports both statuses.
WIth this, we are able to get a periodic update of the QUIC status
without restarting the node.

Also adds the time the node was last pinged via QUIC to the tooltip
on the QUIC status tab.

Resolves https://github.com/storj/storj/issues/4398

Change-Id: I18aa2a8e8d44e8187f8f2eb51f398fa6073882a4
2022-11-09 03:15:57 +00:00

481 lines
14 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package console
import (
"context"
"math"
"time"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/memory"
"storj.io/common/storj"
"storj.io/private/version"
"storj.io/storj/private/date"
"storj.io/storj/private/version/checker"
"storj.io/storj/storagenode/bandwidth"
"storj.io/storj/storagenode/contact"
"storj.io/storj/storagenode/operator"
"storj.io/storj/storagenode/payouts/estimatedpayouts"
"storj.io/storj/storagenode/pieces"
"storj.io/storj/storagenode/pricing"
"storj.io/storj/storagenode/reputation"
"storj.io/storj/storagenode/satellites"
"storj.io/storj/storagenode/storageusage"
"storj.io/storj/storagenode/trust"
)
var (
// SNOServiceErr defines sno service error.
SNOServiceErr = errs.Class("console")
mon = monkit.Package()
)
// Service is handling storage node operator related logic.
//
// architecture: Service
type Service struct {
log *zap.Logger
trust *trust.Pool
usageCache *pieces.BlobsUsageCache
bandwidthDB bandwidth.DB
reputationDB reputation.DB
storageUsageDB storageusage.DB
pricingDB pricing.DB
satelliteDB satellites.DB
pieceStore *pieces.Store
contact *contact.Service
estimation *estimatedpayouts.Service
version *checker.Service
pingStats *contact.PingStats
allocatedDiskSpace memory.Size
walletAddress string
walletFeatures operator.WalletFeatures
startedAt time.Time
versionInfo version.Info
quicStats *contact.QUICStats
configuredPort string
}
// NewService returns new instance of Service.
func NewService(log *zap.Logger, bandwidth bandwidth.DB, pieceStore *pieces.Store, version *checker.Service,
allocatedDiskSpace memory.Size, walletAddress string, versionInfo version.Info, trust *trust.Pool,
reputationDB reputation.DB, storageUsageDB storageusage.DB, pricingDB pricing.DB, satelliteDB satellites.DB,
pingStats *contact.PingStats, contact *contact.Service, estimation *estimatedpayouts.Service, usageCache *pieces.BlobsUsageCache,
walletFeatures operator.WalletFeatures, port string, quicStats *contact.QUICStats) (*Service, error) {
if log == nil {
return nil, errs.New("log can't be nil")
}
if usageCache == nil {
return nil, errs.New("usage cache can't be nil")
}
if bandwidth == nil {
return nil, errs.New("bandwidth can't be nil")
}
if pieceStore == nil {
return nil, errs.New("pieceStore can't be nil")
}
if version == nil {
return nil, errs.New("version can't be nil")
}
if pingStats == nil {
return nil, errs.New("pingStats can't be nil")
}
if contact == nil {
return nil, errs.New("contact service can't be nil")
}
if estimation == nil {
return nil, errs.New("estimation service can't be nil")
}
return &Service{
log: log,
trust: trust,
usageCache: usageCache,
bandwidthDB: bandwidth,
reputationDB: reputationDB,
storageUsageDB: storageUsageDB,
pricingDB: pricingDB,
satelliteDB: satelliteDB,
pieceStore: pieceStore,
version: version,
pingStats: pingStats,
allocatedDiskSpace: allocatedDiskSpace,
contact: contact,
estimation: estimation,
walletAddress: walletAddress,
startedAt: time.Now(),
versionInfo: versionInfo,
walletFeatures: walletFeatures,
quicStats: quicStats,
configuredPort: port,
}, nil
}
// SatelliteInfo encapsulates satellite ID and disqualification.
type SatelliteInfo struct {
ID storj.NodeID `json:"id"`
URL string `json:"url"`
Disqualified *time.Time `json:"disqualified"`
Suspended *time.Time `json:"suspended"`
CurrentStorageUsed int64 `json:"currentStorageUsed"`
}
// Dashboard encapsulates dashboard stale data.
type Dashboard struct {
NodeID storj.NodeID `json:"nodeID"`
Wallet string `json:"wallet"`
WalletFeatures []string `json:"walletFeatures"`
Satellites []SatelliteInfo `json:"satellites"`
DiskSpace DiskSpaceInfo `json:"diskSpace"`
Bandwidth BandwidthInfo `json:"bandwidth"`
LastPinged time.Time `json:"lastPinged"`
Version version.SemVer `json:"version"`
AllowedVersion version.SemVer `json:"allowedVersion"`
UpToDate bool `json:"upToDate"`
StartedAt time.Time `json:"startedAt"`
ConfiguredPort string `json:"configuredPort"`
QUICStatus string `json:"quicStatus"`
LastQUICPingedAt time.Time `json:"lastQuicPingedAt"`
}
// GetDashboardData returns stale dashboard data.
func (s *Service) GetDashboardData(ctx context.Context) (_ *Dashboard, err error) {
defer mon.Task()(&ctx)(&err)
data := new(Dashboard)
data.NodeID = s.contact.Local().ID
data.Wallet = s.walletAddress
data.WalletFeatures = s.walletFeatures
data.Version = s.versionInfo.Version
data.StartedAt = s.startedAt
data.LastPinged = s.pingStats.WhenLastPinged()
data.AllowedVersion, data.UpToDate = s.version.IsAllowed(ctx)
data.QUICStatus = s.quicStats.Status()
data.LastQUICPingedAt = s.quicStats.WhenLastPinged()
data.ConfiguredPort = s.configuredPort
stats, err := s.reputationDB.All(ctx)
if err != nil {
return nil, SNOServiceErr.Wrap(err)
}
for _, rep := range stats {
url, err := s.trust.GetNodeURL(ctx, rep.SatelliteID)
if err != nil {
s.log.Warn("unable to get Satellite URL", zap.String("Satellite ID", rep.SatelliteID.String()),
zap.Error(SNOServiceErr.Wrap(err)))
continue
}
_, currentStorageUsed, err := s.usageCache.SpaceUsedBySatellite(ctx, rep.SatelliteID)
if err != nil {
s.log.Warn("unable to get Satellite Current Storage Used", zap.String("Satellite ID", rep.SatelliteID.String()),
zap.Error(SNOServiceErr.Wrap(err)))
continue
}
data.Satellites = append(data.Satellites,
SatelliteInfo{
ID: rep.SatelliteID,
Disqualified: rep.DisqualifiedAt,
Suspended: rep.SuspendedAt,
URL: url.Address,
CurrentStorageUsed: currentStorageUsed,
},
)
}
pieceTotal, _, err := s.pieceStore.SpaceUsedForPieces(ctx)
if err != nil {
return nil, SNOServiceErr.Wrap(err)
}
trash, err := s.pieceStore.SpaceUsedForTrash(ctx)
if err != nil {
return nil, SNOServiceErr.Wrap(err)
}
bandwidthUsage, err := s.bandwidthDB.MonthSummary(ctx, time.Now())
if err != nil {
return nil, SNOServiceErr.Wrap(err)
}
data.DiskSpace = DiskSpaceInfo{
Used: pieceTotal,
Available: s.allocatedDiskSpace.Int64(),
Trash: trash,
}
overused := s.allocatedDiskSpace.Int64() - pieceTotal - trash
if overused < 0 {
data.DiskSpace.Overused = int64(math.Abs(float64(overused)))
}
data.Bandwidth = BandwidthInfo{
Used: bandwidthUsage,
}
return data, nil
}
// PriceModel is a satellite prices for storagenode usage TB/H.
type PriceModel struct {
EgressBandwidth int64
RepairBandwidth int64
AuditBandwidth int64
DiskSpace int64
}
// Satellite encapsulates satellite related data.
type Satellite struct {
ID storj.NodeID `json:"id"`
StorageDaily []storageusage.Stamp `json:"storageDaily"`
BandwidthDaily []bandwidth.UsageRollup `json:"bandwidthDaily"`
StorageSummary float64 `json:"storageSummary"`
BandwidthSummary int64 `json:"bandwidthSummary"`
EgressSummary int64 `json:"egressSummary"`
IngressSummary int64 `json:"ingressSummary"`
CurrentStorageUsed int64 `json:"currentStorageUsed"`
Audits Audits `json:"audits"`
AuditHistory reputation.AuditHistory `json:"auditHistory"`
PriceModel PriceModel `json:"priceModel"`
NodeJoinedAt time.Time `json:"nodeJoinedAt"`
}
// GetSatelliteData returns satellite related data.
func (s *Service) GetSatelliteData(ctx context.Context, satelliteID storj.NodeID) (_ *Satellite, err error) {
defer mon.Task()(&ctx)(&err)
from, to := date.MonthBoundary(time.Now().UTC())
bandwidthDaily, err := s.bandwidthDB.GetDailySatelliteRollups(ctx, satelliteID, from, to)
if err != nil {
return nil, SNOServiceErr.Wrap(err)
}
storageDaily, err := s.storageUsageDB.GetDaily(ctx, satelliteID, from, to)
if err != nil {
return nil, SNOServiceErr.Wrap(err)
}
bandwidthSummary, err := s.bandwidthDB.SatelliteSummary(ctx, satelliteID, from, to)
if err != nil {
return nil, SNOServiceErr.Wrap(err)
}
egressSummary, err := s.bandwidthDB.SatelliteEgressSummary(ctx, satelliteID, from, to)
if err != nil {
return nil, SNOServiceErr.Wrap(err)
}
ingressSummary, err := s.bandwidthDB.SatelliteIngressSummary(ctx, satelliteID, from, to)
if err != nil {
return nil, SNOServiceErr.Wrap(err)
}
storageSummary, err := s.storageUsageDB.SatelliteSummary(ctx, satelliteID, from, to)
if err != nil {
return nil, SNOServiceErr.Wrap(err)
}
_, currentStorageUsed, err := s.usageCache.SpaceUsedBySatellite(ctx, satelliteID)
if err != nil {
return nil, SNOServiceErr.Wrap(err)
}
rep, err := s.reputationDB.Get(ctx, satelliteID)
if err != nil {
return nil, SNOServiceErr.Wrap(err)
}
pricingModel, err := s.pricingDB.Get(ctx, satelliteID)
if err != nil {
return nil, SNOServiceErr.Wrap(err)
}
satellitePricing := PriceModel{
EgressBandwidth: pricingModel.EgressBandwidth,
RepairBandwidth: pricingModel.RepairBandwidth,
AuditBandwidth: pricingModel.AuditBandwidth,
DiskSpace: pricingModel.DiskSpace,
}
url, err := s.trust.GetNodeURL(ctx, satelliteID)
if err != nil {
s.log.Warn("unable to get Satellite URL", zap.String("Satellite ID", satelliteID.String()),
zap.Error(SNOServiceErr.Wrap(err)))
}
return &Satellite{
ID: satelliteID,
StorageDaily: storageDaily,
BandwidthDaily: bandwidthDaily,
StorageSummary: storageSummary,
BandwidthSummary: bandwidthSummary.Total(),
CurrentStorageUsed: currentStorageUsed,
EgressSummary: egressSummary.Total(),
IngressSummary: ingressSummary.Total(),
Audits: Audits{
AuditScore: rep.Audit.Score,
SuspensionScore: rep.Audit.UnknownScore,
OnlineScore: rep.OnlineScore,
SatelliteName: url.Address,
},
AuditHistory: reputation.GetAuditHistoryFromPB(rep.AuditHistory),
PriceModel: satellitePricing,
NodeJoinedAt: rep.JoinedAt,
}, nil
}
// Satellites represents consolidated data across all satellites.
type Satellites struct {
StorageDaily []storageusage.Stamp `json:"storageDaily"`
BandwidthDaily []bandwidth.UsageRollup `json:"bandwidthDaily"`
StorageSummary float64 `json:"storageSummary"`
BandwidthSummary int64 `json:"bandwidthSummary"`
EgressSummary int64 `json:"egressSummary"`
IngressSummary int64 `json:"ingressSummary"`
EarliestJoinedAt time.Time `json:"earliestJoinedAt"`
Audits []Audits `json:"audits"`
}
// Audits represents audit, suspension and online scores of SNO across all satellites.
type Audits struct {
AuditScore float64 `json:"auditScore"`
SuspensionScore float64 `json:"suspensionScore"`
OnlineScore float64 `json:"onlineScore"`
SatelliteName string `json:"satelliteName"`
}
// GetAllSatellitesData returns bandwidth and storage daily usage consolidate
// among all satellites from the node's trust pool.
func (s *Service) GetAllSatellitesData(ctx context.Context) (_ *Satellites, err error) {
defer mon.Task()(&ctx)(nil)
from, to := date.MonthBoundary(time.Now().UTC())
var audits []Audits
bandwidthDaily, err := s.bandwidthDB.GetDailyRollups(ctx, from, to)
if err != nil {
return nil, SNOServiceErr.Wrap(err)
}
storageDaily, err := s.storageUsageDB.GetDailyTotal(ctx, from, to)
if err != nil {
return nil, SNOServiceErr.Wrap(err)
}
bandwidthSummary, err := s.bandwidthDB.Summary(ctx, from, to)
if err != nil {
return nil, SNOServiceErr.Wrap(err)
}
egressSummary, err := s.bandwidthDB.EgressSummary(ctx, from, to)
if err != nil {
return nil, SNOServiceErr.Wrap(err)
}
ingressSummary, err := s.bandwidthDB.IngressSummary(ctx, from, to)
if err != nil {
return nil, SNOServiceErr.Wrap(err)
}
storageSummary, err := s.storageUsageDB.Summary(ctx, from, to)
if err != nil {
return nil, SNOServiceErr.Wrap(err)
}
satellitesIDs := s.trust.GetSatellites(ctx)
joinedAt := time.Now().UTC()
for i := 0; i < len(satellitesIDs); i++ {
stats, err := s.reputationDB.Get(ctx, satellitesIDs[i])
if err != nil {
return nil, SNOServiceErr.Wrap(err)
}
url, err := s.trust.GetNodeURL(ctx, satellitesIDs[i])
if err != nil {
s.log.Warn("unable to get Satellite URL", zap.String("Satellite ID", satellitesIDs[i].String()),
zap.Error(SNOServiceErr.Wrap(err)))
continue
}
audits = append(audits, Audits{
AuditScore: stats.Audit.Score,
SuspensionScore: stats.Audit.UnknownScore,
OnlineScore: stats.OnlineScore,
SatelliteName: url.Address,
})
if !stats.JoinedAt.IsZero() && stats.JoinedAt.Before(joinedAt) {
joinedAt = stats.JoinedAt
}
}
return &Satellites{
StorageDaily: storageDaily,
BandwidthDaily: bandwidthDaily,
StorageSummary: storageSummary,
BandwidthSummary: bandwidthSummary.Total(),
EgressSummary: egressSummary.Total(),
IngressSummary: ingressSummary.Total(),
EarliestJoinedAt: joinedAt,
Audits: audits,
}, nil
}
// GetSatelliteEstimatedPayout returns estimated payouts for current and previous months for selected satellite.
func (s *Service) GetSatelliteEstimatedPayout(ctx context.Context, satelliteID storj.NodeID, now time.Time) (estimatedPayout estimatedpayouts.EstimatedPayout, err error) {
estimatedPayout, err = s.estimation.GetSatelliteEstimatedPayout(ctx, satelliteID, now)
if err != nil {
return estimatedpayouts.EstimatedPayout{}, SNOServiceErr.Wrap(err)
}
return estimatedPayout, nil
}
// GetAllSatellitesEstimatedPayout returns estimated payouts for current and previous months for all satellites.
func (s *Service) GetAllSatellitesEstimatedPayout(ctx context.Context, now time.Time) (estimatedPayout estimatedpayouts.EstimatedPayout, err error) {
estimatedPayout, err = s.estimation.GetAllSatellitesEstimatedPayout(ctx, now)
if err != nil {
return estimatedpayouts.EstimatedPayout{}, SNOServiceErr.Wrap(err)
}
return estimatedPayout, nil
}
// VerifySatelliteID verifies if the satellite belongs to the trust pool.
func (s *Service) VerifySatelliteID(ctx context.Context, satelliteID storj.NodeID) (err error) {
defer mon.Task()(&ctx)(&err)
err = s.trust.VerifySatelliteID(ctx, satelliteID)
if err != nil {
return SNOServiceErr.Wrap(err)
}
return nil
}