2019-01-24 20:15:10 +00:00
|
|
|
// Copyright (C) 2019 Storj Labs, Inc.
|
2018-06-27 19:42:54 +01:00
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
2018-11-06 17:49:17 +00:00
|
|
|
package psclient
|
2018-06-27 19:42:54 +01:00
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"context"
|
|
|
|
"io"
|
|
|
|
"io/ioutil"
|
|
|
|
|
|
|
|
"github.com/zeebo/errs"
|
|
|
|
|
2018-09-18 05:39:06 +01:00
|
|
|
"storj.io/storj/pkg/pb"
|
2018-06-27 19:42:54 +01:00
|
|
|
"storj.io/storj/pkg/ranger"
|
|
|
|
)
|
|
|
|
|
2018-07-16 20:22:34 +01:00
|
|
|
// Error is the error class for pieceRanger
|
2018-06-27 19:42:54 +01:00
|
|
|
var Error = errs.Class("pieceRanger error")
|
|
|
|
|
|
|
|
type pieceRanger struct {
|
2019-02-20 05:36:08 +00:00
|
|
|
c *PieceStore
|
|
|
|
id PieceID
|
|
|
|
size int64
|
|
|
|
stream pb.PieceStoreRoutes_RetrieveClient
|
2019-02-22 21:17:35 +00:00
|
|
|
pba *pb.OrderLimit
|
2018-06-27 19:42:54 +01:00
|
|
|
}
|
|
|
|
|
2018-09-14 15:10:43 +01:00
|
|
|
// PieceRanger PieceRanger returns a Ranger from a PieceID.
|
2019-02-22 21:17:35 +00:00
|
|
|
func PieceRanger(ctx context.Context, c *PieceStore, stream pb.PieceStoreRoutes_RetrieveClient, id PieceID, pba *pb.OrderLimit) (ranger.Ranger, error) {
|
2018-08-27 18:28:16 +01:00
|
|
|
piece, err := c.Meta(ctx, id)
|
2018-06-27 19:42:54 +01:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2019-02-20 05:36:08 +00:00
|
|
|
return &pieceRanger{c: c, id: id, size: piece.PieceSize, stream: stream, pba: pba}, nil
|
2018-06-27 19:42:54 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// PieceRangerSize creates a PieceRanger with known size.
|
|
|
|
// Use it if you know the piece size. This will safe the extra request for
|
|
|
|
// retrieving the piece size from the piece storage.
|
2019-02-22 21:17:35 +00:00
|
|
|
func PieceRangerSize(c *PieceStore, stream pb.PieceStoreRoutes_RetrieveClient, id PieceID, size int64, pba *pb.OrderLimit) ranger.Ranger {
|
2019-02-20 05:36:08 +00:00
|
|
|
return &pieceRanger{c: c, id: id, size: size, stream: stream, pba: pba}
|
2018-06-27 19:42:54 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// Size implements Ranger.Size
|
|
|
|
func (r *pieceRanger) Size() int64 {
|
|
|
|
return r.size
|
|
|
|
}
|
|
|
|
|
|
|
|
// Range implements Ranger.Range
|
|
|
|
func (r *pieceRanger) Range(ctx context.Context, offset, length int64) (io.ReadCloser, error) {
|
|
|
|
if offset < 0 {
|
|
|
|
return nil, Error.New("negative offset")
|
|
|
|
}
|
|
|
|
if length < 0 {
|
|
|
|
return nil, Error.New("negative length")
|
|
|
|
}
|
|
|
|
if offset+length > r.size {
|
|
|
|
return nil, Error.New("range beyond end")
|
|
|
|
}
|
|
|
|
if length == 0 {
|
|
|
|
return ioutil.NopCloser(bytes.NewReader([]byte{})), nil
|
|
|
|
}
|
2018-08-17 18:40:15 +01:00
|
|
|
|
2019-02-20 05:36:08 +00:00
|
|
|
// Making a copy, otherwise there will be a data race
|
|
|
|
// when another goroutine tries to write the cached size
|
|
|
|
// of this instance at the same time.
|
|
|
|
pbaClone := r.pba.Clone()
|
|
|
|
|
2019-02-22 21:17:35 +00:00
|
|
|
rba := &pb.Order{
|
2019-02-20 05:36:08 +00:00
|
|
|
PayerAllocation: pbaClone,
|
|
|
|
StorageNodeId: r.c.remoteID,
|
|
|
|
}
|
|
|
|
|
2018-08-17 18:40:15 +01:00
|
|
|
// send piece data
|
2019-02-20 05:36:08 +00:00
|
|
|
if err := r.stream.Send(&pb.PieceRetrieval{
|
|
|
|
PieceData: &pb.PieceRetrieval_PieceData{Id: r.id.String(), PieceSize: length, Offset: offset},
|
|
|
|
BandwidthAllocation: rba,
|
|
|
|
}); err != nil {
|
2018-06-27 19:42:54 +01:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2019-02-20 05:36:08 +00:00
|
|
|
return NewStreamReader(r.c, r.stream, rba, r.size), nil
|
2018-06-27 19:42:54 +01:00
|
|
|
}
|