multinode/console: add reputation satellite api

Change-Id: I7cef6c1c271607f7485f604d5b61587558a31878
This commit is contained in:
Yaroslav Vorobiov 2021-06-25 14:09:54 +03:00 committed by Nikolai Siedov
parent bf5194d134
commit 68627e7d80
8 changed files with 761 additions and 248 deletions

View File

@ -0,0 +1,92 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package controllers
import (
"encoding/json"
"net/http"
"github.com/gorilla/mux"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/storj"
"storj.io/storj/multinode/nodes"
"storj.io/storj/multinode/reputation"
)
var (
// ErrReputation is an error type for reputation web api controller.
ErrReputation = errs.Class("reputation web api controller")
)
// Reputation is a reputation web api controller.
type Reputation struct {
log *zap.Logger
service *reputation.Service
}
// NewReputation is a constructor of reputation controller.
func NewReputation(log *zap.Logger, service *reputation.Service) *Reputation {
return &Reputation{
log: log,
service: service,
}
}
// Stats handles retrieval of a node reputation for particular satellite.
func (controller *Reputation) Stats(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
var err error
defer mon.Task()(&ctx)(&err)
w.Header().Add("Content-Type", "application/json")
segments := mux.Vars(r)
satelliteIDEnc, ok := segments["satelliteID"]
if !ok {
controller.serveError(w, http.StatusBadRequest, ErrReputation.New("could not retrieve satellite id segment"))
return
}
satelliteID, err := storj.NodeIDFromString(satelliteIDEnc)
if err != nil {
controller.serveError(w, http.StatusBadRequest, ErrReputation.Wrap(err))
return
}
stats, err := controller.service.Stats(ctx, satelliteID)
if err != nil {
if nodes.ErrNoNode.Has(err) {
controller.serveError(w, http.StatusNotFound, ErrReputation.Wrap(err))
return
}
controller.log.Error("reputation stats internal error", zap.Error(ErrReputation.Wrap(err)))
controller.serveError(w, http.StatusInternalServerError, ErrReputation.Wrap(err))
return
}
if len(stats) == 0 {
stats = make([]reputation.Stats, 0)
}
if err = json.NewEncoder(w).Encode(stats); err != nil {
controller.log.Error("failed to write json response", zap.Error(ErrReputation.Wrap(err)))
return
}
}
// serveError set http statuses and send json error.
func (controller *Reputation) serveError(w http.ResponseWriter, status int, err error) {
w.WriteHeader(status)
var response struct {
Error string `json:"error"`
}
response.Error = err.Error()
err = json.NewEncoder(w).Encode(response)
if err != nil {
controller.log.Error("failed to write json error response", zap.Error(err))
}
}

View File

