2019-03-18 10:55:06 +00:00
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package piecestore
import (
"context"
"io"
2019-06-03 10:17:09 +01:00
"os"
2019-07-03 14:47:55 +01:00
"sync/atomic"
2019-03-18 10:55:06 +00:00
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
2019-08-02 23:49:39 +01:00
monkit "gopkg.in/spacemonkeygo/monkit.v2"
2019-03-18 10:55:06 +00:00
2019-08-23 16:16:43 +01:00
"storj.io/storj/internal/errs2"
2019-03-18 10:55:06 +00:00
"storj.io/storj/internal/memory"
"storj.io/storj/internal/sync2"
2019-07-10 14:41:47 +01:00
"storj.io/storj/pkg/bloomfilter"
2019-03-18 10:55:06 +00:00
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/pb"
2019-09-19 05:46:39 +01:00
"storj.io/storj/pkg/rpc/rpcstatus"
2019-07-28 06:55:36 +01:00
"storj.io/storj/pkg/signing"
2019-07-03 18:29:18 +01:00
"storj.io/storj/pkg/storj"
2019-03-18 10:55:06 +00:00
"storj.io/storj/storagenode/bandwidth"
"storj.io/storj/storagenode/monitor"
"storj.io/storj/storagenode/orders"
"storj.io/storj/storagenode/pieces"
2019-08-19 19:52:47 +01:00
"storj.io/storj/storagenode/retain"
2019-03-18 10:55:06 +00:00
"storj.io/storj/storagenode/trust"
)
var (
mon = monkit . Package ( )
// Error is the default error class for piecestore errors
Error = errs . Class ( "piecestore" )
// ErrProtocol is the default error class for protocol errors.
ErrProtocol = errs . Class ( "piecestore protocol" )
// ErrInternal is the default error class for internal piecestore errors.
ErrInternal = errs . Class ( "piecestore internal" )
)
2019-08-02 23:49:39 +01:00
2019-03-18 10:55:06 +00:00
var _ pb . PiecestoreServer = ( * Endpoint ) ( nil )
2019-04-26 06:17:18 +01:00
// OldConfig contains everything necessary for a server
type OldConfig struct {
2019-07-03 18:29:18 +01:00
Path string ` help:"path to store data in" default:"$CONFDIR/storage" `
2019-10-12 21:34:41 +01:00
WhitelistedSatellites storj . NodeURLs ` help:"a comma-separated list of approved satellite node urls" devDefault:"" releaseDefault:"12EayRS2V1kEsWESU9QMRseFhdxYxKicsiFmxrsLZHeLUtdps3S@us-central-1.tardigrade.io:7777,118UWpMCHzs6CvSgWd9BfFVjw5K9pZbJjkfZJexMtSkmKxvvAW@satellite.stefan-benten.de:7777,121RTSDpyNZVcEU84Ticf2L1ntiuUimbWgfATz21tuvgk3vzoA6@asia-east-1.tardigrade.io:7777,12L9ZFwhzVpuEKMUNUqkaTLGzwY9G24tbiigLiXpmZWKwmcNDDs@europe-west-1.tardigrade.io:7777" `
2019-07-03 18:29:18 +01:00
AllocatedDiskSpace memory . Size ` user:"true" help:"total allocated disk space in bytes" default:"1TB" `
AllocatedBandwidth memory . Size ` user:"true" help:"total allocated bandwidth in bytes" default:"2TB" `
KBucketRefreshInterval time . Duration ` help:"how frequently Kademlia bucket should be refreshed with node stats" default:"1h0m0s" `
2019-04-26 06:17:18 +01:00
}
2019-03-18 10:55:06 +00:00
// Config defines parameters for piecestore endpoint.
type Config struct {
ExpirationGracePeriod time . Duration ` help:"how soon before expiration date should things be considered expired" default:"48h0m0s" `
2019-10-28 19:12:49 +00:00
MaxConcurrentRequests int ` help:"how many concurrent requests are allowed, before uploads are rejected. 0 represents unlimited." default:"0" `
2019-10-13 16:40:24 +01:00
OrderLimitGracePeriod time . Duration ` help:"how long after OrderLimit creation date are OrderLimits no longer accepted" default:"24h0m0s" `
2019-08-12 22:43:05 +01:00
CacheSyncInterval time . Duration ` help:"how often the space used cache is synced to persistent storage" releaseDefault:"1h0m0s" devDefault:"0h1m0s" `
2019-03-18 10:55:06 +00:00
2019-10-13 16:40:24 +01:00
RetainTimeBuffer time . Duration ` help:"allows for small differences in the satellite and storagenode clocks" default:"48h0m0s" `
2019-08-19 19:52:47 +01:00
2019-03-18 10:55:06 +00:00
Monitor monitor . Config
2019-08-22 15:33:14 +01:00
Orders orders . Config
2019-03-18 10:55:06 +00:00
}
2019-10-03 19:31:39 +01:00
type pingStatsSource interface {
WasPinged ( when time . Time )
}
2019-09-10 14:24:16 +01:00
// Endpoint implements uploading, downloading and deleting for a storage node..
//
// architecture: Endpoint
2019-03-18 10:55:06 +00:00
type Endpoint struct {
2019-10-28 19:12:49 +00:00
log * zap . Logger
config Config
grpcReqLimit int
2019-03-18 10:55:06 +00:00
2019-10-03 19:31:39 +01:00
signer signing . Signer
trust * trust . Pool
monitor * monitor . Service
retain * retain . Service
pingStats pingStatsSource
2019-03-18 10:55:06 +00:00
store * pieces . Store
orders orders . DB
usage bandwidth . DB
usedSerials UsedSerials
2019-07-03 14:47:55 +01:00
2019-10-23 20:43:43 +01:00
// liveRequests tracks the total number of incoming rpc requests. For gRPC
// requests only, this number is compared to config.MaxConcurrentRequests
// and limits the number of gRPC requests. dRPC requests are tracked but
// not limited.
liveRequests int32
2019-03-18 10:55:06 +00:00
}
2019-09-12 22:09:46 +01:00
// drpcEndpoint wraps streaming methods so that they can be used with drpc
type drpcEndpoint struct { * Endpoint }
// DRPC returns a DRPC form of the endpoint.
func ( endpoint * Endpoint ) DRPC ( ) pb . DRPCPiecestoreServer { return & drpcEndpoint { Endpoint : endpoint } }
2019-03-18 10:55:06 +00:00
// NewEndpoint creates a new piecestore endpoint.
2019-10-03 19:31:39 +01:00
func NewEndpoint ( log * zap . Logger , signer signing . Signer , trust * trust . Pool , monitor * monitor . Service , retain * retain . Service , pingStats pingStatsSource , store * pieces . Store , orders orders . DB , usage bandwidth . DB , usedSerials UsedSerials , config Config ) ( * Endpoint , error ) {
2019-10-28 19:12:49 +00:00
// If config.MaxConcurrentRequests is set we want to repsect it for grpc.
// However, if it is 0 (unlimited) we force a limit.
grpcReqLimit := config . MaxConcurrentRequests
if grpcReqLimit <= 0 {
grpcReqLimit = 7
}
2019-03-18 10:55:06 +00:00
return & Endpoint {
2019-10-28 19:12:49 +00:00
log : log ,
config : config ,
grpcReqLimit : grpcReqLimit ,
2019-03-18 10:55:06 +00:00
2019-10-03 19:31:39 +01:00
signer : signer ,
trust : trust ,
monitor : monitor ,
retain : retain ,
pingStats : pingStats ,
2019-03-18 10:55:06 +00:00
store : store ,
orders : orders ,
usage : usage ,
usedSerials : usedSerials ,
2019-07-03 14:47:55 +01:00
2019-10-23 20:43:43 +01:00
liveRequests : 0 ,
2019-03-18 10:55:06 +00:00
} , nil
}
2019-08-02 23:49:39 +01:00
var monLiveRequests = mon . TaskNamed ( "live-request" )
2019-03-18 10:55:06 +00:00
// Delete handles deleting a piece on piece store.
func ( endpoint * Endpoint ) Delete ( ctx context . Context , delete * pb . PieceDeleteRequest ) ( _ * pb . PieceDeleteResponse , err error ) {
2019-08-02 23:49:39 +01:00
defer monLiveRequests ( & ctx ) ( & err )
2019-03-18 10:55:06 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-10-23 20:43:43 +01:00
atomic . AddInt32 ( & endpoint . liveRequests , 1 )
defer atomic . AddInt32 ( & endpoint . liveRequests , - 1 )
2019-07-03 14:47:55 +01:00
2019-10-03 19:31:39 +01:00
endpoint . pingStats . WasPinged ( time . Now ( ) )
2019-03-18 10:55:06 +00:00
if delete . Limit . Action != pb . PieceAction_DELETE {
2019-09-19 05:46:39 +01:00
return nil , Error . New ( "expected delete action got %v" , delete . Limit . Action ) // TODO: report rpc status unauthorized or bad request
2019-03-18 10:55:06 +00:00
}
2019-07-30 17:58:08 +01:00
if err := endpoint . verifyOrderLimit ( ctx , delete . Limit ) ; err != nil {
2019-09-19 05:46:39 +01:00
// TODO: report rpc status unauthorized or bad request
2019-03-18 10:55:06 +00:00
return nil , Error . Wrap ( err )
}
2019-08-08 02:47:30 +01:00
if err := endpoint . store . Delete ( ctx , delete . Limit . SatelliteId , delete . Limit . PieceId ) ; err != nil {
2019-03-18 10:55:06 +00:00
// explicitly ignoring error because the errors
// TODO: add more debug info
2019-11-05 21:04:07 +00:00
endpoint . log . Error ( "delete failed" , zap . Stringer ( "Satellite ID" , delete . Limit . SatelliteId ) , zap . Stringer ( "Piece ID" , delete . Limit . PieceId ) , zap . Error ( err ) )
2019-09-19 05:46:39 +01:00
// TODO: report rpc status of internal server error or missing error,
2019-03-18 10:55:06 +00:00
// e.g. missing might happen when we get a deletion request after garbage collection has deleted it
} else {
2019-11-05 21:04:07 +00:00
endpoint . log . Info ( "deleted" , zap . Stringer ( "Satellite ID" , delete . Limit . SatelliteId ) , zap . Stringer ( "Piece ID" , delete . Limit . PieceId ) )
2019-03-18 10:55:06 +00:00
}
return & pb . PieceDeleteResponse { } , nil
}
// Upload handles uploading a piece on piece store.
func ( endpoint * Endpoint ) Upload ( stream pb . Piecestore_UploadServer ) ( err error ) {
2019-10-28 19:12:49 +00:00
return endpoint . doUpload ( stream , endpoint . grpcReqLimit )
2019-09-12 22:09:46 +01:00
}
// Upload handles uploading a piece on piece store.
func ( endpoint * drpcEndpoint ) Upload ( stream pb . DRPCPiecestore_UploadStream ) ( err error ) {
2019-10-28 19:12:49 +00:00
return endpoint . doUpload ( stream , endpoint . config . MaxConcurrentRequests )
2019-09-12 22:09:46 +01:00
}
// uploadStream is the minimum interface required to perform settlements.
type uploadStream interface {
Context ( ) context . Context
Recv ( ) ( * pb . PieceUploadRequest , error )
SendAndClose ( * pb . PieceUploadResponse ) error
}
// doUpload handles uploading a piece on piece store.
2019-10-28 19:12:49 +00:00
func ( endpoint * Endpoint ) doUpload ( stream uploadStream , requestLimit int ) ( err error ) {
2019-03-18 10:55:06 +00:00
ctx := stream . Context ( )
2019-08-02 23:49:39 +01:00
defer monLiveRequests ( & ctx ) ( & err )
2019-03-18 10:55:06 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-07-03 14:47:55 +01:00
2019-10-23 20:43:43 +01:00
liveRequests := atomic . AddInt32 ( & endpoint . liveRequests , 1 )
defer atomic . AddInt32 ( & endpoint . liveRequests , - 1 )
2019-07-03 14:47:55 +01:00
2019-10-03 19:31:39 +01:00
endpoint . pingStats . WasPinged ( time . Now ( ) )
2019-10-28 19:12:49 +00:00
if requestLimit > 0 && int ( liveRequests ) > requestLimit {
2019-10-23 20:43:43 +01:00
endpoint . log . Error ( "upload rejected, too many requests" , zap . Int32 ( "live requests" , liveRequests ) )
2019-09-19 05:46:39 +01:00
return rpcstatus . Error ( rpcstatus . Unavailable , "storage node overloaded" )
2019-07-03 14:47:55 +01:00
}
2019-05-30 16:44:47 +01:00
startTime := time . Now ( ) . UTC ( )
2019-03-18 10:55:06 +00:00
// TODO: set connection timeouts
// TODO: set maximum message size
var message * pb . PieceUploadRequest
message , err = stream . Recv ( )
switch {
case err != nil :
return ErrProtocol . Wrap ( err )
case message == nil :
return ErrProtocol . New ( "expected a message" )
case message . Limit == nil :
return ErrProtocol . New ( "expected order limit as the first message" )
}
limit := message . Limit
2019-11-05 21:04:07 +00:00
endpoint . log . Info ( "upload started" , zap . Stringer ( "Piece ID" , limit . PieceId ) , zap . Stringer ( "Satellite ID" , limit . SatelliteId ) , zap . Stringer ( "Action" , limit . Action ) )
2019-06-11 04:30:17 +01:00
2019-03-18 10:55:06 +00:00
// TODO: verify that we have have expected amount of storage before continuing
if limit . Action != pb . PieceAction_PUT && limit . Action != pb . PieceAction_PUT_REPAIR {
2019-09-19 05:46:39 +01:00
return ErrProtocol . New ( "expected put or put repair action got %v" , limit . Action ) // TODO: report rpc status unauthorized or bad request
2019-03-18 10:55:06 +00:00
}
2019-07-30 17:58:08 +01:00
if err := endpoint . verifyOrderLimit ( ctx , limit ) ; err != nil {
return err
2019-03-18 10:55:06 +00:00
}
2019-05-30 16:44:47 +01:00
var pieceWriter * pieces . Writer
2019-03-18 10:55:06 +00:00
defer func ( ) {
2019-05-30 16:44:47 +01:00
endTime := time . Now ( ) . UTC ( )
dt := endTime . Sub ( startTime )
uploadSize := int64 ( 0 )
if pieceWriter != nil {
uploadSize = pieceWriter . Size ( )
}
uploadRate := float64 ( 0 )
if dt . Seconds ( ) > 0 {
uploadRate = float64 ( uploadSize ) / dt . Seconds ( )
}
uploadDuration := dt . Nanoseconds ( )
2019-03-18 10:55:06 +00:00
if err != nil {
2019-06-04 14:22:00 +01:00
mon . Meter ( "upload_failure_byte_meter" ) . Mark64 ( uploadSize )
2019-05-30 16:44:47 +01:00
mon . IntVal ( "upload_failure_size_bytes" ) . Observe ( uploadSize )
mon . IntVal ( "upload_failure_duration_ns" ) . Observe ( uploadDuration )
mon . FloatVal ( "upload_failure_rate_bytes_per_sec" ) . Observe ( uploadRate )
2019-11-05 21:04:07 +00:00
endpoint . log . Info ( "upload failed" , zap . Stringer ( "Piece ID" , limit . PieceId ) , zap . Stringer ( "Satellite ID" , limit . SatelliteId ) , zap . Stringer ( "Action" , limit . Action ) , zap . Error ( err ) )
2019-03-18 10:55:06 +00:00
} else {
2019-06-04 14:22:00 +01:00
mon . Meter ( "upload_success_byte_meter" ) . Mark64 ( uploadSize )
2019-05-30 16:44:47 +01:00
mon . IntVal ( "upload_success_size_bytes" ) . Observe ( uploadSize )
mon . IntVal ( "upload_success_duration_ns" ) . Observe ( uploadDuration )
mon . FloatVal ( "upload_success_rate_bytes_per_sec" ) . Observe ( uploadRate )
2019-11-05 21:04:07 +00:00
endpoint . log . Info ( "uploaded" , zap . Stringer ( "Piece ID" , limit . PieceId ) , zap . Stringer ( "Satellite ID" , limit . SatelliteId ) , zap . Stringer ( "Action" , limit . Action ) )
2019-03-18 10:55:06 +00:00
}
} ( )
2019-05-30 16:44:47 +01:00
pieceWriter , err = endpoint . store . Writer ( ctx , limit . SatelliteId , limit . PieceId )
2019-03-18 10:55:06 +00:00
if err != nil {
2019-09-19 05:46:39 +01:00
return ErrInternal . Wrap ( err ) // TODO: report rpc status internal server error
2019-03-18 10:55:06 +00:00
}
defer func ( ) {
// cancel error if it hasn't been committed
2019-06-04 13:31:39 +01:00
if cancelErr := pieceWriter . Cancel ( ctx ) ; cancelErr != nil {
2019-04-17 11:09:44 +01:00
endpoint . log . Error ( "error during canceling a piece write" , zap . Error ( cancelErr ) )
2019-03-18 10:55:06 +00:00
}
} ( )
2019-04-15 11:12:22 +01:00
availableBandwidth , err := endpoint . monitor . AvailableBandwidth ( ctx )
if err != nil {
return ErrInternal . Wrap ( err )
}
availableSpace , err := endpoint . monitor . AvailableSpace ( ctx )
if err != nil {
return ErrInternal . Wrap ( err )
}
2019-10-16 03:17:17 +01:00
orderSaved := false
2019-07-01 16:54:11 +01:00
largestOrder := pb . Order { }
2019-10-16 03:17:17 +01:00
// Ensure that the order is saved even in the face of an error. In the
// success path, the order will be saved just before sending the response
// and closing the stream (in which case, orderSaved will be true).
defer func ( ) {
if ! orderSaved {
endpoint . saveOrder ( ctx , limit , & largestOrder )
}
} ( )
2019-03-18 10:55:06 +00:00
for {
message , err = stream . Recv ( ) // TODO: reuse messages to avoid allocations
if err == io . EOF {
return ErrProtocol . New ( "unexpected EOF" )
} else if err != nil {
2019-09-19 05:46:39 +01:00
return ErrProtocol . Wrap ( err ) // TODO: report rpc status bad message
2019-03-18 10:55:06 +00:00
}
if message == nil {
2019-09-19 05:46:39 +01:00
return ErrProtocol . New ( "expected a message" ) // TODO: report rpc status bad message
2019-03-18 10:55:06 +00:00
}
2019-07-08 15:26:19 +01:00
if message . Order == nil && message . Chunk == nil && message . Done == nil {
2019-09-19 05:46:39 +01:00
return ErrProtocol . New ( "expected a message" ) // TODO: report rpc status bad message
2019-07-08 15:26:19 +01:00
}
2019-03-18 10:55:06 +00:00
2019-07-08 15:26:19 +01:00
if message . Order != nil {
2019-07-11 21:51:40 +01:00
if err := endpoint . VerifyOrder ( ctx , limit , message . Order , largestOrder . Amount ) ; err != nil {
2019-03-18 10:55:06 +00:00
return err
}
largestOrder = * message . Order
2019-07-08 15:26:19 +01:00
}
2019-03-18 10:55:06 +00:00
2019-07-08 15:26:19 +01:00
if message . Chunk != nil {
2019-03-18 10:55:06 +00:00
if message . Chunk . Offset != pieceWriter . Size ( ) {
2019-09-19 05:46:39 +01:00
return ErrProtocol . New ( "chunk out of order" ) // TODO: report rpc status bad message
2019-03-18 10:55:06 +00:00
}
2019-04-15 11:12:22 +01:00
chunkSize := int64 ( len ( message . Chunk . Data ) )
if largestOrder . Amount < pieceWriter . Size ( ) + chunkSize {
2019-03-18 10:55:06 +00:00
// TODO: should we write currently and give a chance for uplink to remedy the situation?
2019-09-19 05:46:39 +01:00
return ErrProtocol . New ( "not enough allocated, allocated=%v writing=%v" , largestOrder . Amount , pieceWriter . Size ( ) + int64 ( len ( message . Chunk . Data ) ) ) // TODO: report rpc status ?
2019-03-18 10:55:06 +00:00
}
2019-04-15 11:12:22 +01:00
availableBandwidth -= chunkSize
if availableBandwidth < 0 {
return ErrProtocol . New ( "out of bandwidth" )
}
availableSpace -= chunkSize
if availableSpace < 0 {
return ErrProtocol . New ( "out of space" )
}
2019-03-18 10:55:06 +00:00
if _ , err := pieceWriter . Write ( message . Chunk . Data ) ; err != nil {
2019-09-19 05:46:39 +01:00
return ErrInternal . Wrap ( err ) // TODO: report rpc status internal server error
2019-03-18 10:55:06 +00:00
}
2019-07-08 15:26:19 +01:00
}
2019-03-18 10:55:06 +00:00
2019-07-08 15:26:19 +01:00
if message . Done != nil {
2019-08-08 02:47:30 +01:00
calculatedHash := pieceWriter . Hash ( )
if err := endpoint . VerifyPieceHash ( ctx , limit , message . Done , calculatedHash ) ; err != nil {
2019-09-19 05:46:39 +01:00
return err // TODO: report rpc status internal server error
2019-03-18 10:55:06 +00:00
}
2019-07-15 16:26:18 +01:00
if message . Done . PieceSize != pieceWriter . Size ( ) {
return ErrProtocol . New ( "Size of finished piece does not match size declared by uplink! %d != %d" ,
2019-08-08 02:47:30 +01:00
message . Done . PieceSize , pieceWriter . Size ( ) )
2019-03-18 10:55:06 +00:00
}
{
2019-08-08 02:47:30 +01:00
info := & pb . PieceHeader {
Hash : calculatedHash ,
CreationTime : message . Done . Timestamp ,
Signature : message . Done . GetSignature ( ) ,
OrderLimit : * limit ,
2019-03-18 10:55:06 +00:00
}
2019-08-08 02:47:30 +01:00
if err := pieceWriter . Commit ( ctx , info ) ; err != nil {
2019-09-19 05:46:39 +01:00
return ErrInternal . Wrap ( err ) // TODO: report rpc status internal server error
2019-08-08 02:47:30 +01:00
}
if ! limit . PieceExpiration . IsZero ( ) {
err := endpoint . store . SetExpiration ( ctx , limit . SatelliteId , limit . PieceId , limit . PieceExpiration )
if err != nil {
2019-09-19 05:46:39 +01:00
return ErrInternal . Wrap ( err ) // TODO: report rpc status internal server error
2019-08-08 02:47:30 +01:00
}
2019-03-18 10:55:06 +00:00
}
}
2019-06-05 14:47:01 +01:00
storageNodeHash , err := signing . SignPieceHash ( ctx , endpoint . signer , & pb . PieceHash {
2019-07-03 17:14:37 +01:00
PieceId : limit . PieceId ,
2019-08-08 02:47:30 +01:00
Hash : calculatedHash ,
2019-07-03 17:14:37 +01:00
PieceSize : pieceWriter . Size ( ) ,
Timestamp : time . Now ( ) ,
2019-03-18 10:55:06 +00:00
} )
if err != nil {
return ErrInternal . Wrap ( err )
}
2019-10-16 03:17:17 +01:00
// Save the order before completing the call. Set orderSaved so
// that the defer above does not also save.
orderSaved = true
endpoint . saveOrder ( ctx , limit , & largestOrder )
2019-03-18 10:55:06 +00:00
closeErr := stream . SendAndClose ( & pb . PieceUploadResponse {
Done : storageNodeHash ,
} )
return ErrProtocol . Wrap ( ignoreEOF ( closeErr ) )
}
}
}
2019-09-12 22:09:46 +01:00
// Download handles Downloading a piece on piece store.
2019-03-18 10:55:06 +00:00
func ( endpoint * Endpoint ) Download ( stream pb . Piecestore_DownloadServer ) ( err error ) {
2019-09-12 22:09:46 +01:00
return endpoint . doDownload ( stream )
}
// Download handles Downloading a piece on piece store.
func ( endpoint * drpcEndpoint ) Download ( stream pb . DRPCPiecestore_DownloadStream ) ( err error ) {
return endpoint . doDownload ( stream )
}
// downloadStream is the minimum interface required to perform settlements.
type downloadStream interface {
Context ( ) context . Context
Recv ( ) ( * pb . PieceDownloadRequest , error )
Send ( * pb . PieceDownloadResponse ) error
}
// Download implements downloading a piece from piece store.
func ( endpoint * Endpoint ) doDownload ( stream downloadStream ) ( err error ) {
2019-03-18 10:55:06 +00:00
ctx := stream . Context ( )
2019-08-02 23:49:39 +01:00
defer monLiveRequests ( & ctx ) ( & err )
2019-03-18 10:55:06 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-07-03 14:47:55 +01:00
2019-10-23 20:43:43 +01:00
atomic . AddInt32 ( & endpoint . liveRequests , 1 )
defer atomic . AddInt32 ( & endpoint . liveRequests , - 1 )
2019-07-03 14:47:55 +01:00
2019-05-30 16:44:47 +01:00
startTime := time . Now ( ) . UTC ( )
2019-03-18 10:55:06 +00:00
2019-10-03 19:31:39 +01:00
endpoint . pingStats . WasPinged ( time . Now ( ) )
2019-03-18 10:55:06 +00:00
// TODO: set connection timeouts
// TODO: set maximum message size
var message * pb . PieceDownloadRequest
// receive limit and chunk from uplink
message , err = stream . Recv ( )
if err != nil {
return ErrProtocol . Wrap ( err )
}
if message . Limit == nil || message . Chunk == nil {
return ErrProtocol . New ( "expected order limit and chunk as the first message" )
}
limit , chunk := message . Limit , message . Chunk
2019-11-05 21:04:07 +00:00
endpoint . log . Info ( "download started" , zap . Stringer ( "Piece ID" , limit . PieceId ) , zap . Stringer ( "Satellite ID" , limit . SatelliteId ) , zap . Stringer ( "Action" , limit . Action ) )
2019-06-11 04:30:17 +01:00
2019-03-18 10:55:06 +00:00
if limit . Action != pb . PieceAction_GET && limit . Action != pb . PieceAction_GET_REPAIR && limit . Action != pb . PieceAction_GET_AUDIT {
2019-09-19 05:46:39 +01:00
return ErrProtocol . New ( "expected get or get repair or audit action got %v" , limit . Action ) // TODO: report rpc status unauthorized or bad request
2019-03-18 10:55:06 +00:00
}
if chunk . ChunkSize > limit . Limit {
return ErrProtocol . New ( "requested more that order limit allows, limit=%v requested=%v" , limit . Limit , chunk . ChunkSize )
}
2019-07-30 17:58:08 +01:00
if err := endpoint . verifyOrderLimit ( ctx , limit ) ; err != nil {
2019-09-19 05:46:39 +01:00
return Error . Wrap ( err ) // TODO: report rpc status unauthorized or bad request
2019-03-18 10:55:06 +00:00
}
2019-05-30 16:44:47 +01:00
var pieceReader * pieces . Reader
2019-03-18 10:55:06 +00:00
defer func ( ) {
2019-05-30 16:44:47 +01:00
endTime := time . Now ( ) . UTC ( )
dt := endTime . Sub ( startTime )
downloadSize := int64 ( 0 )
if pieceReader != nil {
downloadSize = pieceReader . Size ( )
}
downloadRate := float64 ( 0 )
if dt . Seconds ( ) > 0 {
downloadRate = float64 ( downloadSize ) / dt . Seconds ( )
}
downloadDuration := dt . Nanoseconds ( )
2019-03-18 10:55:06 +00:00
if err != nil {
2019-06-04 14:22:00 +01:00
mon . Meter ( "download_failure_byte_meter" ) . Mark64 ( downloadSize )
2019-05-30 16:44:47 +01:00
mon . IntVal ( "download_failure_size_bytes" ) . Observe ( downloadSize )
mon . IntVal ( "download_failure_duration_ns" ) . Observe ( downloadDuration )
mon . FloatVal ( "download_failure_rate_bytes_per_sec" ) . Observe ( downloadRate )
2019-11-05 21:04:07 +00:00
endpoint . log . Info ( "download failed" , zap . Stringer ( "Piece ID" , limit . PieceId ) , zap . Stringer ( "Satellite ID" , limit . SatelliteId ) , zap . Stringer ( "Action" , limit . Action ) , zap . Error ( err ) )
2019-03-18 10:55:06 +00:00
} else {
2019-06-04 14:22:00 +01:00
mon . Meter ( "download_success_byte_meter" ) . Mark64 ( downloadSize )
2019-05-30 16:44:47 +01:00
mon . IntVal ( "download_success_size_bytes" ) . Observe ( downloadSize )
mon . IntVal ( "download_success_duration_ns" ) . Observe ( downloadDuration )
mon . FloatVal ( "download_success_rate_bytes_per_sec" ) . Observe ( downloadRate )
2019-11-05 21:04:07 +00:00
endpoint . log . Info ( "downloaded" , zap . Stringer ( "Piece ID" , limit . PieceId ) , zap . Stringer ( "Satellite ID" , limit . SatelliteId ) , zap . Stringer ( "Action" , limit . Action ) )
2019-03-18 10:55:06 +00:00
}
} ( )
2019-05-30 16:44:47 +01:00
pieceReader , err = endpoint . store . Reader ( ctx , limit . SatelliteId , limit . PieceId )
2019-03-18 10:55:06 +00:00
if err != nil {
2019-06-03 10:17:09 +01:00
if os . IsNotExist ( err ) {
2019-09-19 05:46:39 +01:00
return rpcstatus . Error ( rpcstatus . NotFound , err . Error ( ) )
2019-06-03 10:17:09 +01:00
}
2019-09-19 05:46:39 +01:00
return rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-03-18 10:55:06 +00:00
}
defer func ( ) {
err := pieceReader . Close ( ) // similarly how transcation Rollback works
if err != nil {
// no reason to report this error to the uplink
endpoint . log . Error ( "failed to close piece reader" , zap . Error ( err ) )
}
} ( )
2019-08-26 19:57:41 +01:00
// for repair traffic, send along the PieceHash and original OrderLimit for validation
// before sending the piece itself
if message . Limit . Action == pb . PieceAction_GET_REPAIR {
var orderLimit pb . OrderLimit
var pieceHash pb . PieceHash
if pieceReader . StorageFormatVersion ( ) == 0 {
// v0 stores this information in SQL
info , err := endpoint . store . GetV0PieceInfoDB ( ) . Get ( ctx , limit . SatelliteId , limit . PieceId )
if err != nil {
endpoint . log . Error ( "error getting piece from v0 pieceinfo db" , zap . Error ( err ) )
2019-09-19 05:46:39 +01:00
return rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-08-26 19:57:41 +01:00
}
orderLimit = * info . OrderLimit
pieceHash = * info . UplinkPieceHash
} else {
//v1+ stores this information in the file
header , err := pieceReader . GetPieceHeader ( )
if err != nil {
endpoint . log . Error ( "error getting header from piecereader" , zap . Error ( err ) )
2019-09-19 05:46:39 +01:00
return rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-08-26 19:57:41 +01:00
}
orderLimit = header . OrderLimit
pieceHash = pb . PieceHash {
PieceId : limit . PieceId ,
Hash : header . GetHash ( ) ,
PieceSize : pieceReader . Size ( ) ,
Timestamp : header . GetCreationTime ( ) ,
Signature : header . GetSignature ( ) ,
}
}
err = stream . Send ( & pb . PieceDownloadResponse { Hash : & pieceHash , Limit : & orderLimit } )
if err != nil {
endpoint . log . Error ( "error sending hash and order limit" , zap . Error ( err ) )
2019-09-19 05:46:39 +01:00
return rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-08-26 19:57:41 +01:00
}
}
2019-03-18 10:55:06 +00:00
// TODO: verify chunk.Size behavior logic with regards to reading all
if chunk . Offset + chunk . ChunkSize > pieceReader . Size ( ) {
return Error . New ( "requested more data than available, requesting=%v available=%v" , chunk . Offset + chunk . ChunkSize , pieceReader . Size ( ) )
}
2019-04-15 11:12:22 +01:00
availableBandwidth , err := endpoint . monitor . AvailableBandwidth ( ctx )
if err != nil {
2019-08-26 19:57:41 +01:00
endpoint . log . Error ( "error getting available bandwidth" , zap . Error ( err ) )
2019-09-19 05:46:39 +01:00
return rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-04-15 11:12:22 +01:00
}
2019-03-18 10:55:06 +00:00
throttle := sync2 . NewThrottle ( )
// TODO: see whether this can be implemented without a goroutine
group , ctx := errgroup . WithContext ( ctx )
group . Go ( func ( ) ( err error ) {
var maximumChunkSize = 1 * memory . MiB . Int64 ( )
currentOffset := chunk . Offset
unsentAmount := chunk . ChunkSize
for unsentAmount > 0 {
tryToSend := min ( unsentAmount , maximumChunkSize )
// TODO: add timeout here
chunkSize , err := throttle . ConsumeOrWait ( tryToSend )
if err != nil {
// this can happen only because uplink decided to close the connection
return nil
}
chunkData := make ( [ ] byte , chunkSize )
_ , err = pieceReader . Seek ( currentOffset , io . SeekStart )
if err != nil {
2019-08-26 19:57:41 +01:00
endpoint . log . Error ( "error seeking on piecereader" , zap . Error ( err ) )
2019-09-19 05:46:39 +01:00
return rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-03-18 10:55:06 +00:00
}
2019-07-25 09:22:15 +01:00
// ReadFull is required to ensure we are sending the right amount of data.
_ , err = io . ReadFull ( pieceReader , chunkData )
2019-03-18 10:55:06 +00:00
if err != nil {
2019-08-26 19:57:41 +01:00
endpoint . log . Error ( "error reading from piecereader" , zap . Error ( err ) )
2019-09-19 05:46:39 +01:00
return rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-03-18 10:55:06 +00:00
}
err = stream . Send ( & pb . PieceDownloadResponse {
Chunk : & pb . PieceDownloadResponse_Chunk {
Offset : currentOffset ,
Data : chunkData ,
} ,
} )
if err != nil {
// err is io.EOF when uplink asked for a piece, but decided not to retrieve it,
// no need to propagate it
return ErrProtocol . Wrap ( ignoreEOF ( err ) )
}
currentOffset += chunkSize
unsentAmount -= chunkSize
}
return nil
} )
recvErr := func ( ) ( err error ) {
2019-07-01 16:54:11 +01:00
largestOrder := pb . Order { }
2019-07-31 15:13:39 +01:00
defer endpoint . saveOrder ( ctx , limit , & largestOrder )
2019-03-18 10:55:06 +00:00
// ensure that we always terminate sending goroutine
defer throttle . Fail ( io . EOF )
for {
// TODO: check errors
// TODO: add timeout here
message , err = stream . Recv ( )
if err != nil {
2019-08-23 16:16:43 +01:00
// err is io.EOF or canceled when uplink closed the connection, no need to return error
if errs2 . IsCanceled ( err ) {
endpoint . log . Debug ( "client canceled connection" )
return nil
}
2019-03-18 10:55:06 +00:00
return ErrProtocol . Wrap ( ignoreEOF ( err ) )
}
if message == nil || message . Order == nil {
return ErrProtocol . New ( "expected order as the message" )
}
2019-07-11 21:51:40 +01:00
if err := endpoint . VerifyOrder ( ctx , limit , message . Order , largestOrder . Amount ) ; err != nil {
2019-03-18 10:55:06 +00:00
return err
}
2019-04-15 11:12:22 +01:00
chunkSize := message . Order . Amount - largestOrder . Amount
availableBandwidth -= chunkSize
if availableBandwidth < 0 {
return ErrProtocol . New ( "out of bandwidth" )
}
if err := throttle . Produce ( chunkSize ) ; err != nil {
2019-03-18 10:55:06 +00:00
// shouldn't happen since only receiving side is calling Fail
return ErrInternal . Wrap ( err )
}
largestOrder = * message . Order
}
} ( )
// ensure we wait for sender to complete
sendErr := group . Wait ( )
return Error . Wrap ( errs . Combine ( sendErr , recvErr ) )
}
2019-07-31 15:13:39 +01:00
// saveOrder saves the order with all necessary information. It assumes it has been already verified.
func ( endpoint * Endpoint ) saveOrder ( ctx context . Context , limit * pb . OrderLimit , order * pb . Order ) {
2019-06-04 13:31:39 +01:00
var err error
defer mon . Task ( ) ( & ctx ) ( & err )
2019-03-18 10:55:06 +00:00
// TODO: do this in a goroutine
if order == nil || order . Amount <= 0 {
return
}
2019-06-04 13:31:39 +01:00
err = endpoint . orders . Enqueue ( ctx , & orders . Info {
2019-07-09 22:33:45 +01:00
Limit : limit ,
Order : order ,
2019-03-18 10:55:06 +00:00
} )
if err != nil {
endpoint . log . Error ( "failed to add order" , zap . Error ( err ) )
} else {
2019-06-04 13:31:39 +01:00
err = endpoint . usage . Add ( ctx , limit . SatelliteId , limit . Action , order . Amount , time . Now ( ) )
2019-03-18 10:55:06 +00:00
if err != nil {
endpoint . log . Error ( "failed to add bandwidth usage" , zap . Error ( err ) )
}
}
}
2019-07-10 14:41:47 +01:00
// Retain keeps only piece ids specified in the request
2019-07-11 21:04:22 +01:00
func ( endpoint * Endpoint ) Retain ( ctx context . Context , retainReq * pb . RetainRequest ) ( res * pb . RetainResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-07-26 21:49:08 +01:00
// if retain status is disabled, quit immediately
2019-08-19 19:52:47 +01:00
if endpoint . retain . Status ( ) == retain . Disabled {
2019-07-26 21:49:08 +01:00
return & pb . RetainResponse { } , nil
}
2019-07-10 14:41:47 +01:00
peer , err := identity . PeerIdentityFromContext ( ctx )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . Unauthenticated , Error . Wrap ( err ) . Error ( ) )
2019-07-11 21:04:22 +01:00
}
err = endpoint . trust . VerifySatelliteID ( ctx , peer . ID )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . PermissionDenied , Error . New ( "retain called with untrusted ID" ) . Error ( ) )
2019-07-10 14:41:47 +01:00
}
filter , err := bloomfilter . NewFromBytes ( retainReq . GetFilter ( ) )
if err != nil {
2019-09-19 05:46:39 +01:00
return nil , rpcstatus . Error ( rpcstatus . InvalidArgument , Error . Wrap ( err ) . Error ( ) )
2019-07-10 14:41:47 +01:00
}
2019-08-19 19:52:47 +01:00
// the queue function will update the created before time based on the configurable retain buffer
queued := endpoint . retain . Queue ( retain . Request {
SatelliteID : peer . ID ,
CreatedBefore : retainReq . GetCreationDate ( ) ,
Filter : filter ,
2019-08-08 02:47:30 +01:00
} )
2019-08-19 19:52:47 +01:00
if ! queued {
2019-11-05 21:04:07 +00:00
endpoint . log . Debug ( "Retain job not queued for satellite" , zap . Stringer ( "Satellite ID" , peer . ID ) )
2019-07-10 14:41:47 +01:00
}
2019-07-26 21:49:08 +01:00
2019-07-10 14:41:47 +01:00
return & pb . RetainResponse { } , nil
}
2019-09-12 10:26:55 +01:00
// TestLiveRequestCount returns the current number of live requests.
func ( endpoint * Endpoint ) TestLiveRequestCount ( ) int32 {
2019-10-23 20:43:43 +01:00
return atomic . LoadInt32 ( & endpoint . liveRequests )
2019-09-12 10:26:55 +01:00
}
2019-03-18 10:55:06 +00:00
// min finds the min of two values
func min ( a , b int64 ) int64 {
if a < b {
return a
}
return b
}
// ignoreEOF ignores io.EOF error.
func ignoreEOF ( err error ) error {
2019-10-15 17:13:53 +01:00
// gRPC gives us an io.EOF but dRPC gives us a wrapped io.EOF
if errs . Is ( err , io . EOF ) {
2019-03-18 10:55:06 +00:00
return nil
}
return err
}