storj/pkg/piecestore/rpc/server/server.go
Alexander Leitner ff8e191a9a
Added rpc server (#40)
* Added piecestore

* gofmt

* Added requested changes

* Added cli

* Removed ranger because I wanted something that can stand alone

* Add example of http server using piece store

* Changed piecestore code to make it more optial for error handelling

* Merged with piecestore

* Added missing package

* Forgot io import

* gofmt

* gofmt

* Forgot io

* Make path by hash exported

* updated to simplify again whoops

* Updated server to work real good

* Forgot ampersand

* Updated to match FilePiece

* Merged in cam's delete code

* Remove unused io

* Added RPC code

* Give the download request a reader

* Removed http server stuff; changed receive stream to say io.reader

* Added expiration date to shardInfo

* Change all instances of Shard to Piece; change protobuf name; moved client insance to outside functions

* added ttl info request

* Move scripts to http server pr; added close method for Retrieve api

* added rpc server tests for getting piece meta data and retrieval routes

* Resolved linter errors, moved to prc server to pkg, updated go.mod to use latest protobuf

* Imported cams test

* Bump gometalinter deadline

* WIP adding tests

* added tests for store and delete routes

* Add changes as requested by Kaloyan, also cleaned up some code

* Get the code actually working whoops

* More cleanup

* Separating database calls from api.go

* need to rename expiration

* Added some changes requested by JT

* Fix read size

* Fixed total amount to read

* added tests

* Simplify protobuf, add store tests, edited api to handle invalid stores properly, return errors instead of messages

* Moved rpc client and server to piece store

* Moved piecestore protobuf to the /protos folder

* Cleaned up messages

* Clean up linter errors

* Added missing sqlite import

* Add ability to do iterative reads and writes to pstore

* Incrementally read data

* Fix linter and import errors

* Solve linter Error

* Change return types

* begin test refactor

* refactored to implement only 1 db connection, moved SQLite row checking into separate function and removed defer on rows.Close(), fixed os.tempDir in rpc_test.go

* Cleaning up tests

* Added retrieve tests

* refactored delete tests

* Deleted old tests

* Updated cmd/piecestore to reflect changes to piecestore

* Refactored server tests and server/client store code

* gofmt

* WIP implementing TTL struct

* Read 4k at a time when Retrieving

* implemented ttl struct

* Accidentally removed fpiece dependency?

* Double resolve merge conflict

* Moved client to the examples since it is an example

* Change hash to id in protobuf. TODO: Change client and server to reflect these changes

* Update client and protobuf

* changed hash to id

* Handle eof properly in retrieve

* Id -> ID

* forgot to change import for client after moving

* Moved client and server main to examples

* Made changes requested by JT

* checkEntries is now private, created currentTime variable for checkEntries, added defer rows.Close()

* Print not fatal

* Handle overflow when reading from server

* added const IDLength

* removed types from comments

* Add reader/writer for download data from and uploading data to server

* goimports and comments

* fixed nits, casts, added OK const, DBCleanup now exits program on error

* Add stream reader and writer for server

* Fix errors

* i beforee except after c lol

* customizable data dir

* Forgot to rename variable

* customizable data dir

* Handle closing of server stream properly

* linter

* pass on inserting the same data twice

* linter

* linter

* Do the easy things JT asked for

* Handle overflow from reads properly; handle custom db path

* Handle overflow for server stream read; TODO Combine server and client stream reads

* Allow for TTL of 0 to stay forever

* Make Client cleaner for user
2018-06-02 14:14:59 -04:00

143 lines
3.3 KiB
Go

// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package server
import (
"fmt"
"io"
"log"
"os"
"golang.org/x/net/context"
"storj.io/storj/pkg/piecestore"
"storj.io/storj/pkg/piecestore/rpc/server/ttl"
pb "storj.io/storj/protos/piecestore"
)
// OK - Success!
const OK = "OK"
// Server -- GRPC server meta data used in route calls
type Server struct {
PieceStoreDir string
DB *ttl.TTL
}
// Store -- Store incoming data using piecestore
func (s *Server) Store(stream pb.PieceStoreRoutes_StoreServer) error {
log.Println("Storing data...")
// Receive initial meta data about what's being stored
piece, err := stream.Recv()
if err != nil {
return err
}
// If we put in the database first then that checks if the data already exists
if err := s.DB.AddTTLToDB(piece.Id, piece.Ttl); err != nil {
log.Println(err)
return err
}
// Initialize file for storing data
storeFile, err := pstore.StoreWriter(piece.Id, piece.Size, piece.StoreOffset, s.PieceStoreDir)
if err != nil {
return err
}
defer storeFile.Close()
reader := &StreamReader{stream: stream}
total, err := io.Copy(storeFile, reader)
if err != nil {
return err
}
if total < piece.Size {
return fmt.Errorf("Received %v bytes of total %v bytes", int64(total), piece.Size)
}
log.Println("Successfully stored data.")
return stream.SendAndClose(&pb.PieceStoreSummary{Message: OK, TotalReceived: int64(total)})
}
// Retrieve -- Retrieve data from piecestore and send to client
func (s *Server) Retrieve(pieceMeta *pb.PieceRetrieval, stream pb.PieceStoreRoutes_RetrieveServer) error {
log.Println("Retrieving data...")
path, err := pstore.PathByID(pieceMeta.Id, s.PieceStoreDir)
if err != nil {
return err
}
fileInfo, err := os.Stat(path)
if err != nil {
return err
}
// Read the size specified
totalToRead := pieceMeta.Size
// Read the entire file if specified -1
if pieceMeta.Size <= -1 {
totalToRead = fileInfo.Size()
}
storeFile, err := pstore.RetrieveReader(pieceMeta.Id, totalToRead, pieceMeta.StoreOffset, s.PieceStoreDir)
if err != nil {
return err
}
defer storeFile.Close()
writer := &StreamWriter{stream: stream}
_, err = io.Copy(writer, storeFile)
if err != nil {
return err
}
log.Println("Successfully retrieved data.")
return nil
}
// Piece -- Send meta data about a stored by by Id
func (s *Server) Piece(ctx context.Context, in *pb.PieceId) (*pb.PieceSummary, error) {
log.Println("Getting Meta data...")
path, err := pstore.PathByID(in.Id, s.PieceStoreDir)
if err != nil {
return nil, err
}
fileInfo, err := os.Stat(path)
if err != nil {
return nil, err
}
// Read database to calculate expiration
ttl, err := s.DB.GetTTLByID(in.Id)
if err != nil {
return nil, err
}
log.Println("Meta data retrieved.")
return &pb.PieceSummary{Id: in.Id, Size: fileInfo.Size(), Expiration: ttl}, nil
}
// Delete -- Delete data by Id from piecestore
func (s *Server) Delete(ctx context.Context, in *pb.PieceDelete) (*pb.PieceDeleteSummary, error) {
log.Println("Deleting data...")
if err := pstore.Delete(in.Id, s.PieceStoreDir); err != nil {
return nil, err
}
if err := s.DB.DeleteTTLByID(in.Id); err != nil {
return nil, err
}
log.Println("Successfully deleted data.")
return &pb.PieceDeleteSummary{Message: OK}, nil
}