@ -20,6 +20,7 @@ import (
"storj.io/storj/multinode/nodes"
"storj.io/storj/multinode/operators"
"storj.io/storj/multinode/payouts"
"storj.io/storj/multinode/reputation"
"storj.io/storj/multinode/storage"
)
@ -36,11 +37,12 @@ type Config struct {
// Services contains services utilized by multinode dashboard.
type Services struct {
Nodes *nodes.Service
Payouts *payouts.Service
Operators *operators.Service
Storage *storage.Service
Bandwidth *bandwidth.Service
Nodes *nodes.Service
Payouts *payouts.Service
Operators *operators.Service
Storage *storage.Service
Bandwidth *bandwidth.Service
Reputation *reputation.Service
}
// Server represents Multinode Dashboard http server.
@ -52,11 +54,12 @@ type Server struct {
http http.Server
config Config
nodes *nodes.Service
payouts *payouts.Service
operators *operators.Service
bandwidth *bandwidth.Service
storage *storage.Service
nodes *nodes.Service
payouts *payouts.Service
operators *operators.Service
bandwidth *bandwidth.Service
storage *storage.Service
reputation *reputation.Service
index *template.Template
}
@ -64,14 +67,15 @@ type Server struct {
// NewServer returns new instance of Multinode Dashboard http server.
func NewServer(log *zap.Logger, listener net.Listener, config Config, services Services) (*Server, error) {
server := Server{
log: log,
listener: listener,
config: config,
nodes: services.Nodes,
operators: services.Operators,
payouts: services.Payouts,
storage: services.Storage,
bandwidth: services.Bandwidth,
log: log,
listener: listener,
config: config,
nodes: services.Nodes,
operators: services.Operators,
payouts: services.Payouts,
storage: services.Storage,
bandwidth: services.Bandwidth,
reputation: services.Reputation,
}
router := mux.NewRouter()
@ -125,6 +129,10 @@ func NewServer(log *zap.Logger, listener net.Listener, config Config, services S
storageRouter.HandleFunc("/disk-space", storageController.TotalDiskSpace).Methods(http.MethodGet)
storageRouter.HandleFunc("/disk-space/{nodeID}", storageController.DiskSpace).Methods(http.MethodGet)
reputationController := controllers.NewReputation(server.log, server.reputation)
reputationRouter := apiRouter.PathPrefix("/reputation").Subrouter()
reputationRouter.HandleFunc("/satellites/{satelliteID}", reputationController.Stats)
if server.config.StaticDir != "" {
router.PathPrefix("/static/").Handler(http.StripPrefix("/static", fs))
router.PathPrefix("/").HandlerFunc(server.appHandler)

View File

@ -20,6 +20,7 @@ import (
"storj.io/storj/multinode/nodes"
"storj.io/storj/multinode/operators"
"storj.io/storj/multinode/payouts"
"storj.io/storj/multinode/reputation"
"storj.io/storj/multinode/storage"
"storj.io/storj/private/lifecycle"
)
@ -84,6 +85,10 @@ type Peer struct {
Service *storage.Service
}
Reputation struct {
Service *reputation.Service
}
// Web server with web UI.
Console struct {
Listener net.Listener
@ -154,6 +159,14 @@ func New(log *zap.Logger, full *identity.FullIdentity, config Config, db DB) (_
)
}
{ // reputation setup
peer.Reputation.Service = reputation.NewService(
peer.Log.Named("reputation:service"),
peer.Dialer,
peer.DB.Nodes(),
)
}
{ // console setup
peer.Console.Listener, err = net.Listen("tcp", config.Console.Address)
if err != nil {
@ -165,11 +178,12 @@ func New(log *zap.Logger, full *identity.FullIdentity, config Config, db DB) (_
peer.Console.Listener,
config.Console,
server.Services{
Nodes: peer.Nodes.Service,
Payouts: peer.Payouts.Service,
Operators: peer.Operators.Service,
Storage: peer.Storage.Service,
Bandwidth: peer.Bandwidth.Service,
Nodes: peer.Nodes.Service,
Payouts: peer.Payouts.Service,
Operators: peer.Operators.Service,
Storage: peer.Storage.Service,
Bandwidth: peer.Bandwidth.Service,
Reputation: peer.Reputation.Service,
},
)
if err != nil {

View File

@ -0,0 +1,44 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package reputation
import (
"time"
"storj.io/common/storj"
)
// AuditWindow contains audit count for particular time frame.
type AuditWindow struct {
WindowStart time.Time `json:"windowStart"`
TotalCount int32 `json:"totalCount"`
OnlineCount int32 `json:"onlineCount"`
}
// Audit contains audit reputation metrics.
type Audit struct {
TotalCount int64 `json:"totalCount"`
SuccessCount int64 `json:"successCount"`
Alpha float64 `json:"alpha"`
Beta float64 `json:"beta"`
UnknownAlpha float64 `json:"unknownAlpha"`
UnknownBeta float64 `json:"unknownBeta"`
Score float64 `json:"score"`
SuspensionScore float64 `json:"suspensionScore"`
History []AuditWindow `json:"history"`
}
// Stats encapsulates node reputation data.
type Stats struct {
NodeID storj.NodeID `json:"nodeId"`
NodeName string `json:"nodeName"`
Audit Audit `json:"audit"`
OnlineScore float64 `json:"onlineScore"`
DisqualifiedAt *time.Time `json:"disqualifiedAt"`
SuspendedAt *time.Time `json:"suspendedAt"`
OfflineSuspendedAt *time.Time `json:"offlineSuspendedAt"`
OfflineUnderReviewAt *time.Time `json:"offlineUnderReviewAt"`
UpdatedAt time.Time `json:"updatedAt"`
JoinedAt time.Time `json:"joinedAt"`
}

View File

@ -0,0 +1,133 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package reputation
import (
"context"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/rpc"
"storj.io/common/rpc/rpcstatus"
"storj.io/common/storj"
"storj.io/storj/multinode/nodes"
"storj.io/storj/private/multinodepb"
)
var (
mon = monkit.Package()
// Error is an error class for reputation service error.
Error = errs.Class("reputation")
// ErrorNoStats is an error class for reputation is not found error.
ErrorNoStats = errs.Class("reputation stats not found")
)
// Service exposes all reputation related logic.
//
// architecture: Service
type Service struct {
log *zap.Logger
dialer rpc.Dialer
nodes nodes.DB
}
// NewService creates new instance of reputation Service.
func NewService(log *zap.Logger, dialer rpc.Dialer, nodes nodes.DB) *Service {
return &Service{
log: log,
dialer: dialer,
nodes: nodes,
}
}
// Stats retrieves node reputation stats list for satellite.
func (service *Service) Stats(ctx context.Context, satelliteID storj.NodeID) (_ []Stats, err error) {
defer mon.Task()(&ctx)(&err)
nodeList, err := service.nodes.List(ctx)
if err != nil {
return nil, Error.Wrap(err)
}
var statsList []Stats
for _, node := range nodeList {
stats, err := service.dialStats(ctx, node, satelliteID)
if err != nil {
if ErrorNoStats.Has(err) {
continue
}
return nil, Error.Wrap(err)
}
statsList = append(statsList, stats)
}
return statsList, nil
}
// dialStats dials node and retrieves reputation stats for particular satellite.
func (service *Service) dialStats(ctx context.Context, node nodes.Node, satelliteID storj.NodeID) (_ Stats, err error) {
defer mon.Task()(&ctx)(&err)
conn, err := service.dialer.DialNodeURL(ctx, storj.NodeURL{
ID: node.ID,
Address: node.PublicAddress,
})
if err != nil {
return Stats{}, Error.Wrap(err)
}
defer func() {
err = errs.Combine(err, conn.Close())
}()
nodeClient := multinodepb.NewDRPCNodeClient(conn)
req := &multinodepb.ReputationRequest{
Header: &multinodepb.RequestHeader{
ApiKey: node.APISecret,
},
SatelliteId: satelliteID,
}
resp, err := nodeClient.Reputation(ctx, req)
if err != nil {
if rpcstatus.Code(err) == rpcstatus.NotFound {
return Stats{}, ErrorNoStats.New("no stats for %s", satelliteID.String())
}
return Stats{}, Error.Wrap(err)
}
var auditWindows []AuditWindow
for _, window := range resp.Audit.History {
auditWindows = append(auditWindows, AuditWindow{
WindowStart: window.WindowStart,
TotalCount: window.TotalCount,
OnlineCount: window.OnlineCount,
})
}
return Stats{
NodeID: node.ID,
NodeName: node.Name,
Audit: Audit{
TotalCount: resp.Audit.TotalCount,
SuccessCount: resp.Audit.SuccessCount,
Alpha: resp.Audit.Alpha,
Beta: resp.Audit.Beta,
UnknownAlpha: resp.Audit.UnknownAlpha,
UnknownBeta: resp.Audit.UnknownBeta,
Score: resp.Audit.Score,
SuspensionScore: resp.Audit.SuspensionScore,
History: auditWindows,
},
OnlineScore: resp.Online.Score,
DisqualifiedAt: resp.DisqualifiedAt,
SuspendedAt: resp.SuspendedAt,
OfflineSuspendedAt: resp.OfflineSuspendedAt,
OfflineUnderReviewAt: resp.OfflineUnderReviewAt,
UpdatedAt: resp.UpdatedAt,
JoinedAt: resp.JoinedAt,
}, nil
}

File diff suppressed because it is too large Load Diff

View File

@ -193,6 +193,12 @@ message ReputationRequest {
bytes satellite_id = 2 [(gogoproto.customtype) = "NodeID", (gogoproto.nullable) = false];
}
message AuditWindow {
google.protobuf.Timestamp window_start = 1 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
int32 online_count = 2;
int32 total_count = 3;
}
message ReputationResponse {
message Online {
double score = 1;
@ -200,10 +206,23 @@ message ReputationResponse {
message Audit {
double score = 1;
double suspension_score = 2;
int64 total_count = 3;
int64 success_count = 4;
double alpha = 5;
double beta = 6;
double unknown_alpha = 7;
double unknown_beta = 8;
repeated AuditWindow history = 9;
}
Online online = 1;
Audit audit = 2;
google.protobuf.Timestamp disqualified_at = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = true];
google.protobuf.Timestamp suspended_at = 4 [(gogoproto.stdtime) = true, (gogoproto.nullable) = true];
google.protobuf.Timestamp offline_suspended_at = 5 [(gogoproto.stdtime) = true, (gogoproto.nullable) = true];
google.protobuf.Timestamp offline_under_review_at = 6 [(gogoproto.stdtime) = true, (gogoproto.nullable) = true];
google.protobuf.Timestamp updated_at = 7 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
google.protobuf.Timestamp joined_at = 8 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
}
message TrustedSatellitesRequest {

View File

@ -88,6 +88,20 @@ func (node *NodeEndpoint) Reputation(ctx context.Context, req *multinodepb.Reput
if err != nil {
return nil, rpcstatus.Wrap(rpcstatus.Internal, err)
}
if rep.JoinedAt.IsZero() {
return nil, rpcstatus.Error(rpcstatus.NotFound, "satellite reputation not found")
}
var windows []*multinodepb.AuditWindow
if rep.AuditHistory != nil {
for _, window := range rep.AuditHistory.Windows {
windows = append(windows, &multinodepb.AuditWindow{
WindowStart: window.WindowStart,
OnlineCount: window.OnlineCount,
TotalCount: window.TotalCount,
})
}
}
return &multinodepb.ReputationResponse{
Online: &multinodepb.ReputationResponse_Online{
@ -96,7 +110,20 @@ func (node *NodeEndpoint) Reputation(ctx context.Context, req *multinodepb.Reput
Audit: &multinodepb.ReputationResponse_Audit{
Score: rep.Audit.Score,
SuspensionScore: rep.Audit.UnknownScore,
TotalCount: rep.Audit.TotalCount,
SuccessCount: rep.Audit.SuccessCount,
Alpha: rep.Audit.Alpha,
Beta: rep.Audit.Beta,
UnknownAlpha: rep.Audit.UnknownAlpha,
UnknownBeta: rep.Audit.UnknownBeta,
History: windows,
},
DisqualifiedAt: rep.DisqualifiedAt,
SuspendedAt: rep.SuspendedAt,
OfflineSuspendedAt: rep.OfflineSuspendedAt,
OfflineUnderReviewAt: rep.OfflineUnderReviewAt,
UpdatedAt: rep.UpdatedAt,
JoinedAt: rep.JoinedAt,
}, nil
}