b0db33f919
* captplanet standalone farmer setup * Bandwidth Allocation * utils.Close method changed to utils.LogClose * Get build temporarily working * Get/Put for PSClient should take payer bandwidth allocations rather than the NewPSClient function * Update example client to reflect changes in client API * Update ecclient to use latest PSClient, Make NewPSClient return error also * Updated pieceranger tests to check for errors; sign method should take byte array * Handle defers in store.go better * Fix defer functions in psdb.go * fun times * Protobuf bandwidthallocation data is now a byte array * Remove psservice package and merge it into pstore server * Write wrapper for database calls * Change all expiration names in protobuf to be more informative; add defer in retrieve; remove old comment * Make PSDB tests implementation independent rather than method independent * get rid of payer, renter in ecclient * add context monitoring in store and retrieve
132 lines
3.0 KiB
Go
132 lines
3.0 KiB
Go
// Copyright (C) 2018 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package client
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
|
|
"github.com/gogo/protobuf/proto"
|
|
"storj.io/storj/pkg/utils"
|
|
pb "storj.io/storj/protos/piecestore"
|
|
)
|
|
|
|
// StreamWriter creates a StreamWriter for writing data to the piece store server
|
|
type StreamWriter struct {
|
|
stream pb.PieceStoreRoutes_StoreClient
|
|
signer *Client // We need this for signing
|
|
totalWritten int64
|
|
pba *pb.PayerBandwidthAllocation
|
|
}
|
|
|
|
// Write Piece data to a piece store server upload stream
|
|
func (s *StreamWriter) Write(b []byte) (int, error) {
|
|
updatedAllocation := s.totalWritten + int64(len(b))
|
|
allocationData := &pb.RenterBandwidthAllocation_Data{
|
|
PayerAllocation: s.pba,
|
|
Total: updatedAllocation,
|
|
}
|
|
|
|
serializedAllocation, err := proto.Marshal(allocationData)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
sig, err := s.signer.sign(serializedAllocation)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
msg := &pb.PieceStore{
|
|
Piecedata: &pb.PieceStore_PieceData{Content: b},
|
|
Bandwidthallocation: &pb.RenterBandwidthAllocation{
|
|
Data: serializedAllocation, Signature: sig,
|
|
},
|
|
}
|
|
|
|
s.totalWritten = updatedAllocation
|
|
|
|
// Second we send the actual content
|
|
if err := s.stream.Send(msg); err != nil {
|
|
return 0, fmt.Errorf("%v.Send() = %v", s.stream, err)
|
|
}
|
|
|
|
return len(b), nil
|
|
}
|
|
|
|
// Close the piece store Write Stream
|
|
func (s *StreamWriter) Close() error {
|
|
reply, err := s.stream.CloseAndRecv()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Printf("Route summary: %v", reply)
|
|
|
|
return nil
|
|
}
|
|
|
|
// StreamReader is a struct for reading piece download stream from server
|
|
type StreamReader struct {
|
|
stream pb.PieceStoreRoutes_RetrieveClient
|
|
src *utils.ReaderSource
|
|
totalRead int64
|
|
}
|
|
|
|
// NewStreamReader creates a StreamReader for reading data from the piece store server
|
|
func NewStreamReader(signer *Client, stream pb.PieceStoreRoutes_RetrieveClient, pba *pb.PayerBandwidthAllocation) *StreamReader {
|
|
sr := &StreamReader{
|
|
stream: stream,
|
|
}
|
|
|
|
sr.src = utils.NewReaderSource(func() ([]byte, error) {
|
|
updatedAllocation := int64(signer.bandwidthMsgSize) + sr.totalRead
|
|
allocationData := &pb.RenterBandwidthAllocation_Data{
|
|
PayerAllocation: pba,
|
|
Total: updatedAllocation,
|
|
}
|
|
|
|
serializedAllocation, err := proto.Marshal(allocationData)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
sig, err := signer.sign(serializedAllocation)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
msg := &pb.PieceRetrieval{
|
|
Bandwidthallocation: &pb.RenterBandwidthAllocation{
|
|
Signature: sig,
|
|
Data: serializedAllocation,
|
|
},
|
|
}
|
|
|
|
sr.totalRead = updatedAllocation
|
|
|
|
if err = stream.Send(msg); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp, err := stream.Recv()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return resp.GetContent(), nil
|
|
})
|
|
|
|
return sr
|
|
}
|
|
|
|
// Read Piece data from piece store server download stream
|
|
func (s *StreamReader) Read(b []byte) (int, error) {
|
|
return s.src.Read(b)
|
|
}
|
|
|
|
// Close the piece store server Read Stream
|
|
func (s *StreamReader) Close() error {
|
|
return s.stream.CloseSend()
|
|
}
|