storagenode/heldamount/service added, console/heldamountapi added, console/server updated

Change-Id: I6290a6ea1b07b222908440defbbd7aec5f2a4cdf
This commit is contained in:
Qweder93 2020-03-13 16:01:12 +02:00
parent 5ccce04338
commit 7b0371e9e2
4 changed files with 253 additions and 1 deletions

View File

@ -0,0 +1,100 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package consoleheldamount
import (
"encoding/json"
"net/http"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/storj/pkg/storj"
"storj.io/storj/storagenode/heldamount"
)
const (
contentType = "Content-Type"
applicationJSON = "application/json"
)
var mon = monkit.Package()
// Error is error type of storagenode web console.
var Error = errs.Class("heldamount console web error")
// HeldAmount represents heldmount service.
// architecture: Service
type HeldAmount struct {
service *heldamount.Service
log *zap.Logger
}
// jsonOutput defines json structure of api response data.
type jsonOutput struct {
Data interface{} `json:"data"`
Error string `json:"error"`
}
// NewHeldAmount creates new instance of heldamount service.
func NewHeldAmount(log *zap.Logger, service *heldamount.Service) *HeldAmount {
return &HeldAmount{
log: log,
service: service,
}
}
// GetMonthlyHeldAmount returns heldamount, storage holding and prices data for specific month from satellite.
func (heldamount *HeldAmount) GetMonthlyHeldAmount(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
defer mon.Task()(&ctx)(nil)
period := r.URL.Query().Get("period")
id := r.URL.Query().Get("satelliteID")
satelliteID, err := storj.NodeIDFromString(id)
if err != nil {
heldamount.writeError(w, http.StatusBadRequest, Error.Wrap(err))
return
}
paystubData, err := heldamount.service.GetPaystubStats(ctx, satelliteID, period)
if err != nil {
heldamount.writeError(w, http.StatusInternalServerError, Error.Wrap(err))
return
}
heldamount.writeData(w, paystubData)
}
// writeData is helper method to write JSON to http.ResponseWriter and log encoding error.
func (heldamount *HeldAmount) writeData(w http.ResponseWriter, data interface{}) {
w.Header().Set(contentType, applicationJSON)
w.WriteHeader(http.StatusOK)
output := jsonOutput{Data: data}
if err := json.NewEncoder(w).Encode(output); err != nil {
heldamount.log.Error("json encoder error", zap.Error(err))
}
}
// writeError writes a JSON error payload to http.ResponseWriter log encoding error.
func (heldamount *HeldAmount) writeError(w http.ResponseWriter, status int, err error) {
if status >= http.StatusInternalServerError {
heldamount.log.Error("api handler server error", zap.Int("status code", status), zap.Error(err))
}
w.Header().Set(contentType, applicationJSON)
w.WriteHeader(status)
output := jsonOutput{Error: err.Error()}
if err := json.NewEncoder(w).Encode(output); err != nil {
heldamount.log.Error("json encoder error", zap.Error(err))
}
}

View File

