a760e48d4d
* protobuf for sending bandwidth agreements to satellite from storage nodes * Setup process for sending agreements * Add payer_id to db with bandwidth agreements for better sorting * Linter errors * Read agreements from PSDB * Try writing message to server * Cleanup * Basic functionality * Better error handelling * Fix test * setup config and server structure for receiving bandwidth agreements * Resolve linter issues * Optional commit for if we want to handle deletes all at once * add identity to Server, add logic for receiving bandwidth messsages * Bandwidth agreement DBX creation and integration with bw agreement endpoint Co-authored-by: Kishore <kishore@storj.io> Co-authored-by: Cam <cameron@storj.io> * protobuf for sending bandwidth agreements to satellite from storage nodes * Setup process for sending agreements * Add payer_id to db with bandwidth agreements for better sorting * Linter errors * Read agreements from PSDB * Try writing message to server * Cleanup * Basic functionality * Better error handelling * Fix test * setup config and server structure for receiving bandwidth agreements * Resolve linter issues * Optional commit for if we want to handle deletes all at once * add identity to Server, add logic for receiving bandwidth messsages * Bandwidth agreement DBX creation and integration with bw agreement endpoint Co-authored-by: Kishore <kishore@storj.io> Co-authored-by: Cam <cameron@storj.io> * added postgres create/read/delete test function Co-authored-by: kishore <kishore@storj.io Co-authored-by: cam <cameron@storj.io> * edit comment * removed sqlite3 driver from dbx * remove generated sqlite code, add dbx read limitoffset * remove getServerAndDB function, rename getDBPath to getPSQLInfo * WIP writing server endpoint test * code review changes
100 lines
2.1 KiB
Go
100 lines
2.1 KiB
Go
// Copyright (C) 2018 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package bwagreement
|
|
|
|
import (
|
|
"context"
|
|
"strings"
|
|
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
|
|
dbx "storj.io/storj/pkg/bwagreement/dbx"
|
|
"storj.io/storj/pkg/pb"
|
|
)
|
|
|
|
// Server is an implementation of the pb.BandwidthServer interface
|
|
type Server struct {
|
|
DB *dbx.DB
|
|
//identity *provider.FullIdentity
|
|
logger *zap.Logger
|
|
}
|
|
|
|
// NewServer creates instance of Server
|
|
func NewServer(driver, source string, logger *zap.Logger) (*Server, error) {
|
|
db, err := dbx.Open(driver, source)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
_, err = db.Exec(db.Schema())
|
|
if err != nil && !strings.Contains(err.Error(), "already exists") {
|
|
return nil, err
|
|
}
|
|
|
|
return &Server{
|
|
DB: db,
|
|
logger: logger,
|
|
}, nil
|
|
}
|
|
|
|
// Create a db entry for the provided storagenode
|
|
func (s *Server) Create(ctx context.Context, createBwAgreement *pb.RenterBandwidthAllocation) (bwagreement *dbx.Bwagreement, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
s.logger.Debug("entering statdb Create")
|
|
|
|
signature := createBwAgreement.GetSignature()
|
|
data := createBwAgreement.GetData()
|
|
|
|
bwagreement, err = s.DB.Create_Bwagreement(
|
|
ctx,
|
|
dbx.Bwagreement_Signature(signature),
|
|
dbx.Bwagreement_Data(data),
|
|
)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, err.Error())
|
|
}
|
|
|
|
return bwagreement, nil
|
|
}
|
|
|
|
// BandwidthAgreements receives and stores bandwidth agreements from storage nodes
|
|
func (s *Server) BandwidthAgreements(stream pb.Bandwidth_BandwidthAgreementsServer) (err error) {
|
|
ctx := stream.Context()
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
ch := make(chan *pb.RenterBandwidthAllocation, 1)
|
|
errch := make(chan error, 1)
|
|
go func() {
|
|
for {
|
|
msg, err := stream.Recv()
|
|
if err != nil {
|
|
s.logger.Error("Grpc Receive Error", zap.Error(err))
|
|
errch <- err
|
|
return
|
|
}
|
|
ch <- msg
|
|
}
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case err := <-errch:
|
|
return err
|
|
case <-ctx.Done():
|
|
return nil
|
|
case agreement := <-ch:
|
|
go func() {
|
|
_, err = s.Create(ctx, agreement)
|
|
if err != nil {
|
|
s.logger.Error("DB entry creation Error", zap.Error(err))
|
|
errch <- err
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
|
|
}
|