2019-01-24 20:15:10 +00:00
|
|
|
// Copyright (C) 2019 Storj Labs, Inc.
|
2018-08-17 18:40:15 +01:00
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
2018-11-06 17:49:17 +00:00
|
|
|
package psserver
|
2018-08-17 18:40:15 +01:00
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2018-09-10 10:18:42 +01:00
|
|
|
"fmt"
|
2018-08-17 18:40:15 +01:00
|
|
|
"io"
|
|
|
|
"os"
|
2018-09-10 10:18:42 +01:00
|
|
|
"sync/atomic"
|
2018-08-17 18:40:15 +01:00
|
|
|
|
|
|
|
"github.com/zeebo/errs"
|
2018-11-08 20:27:07 +00:00
|
|
|
"go.uber.org/zap"
|
2018-09-10 10:18:42 +01:00
|
|
|
|
2019-02-01 17:57:11 +00:00
|
|
|
"storj.io/storj/internal/memory"
|
2018-09-10 10:18:42 +01:00
|
|
|
"storj.io/storj/internal/sync2"
|
2018-09-18 05:39:06 +01:00
|
|
|
"storj.io/storj/pkg/pb"
|
2018-08-17 18:40:15 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
// RetrieveError is a type of error for failures in Server.Retrieve()
|
|
|
|
var RetrieveError = errs.Class("retrieve error")
|
|
|
|
|
|
|
|
// Retrieve -- Retrieve data from piecestore and send to client
|
|
|
|
func (s *Server) Retrieve(stream pb.PieceStoreRoutes_RetrieveServer) (err error) {
|
|
|
|
ctx := stream.Context()
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
|
|
|
// Receive Signature
|
|
|
|
recv, err := stream.Recv()
|
|
|
|
if err != nil {
|
|
|
|
return RetrieveError.Wrap(err)
|
|
|
|
}
|
|
|
|
if recv == nil {
|
|
|
|
return RetrieveError.New("error receiving piece data")
|
|
|
|
}
|
|
|
|
|
|
|
|
pd := recv.GetPieceData()
|
|
|
|
if pd == nil {
|
|
|
|
return RetrieveError.New("PieceStore message is nil")
|
|
|
|
}
|
|
|
|
|
2019-02-20 05:36:08 +00:00
|
|
|
rba := recv.GetBandwidthAllocation()
|
|
|
|
if rba == nil {
|
2019-02-22 21:17:35 +00:00
|
|
|
return RetrieveError.New("Order message is nil")
|
2019-02-20 05:36:08 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
pba := rba.PayerAllocation
|
2019-02-22 21:17:35 +00:00
|
|
|
if pb.Equal(&pba, &pb.OrderLimit{}) {
|
|
|
|
return RetrieveError.New("OrderLimit message is empty")
|
2019-02-20 05:36:08 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
id, err := getNamespacedPieceID([]byte(pd.GetId()), pba.SatelliteId.Bytes())
|
2018-10-23 17:03:35 +01:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2019-02-01 17:57:11 +00:00
|
|
|
s.log.Debug("Retrieving",
|
|
|
|
zap.String("Piece ID", id),
|
|
|
|
zap.Int64("Offset", pd.GetOffset()),
|
|
|
|
zap.Int64("Size", pd.GetPieceSize()),
|
|
|
|
)
|
|
|
|
|
2018-08-17 18:40:15 +01:00
|
|
|
// Get path to data being retrieved
|
2019-01-11 11:26:39 +00:00
|
|
|
path, err := s.storage.PiecePath(id)
|
2018-08-17 18:40:15 +01:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Verify that the path exists
|
|
|
|
fileInfo, err := os.Stat(path)
|
|
|
|
if err != nil {
|
|
|
|
return RetrieveError.Wrap(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Read the size specified
|
2018-11-20 17:09:35 +00:00
|
|
|
totalToRead := pd.GetPieceSize()
|
2018-08-17 18:40:15 +01:00
|
|
|
fileSize := fileInfo.Size()
|
|
|
|
|
|
|
|
// Read the entire file if specified -1 but make sure we do it from the correct offset
|
2018-11-20 17:09:35 +00:00
|
|
|
if pd.GetPieceSize() <= -1 || totalToRead+pd.GetOffset() > fileSize {
|
2018-08-17 18:40:15 +01:00
|
|
|
totalToRead = fileSize - pd.GetOffset()
|
|
|
|
}
|
|
|
|
|
2018-10-23 17:03:35 +01:00
|
|
|
retrieved, allocated, err := s.retrieveData(ctx, stream, id, pd.GetOffset(), totalToRead)
|
2018-08-17 18:40:15 +01:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2019-01-24 16:09:37 +00:00
|
|
|
s.log.Info("Successfully retrieved",
|
2019-02-01 17:57:11 +00:00
|
|
|
zap.String("Piece ID", id),
|
2018-12-17 15:23:02 +00:00
|
|
|
zap.Int64("Allocated", allocated),
|
|
|
|
zap.Int64("Retrieved", retrieved),
|
|
|
|
)
|
2018-08-17 18:40:15 +01:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *Server) retrieveData(ctx context.Context, stream pb.PieceStoreRoutes_RetrieveServer, id string, offset, length int64) (retrieved, allocated int64, err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
2019-01-11 11:26:39 +00:00
|
|
|
storeFile, err := s.storage.Reader(ctx, id, offset, length)
|
2018-08-17 18:40:15 +01:00
|
|
|
if err != nil {
|
2019-01-29 15:41:01 +00:00
|
|
|
return 0, 0, RetrieveError.Wrap(err)
|
2018-08-17 18:40:15 +01:00
|
|
|
}
|
|
|
|
|
2019-01-29 15:41:01 +00:00
|
|
|
defer func() {
|
|
|
|
err = errs.Combine(err, storeFile.Close())
|
|
|
|
}()
|
2018-08-17 18:40:15 +01:00
|
|
|
|
|
|
|
writer := NewStreamWriter(s, stream)
|
2018-09-10 10:18:42 +01:00
|
|
|
allocationTracking := sync2.NewThrottle()
|
|
|
|
totalAllocated := int64(0)
|
|
|
|
|
|
|
|
// Bandwidth Allocation recv loop
|
|
|
|
go func() {
|
|
|
|
var lastTotal int64
|
2019-02-22 21:17:35 +00:00
|
|
|
var lastAllocation *pb.Order
|
2018-09-10 10:18:42 +01:00
|
|
|
defer func() {
|
|
|
|
if lastAllocation == nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
err := s.DB.WriteBandwidthAllocToDB(lastAllocation)
|
|
|
|
if err != nil {
|
|
|
|
// TODO: handle error properly
|
2018-12-17 15:23:02 +00:00
|
|
|
s.log.Error("WriteBandwidthAllocToDB Error:", zap.Error(err))
|
2018-09-10 10:18:42 +01:00
|
|
|
}
|
|
|
|
}()
|
2018-08-17 18:40:15 +01:00
|
|
|
|
2018-09-10 10:18:42 +01:00
|
|
|
for {
|
|
|
|
recv, err := stream.Recv()
|
|
|
|
if err != nil {
|
2019-01-29 15:41:01 +00:00
|
|
|
allocationTracking.Fail(RetrieveError.Wrap(err))
|
2018-09-10 10:18:42 +01:00
|
|
|
return
|
|
|
|
}
|
2019-01-28 19:45:25 +00:00
|
|
|
rba := recv.BandwidthAllocation
|
|
|
|
if err = s.verifySignature(stream.Context(), rba); err != nil {
|
2019-01-29 15:41:01 +00:00
|
|
|
allocationTracking.Fail(RetrieveError.Wrap(err))
|
2018-09-10 10:18:42 +01:00
|
|
|
return
|
|
|
|
}
|
2019-01-28 19:45:25 +00:00
|
|
|
pba := rba.PayerAllocation
|
|
|
|
if err = s.verifyPayerAllocation(&pba, "GET"); err != nil {
|
2019-01-29 15:41:01 +00:00
|
|
|
allocationTracking.Fail(RetrieveError.Wrap(err))
|
2018-09-10 10:18:42 +01:00
|
|
|
return
|
|
|
|
}
|
2019-01-28 19:45:25 +00:00
|
|
|
//todo: figure out why this fails tests
|
|
|
|
// if rba.Total > pba.MaxSize {
|
|
|
|
// allocationTracking.Fail(fmt.Errorf("attempt to send more data than allocation %v got %v", rba.Total, pba.MaxSize))
|
|
|
|
// return
|
|
|
|
// }
|
|
|
|
if lastTotal > rba.Total {
|
|
|
|
allocationTracking.Fail(fmt.Errorf("got lower allocation was %v got %v", lastTotal, rba.Total))
|
2019-01-06 18:51:01 +00:00
|
|
|
return
|
|
|
|
}
|
2019-01-28 19:45:25 +00:00
|
|
|
atomic.StoreInt64(&totalAllocated, rba.Total)
|
|
|
|
if err = allocationTracking.Produce(rba.Total - lastTotal); err != nil {
|
2018-09-10 10:18:42 +01:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2019-01-28 19:45:25 +00:00
|
|
|
lastAllocation = rba
|
|
|
|
lastTotal = rba.Total
|
2018-08-17 18:40:15 +01:00
|
|
|
}
|
2018-09-10 10:18:42 +01:00
|
|
|
}()
|
2018-08-17 18:40:15 +01:00
|
|
|
|
2018-09-10 10:18:42 +01:00
|
|
|
// Data send loop
|
2019-02-01 17:57:11 +00:00
|
|
|
messageSize := int64(32 * memory.KiB)
|
2018-09-10 10:18:42 +01:00
|
|
|
used := int64(0)
|
2018-08-17 18:40:15 +01:00
|
|
|
|
2018-09-10 10:18:42 +01:00
|
|
|
for used < length {
|
|
|
|
nextMessageSize, err := allocationTracking.ConsumeOrWait(messageSize)
|
|
|
|
if err != nil {
|
2019-01-29 15:41:01 +00:00
|
|
|
allocationTracking.Fail(RetrieveError.Wrap(err))
|
2018-09-10 10:18:42 +01:00
|
|
|
break
|
2018-08-17 18:40:15 +01:00
|
|
|
}
|
|
|
|
|
2019-02-01 17:57:11 +00:00
|
|
|
toCopy := nextMessageSize
|
|
|
|
if length-used < nextMessageSize {
|
|
|
|
toCopy = length - used
|
|
|
|
}
|
|
|
|
|
2018-09-10 10:18:42 +01:00
|
|
|
used += nextMessageSize
|
2019-02-01 17:57:11 +00:00
|
|
|
|
|
|
|
n, err := io.CopyN(writer, storeFile, toCopy)
|
|
|
|
if err != nil {
|
|
|
|
// break on error
|
|
|
|
allocationTracking.Fail(RetrieveError.Wrap(err))
|
|
|
|
break
|
|
|
|
}
|
2018-09-10 10:18:42 +01:00
|
|
|
// correct errors when needed
|
|
|
|
if n != nextMessageSize {
|
2019-02-01 17:57:11 +00:00
|
|
|
if err := allocationTracking.Produce(nextMessageSize - n); err != nil {
|
2018-09-10 10:18:42 +01:00
|
|
|
break
|
|
|
|
}
|
|
|
|
used -= nextMessageSize - n
|
|
|
|
}
|
2018-08-17 18:40:15 +01:00
|
|
|
}
|
|
|
|
|
2018-09-10 10:18:42 +01:00
|
|
|
// TODO: handle errors
|
|
|
|
// _ = stream.Close()
|
|
|
|
|
|
|
|
return used, atomic.LoadInt64(&totalAllocated), allocationTracking.Err()
|
2018-08-17 18:40:15 +01:00
|
|
|
}
|