2019-03-18 10:55:06 +00:00
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package piecestore
import (
"context"
"io"
2019-06-03 10:17:09 +01:00
"os"
2019-07-10 14:41:47 +01:00
"runtime"
2019-07-03 14:47:55 +01:00
"sync/atomic"
2019-03-18 10:55:06 +00:00
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
2019-06-03 10:17:09 +01:00
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
2019-07-09 22:54:00 +01:00
"gopkg.in/spacemonkeygo/monkit.v2"
2019-03-18 10:55:06 +00:00
"storj.io/storj/internal/memory"
"storj.io/storj/internal/sync2"
2019-07-10 14:41:47 +01:00
"storj.io/storj/pkg/bloomfilter"
2019-03-18 10:55:06 +00:00
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/pb"
2019-07-28 06:55:36 +01:00
"storj.io/storj/pkg/signing"
2019-07-03 18:29:18 +01:00
"storj.io/storj/pkg/storj"
2019-03-18 10:55:06 +00:00
"storj.io/storj/storagenode/bandwidth"
"storj.io/storj/storagenode/monitor"
"storj.io/storj/storagenode/orders"
"storj.io/storj/storagenode/pieces"
"storj.io/storj/storagenode/trust"
)
var (
mon = monkit . Package ( )
// Error is the default error class for piecestore errors
Error = errs . Class ( "piecestore" )
// ErrProtocol is the default error class for protocol errors.
ErrProtocol = errs . Class ( "piecestore protocol" )
// ErrInternal is the default error class for internal piecestore errors.
ErrInternal = errs . Class ( "piecestore internal" )
)
var _ pb . PiecestoreServer = ( * Endpoint ) ( nil )
2019-04-26 06:17:18 +01:00
// OldConfig contains everything necessary for a server
type OldConfig struct {
2019-07-03 18:29:18 +01:00
Path string ` help:"path to store data in" default:"$CONFDIR/storage" `
WhitelistedSatellites storj . NodeURLs ` help:"a comma-separated list of approved satellite node urls" devDefault:"" releaseDefault:"12EayRS2V1kEsWESU9QMRseFhdxYxKicsiFmxrsLZHeLUtdps3S@mars.tardigrade.io:7777,118UWpMCHzs6CvSgWd9BfFVjw5K9pZbJjkfZJexMtSkmKxvvAW@satellite.stefan-benten.de:7777,121RTSDpyNZVcEU84Ticf2L1ntiuUimbWgfATz21tuvgk3vzoA6@saturn.tardigrade.io:7777,12L9ZFwhzVpuEKMUNUqkaTLGzwY9G24tbiigLiXpmZWKwmcNDDs@jupiter.tardigrade.io:7777" `
AllocatedDiskSpace memory . Size ` user:"true" help:"total allocated disk space in bytes" default:"1TB" `
AllocatedBandwidth memory . Size ` user:"true" help:"total allocated bandwidth in bytes" default:"2TB" `
KBucketRefreshInterval time . Duration ` help:"how frequently Kademlia bucket should be refreshed with node stats" default:"1h0m0s" `
2019-04-26 06:17:18 +01:00
}
2019-03-18 10:55:06 +00:00
// Config defines parameters for piecestore endpoint.
type Config struct {
ExpirationGracePeriod time . Duration ` help:"how soon before expiration date should things be considered expired" default:"48h0m0s" `
2019-07-03 14:47:55 +01:00
MaxConcurrentRequests int ` help:"how many concurrent requests are allowed, before uploads are rejected." default:"6" `
2019-07-02 17:06:12 +01:00
OrderLimitGracePeriod time . Duration ` help:"how long after OrderLimit creation date are OrderLimits no longer accepted" default:"1h0m0s" `
2019-07-11 21:04:22 +01:00
RetainTimeBuffer time . Duration ` help:"allows for small differences in the satellite and storagenode clocks" default:"1h0m0s" `
2019-07-26 21:49:08 +01:00
RetainStatus RetainStatus ` help:"allows configuration to enable, disable, or test retain requests from the satellite. Options: (disabled/enabled/debug)" default:"disabled" `
2019-03-18 10:55:06 +00:00
Monitor monitor . Config
2019-03-27 10:24:35 +00:00
Sender orders . SenderConfig
2019-03-18 10:55:06 +00:00
}
2019-07-26 21:49:08 +01:00
// RetainStatus is a type defining the enabled/disabled status of retain requests
type RetainStatus uint32
const (
// RetainDisabled means we do not do anything with retain requests
RetainDisabled RetainStatus = iota + 1
// RetainEnabled means we fully enable retain requests and delete data not defined by bloom filter
RetainEnabled
// RetainDebug means we partially enable retain requests, and print out pieces we should delete, without actually deleting them
RetainDebug
)
// Set implements pflag.Value
func ( v * RetainStatus ) Set ( s string ) error {
switch s {
case "disabled" :
* v = RetainDisabled
case "enabled" :
* v = RetainEnabled
case "debug" :
* v = RetainDebug
default :
return Error . New ( "invalid RetainStatus %q" , s )
}
return nil
}
// Type implements pflag.Value
func ( * RetainStatus ) Type ( ) string { return "storj.RetainStatus" }
// String implements pflag.Value
func ( v * RetainStatus ) String ( ) string {
switch * v {
case RetainDisabled :
return "disabled"
case RetainEnabled :
return "enabled"
case RetainDebug :
return "debug"
default :
return "invalid"
}
}
2019-03-18 10:55:06 +00:00
// Endpoint implements uploading, downloading and deleting for a storage node.
type Endpoint struct {
log * zap . Logger
config Config
2019-04-15 11:12:22 +01:00
signer signing . Signer
trust * trust . Pool
monitor * monitor . Service
2019-03-18 10:55:06 +00:00
store * pieces . Store
pieceinfo pieces . DB
orders orders . DB
usage bandwidth . DB
usedSerials UsedSerials
2019-07-03 14:47:55 +01:00
liveRequests int32
2019-03-18 10:55:06 +00:00
}
// NewEndpoint creates a new piecestore endpoint.
2019-04-15 11:12:22 +01:00
func NewEndpoint ( log * zap . Logger , signer signing . Signer , trust * trust . Pool , monitor * monitor . Service , store * pieces . Store , pieceinfo pieces . DB , orders orders . DB , usage bandwidth . DB , usedSerials UsedSerials , config Config ) ( * Endpoint , error ) {
2019-03-18 10:55:06 +00:00
return & Endpoint {
log : log ,
config : config ,
2019-04-15 11:12:22 +01:00
signer : signer ,
trust : trust ,
monitor : monitor ,
2019-03-18 10:55:06 +00:00
store : store ,
pieceinfo : pieceinfo ,
orders : orders ,
usage : usage ,
usedSerials : usedSerials ,
2019-07-03 14:47:55 +01:00
liveRequests : 0 ,
2019-03-18 10:55:06 +00:00
} , nil
}
// Delete handles deleting a piece on piece store.
func ( endpoint * Endpoint ) Delete ( ctx context . Context , delete * pb . PieceDeleteRequest ) ( _ * pb . PieceDeleteResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-07-03 14:47:55 +01:00
atomic . AddInt32 ( & endpoint . liveRequests , 1 )
defer atomic . AddInt32 ( & endpoint . liveRequests , - 1 )
2019-03-18 10:55:06 +00:00
if delete . Limit . Action != pb . PieceAction_DELETE {
return nil , Error . New ( "expected delete action got %v" , delete . Limit . Action ) // TODO: report grpc status unauthorized or bad request
}
2019-07-30 17:58:08 +01:00
if err := endpoint . verifyOrderLimit ( ctx , delete . Limit ) ; err != nil {
2019-03-18 10:55:06 +00:00
// TODO: report grpc status unauthorized or bad request
return nil , Error . Wrap ( err )
}
// TODO: parallelize this and maybe return early
pieceInfoErr := endpoint . pieceinfo . Delete ( ctx , delete . Limit . SatelliteId , delete . Limit . PieceId )
pieceErr := endpoint . store . Delete ( ctx , delete . Limit . SatelliteId , delete . Limit . PieceId )
if err := errs . Combine ( pieceInfoErr , pieceErr ) ; err != nil {
// explicitly ignoring error because the errors
// TODO: add more debug info
endpoint . log . Error ( "delete failed" , zap . Stringer ( "Piece ID" , delete . Limit . PieceId ) , zap . Error ( err ) )
// TODO: report internal server internal or missing error using grpc status,
// e.g. missing might happen when we get a deletion request after garbage collection has deleted it
} else {
2019-04-09 00:14:09 +01:00
endpoint . log . Info ( "deleted" , zap . Stringer ( "Piece ID" , delete . Limit . PieceId ) )
2019-03-18 10:55:06 +00:00
}
return & pb . PieceDeleteResponse { } , nil
}
// Upload handles uploading a piece on piece store.
func ( endpoint * Endpoint ) Upload ( stream pb . Piecestore_UploadServer ) ( err error ) {
ctx := stream . Context ( )
defer mon . Task ( ) ( & ctx ) ( & err )
2019-07-03 14:47:55 +01:00
liveRequests := atomic . AddInt32 ( & endpoint . liveRequests , 1 )
defer atomic . AddInt32 ( & endpoint . liveRequests , - 1 )
if int ( liveRequests ) > endpoint . config . MaxConcurrentRequests {
endpoint . log . Error ( "upload rejected, too many requests" , zap . Int32 ( "live requests" , liveRequests ) )
return status . Error ( codes . Unavailable , "storage node overloaded" )
}
2019-05-30 16:44:47 +01:00
startTime := time . Now ( ) . UTC ( )
2019-03-18 10:55:06 +00:00
// TODO: set connection timeouts
// TODO: set maximum message size
var message * pb . PieceUploadRequest
message , err = stream . Recv ( )
switch {
case err != nil :
return ErrProtocol . Wrap ( err )
case message == nil :
return ErrProtocol . New ( "expected a message" )
case message . Limit == nil :
return ErrProtocol . New ( "expected order limit as the first message" )
}
limit := message . Limit
2019-06-17 23:38:52 +01:00
endpoint . log . Info ( "upload started" , zap . Stringer ( "Piece ID" , limit . PieceId ) , zap . Stringer ( "SatelliteID" , limit . SatelliteId ) , zap . Stringer ( "Action" , limit . Action ) )
2019-06-11 04:30:17 +01:00
2019-03-18 10:55:06 +00:00
// TODO: verify that we have have expected amount of storage before continuing
if limit . Action != pb . PieceAction_PUT && limit . Action != pb . PieceAction_PUT_REPAIR {
return ErrProtocol . New ( "expected put or put repair action got %v" , limit . Action ) // TODO: report grpc status unauthorized or bad request
}
2019-07-30 17:58:08 +01:00
if err := endpoint . verifyOrderLimit ( ctx , limit ) ; err != nil {
return err
2019-03-18 10:55:06 +00:00
}
2019-05-30 16:44:47 +01:00
var pieceWriter * pieces . Writer
2019-03-18 10:55:06 +00:00
defer func ( ) {
2019-05-30 16:44:47 +01:00
endTime := time . Now ( ) . UTC ( )
dt := endTime . Sub ( startTime )
uploadSize := int64 ( 0 )
if pieceWriter != nil {
uploadSize = pieceWriter . Size ( )
}
uploadRate := float64 ( 0 )
if dt . Seconds ( ) > 0 {
uploadRate = float64 ( uploadSize ) / dt . Seconds ( )
}
uploadDuration := dt . Nanoseconds ( )
2019-03-18 10:55:06 +00:00
if err != nil {
2019-06-04 14:22:00 +01:00
mon . Meter ( "upload_failure_byte_meter" ) . Mark64 ( uploadSize )
2019-05-30 16:44:47 +01:00
mon . IntVal ( "upload_failure_size_bytes" ) . Observe ( uploadSize )
mon . IntVal ( "upload_failure_duration_ns" ) . Observe ( uploadDuration )
mon . FloatVal ( "upload_failure_rate_bytes_per_sec" ) . Observe ( uploadRate )
2019-06-17 23:38:52 +01:00
endpoint . log . Info ( "upload failed" , zap . Stringer ( "Piece ID" , limit . PieceId ) , zap . Stringer ( "SatelliteID" , limit . SatelliteId ) , zap . Stringer ( "Action" , limit . Action ) , zap . Error ( err ) )
2019-03-18 10:55:06 +00:00
} else {
2019-06-04 14:22:00 +01:00
mon . Meter ( "upload_success_byte_meter" ) . Mark64 ( uploadSize )
2019-05-30 16:44:47 +01:00
mon . IntVal ( "upload_success_size_bytes" ) . Observe ( uploadSize )
mon . IntVal ( "upload_success_duration_ns" ) . Observe ( uploadDuration )
mon . FloatVal ( "upload_success_rate_bytes_per_sec" ) . Observe ( uploadRate )
2019-06-17 23:38:52 +01:00
endpoint . log . Info ( "uploaded" , zap . Stringer ( "Piece ID" , limit . PieceId ) , zap . Stringer ( "SatelliteID" , limit . SatelliteId ) , zap . Stringer ( "Action" , limit . Action ) )
2019-03-18 10:55:06 +00:00
}
} ( )
2019-05-30 16:44:47 +01:00
pieceWriter , err = endpoint . store . Writer ( ctx , limit . SatelliteId , limit . PieceId )
2019-03-18 10:55:06 +00:00
if err != nil {
return ErrInternal . Wrap ( err ) // TODO: report grpc status internal server error
}
defer func ( ) {
// cancel error if it hasn't been committed
2019-06-04 13:31:39 +01:00
if cancelErr := pieceWriter . Cancel ( ctx ) ; cancelErr != nil {
2019-04-17 11:09:44 +01:00
endpoint . log . Error ( "error during canceling a piece write" , zap . Error ( cancelErr ) )
2019-03-18 10:55:06 +00:00
}
} ( )
2019-04-15 11:12:22 +01:00
availableBandwidth , err := endpoint . monitor . AvailableBandwidth ( ctx )
if err != nil {
return ErrInternal . Wrap ( err )
}
availableSpace , err := endpoint . monitor . AvailableSpace ( ctx )
if err != nil {
return ErrInternal . Wrap ( err )
}
2019-07-01 16:54:11 +01:00
largestOrder := pb . Order { }
2019-07-11 21:51:40 +01:00
defer endpoint . SaveOrder ( ctx , limit , & largestOrder )
2019-03-18 10:55:06 +00:00
for {
message , err = stream . Recv ( ) // TODO: reuse messages to avoid allocations
if err == io . EOF {
return ErrProtocol . New ( "unexpected EOF" )
} else if err != nil {
return ErrProtocol . Wrap ( err ) // TODO: report grpc status bad message
}
if message == nil {
return ErrProtocol . New ( "expected a message" ) // TODO: report grpc status bad message
}
2019-07-08 15:26:19 +01:00
if message . Order == nil && message . Chunk == nil && message . Done == nil {
return ErrProtocol . New ( "expected a message" ) // TODO: report grpc status bad message
}
2019-03-18 10:55:06 +00:00
2019-07-08 15:26:19 +01:00
if message . Order != nil {
2019-07-11 21:51:40 +01:00
if err := endpoint . VerifyOrder ( ctx , limit , message . Order , largestOrder . Amount ) ; err != nil {
2019-03-18 10:55:06 +00:00
return err
}
largestOrder = * message . Order
2019-07-08 15:26:19 +01:00
}
2019-03-18 10:55:06 +00:00
2019-07-08 15:26:19 +01:00
if message . Chunk != nil {
2019-03-18 10:55:06 +00:00
if message . Chunk . Offset != pieceWriter . Size ( ) {
return ErrProtocol . New ( "chunk out of order" ) // TODO: report grpc status bad message
}
2019-04-15 11:12:22 +01:00
chunkSize := int64 ( len ( message . Chunk . Data ) )
if largestOrder . Amount < pieceWriter . Size ( ) + chunkSize {
2019-03-18 10:55:06 +00:00
// TODO: should we write currently and give a chance for uplink to remedy the situation?
return ErrProtocol . New ( "not enough allocated, allocated=%v writing=%v" , largestOrder . Amount , pieceWriter . Size ( ) + int64 ( len ( message . Chunk . Data ) ) ) // TODO: report grpc status ?
}
2019-04-15 11:12:22 +01:00
availableBandwidth -= chunkSize
if availableBandwidth < 0 {
return ErrProtocol . New ( "out of bandwidth" )
}
availableSpace -= chunkSize
if availableSpace < 0 {
return ErrProtocol . New ( "out of space" )
}
2019-03-18 10:55:06 +00:00
if _ , err := pieceWriter . Write ( message . Chunk . Data ) ; err != nil {
return ErrInternal . Wrap ( err ) // TODO: report grpc status internal server error
}
2019-07-08 15:26:19 +01:00
}
2019-03-18 10:55:06 +00:00
2019-07-08 15:26:19 +01:00
if message . Done != nil {
2019-03-18 10:55:06 +00:00
expectedHash := pieceWriter . Hash ( )
2019-07-11 21:51:40 +01:00
if err := endpoint . VerifyPieceHash ( ctx , limit , message . Done , expectedHash ) ; err != nil {
2019-03-18 10:55:06 +00:00
return err // TODO: report grpc status internal server error
}
2019-07-15 16:26:18 +01:00
if message . Done . PieceSize != pieceWriter . Size ( ) {
return ErrProtocol . New ( "Size of finished piece does not match size declared by uplink! %d != %d" ,
message . Done . GetPieceSize ( ) , pieceWriter . Size ( ) )
}
2019-07-01 10:36:35 +01:00
2019-06-04 13:31:39 +01:00
if err := pieceWriter . Commit ( ctx ) ; err != nil {
2019-03-18 10:55:06 +00:00
return ErrInternal . Wrap ( err ) // TODO: report grpc status internal server error
}
// TODO: do this in a goroutine
{
// TODO: maybe this should be as a pieceWriter.Commit(ctx, info)
info := & pieces . Info {
SatelliteID : limit . SatelliteId ,
PieceID : limit . PieceId ,
PieceSize : pieceWriter . Size ( ) ,
2019-07-09 22:54:00 +01:00
PieceCreation : limit . OrderCreation ,
PieceExpiration : limit . PieceExpiration ,
2019-03-18 10:55:06 +00:00
2019-07-11 21:51:40 +01:00
OrderLimit : limit ,
2019-03-18 10:55:06 +00:00
UplinkPieceHash : message . Done ,
}
if err := endpoint . pieceinfo . Add ( ctx , info ) ; err != nil {
2019-06-21 17:16:39 +01:00
ignoreCancelContext := context . Background ( )
deleteErr := endpoint . store . Delete ( ignoreCancelContext , limit . SatelliteId , limit . PieceId )
return ErrInternal . Wrap ( errs . Combine ( err , deleteErr ) )
2019-03-18 10:55:06 +00:00
}
}
2019-06-05 14:47:01 +01:00
storageNodeHash , err := signing . SignPieceHash ( ctx , endpoint . signer , & pb . PieceHash {
2019-07-03 17:14:37 +01:00
PieceId : limit . PieceId ,
Hash : expectedHash ,
PieceSize : pieceWriter . Size ( ) ,
Timestamp : time . Now ( ) ,
2019-03-18 10:55:06 +00:00
} )
if err != nil {
return ErrInternal . Wrap ( err )
}
closeErr := stream . SendAndClose ( & pb . PieceUploadResponse {
Done : storageNodeHash ,
} )
return ErrProtocol . Wrap ( ignoreEOF ( closeErr ) )
}
}
}
// Download implements downloading a piece from piece store.
func ( endpoint * Endpoint ) Download ( stream pb . Piecestore_DownloadServer ) ( err error ) {
ctx := stream . Context ( )
defer mon . Task ( ) ( & ctx ) ( & err )
2019-07-03 14:47:55 +01:00
atomic . AddInt32 ( & endpoint . liveRequests , 1 )
defer atomic . AddInt32 ( & endpoint . liveRequests , - 1 )
2019-05-30 16:44:47 +01:00
startTime := time . Now ( ) . UTC ( )
2019-03-18 10:55:06 +00:00
// TODO: set connection timeouts
// TODO: set maximum message size
var message * pb . PieceDownloadRequest
// receive limit and chunk from uplink
message , err = stream . Recv ( )
if err != nil {
return ErrProtocol . Wrap ( err )
}
if message . Limit == nil || message . Chunk == nil {
return ErrProtocol . New ( "expected order limit and chunk as the first message" )
}
limit , chunk := message . Limit , message . Chunk
2019-06-17 23:38:52 +01:00
endpoint . log . Info ( "download started" , zap . Stringer ( "Piece ID" , limit . PieceId ) , zap . Stringer ( "SatelliteID" , limit . SatelliteId ) , zap . Stringer ( "Action" , limit . Action ) )
2019-06-11 04:30:17 +01:00
2019-03-18 10:55:06 +00:00
if limit . Action != pb . PieceAction_GET && limit . Action != pb . PieceAction_GET_REPAIR && limit . Action != pb . PieceAction_GET_AUDIT {
return ErrProtocol . New ( "expected get or get repair or audit action got %v" , limit . Action ) // TODO: report grpc status unauthorized or bad request
}
if chunk . ChunkSize > limit . Limit {
return ErrProtocol . New ( "requested more that order limit allows, limit=%v requested=%v" , limit . Limit , chunk . ChunkSize )
}
2019-07-30 17:58:08 +01:00
if err := endpoint . verifyOrderLimit ( ctx , limit ) ; err != nil {
2019-03-18 10:55:06 +00:00
return Error . Wrap ( err ) // TODO: report grpc status unauthorized or bad request
}
2019-05-30 16:44:47 +01:00
var pieceReader * pieces . Reader
2019-03-18 10:55:06 +00:00
defer func ( ) {
2019-05-30 16:44:47 +01:00
endTime := time . Now ( ) . UTC ( )
dt := endTime . Sub ( startTime )
downloadSize := int64 ( 0 )
if pieceReader != nil {
downloadSize = pieceReader . Size ( )
}
downloadRate := float64 ( 0 )
if dt . Seconds ( ) > 0 {
downloadRate = float64 ( downloadSize ) / dt . Seconds ( )
}
downloadDuration := dt . Nanoseconds ( )
2019-03-18 10:55:06 +00:00
if err != nil {
2019-06-04 14:22:00 +01:00
mon . Meter ( "download_failure_byte_meter" ) . Mark64 ( downloadSize )
2019-05-30 16:44:47 +01:00
mon . IntVal ( "download_failure_size_bytes" ) . Observe ( downloadSize )
mon . IntVal ( "download_failure_duration_ns" ) . Observe ( downloadDuration )
mon . FloatVal ( "download_failure_rate_bytes_per_sec" ) . Observe ( downloadRate )
2019-06-17 23:38:52 +01:00
endpoint . log . Info ( "download failed" , zap . Stringer ( "Piece ID" , limit . PieceId ) , zap . Stringer ( "SatelliteID" , limit . SatelliteId ) , zap . Stringer ( "Action" , limit . Action ) , zap . Error ( err ) )
2019-03-18 10:55:06 +00:00
} else {
2019-06-04 14:22:00 +01:00
mon . Meter ( "download_success_byte_meter" ) . Mark64 ( downloadSize )
2019-05-30 16:44:47 +01:00
mon . IntVal ( "download_success_size_bytes" ) . Observe ( downloadSize )
mon . IntVal ( "download_success_duration_ns" ) . Observe ( downloadDuration )
mon . FloatVal ( "download_success_rate_bytes_per_sec" ) . Observe ( downloadRate )
2019-06-17 23:38:52 +01:00
endpoint . log . Info ( "downloaded" , zap . Stringer ( "Piece ID" , limit . PieceId ) , zap . Stringer ( "SatelliteID" , limit . SatelliteId ) , zap . Stringer ( "Action" , limit . Action ) )
2019-03-18 10:55:06 +00:00
}
} ( )
2019-05-30 16:44:47 +01:00
pieceReader , err = endpoint . store . Reader ( ctx , limit . SatelliteId , limit . PieceId )
2019-03-18 10:55:06 +00:00
if err != nil {
2019-06-03 10:17:09 +01:00
if os . IsNotExist ( err ) {
return status . Error ( codes . NotFound , err . Error ( ) )
}
return status . Error ( codes . Internal , err . Error ( ) )
2019-03-18 10:55:06 +00:00
}
defer func ( ) {
err := pieceReader . Close ( ) // similarly how transcation Rollback works
if err != nil {
// no reason to report this error to the uplink
endpoint . log . Error ( "failed to close piece reader" , zap . Error ( err ) )
}
} ( )
// TODO: verify chunk.Size behavior logic with regards to reading all
if chunk . Offset + chunk . ChunkSize > pieceReader . Size ( ) {
return Error . New ( "requested more data than available, requesting=%v available=%v" , chunk . Offset + chunk . ChunkSize , pieceReader . Size ( ) )
}
2019-04-15 11:12:22 +01:00
availableBandwidth , err := endpoint . monitor . AvailableBandwidth ( ctx )
if err != nil {
return ErrInternal . Wrap ( err )
}
2019-03-18 10:55:06 +00:00
throttle := sync2 . NewThrottle ( )
// TODO: see whether this can be implemented without a goroutine
group , ctx := errgroup . WithContext ( ctx )
group . Go ( func ( ) ( err error ) {
var maximumChunkSize = 1 * memory . MiB . Int64 ( )
currentOffset := chunk . Offset
unsentAmount := chunk . ChunkSize
for unsentAmount > 0 {
tryToSend := min ( unsentAmount , maximumChunkSize )
// TODO: add timeout here
chunkSize , err := throttle . ConsumeOrWait ( tryToSend )
if err != nil {
// this can happen only because uplink decided to close the connection
return nil
}
chunkData := make ( [ ] byte , chunkSize )
_ , err = pieceReader . Seek ( currentOffset , io . SeekStart )
if err != nil {
return ErrInternal . Wrap ( err )
}
2019-07-25 09:22:15 +01:00
// ReadFull is required to ensure we are sending the right amount of data.
_ , err = io . ReadFull ( pieceReader , chunkData )
2019-03-18 10:55:06 +00:00
if err != nil {
return ErrInternal . Wrap ( err )
}
err = stream . Send ( & pb . PieceDownloadResponse {
Chunk : & pb . PieceDownloadResponse_Chunk {
Offset : currentOffset ,
Data : chunkData ,
} ,
} )
if err != nil {
// err is io.EOF when uplink asked for a piece, but decided not to retrieve it,
// no need to propagate it
return ErrProtocol . Wrap ( ignoreEOF ( err ) )
}
currentOffset += chunkSize
unsentAmount -= chunkSize
}
return nil
} )
recvErr := func ( ) ( err error ) {
2019-07-01 16:54:11 +01:00
largestOrder := pb . Order { }
2019-07-11 21:51:40 +01:00
defer endpoint . SaveOrder ( ctx , limit , & largestOrder )
2019-03-18 10:55:06 +00:00
// ensure that we always terminate sending goroutine
defer throttle . Fail ( io . EOF )
for {
// TODO: check errors
// TODO: add timeout here
message , err = stream . Recv ( )
if err != nil {
// err is io.EOF when uplink closed the connection, no need to return error
return ErrProtocol . Wrap ( ignoreEOF ( err ) )
}
if message == nil || message . Order == nil {
return ErrProtocol . New ( "expected order as the message" )
}
2019-07-11 21:51:40 +01:00
if err := endpoint . VerifyOrder ( ctx , limit , message . Order , largestOrder . Amount ) ; err != nil {
2019-03-18 10:55:06 +00:00
return err
}
2019-04-15 11:12:22 +01:00
chunkSize := message . Order . Amount - largestOrder . Amount
availableBandwidth -= chunkSize
if availableBandwidth < 0 {
return ErrProtocol . New ( "out of bandwidth" )
}
if err := throttle . Produce ( chunkSize ) ; err != nil {
2019-03-18 10:55:06 +00:00
// shouldn't happen since only receiving side is calling Fail
return ErrInternal . Wrap ( err )
}
largestOrder = * message . Order
}
} ( )
// ensure we wait for sender to complete
sendErr := group . Wait ( )
return Error . Wrap ( errs . Combine ( sendErr , recvErr ) )
}
// SaveOrder saves the order with all necessary information. It assumes it has been already verified.
2019-07-11 21:51:40 +01:00
func ( endpoint * Endpoint ) SaveOrder ( ctx context . Context , limit * pb . OrderLimit , order * pb . Order ) {
2019-06-04 13:31:39 +01:00
var err error
defer mon . Task ( ) ( & ctx ) ( & err )
2019-03-18 10:55:06 +00:00
// TODO: do this in a goroutine
if order == nil || order . Amount <= 0 {
return
}
2019-06-04 13:31:39 +01:00
err = endpoint . orders . Enqueue ( ctx , & orders . Info {
2019-07-09 22:33:45 +01:00
Limit : limit ,
Order : order ,
2019-03-18 10:55:06 +00:00
} )
if err != nil {
endpoint . log . Error ( "failed to add order" , zap . Error ( err ) )
} else {
2019-06-04 13:31:39 +01:00
err = endpoint . usage . Add ( ctx , limit . SatelliteId , limit . Action , order . Amount , time . Now ( ) )
2019-03-18 10:55:06 +00:00
if err != nil {
endpoint . log . Error ( "failed to add bandwidth usage" , zap . Error ( err ) )
}
}
}
2019-07-10 14:41:47 +01:00
// Retain keeps only piece ids specified in the request
2019-07-11 21:04:22 +01:00
func ( endpoint * Endpoint ) Retain ( ctx context . Context , retainReq * pb . RetainRequest ) ( res * pb . RetainResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-07-26 21:49:08 +01:00
// if retain status is disabled, quit immediately
if endpoint . config . RetainStatus == RetainDisabled {
return & pb . RetainResponse { } , nil
}
2019-07-10 14:41:47 +01:00
peer , err := identity . PeerIdentityFromContext ( ctx )
if err != nil {
2019-07-11 21:04:22 +01:00
return nil , status . Error ( codes . Unauthenticated , Error . Wrap ( err ) . Error ( ) )
}
err = endpoint . trust . VerifySatelliteID ( ctx , peer . ID )
if err != nil {
return nil , status . Error ( codes . PermissionDenied , Error . New ( "retain called with untrusted ID" ) . Error ( ) )
2019-07-10 14:41:47 +01:00
}
filter , err := bloomfilter . NewFromBytes ( retainReq . GetFilter ( ) )
if err != nil {
2019-07-11 21:04:22 +01:00
return nil , status . Error ( codes . InvalidArgument , Error . Wrap ( err ) . Error ( ) )
2019-07-10 14:41:47 +01:00
}
const limit = 1000
offset := 0
2019-07-11 21:04:22 +01:00
numDeleted := 0
2019-07-10 14:41:47 +01:00
hasMorePieces := true
for hasMorePieces {
2019-07-11 21:04:22 +01:00
// subtract some time to leave room for clock difference between the satellite and storage node
createdBefore := retainReq . GetCreationDate ( ) . Add ( - endpoint . config . RetainTimeBuffer )
2019-07-10 14:41:47 +01:00
pieceIDs , err := endpoint . pieceinfo . GetPieceIDs ( ctx , peer . ID , createdBefore , limit , offset )
if err != nil {
2019-07-11 21:04:22 +01:00
return nil , status . Error ( codes . Internal , Error . Wrap ( err ) . Error ( ) )
2019-07-10 14:41:47 +01:00
}
for _ , pieceID := range pieceIDs {
if ! filter . Contains ( pieceID ) {
2019-07-26 21:49:08 +01:00
endpoint . log . Sugar ( ) . Debugf ( "About to delete piece id (%s) from satellite (%s). RetainStatus: %s" , pieceID . String ( ) , peer . ID . String ( ) , endpoint . config . RetainStatus . String ( ) )
// if retain status is enabled, delete pieceid
if endpoint . config . RetainStatus == RetainEnabled {
if err = endpoint . store . Delete ( ctx , peer . ID , pieceID ) ; err != nil {
endpoint . log . Error ( "failed to delete a piece" , zap . Error ( err ) )
// continue because if we fail to delete from file system,
// we need to keep the pieceinfo so we can delete next time
continue
}
if err = endpoint . pieceinfo . Delete ( ctx , peer . ID , pieceID ) ; err != nil {
endpoint . log . Error ( "failed to delete piece info" , zap . Error ( err ) )
}
2019-07-10 14:41:47 +01:00
}
2019-07-26 21:49:08 +01:00
2019-07-11 21:04:22 +01:00
numDeleted ++
2019-07-10 14:41:47 +01:00
}
}
2019-07-26 21:49:08 +01:00
2019-07-10 14:41:47 +01:00
hasMorePieces = ( len ( pieceIDs ) == limit )
offset += len ( pieceIDs )
2019-07-11 21:04:22 +01:00
offset -= numDeleted
2019-07-10 14:41:47 +01:00
// We call Gosched() here because the GC process is expected to be long and we want to keep it at low priority,
// so other goroutines can continue serving requests.
runtime . Gosched ( )
}
2019-07-26 21:49:08 +01:00
endpoint . log . Sugar ( ) . Debugf ( "Deleted %d pieces during retain. RetainStatus: %s" , numDeleted , endpoint . config . RetainStatus . String ( ) )
2019-07-10 14:41:47 +01:00
return & pb . RetainResponse { } , nil
}
2019-03-18 10:55:06 +00:00
// min finds the min of two values
func min ( a , b int64 ) int64 {
if a < b {
return a
}
return b
}
// ignoreEOF ignores io.EOF error.
func ignoreEOF ( err error ) error {
if err == io . EOF {
return nil
}
return err
}