Add logger to piecestore server/node started log (#856)
* add logger to psserver struct * node started log * rename initialize to NewEndpoint * return err from WriteBandwidthAllocToDB
This commit is contained in:
parent
d8df4b5f6b
commit
0d17c21a1b
@ -144,7 +144,7 @@ func New(t zaptest.TestingT, satelliteCount, storageNodeCount, uplinkCount int)
|
||||
return nil, utils.CombineErrors(err, planet.Shutdown())
|
||||
}
|
||||
|
||||
server := pieceserver.New(storageDir, serverdb, pieceserver.Config{
|
||||
server := pieceserver.New(node.Log, storageDir, serverdb, pieceserver.Config{
|
||||
Path: storageDir,
|
||||
AllocatedDiskSpace: memory.GB.Int64(),
|
||||
AllocatedBandwidth: 100 * memory.GB.Int64(),
|
||||
|
@ -7,7 +7,6 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"sync/atomic"
|
||||
|
||||
@ -48,7 +47,11 @@ func (s *Server) Retrieve(stream pb.PieceStoreRoutes_RetrieveServer) (err error)
|
||||
return RetrieveError.New("PieceStore message is nil")
|
||||
}
|
||||
|
||||
zap.S().Infof("Retrieving piece %s with offset %d and length %d.", pd.GetId(), pd.GetOffset(), pd.GetPieceSize())
|
||||
s.log.Debug("Retrieving",
|
||||
zap.String("Piece ID", fmt.Sprint(pd.GetId())),
|
||||
zap.Int64("Offset", pd.GetOffset()),
|
||||
zap.Int64("Size", pd.GetPieceSize()),
|
||||
)
|
||||
|
||||
id, err := getNamespacedPieceID([]byte(pd.GetId()), getNamespace(authorization))
|
||||
if err != nil {
|
||||
@ -81,7 +84,11 @@ func (s *Server) Retrieve(stream pb.PieceStoreRoutes_RetrieveServer) (err error)
|
||||
return err
|
||||
}
|
||||
|
||||
zap.S().Infof("Successfully retrieved %s: Allocated: %v, Retrieved: %v\n", pd.GetId(), allocated, retrieved)
|
||||
s.log.Debug("Successfully retrieved",
|
||||
zap.String("Piece ID", fmt.Sprint(pd.GetId())),
|
||||
zap.Int64("Allocated", allocated),
|
||||
zap.Int64("Retrieved", retrieved),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -110,7 +117,7 @@ func (s *Server) retrieveData(ctx context.Context, stream pb.PieceStoreRoutes_Re
|
||||
err := s.DB.WriteBandwidthAllocToDB(lastAllocation)
|
||||
if err != nil {
|
||||
// TODO: handle error properly
|
||||
log.Println("WriteBandwidthAllocToDB Error:", err)
|
||||
s.log.Error("WriteBandwidthAllocToDB Error:", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
"crypto/hmac"
|
||||
"crypto/sha512"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@ -52,7 +53,12 @@ func (c Config) Run(ctx context.Context, server *provider.Provider) (err error)
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
s, err := Initialize(ctx, c, server.Identity().Key)
|
||||
db, err := psdb.Open(ctx, filepath.Join(c.Path, "piece-store-data"), filepath.Join(c.Path, "piecestore.db"))
|
||||
if err != nil {
|
||||
return ServerError.Wrap(err)
|
||||
}
|
||||
|
||||
s, err := NewEndpoint(zap.L(), c, db, server.Identity().Key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -74,6 +80,7 @@ func (c Config) Run(ctx context.Context, server *provider.Provider) (err error)
|
||||
log.Fatal(s.Stop(ctx))
|
||||
}()
|
||||
|
||||
s.log.Info("Started Node", zap.String("ID", fmt.Sprint(server.Identity().ID)))
|
||||
return server.Run(ctx)
|
||||
}
|
||||
|
||||
@ -97,6 +104,7 @@ func DirSize(path string) (int64, error) {
|
||||
|
||||
// Server -- GRPC server meta data used in route calls
|
||||
type Server struct {
|
||||
log *zap.Logger
|
||||
DataDir string
|
||||
DB *psdb.DB
|
||||
pkey crypto.PrivateKey
|
||||
@ -105,10 +113,8 @@ type Server struct {
|
||||
verifier auth.SignedMessageVerifier
|
||||
}
|
||||
|
||||
// Initialize -- initializes a server struct
|
||||
func Initialize(ctx context.Context, config Config, pkey crypto.PrivateKey) (*Server, error) {
|
||||
dbPath := filepath.Join(config.Path, "piecestore.db")
|
||||
dataDir := filepath.Join(config.Path, "piece-store-data")
|
||||
// NewEndpoint -- initializes a new endpoint for a piecestore server
|
||||
func NewEndpoint(log *zap.Logger, config Config, db *psdb.DB, pkey crypto.PrivateKey) (*Server, error) {
|
||||
|
||||
// read the allocated disk space from the config file
|
||||
allocatedDiskSpace := config.AllocatedDiskSpace
|
||||
@ -123,11 +129,6 @@ func Initialize(ctx context.Context, config Config, pkey crypto.PrivateKey) (*Se
|
||||
}
|
||||
freeDiskSpace := int64(diskSpace.Free)
|
||||
|
||||
db, err := psdb.Open(ctx, dataDir, dbPath)
|
||||
if err != nil {
|
||||
return nil, ServerError.Wrap(err)
|
||||
}
|
||||
|
||||
// get how much is currently used, if for the first time totalUsed = 0
|
||||
totalUsed, err := db.SumTTLSizes()
|
||||
if err != nil {
|
||||
@ -141,34 +142,35 @@ func Initialize(ctx context.Context, config Config, pkey crypto.PrivateKey) (*Se
|
||||
}
|
||||
|
||||
if usedBandwidth > allocatedBandwidth {
|
||||
zap.S().Warnf("Exceed the allowed Bandwidth setting")
|
||||
log.Warn("Exceed the allowed Bandwidth setting")
|
||||
} else {
|
||||
zap.S().Info("Remaining Bandwidth ", allocatedBandwidth-usedBandwidth)
|
||||
log.Info("Remaining Bandwidth", zap.Int64("bytes", allocatedBandwidth-usedBandwidth))
|
||||
}
|
||||
|
||||
// check your hard drive is big enough
|
||||
// first time setup as a piece node server
|
||||
if (totalUsed == 0x00) && (freeDiskSpace < allocatedDiskSpace) {
|
||||
allocatedDiskSpace = freeDiskSpace
|
||||
zap.S().Warnf("Disk space is less than requested allocated space, allocating = %d Bytes", allocatedDiskSpace)
|
||||
log.Warn("Disk space is less than requested. Allocating space", zap.Int64("bytes", allocatedDiskSpace))
|
||||
}
|
||||
|
||||
// on restarting the Piece node server, assuming already been working as a node
|
||||
// used above the alloacated space, user changed the allocation space setting
|
||||
// before restarting
|
||||
if totalUsed >= allocatedDiskSpace {
|
||||
zap.S().Warnf("Used more space then allocated, allocating = %d Bytes", allocatedDiskSpace)
|
||||
log.Warn("Used more space than allocated. Allocating space", zap.Int64("bytes", allocatedDiskSpace))
|
||||
}
|
||||
|
||||
// the available diskspace is less than remaining allocated space,
|
||||
// due to change of setting before restarting
|
||||
if freeDiskSpace < (allocatedDiskSpace - totalUsed) {
|
||||
allocatedDiskSpace = freeDiskSpace
|
||||
zap.S().Warnf("Disk space is less than requested allocated space, allocating = %d Bytes", allocatedDiskSpace)
|
||||
log.Warn("Disk space is less than requested. Allocating space", zap.Int64("bytes", allocatedDiskSpace))
|
||||
}
|
||||
|
||||
return &Server{
|
||||
DataDir: dataDir,
|
||||
log: log,
|
||||
DataDir: filepath.Join(config.Path, "piece-store-data"),
|
||||
DB: db,
|
||||
pkey: pkey,
|
||||
totalAllocated: allocatedDiskSpace,
|
||||
@ -178,8 +180,9 @@ func Initialize(ctx context.Context, config Config, pkey crypto.PrivateKey) (*Se
|
||||
}
|
||||
|
||||
// New creates a Server with custom db
|
||||
func New(dataDir string, db *psdb.DB, config Config, pkey crypto.PrivateKey) *Server {
|
||||
func New(log *zap.Logger, dataDir string, db *psdb.DB, config Config, pkey crypto.PrivateKey) *Server {
|
||||
return &Server{
|
||||
log: log,
|
||||
DataDir: dataDir,
|
||||
DB: db,
|
||||
pkey: pkey,
|
||||
@ -196,7 +199,7 @@ func (s *Server) Stop(ctx context.Context) (err error) {
|
||||
|
||||
// Piece -- Send meta data about a stored by by Id
|
||||
func (s *Server) Piece(ctx context.Context, in *pb.PieceId) (*pb.PieceSummary, error) {
|
||||
zap.S().Infof("Getting Meta for %s...", in.GetId())
|
||||
s.log.Debug("Getting Meta", zap.String("Piece ID", in.GetId()))
|
||||
|
||||
authorization := in.GetAuthorization()
|
||||
if err := s.verifier(authorization); err != nil {
|
||||
@ -233,13 +236,13 @@ func (s *Server) Piece(ctx context.Context, in *pb.PieceId) (*pb.PieceSummary, e
|
||||
return nil, err
|
||||
}
|
||||
|
||||
zap.S().Infof("Successfully retrieved meta for %s.", in.GetId())
|
||||
s.log.Debug("Successfully retrieved meta", zap.String("Piece ID", in.GetId()))
|
||||
return &pb.PieceSummary{Id: in.GetId(), PieceSize: fileInfo.Size(), ExpirationUnixSec: ttl}, nil
|
||||
}
|
||||
|
||||
// Stats will return statistics about the Server
|
||||
func (s *Server) Stats(ctx context.Context, in *pb.StatsReq) (*pb.StatSummary, error) {
|
||||
zap.S().Infof("Getting Stats...\n")
|
||||
s.log.Debug("Getting Stats...")
|
||||
|
||||
totalUsed, err := s.DB.SumTTLSizes()
|
||||
if err != nil {
|
||||
@ -256,7 +259,7 @@ func (s *Server) Stats(ctx context.Context, in *pb.StatsReq) (*pb.StatSummary, e
|
||||
|
||||
// Delete -- Delete data by Id from piecestore
|
||||
func (s *Server) Delete(ctx context.Context, in *pb.PieceDelete) (*pb.PieceDeleteSummary, error) {
|
||||
zap.S().Infof("Deleting %s...", in.GetId())
|
||||
s.log.Debug("Deleting", zap.String("Piece ID", fmt.Sprint(in.GetId())))
|
||||
|
||||
authorization := in.GetAuthorization()
|
||||
if err := s.verifier(authorization); err != nil {
|
||||
@ -271,7 +274,6 @@ func (s *Server) Delete(ctx context.Context, in *pb.PieceDelete) (*pb.PieceDelet
|
||||
return nil, err
|
||||
}
|
||||
|
||||
zap.S().Infof("Successfully deleted %s.", in.GetId())
|
||||
return &pb.PieceDeleteSummary{Message: OK}, nil
|
||||
}
|
||||
|
||||
@ -284,7 +286,7 @@ func (s *Server) deleteByID(id string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
zap.S().Infof("Deleted data of id (%s)\n", id)
|
||||
s.log.Debug("Deleted", zap.String("Piece ID", id))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
"github.com/gtank/cryptopasta"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.uber.org/zap/zaptest"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
@ -498,6 +499,7 @@ func newTestServerStruct(t *testing.T) (*Server, func()) {
|
||||
return nil
|
||||
}
|
||||
server := &Server{
|
||||
log: zaptest.NewLogger(t),
|
||||
DataDir: tempDir,
|
||||
DB: psDB,
|
||||
verifier: verifier,
|
||||
|
@ -5,6 +5,7 @@ package psserver
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
@ -45,7 +46,7 @@ func (s *Server) Store(reqStream pb.PieceStoreRoutes_StoreServer) (err error) {
|
||||
return StoreError.New("PieceStore message is nil")
|
||||
}
|
||||
|
||||
zap.S().Infof("Storing %s...", pd.GetId())
|
||||
s.log.Debug("Storing", zap.String("Piece ID", fmt.Sprint(pd.GetId())))
|
||||
|
||||
if pd.GetId() == "" {
|
||||
return StoreError.New("piece ID not specified")
|
||||
@ -68,7 +69,7 @@ func (s *Server) Store(reqStream pb.PieceStoreRoutes_StoreServer) (err error) {
|
||||
if err = s.DB.AddBandwidthUsed(total); err != nil {
|
||||
return StoreError.New("failed to write bandwidth info to database: %v", err)
|
||||
}
|
||||
zap.S().Infof("Successfully stored %s.", pd.GetId())
|
||||
s.log.Debug("Successfully stored", zap.String("Piece ID", fmt.Sprint(pd.GetId())))
|
||||
|
||||
return reqStream.SendAndClose(&pb.PieceStoreSummary{Message: OK, TotalReceived: total})
|
||||
}
|
||||
@ -80,7 +81,7 @@ func (s *Server) storeData(ctx context.Context, stream pb.PieceStoreRoutes_Store
|
||||
defer func() {
|
||||
if err != nil && err != io.EOF {
|
||||
if deleteErr := s.deleteByID(id); deleteErr != nil {
|
||||
zap.S().Errorf("Failed on deleteByID in Store: %s", deleteErr.Error())
|
||||
s.log.Error("Failed on deleteByID in Store", zap.Error(deleteErr))
|
||||
}
|
||||
}
|
||||
}()
|
||||
@ -105,18 +106,13 @@ func (s *Server) storeData(ctx context.Context, stream pb.PieceStoreRoutes_Store
|
||||
spaceLeft := s.totalAllocated - spaceUsed
|
||||
reader := NewStreamReader(s, stream, bwLeft, spaceLeft)
|
||||
|
||||
defer func() {
|
||||
baWriteErr := s.DB.WriteBandwidthAllocToDB(reader.bandwidthAllocation)
|
||||
if baWriteErr != nil {
|
||||
zap.S().Errorf("Error while writing Bandwidth Alloc to DB: %s\n", baWriteErr.Error())
|
||||
}
|
||||
}()
|
||||
|
||||
total, err = io.Copy(storeFile, reader)
|
||||
|
||||
if err != nil && err != io.EOF {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return total, nil
|
||||
err = s.DB.WriteBandwidthAllocToDB(reader.bandwidthAllocation)
|
||||
|
||||
return total, err
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user