098cbc9c67
all of the packages and tests work with both grpc and drpc. we'll probably need to do some jenkins pipelines to run the tests with drpc as well. most of the changes are really due to a bit of cleanup of the pkg/transport.Client api into an rpc.Dialer in the spirit of a net.Dialer. now that we don't need observers, we can pass around stateless configuration to everything rather than stateful things that issue observations. it also adds a DialAddressID for the case where we don't have a pb.Node, but we do have an address and want to assert some ID. this happened pretty frequently, and now there's no more weird contortions creating custom tls options, etc. a lot of the other changes are being consistent/using the abstractions in the rpc package to do rpc style things like finding peer information, or checking status codes. Change-Id: Ief62875e21d80a21b3c56a5a37f45887679f9412
309 lines
8.2 KiB
Go
309 lines
8.2 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package piecestore
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
|
|
"github.com/zeebo/errs"
|
|
|
|
"storj.io/storj/internal/errs2"
|
|
"storj.io/storj/pkg/identity"
|
|
"storj.io/storj/pkg/pb"
|
|
"storj.io/storj/pkg/signing"
|
|
"storj.io/storj/pkg/storj"
|
|
)
|
|
|
|
// Downloader is interface that can be used for downloading content.
|
|
// It matches signature of `io.ReadCloser`, with one extra function,
|
|
// GetHashAndLimit(), used for accessing information during GET_REPAIR.
|
|
type Downloader interface {
|
|
Read([]byte) (int, error)
|
|
Close() error
|
|
GetHashAndLimit() (*pb.PieceHash, *pb.OrderLimit)
|
|
}
|
|
|
|
// Download implements downloading from a piecestore.
|
|
type Download struct {
|
|
client *Client
|
|
limit *pb.OrderLimit
|
|
privateKey storj.PiecePrivateKey
|
|
peer *identity.PeerIdentity
|
|
stream downloadStream
|
|
ctx context.Context
|
|
|
|
read int64 // how much data we have read so far
|
|
allocated int64 // how far have we sent orders
|
|
downloaded int64 // how much data have we downloaded
|
|
downloadSize int64 // how much do we want to download
|
|
|
|
// what is the step we consider to upload
|
|
allocationStep int64
|
|
|
|
unread ReadBuffer
|
|
|
|
// hash and originLimit are received in the event of a GET_REPAIR
|
|
hash *pb.PieceHash
|
|
originLimit *pb.OrderLimit
|
|
|
|
closed bool
|
|
closingError error
|
|
}
|
|
|
|
type downloadStream interface {
|
|
CloseSend() error
|
|
Send(*pb.PieceDownloadRequest) error
|
|
Recv() (*pb.PieceDownloadResponse, error)
|
|
}
|
|
|
|
// Download starts a new download using the specified order limit at the specified offset and size.
|
|
func (client *Client) Download(ctx context.Context, limit *pb.OrderLimit, piecePrivateKey storj.PiecePrivateKey, offset, size int64) (_ Downloader, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
peer, err := client.conn.PeerIdentity()
|
|
if err != nil {
|
|
return nil, ErrInternal.Wrap(err)
|
|
}
|
|
|
|
stream, err := client.client.Download(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = stream.Send(&pb.PieceDownloadRequest{
|
|
Limit: limit,
|
|
Chunk: &pb.PieceDownloadRequest_Chunk{
|
|
Offset: offset,
|
|
ChunkSize: size,
|
|
},
|
|
})
|
|
if err != nil {
|
|
_, recvErr := stream.Recv()
|
|
return nil, ErrProtocol.Wrap(errs.Combine(err, recvErr))
|
|
}
|
|
|
|
download := &Download{
|
|
client: client,
|
|
limit: limit,
|
|
privateKey: piecePrivateKey,
|
|
peer: peer,
|
|
stream: stream,
|
|
ctx: ctx,
|
|
|
|
read: 0,
|
|
|
|
allocated: 0,
|
|
downloaded: 0,
|
|
downloadSize: size,
|
|
|
|
allocationStep: client.config.InitialStep,
|
|
}
|
|
|
|
if client.config.DownloadBufferSize <= 0 {
|
|
return &LockingDownload{download: download}, nil
|
|
}
|
|
return &LockingDownload{
|
|
download: NewBufferedDownload(download, int(client.config.DownloadBufferSize)),
|
|
}, nil
|
|
}
|
|
|
|
// Read downloads data from the storage node allocating as necessary.
|
|
func (client *Download) Read(data []byte) (read int, err error) {
|
|
ctx := client.ctx
|
|
defer mon.Task()(&ctx, "node: "+client.peer.ID.String()[0:8])(&err)
|
|
|
|
if client.closed {
|
|
return 0, io.ErrClosedPipe
|
|
}
|
|
|
|
for client.read < client.downloadSize {
|
|
// read from buffer
|
|
n, err := client.unread.Read(data)
|
|
client.read += int64(n)
|
|
read += n
|
|
|
|
// if we have an error return the error
|
|
if err != nil {
|
|
return read, err
|
|
}
|
|
// if we are pending for an error, avoid further requests, but try to finish what's in unread buffer.
|
|
if client.unread.Errored() {
|
|
return read, nil
|
|
}
|
|
|
|
// do we need to send a new order to storagenode
|
|
if client.allocated-client.downloaded < client.allocationStep {
|
|
newAllocation := client.allocationStep
|
|
|
|
// have we downloaded more than we have allocated due to a generous storagenode?
|
|
if client.allocated-client.downloaded < 0 {
|
|
newAllocation += client.downloaded - client.allocated
|
|
}
|
|
|
|
// ensure we don't allocate more than we intend to read
|
|
if client.allocated+newAllocation > client.downloadSize {
|
|
newAllocation = client.downloadSize - client.allocated
|
|
}
|
|
|
|
// send an order
|
|
if newAllocation > 0 {
|
|
order, err := signing.SignUplinkOrder(ctx, client.privateKey, &pb.Order{
|
|
SerialNumber: client.limit.SerialNumber,
|
|
Amount: newAllocation,
|
|
})
|
|
if err != nil {
|
|
// we are signing so we shouldn't propagate this into close,
|
|
// however we should include this as a read error
|
|
client.unread.IncludeError(err)
|
|
client.closeWithError(nil)
|
|
return read, nil
|
|
}
|
|
|
|
err = client.stream.Send(&pb.PieceDownloadRequest{
|
|
Order: order,
|
|
})
|
|
if err != nil {
|
|
// other side doesn't want to talk to us anymore or network went down
|
|
client.unread.IncludeError(err)
|
|
// if it's a cancellation, then we'll just close with context.Canceled
|
|
if errs2.IsCanceled(err) {
|
|
client.closeWithError(err)
|
|
return read, err
|
|
}
|
|
// otherwise, something else happened and we should try to ask the other side
|
|
client.closeAndTryFetchError()
|
|
return read, nil
|
|
}
|
|
|
|
// update our allocation step
|
|
client.allocationStep = client.client.nextAllocationStep(client.allocationStep)
|
|
}
|
|
}
|
|
|
|
// we have data, no need to wait for a chunk
|
|
if read > 0 {
|
|
return read, nil
|
|
}
|
|
|
|
// we don't have data, wait for a chunk from storage node
|
|
response, err := client.stream.Recv()
|
|
if response != nil && response.Chunk != nil {
|
|
client.downloaded += int64(len(response.Chunk.Data))
|
|
client.unread.Fill(response.Chunk.Data)
|
|
}
|
|
// This is a GET_REPAIR because we got a piece hash and the original order limit.
|
|
if response != nil && response.Hash != nil && response.Limit != nil {
|
|
client.hash = response.Hash
|
|
client.originLimit = response.Limit
|
|
}
|
|
|
|
// we may have some data buffered, so we cannot immediately return the error
|
|
// we'll queue the error and use the received error as the closing error
|
|
if err != nil {
|
|
client.unread.IncludeError(err)
|
|
client.handleClosingError(err)
|
|
}
|
|
}
|
|
|
|
// all downloaded
|
|
if read == 0 {
|
|
return 0, io.EOF
|
|
}
|
|
return read, nil
|
|
}
|
|
|
|
// handleClosingError should be used for an error that also closed the stream.
|
|
func (client *Download) handleClosingError(err error) {
|
|
if client.closed {
|
|
return
|
|
}
|
|
client.closed = true
|
|
client.closingError = err
|
|
}
|
|
|
|
// closeWithError is used when we include the err in the closing error and also close the stream.
|
|
func (client *Download) closeWithError(err error) {
|
|
if client.closed {
|
|
return
|
|
}
|
|
client.closed = true
|
|
client.closingError = errs.Combine(err, client.stream.CloseSend())
|
|
}
|
|
|
|
// closeAndTryFetchError closes the stream and also tries to fetch the actual error from the stream.
|
|
func (client *Download) closeAndTryFetchError() {
|
|
if client.closed {
|
|
return
|
|
}
|
|
client.closed = true
|
|
|
|
client.closingError = client.stream.CloseSend()
|
|
if client.closingError == nil || client.closingError == io.EOF {
|
|
_, client.closingError = client.stream.Recv()
|
|
}
|
|
}
|
|
|
|
// Close closes the downloading.
|
|
func (client *Download) Close() (err error) {
|
|
defer func() {
|
|
if err != nil {
|
|
details := errs.Class(fmt.Sprintf("(Node ID: %s, Piece ID: %s)", client.peer.ID.String(), client.limit.PieceId.String()))
|
|
err = details.Wrap(err)
|
|
err = Error.Wrap(err)
|
|
}
|
|
}()
|
|
|
|
client.closeWithError(nil)
|
|
return client.closingError
|
|
}
|
|
|
|
// GetHashAndLimit gets the download's hash and original order limit.
|
|
func (client *Download) GetHashAndLimit() (*pb.PieceHash, *pb.OrderLimit) {
|
|
return client.hash, client.originLimit
|
|
}
|
|
|
|
// ReadBuffer implements buffered reading with an error.
|
|
type ReadBuffer struct {
|
|
data []byte
|
|
err error
|
|
}
|
|
|
|
// Error returns an error if it was encountered.
|
|
func (buffer *ReadBuffer) Error() error { return buffer.err }
|
|
|
|
// Errored returns whether the buffer contains an error.
|
|
func (buffer *ReadBuffer) Errored() bool { return buffer.err != nil }
|
|
|
|
// Empty checks whether buffer needs to be filled.
|
|
func (buffer *ReadBuffer) Empty() bool {
|
|
return len(buffer.data) == 0 && buffer.err == nil
|
|
}
|
|
|
|
// IncludeError adds error at the end of the buffer.
|
|
func (buffer *ReadBuffer) IncludeError(err error) {
|
|
buffer.err = errs.Combine(buffer.err, err)
|
|
}
|
|
|
|
// Fill fills the buffer with the specified bytes.
|
|
func (buffer *ReadBuffer) Fill(data []byte) {
|
|
buffer.data = data
|
|
}
|
|
|
|
// Read reads from the buffer.
|
|
func (buffer *ReadBuffer) Read(data []byte) (n int, err error) {
|
|
if len(buffer.data) > 0 {
|
|
n = copy(data, buffer.data)
|
|
buffer.data = buffer.data[n:]
|
|
return n, nil
|
|
}
|
|
|
|
if buffer.err != nil {
|
|
return 0, buffer.err
|
|
}
|
|
|
|
return 0, nil
|
|
}
|