From b7f8e309d92a001426b1c25a7623edb28f971f42 Mon Sep 17 00:00:00 2001 From: Egon Elbre Date: Wed, 27 Feb 2019 11:56:16 +0200 Subject: [PATCH] add piecestore.Storage interface (#1369) --- pkg/piecestore/psserver/readerwriter.go | 2 +- pkg/piecestore/psserver/retrieve.go | 2 +- pkg/piecestore/psserver/server.go | 33 +++++++++++++++++++------ pkg/piecestore/psserver/store.go | 2 +- 4 files changed, 29 insertions(+), 10 deletions(-) diff --git a/pkg/piecestore/psserver/readerwriter.go b/pkg/piecestore/psserver/readerwriter.go index fbcabf2fb..7a3567301 100644 --- a/pkg/piecestore/psserver/readerwriter.go +++ b/pkg/piecestore/psserver/readerwriter.go @@ -89,7 +89,7 @@ func NewStreamReader(s *Server, stream pb.PieceStoreRoutes_StoreServer, bandwidt return sr } -// Read -- Read method for piece download from stream +// Read reads piece from stream func (s *StreamReader) Read(b []byte) (int, error) { if s.sofar >= s.bandwidthRemaining { return 0, StreamWriterError.New("out of bandwidth") diff --git a/pkg/piecestore/psserver/retrieve.go b/pkg/piecestore/psserver/retrieve.go index dce1d8041..ed52e738a 100644 --- a/pkg/piecestore/psserver/retrieve.go +++ b/pkg/piecestore/psserver/retrieve.go @@ -21,7 +21,7 @@ import ( // RetrieveError is a type of error for failures in Server.Retrieve() var RetrieveError = errs.Class("retrieve error") -// Retrieve -- Retrieve data from piecestore and send to client +// Retrieve servers data from piecestore and sends to client func (s *Server) Retrieve(stream pb.PieceStoreRoutes_RetrieveServer) (err error) { ctx := stream.Context() defer mon.Task()(&ctx)(&err) diff --git a/pkg/piecestore/psserver/server.go b/pkg/piecestore/psserver/server.go index d1eb05616..37cd10ed3 100644 --- a/pkg/piecestore/psserver/server.go +++ b/pkg/piecestore/psserver/server.go @@ -10,6 +10,7 @@ import ( "crypto/sha512" "errors" "fmt" + "io" "os" "path/filepath" "strings" @@ -35,7 +36,7 @@ var ( ServerError = errs.Class("PSServer error") ) -//DirSize returns the total size of the files in that directory +// DirSize returns the total size of the files in that directory func DirSize(path string) (int64, error) { var size int64 _, err := os.Stat(path) @@ -53,11 +54,29 @@ func DirSize(path string) (int64, error) { return size, err } -// Server -- GRPC server meta data used in route calls +// Storage describes storing blobs on disk +type Storage interface { + // Writer returns a writer for the specified pieceID + Writer(pieceID string) (io.WriteCloser, error) + // Reader returns a reader for the specified pieceID + Reader(ctx context.Context, pieceID string, offset int64, length int64) (io.ReadCloser, error) + // Delete deletes the specified pieceID + Delete(pieceID string) error + + // Close closes the underlying database. + Close() error + + // PiecePath returns path of the specified piece on disk. + PiecePath(pieceID string) (string, error) + // Info returns the current status of the disk. + Info() (pstore.DiskInfo, error) +} + +// Server implements serving and storing pieces type Server struct { startTime time.Time log *zap.Logger - storage *pstore.Storage + storage Storage DB *psdb.DB identity *identity.FullIdentity totalAllocated int64 // TODO: use memory.Size @@ -67,7 +86,7 @@ type Server struct { } // NewEndpoint creates a new endpoint -func NewEndpoint(log *zap.Logger, config Config, storage *pstore.Storage, db *psdb.DB, identity *identity.FullIdentity, k *kademlia.Kademlia) (*Server, error) { +func NewEndpoint(log *zap.Logger, config Config, storage Storage, db *psdb.DB, identity *identity.FullIdentity, k *kademlia.Kademlia) (*Server, error) { // read the allocated disk space from the config file allocatedDiskSpace := config.AllocatedDiskSpace.Int64() allocatedBandwidth := config.AllocatedBandwidth.Int64() @@ -156,7 +175,7 @@ func (s *Server) Stop(ctx context.Context) error { ) } -// Piece -- Send meta data about a piece stored by Id +// Piece servers meta information about a piece. func (s *Server) Piece(ctx context.Context, in *pb.PieceId) (*pb.PieceSummary, error) { s.log.Debug("Getting Meta", zap.String("Piece ID", in.GetId())) @@ -185,7 +204,7 @@ func (s *Server) Piece(ctx context.Context, in *pb.PieceId) (*pb.PieceSummary, e return &pb.PieceSummary{Id: in.GetId(), PieceSize: fileInfo.Size(), ExpirationUnixSec: ttl}, nil } -// Stats will return statistics about the Server +// Stats returns current statistics about the server. func (s *Server) Stats(ctx context.Context, in *pb.StatsReq) (*pb.StatSummary, error) { s.log.Debug("Getting Stats...") @@ -241,7 +260,7 @@ func (s *Server) Dashboard(in *pb.DashboardReq, stream pb.PieceStoreRoutes_Dashb } } -// Delete -- Delete data by Id from piecestore +// Delete deletes data based on the specified ID. func (s *Server) Delete(ctx context.Context, in *pb.PieceDelete) (*pb.PieceDeleteSummary, error) { s.log.Debug("Deleting", zap.String("Piece ID", fmt.Sprint(in.GetId()))) diff --git a/pkg/piecestore/psserver/store.go b/pkg/piecestore/psserver/store.go index b86380f7d..7b167ffb7 100644 --- a/pkg/piecestore/psserver/store.go +++ b/pkg/piecestore/psserver/store.go @@ -23,7 +23,7 @@ const OK = "OK" // StoreError is a type of error for failures in Server.Store() var StoreError = errs.Class("store error") -// Store incoming data using piecestore +// Store stores incoming data using piecestore func (s *Server) Store(reqStream pb.PieceStoreRoutes_StoreServer) (err error) { ctx := reqStream.Context() defer mon.Task()(&ctx)(&err)