@ -18,7 +18,9 @@ import (
"storj.io/common/storj"
"storj.io/storj/storagenode/console"
"storj.io/storj/storagenode/console/consoleheldamount"
"storj.io/storj/storagenode/console/consolenotifications"
"storj.io/storj/storagenode/heldamount"
"storj.io/storj/storagenode/notifications"
)
@ -48,24 +50,28 @@ type Server struct {
service *console.Service
notifications *notifications.Service
heldAmount *heldamount.Service
listener net.Listener
server http.Server
}
// NewServer creates new instance of storagenode console web server.
func NewServer(logger *zap.Logger, assets http.FileSystem, notifications *notifications.Service, service *console.Service, listener net.Listener) *Server {
func NewServer(logger *zap.Logger, assets http.FileSystem, notifications *notifications.Service, service *console.Service, heldAmount *heldamount.Service, listener net.Listener) *Server {
server := Server{
log: logger,
service: service,
listener: listener,
notifications: notifications,
heldAmount: heldAmount,
}
router := mux.NewRouter()
apiRouter := router.PathPrefix("/api").Subrouter()
notificationRouter := router.PathPrefix("/api/notifications").Subrouter()
notificationController := consolenotifications.NewNotifications(server.log, server.notifications)
heldamountRouter := router.PathPrefix("/api/heldamount").Subrouter()
heldamountController := consoleheldamount.NewHeldAmount(server.log, server.heldAmount)
if assets != nil {
fs := http.FileServer(assets)
@ -84,6 +90,7 @@ func NewServer(logger *zap.Logger, assets http.FileSystem, notifications *notifi
notificationRouter.Handle("/list", http.HandlerFunc(notificationController.ListNotifications)).Methods(http.MethodGet)
notificationRouter.Handle("/{id}/read", http.HandlerFunc(notificationController.ReadNotification)).Methods(http.MethodPost)
notificationRouter.Handle("/readall", http.HandlerFunc(notificationController.ReadAllNotifications)).Methods(http.MethodPost)
heldamountRouter.Handle("/{period}", http.HandlerFunc(heldamountController.GetMonthlyHeldAmount)).Methods(http.MethodGet)
server.server = http.Server{
Handler: router,

View File

@ -0,0 +1,132 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package heldamount
import (
"context"
"time"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/pb"
"storj.io/common/rpc"
"storj.io/storj/pkg/storj"
"storj.io/storj/storagenode/trust"
)
var (
// HeldAmountServiceErr defines held amount service error
HeldAmountServiceErr = errs.Class("node stats service error")
mon = monkit.Package()
)
// Client encapsulates HeldAmountClient with underlying connection
//
// architecture: Client
type Client struct {
conn *rpc.Conn
pb.DRPCHeldAmountClient
}
// Close closes underlying client connection
func (c *Client) Close() error {
return c.conn.Close()
}
// Service retrieves info from satellites using an rpc client
//
// architecture: Service
type Service struct {
log *zap.Logger
dialer rpc.Dialer
trust *trust.Pool
}
// NewService creates new instance of service
func NewService(log *zap.Logger, dialer rpc.Dialer, trust *trust.Pool) *Service {
return &Service{
log: log,
dialer: dialer,
trust: trust,
}
}
// GetPaystubStats retrieves held amount for particular satellite
func (service *Service) GetPaystubStats(ctx context.Context, satelliteID storj.NodeID, period string) (_ *PayStub, err error) {
defer mon.Task()(&ctx)(&err)
client, err := service.dial(ctx, satelliteID)
if err != nil {
return nil, HeldAmountServiceErr.Wrap(err)
}
defer func() { err = errs.Combine(err, client.Close()) }()
requestedPeriod, err := stringToTime(period)
if err != nil {
return nil, HeldAmountServiceErr.Wrap(err)
}
resp, err := client.GetPayStub(ctx, &pb.GetHeldAmountRequest{Period: requestedPeriod})
if err != nil {
return nil, HeldAmountServiceErr.Wrap(err)
}
return &PayStub{
Period: period,
SatelliteID: satelliteID,
Created: resp.CreatedAt,
Codes: resp.Codes,
UsageAtRest: float64(resp.UsageAtRest),
UsageGet: resp.UsageGet,
UsagePut: resp.UsagePut,
UsageGetRepair: resp.CompGetRepair,
UsagePutRepair: resp.CompPutRepair,
UsageGetAudit: resp.UsageGetAudit,
CompAtRest: resp.CompAtRest,
CompGet: resp.CompGet,
CompPut: resp.CompPut,
CompGetRepair: resp.CompGetRepair,
CompPutRepair: resp.CompPutRepair,
CompGetAudit: resp.CompGetAudit,
SurgePercent: resp.SurgePercent,
Held: resp.Held,
Owed: resp.Owed,
Disposed: resp.Disposed,
Paid: resp.Paid,
}, nil
}
// dial dials the HeldAmount client for the satellite by id
func (service *Service) dial(ctx context.Context, satelliteID storj.NodeID) (_ *Client, err error) {
defer mon.Task()(&ctx)(&err)
address, err := service.trust.GetAddress(ctx, satelliteID)
if err != nil {
return nil, errs.New("unable to find satellite %s: %w", satelliteID, err)
}
conn, err := service.dialer.DialAddressID(ctx, address, satelliteID)
if err != nil {
return nil, errs.New("unable to connect to the satellite %s: %w", satelliteID, err)
}
return &Client{
conn: conn,
DRPCHeldAmountClient: pb.NewDRPCHeldAmountClient(conn.Raw()),
}, nil
}
func stringToTime(period string) (_ time.Time, err error) {
layout := "2006-01"
result, err := time.Parse(layout, period)
if err != nil {
return time.Time{}, err
}
return result, nil
}

View File

@ -230,6 +230,10 @@ type Peer struct {
Service *notifications.Service
}
Heldamount struct {
Service *heldamount.Service
}
Bandwidth *bandwidth.Service
}
@ -484,6 +488,14 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
debug.Cycle("Orders Cleanup", peer.Storage2.Orders.Cleanup))
}
{ // setub heldamount service.
peer.Heldamount.Service = heldamount.NewService(
peer.Log.Named("heldamount:service"),
peer.Dialer,
peer.Storage2.Trust,
)
}
{ // setup node stats service
peer.NodeStats.Service = nodestats.NewService(
peer.Log.Named("nodestats:service"),
@ -547,6 +559,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
assets,
peer.Notifications.Service,
peer.Console.Service,
peer.Heldamount.Service,
peer.Console.Listener,
)
peer.Services.Add(lifecycle.Item{