storagenode/piecestore: add large timeouts to read/write operations
this is to help protect against intentional or unintentional slowloris style problems where a client keeps a tcp connection alive but never sends any data. because grpc is great, we have to spawn a separate goroutine for every read/write to the stream so that we can return from the server handler to cancel it if necessary. yep. really. additionally, we update the rpcstatus package to do some stack trace capture and add a Wrap method for the times where we want to just use the existing error. also fixes a number of TODOs where we attach status codes to the returned errors in the endpoints. Change-Id: Id8bb8ff84aa34e0f711b0cf9bce3908b36a1d3c1
This commit is contained in:
parent
89a148047d
commit
16bb374deb
6
go.sum
6
go.sum
@ -597,12 +597,6 @@ storj.io/common v0.0.0-20200114152414-8dcdd4c9d250 h1:JDScdUShGqfHyiSWtfXhYEK1Ok
|
||||
storj.io/common v0.0.0-20200114152414-8dcdd4c9d250/go.mod h1:0yn1ANoDXETNBREGQHq8d7m1Kq0vWMu6Ul7C2YPZo/E=
|
||||
storj.io/drpc v0.0.7-0.20191115031725-2171c57838d2 h1:8SgLYEhe99R8QlAD1EAOBPRyIR+cn2hqkXtWlAUPf/c=
|
||||
storj.io/drpc v0.0.7-0.20191115031725-2171c57838d2/go.mod h1:/ascUDbzNAv0A3Jj7wUIKFBH2JdJ2uJIBO/b9+2yHgQ=
|
||||
storj.io/uplink v0.0.0-20200108132132-c2c5e0d46c1a h1:w/588H+U5IfTXCHA2GTFVLzpUbworS0DtoB4sR9h/8M=
|
||||
storj.io/uplink v0.0.0-20200108132132-c2c5e0d46c1a/go.mod h1:3498FK1ewiOxrVTbPwGJmE/kwIWA3q9ULtAU/WAreys=
|
||||
storj.io/uplink v0.0.0-20200109100422-69086b6ee4a8 h1:WG1rX2uc815ZkUz1xrebuZA+JWFBF9Y2n64gvVKZFko=
|
||||
storj.io/uplink v0.0.0-20200109100422-69086b6ee4a8/go.mod h1:3498FK1ewiOxrVTbPwGJmE/kwIWA3q9ULtAU/WAreys=
|
||||
storj.io/uplink v0.0.0-20200120140436-7a380b5d3c87 h1:1Kgi6COupqAkdypsgPWaqItAeRQF++L7jAMbjthVQuI=
|
||||
storj.io/uplink v0.0.0-20200120140436-7a380b5d3c87/go.mod h1:3498FK1ewiOxrVTbPwGJmE/kwIWA3q9ULtAU/WAreys=
|
||||
storj.io/uplink v0.0.0-20200120151236-94b86a219ae3 h1:MzE8aqmrdINQKlpYq9KqQNzTyXh7hoDVIay6hyoIyhY=
|
||||
storj.io/uplink v0.0.0-20200120151236-94b86a219ae3/go.mod h1:3498FK1ewiOxrVTbPwGJmE/kwIWA3q9ULtAU/WAreys=
|
||||
storj.io/uplink v0.0.0-20200122124300-07c62f7fe572 h1:UyIMPgwjAmEiJZux4ZUMLYiMcTS1pg7JjII/IGxDsPU=
|
||||
|
@ -110,10 +110,11 @@ func (planet *Planet) newStorageNodes(count int, whitelistedSatellites storj.Nod
|
||||
StaticDir: filepath.Join(developmentRoot, "web/storagenode/"),
|
||||
},
|
||||
Storage2: piecestore.Config{
|
||||
CacheSyncInterval: defaultInterval,
|
||||
ExpirationGracePeriod: 0,
|
||||
MaxConcurrentRequests: 100,
|
||||
OrderLimitGracePeriod: time.Hour,
|
||||
CacheSyncInterval: defaultInterval,
|
||||
ExpirationGracePeriod: 0,
|
||||
MaxConcurrentRequests: 100,
|
||||
OrderLimitGracePeriod: time.Hour,
|
||||
StreamOperationTimeout: time.Hour,
|
||||
Orders: orders.Config{
|
||||
SenderInterval: defaultInterval,
|
||||
SenderTimeout: 10 * time.Minute,
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
"storj.io/common/memory"
|
||||
"storj.io/common/pb"
|
||||
"storj.io/common/rpc/rpcstatus"
|
||||
"storj.io/common/rpc/rpctimeout"
|
||||
"storj.io/common/signing"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/sync2"
|
||||
@ -36,13 +37,6 @@ import (
|
||||
|
||||
var (
|
||||
mon = monkit.Package()
|
||||
|
||||
// Error is the default error class for piecestore errors
|
||||
Error = errs.Class("piecestore")
|
||||
// ErrProtocol is the default error class for protocol errors.
|
||||
ErrProtocol = errs.Class("piecestore protocol")
|
||||
// ErrInternal is the default error class for internal piecestore errors.
|
||||
ErrInternal = errs.Class("piecestore internal")
|
||||
)
|
||||
|
||||
var _ pb.PiecestoreServer = (*Endpoint)(nil)
|
||||
@ -58,12 +52,12 @@ type OldConfig struct {
|
||||
|
||||
// Config defines parameters for piecestore endpoint.
|
||||
type Config struct {
|
||||
ExpirationGracePeriod time.Duration `help:"how soon before expiration date should things be considered expired" default:"48h0m0s"`
|
||||
MaxConcurrentRequests int `help:"how many concurrent requests are allowed, before uploads are rejected. 0 represents unlimited." default:"0"`
|
||||
OrderLimitGracePeriod time.Duration `help:"how long after OrderLimit creation date are OrderLimits no longer accepted" default:"24h0m0s"`
|
||||
CacheSyncInterval time.Duration `help:"how often the space used cache is synced to persistent storage" releaseDefault:"1h0m0s" devDefault:"0h1m0s"`
|
||||
|
||||
RetainTimeBuffer time.Duration `help:"allows for small differences in the satellite and storagenode clocks" default:"48h0m0s"`
|
||||
ExpirationGracePeriod time.Duration `help:"how soon before expiration date should things be considered expired" default:"48h0m0s"`
|
||||
MaxConcurrentRequests int `help:"how many concurrent requests are allowed, before uploads are rejected. 0 represents unlimited." default:"0"`
|
||||
OrderLimitGracePeriod time.Duration `help:"how long after OrderLimit creation date are OrderLimits no longer accepted" default:"24h0m0s"`
|
||||
CacheSyncInterval time.Duration `help:"how often the space used cache is synced to persistent storage" releaseDefault:"1h0m0s" devDefault:"0h1m0s"`
|
||||
StreamOperationTimeout time.Duration `help:"how long to spend waiting for a stream operation before canceling" default:"30m"`
|
||||
RetainTimeBuffer time.Duration `help:"allows for small differences in the satellite and storagenode clocks" default:"48h0m0s"`
|
||||
|
||||
Trust trust.Config
|
||||
|
||||
@ -151,12 +145,12 @@ func (endpoint *Endpoint) Delete(ctx context.Context, delete *pb.PieceDeleteRequ
|
||||
endpoint.pingStats.WasPinged(time.Now())
|
||||
|
||||
if delete.Limit.Action != pb.PieceAction_DELETE {
|
||||
return nil, Error.New("expected delete action got %v", delete.Limit.Action) // TODO: report rpc status unauthorized or bad request
|
||||
return nil, rpcstatus.Errorf(rpcstatus.InvalidArgument,
|
||||
"expected delete action got %v", delete.Limit.Action)
|
||||
}
|
||||
|
||||
if err := endpoint.verifyOrderLimit(ctx, delete.Limit); err != nil {
|
||||
// TODO: report rpc status unauthorized or bad request
|
||||
return nil, Error.Wrap(err)
|
||||
return nil, rpcstatus.Wrap(rpcstatus.Unauthenticated, err)
|
||||
}
|
||||
|
||||
if err := endpoint.store.Delete(ctx, delete.Limit.SatelliteId, delete.Limit.PieceId); err != nil {
|
||||
@ -182,14 +176,12 @@ func (endpoint *Endpoint) DeletePieces(
|
||||
|
||||
peer, err := identity.PeerIdentityFromContext(ctx)
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.Unauthenticated, Error.Wrap(err).Error())
|
||||
return nil, rpcstatus.Wrap(rpcstatus.Unauthenticated, err)
|
||||
}
|
||||
|
||||
err = endpoint.trust.VerifySatelliteID(ctx, peer.ID)
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.PermissionDenied,
|
||||
Error.New("%s", "delete pieces called with untrusted ID").Error(),
|
||||
)
|
||||
return nil, rpcstatus.Error(rpcstatus.PermissionDenied, "delete pieces called with untrusted ID")
|
||||
}
|
||||
|
||||
for _, pieceID := range req.PieceIds {
|
||||
@ -200,7 +192,7 @@ func (endpoint *Endpoint) DeletePieces(
|
||||
endpoint.log.Error("delete failed",
|
||||
zap.Stringer("Satellite ID", peer.ID),
|
||||
zap.Stringer("Piece ID", pieceID),
|
||||
zap.Error(Error.Wrap(err)),
|
||||
zap.Error(err),
|
||||
)
|
||||
} else {
|
||||
endpoint.log.Info("deleted",
|
||||
@ -222,14 +214,12 @@ func (endpoint *Endpoint) DeletePiece(
|
||||
|
||||
peer, err := identity.PeerIdentityFromContext(ctx)
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.Unauthenticated, Error.Wrap(err).Error())
|
||||
return nil, rpcstatus.Wrap(rpcstatus.Unauthenticated, err)
|
||||
}
|
||||
|
||||
err = endpoint.trust.VerifySatelliteID(ctx, peer.ID)
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.PermissionDenied,
|
||||
Error.New("%s", "delete piece called with untrusted ID").Error(),
|
||||
)
|
||||
return nil, rpcstatus.Error(rpcstatus.PermissionDenied, "delete piece called with untrusted ID")
|
||||
}
|
||||
|
||||
err = endpoint.store.Delete(ctx, peer.ID, req.PieceId)
|
||||
@ -241,14 +231,12 @@ func (endpoint *Endpoint) DeletePiece(
|
||||
}
|
||||
|
||||
endpoint.log.Error("delete failed",
|
||||
zap.Error(Error.Wrap(err)),
|
||||
zap.Error(err),
|
||||
zap.Stringer("Satellite ID", peer.ID),
|
||||
zap.Stringer("Piece ID", req.PieceId),
|
||||
)
|
||||
|
||||
return nil, rpcstatus.Error(rpcstatus.Internal,
|
||||
Error.New("%s", "delete failed").Error(),
|
||||
)
|
||||
return nil, rpcstatus.Error(rpcstatus.Internal, "delete failed")
|
||||
}
|
||||
|
||||
endpoint.log.Info("deleted",
|
||||
@ -294,19 +282,22 @@ func (endpoint *Endpoint) doUpload(stream uploadStream, requestLimit int) (err e
|
||||
|
||||
startTime := time.Now().UTC()
|
||||
|
||||
// TODO: set connection timeouts
|
||||
// TODO: set maximum message size
|
||||
|
||||
// N.B.: we are only allowed to use message if the returned error is nil. it would be
|
||||
// a race condition otherwise as Run does not wait for the closure to exit.
|
||||
var message *pb.PieceUploadRequest
|
||||
|
||||
message, err = stream.Recv()
|
||||
err = rpctimeout.Run(ctx, endpoint.config.StreamOperationTimeout, func(_ context.Context) (err error) {
|
||||
message, err = stream.Recv()
|
||||
return err
|
||||
})
|
||||
switch {
|
||||
case err != nil:
|
||||
return ErrProtocol.Wrap(err)
|
||||
return rpcstatus.Wrap(rpcstatus.Internal, err)
|
||||
case message == nil:
|
||||
return ErrProtocol.New("expected a message")
|
||||
return rpcstatus.Error(rpcstatus.InvalidArgument, "expected a message")
|
||||
case message.Limit == nil:
|
||||
return ErrProtocol.New("expected order limit as the first message")
|
||||
return rpcstatus.Error(rpcstatus.InvalidArgument, "expected order limit as the first message")
|
||||
}
|
||||
limit := message.Limit
|
||||
endpoint.log.Info("upload started", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action))
|
||||
@ -314,7 +305,7 @@ func (endpoint *Endpoint) doUpload(stream uploadStream, requestLimit int) (err e
|
||||
// TODO: verify that we have have expected amount of storage before continuing
|
||||
|
||||
if limit.Action != pb.PieceAction_PUT && limit.Action != pb.PieceAction_PUT_REPAIR {
|
||||
return ErrProtocol.New("expected put or put repair action got %v", limit.Action) // TODO: report rpc status unauthorized or bad request
|
||||
return rpcstatus.Errorf(rpcstatus.InvalidArgument, "expected put or put repair action got %v", limit.Action)
|
||||
}
|
||||
|
||||
if err := endpoint.verifyOrderLimit(ctx, limit); err != nil {
|
||||
@ -352,7 +343,7 @@ func (endpoint *Endpoint) doUpload(stream uploadStream, requestLimit int) (err e
|
||||
|
||||
pieceWriter, err = endpoint.store.Writer(ctx, limit.SatelliteId, limit.PieceId)
|
||||
if err != nil {
|
||||
return ErrInternal.Wrap(err) // TODO: report rpc status internal server error
|
||||
return rpcstatus.Wrap(rpcstatus.Internal, err)
|
||||
}
|
||||
defer func() {
|
||||
// cancel error if it hasn't been committed
|
||||
@ -363,12 +354,12 @@ func (endpoint *Endpoint) doUpload(stream uploadStream, requestLimit int) (err e
|
||||
|
||||
availableBandwidth, err := endpoint.monitor.AvailableBandwidth(ctx)
|
||||
if err != nil {
|
||||
return ErrInternal.Wrap(err)
|
||||
return rpcstatus.Wrap(rpcstatus.Internal, err)
|
||||
}
|
||||
|
||||
availableSpace, err := endpoint.monitor.AvailableSpace(ctx)
|
||||
if err != nil {
|
||||
return ErrInternal.Wrap(err)
|
||||
return rpcstatus.Wrap(rpcstatus.Internal, err)
|
||||
}
|
||||
|
||||
orderSaved := false
|
||||
@ -383,17 +374,25 @@ func (endpoint *Endpoint) doUpload(stream uploadStream, requestLimit int) (err e
|
||||
}()
|
||||
|
||||
for {
|
||||
message, err = stream.Recv() // TODO: reuse messages to avoid allocations
|
||||
if err == io.EOF {
|
||||
return ErrProtocol.New("unexpected EOF")
|
||||
// TODO: reuse messages to avoid allocations
|
||||
|
||||
// N.B.: we are only allowed to use message if the returned error is nil. it would be
|
||||
// a race condition otherwise as Run does not wait for the closure to exit.
|
||||
err = rpctimeout.Run(ctx, endpoint.config.StreamOperationTimeout, func(_ context.Context) (err error) {
|
||||
message, err = stream.Recv()
|
||||
return err
|
||||
})
|
||||
if errs.Is(err, io.EOF) {
|
||||
return rpcstatus.Error(rpcstatus.InvalidArgument, "unexpected EOF")
|
||||
} else if err != nil {
|
||||
return ErrProtocol.Wrap(err) // TODO: report rpc status bad message
|
||||
return rpcstatus.Wrap(rpcstatus.Internal, err)
|
||||
}
|
||||
|
||||
if message == nil {
|
||||
return ErrProtocol.New("expected a message") // TODO: report rpc status bad message
|
||||
return rpcstatus.Error(rpcstatus.InvalidArgument, "expected a message")
|
||||
}
|
||||
if message.Order == nil && message.Chunk == nil && message.Done == nil {
|
||||
return ErrProtocol.New("expected a message") // TODO: report rpc status bad message
|
||||
return rpcstatus.Error(rpcstatus.InvalidArgument, "expected a message")
|
||||
}
|
||||
|
||||
if message.Order != nil {
|
||||
@ -405,36 +404,39 @@ func (endpoint *Endpoint) doUpload(stream uploadStream, requestLimit int) (err e
|
||||
|
||||
if message.Chunk != nil {
|
||||
if message.Chunk.Offset != pieceWriter.Size() {
|
||||
return ErrProtocol.New("chunk out of order") // TODO: report rpc status bad message
|
||||
return rpcstatus.Error(rpcstatus.InvalidArgument, "chunk out of order")
|
||||
}
|
||||
|
||||
chunkSize := int64(len(message.Chunk.Data))
|
||||
if largestOrder.Amount < pieceWriter.Size()+chunkSize {
|
||||
// TODO: should we write currently and give a chance for uplink to remedy the situation?
|
||||
return ErrProtocol.New("not enough allocated, allocated=%v writing=%v", largestOrder.Amount, pieceWriter.Size()+int64(len(message.Chunk.Data))) // TODO: report rpc status ?
|
||||
return rpcstatus.Errorf(rpcstatus.InvalidArgument,
|
||||
"not enough allocated, allocated=%v writing=%v",
|
||||
largestOrder.Amount, pieceWriter.Size()+int64(len(message.Chunk.Data)))
|
||||
}
|
||||
|
||||
availableBandwidth -= chunkSize
|
||||
if availableBandwidth < 0 {
|
||||
return ErrProtocol.New("out of bandwidth")
|
||||
return rpcstatus.Error(rpcstatus.Internal, "out of bandwidth")
|
||||
}
|
||||
availableSpace -= chunkSize
|
||||
if availableSpace < 0 {
|
||||
return ErrProtocol.New("out of space")
|
||||
return rpcstatus.Error(rpcstatus.Internal, "out of space")
|
||||
}
|
||||
|
||||
if _, err := pieceWriter.Write(message.Chunk.Data); err != nil {
|
||||
return ErrInternal.Wrap(err) // TODO: report rpc status internal server error
|
||||
return rpcstatus.Wrap(rpcstatus.Internal, err)
|
||||
}
|
||||
}
|
||||
|
||||
if message.Done != nil {
|
||||
calculatedHash := pieceWriter.Hash()
|
||||
if err := endpoint.VerifyPieceHash(ctx, limit, message.Done, calculatedHash); err != nil {
|
||||
return err // TODO: report rpc status internal server error
|
||||
return rpcstatus.Wrap(rpcstatus.Internal, err)
|
||||
}
|
||||
if message.Done.PieceSize != pieceWriter.Size() {
|
||||
return ErrProtocol.New("Size of finished piece does not match size declared by uplink! %d != %d",
|
||||
return rpcstatus.Errorf(rpcstatus.InvalidArgument,
|
||||
"Size of finished piece does not match size declared by uplink! %d != %d",
|
||||
message.Done.PieceSize, pieceWriter.Size())
|
||||
}
|
||||
|
||||
@ -446,12 +448,12 @@ func (endpoint *Endpoint) doUpload(stream uploadStream, requestLimit int) (err e
|
||||
OrderLimit: *limit,
|
||||
}
|
||||
if err := pieceWriter.Commit(ctx, info); err != nil {
|
||||
return ErrInternal.Wrap(err) // TODO: report rpc status internal server error
|
||||
return rpcstatus.Wrap(rpcstatus.Internal, err)
|
||||
}
|
||||
if !limit.PieceExpiration.IsZero() {
|
||||
err := endpoint.store.SetExpiration(ctx, limit.SatelliteId, limit.PieceId, limit.PieceExpiration)
|
||||
if err != nil {
|
||||
return ErrInternal.Wrap(err) // TODO: report rpc status internal server error
|
||||
return rpcstatus.Wrap(rpcstatus.Internal, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -463,7 +465,7 @@ func (endpoint *Endpoint) doUpload(stream uploadStream, requestLimit int) (err e
|
||||
Timestamp: time.Now(),
|
||||
})
|
||||
if err != nil {
|
||||
return ErrInternal.Wrap(err)
|
||||
return rpcstatus.Wrap(rpcstatus.Internal, err)
|
||||
}
|
||||
|
||||
// Save the order before completing the call. Set orderSaved so
|
||||
@ -471,10 +473,16 @@ func (endpoint *Endpoint) doUpload(stream uploadStream, requestLimit int) (err e
|
||||
orderSaved = true
|
||||
endpoint.saveOrder(ctx, limit, &largestOrder)
|
||||
|
||||
closeErr := stream.SendAndClose(&pb.PieceUploadResponse{
|
||||
Done: storageNodeHash,
|
||||
closeErr := rpctimeout.Run(ctx, endpoint.config.StreamOperationTimeout, func(_ context.Context) (err error) {
|
||||
return stream.SendAndClose(&pb.PieceUploadResponse{Done: storageNodeHash})
|
||||
})
|
||||
return ErrProtocol.Wrap(ignoreEOF(closeErr))
|
||||
if errs.Is(closeErr, io.EOF) {
|
||||
closeErr = nil
|
||||
}
|
||||
if closeErr != nil {
|
||||
return rpcstatus.Wrap(rpcstatus.Internal, closeErr)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -509,33 +517,37 @@ func (endpoint *Endpoint) doDownload(stream downloadStream) (err error) {
|
||||
|
||||
endpoint.pingStats.WasPinged(time.Now())
|
||||
|
||||
// TODO: set connection timeouts
|
||||
// TODO: set maximum message size
|
||||
|
||||
var message *pb.PieceDownloadRequest
|
||||
|
||||
// receive limit and chunk from uplink
|
||||
message, err = stream.Recv()
|
||||
// N.B.: we are only allowed to use message if the returned error is nil. it would be
|
||||
// a race condition otherwise as Run does not wait for the closure to exit.
|
||||
err = rpctimeout.Run(ctx, endpoint.config.StreamOperationTimeout, func(_ context.Context) (err error) {
|
||||
message, err = stream.Recv()
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return ErrProtocol.Wrap(err)
|
||||
return rpcstatus.Wrap(rpcstatus.Internal, err)
|
||||
}
|
||||
if message.Limit == nil || message.Chunk == nil {
|
||||
return ErrProtocol.New("expected order limit and chunk as the first message")
|
||||
return rpcstatus.Error(rpcstatus.InvalidArgument, "expected order limit and chunk as the first message")
|
||||
}
|
||||
limit, chunk := message.Limit, message.Chunk
|
||||
|
||||
endpoint.log.Info("download started", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action))
|
||||
|
||||
if limit.Action != pb.PieceAction_GET && limit.Action != pb.PieceAction_GET_REPAIR && limit.Action != pb.PieceAction_GET_AUDIT {
|
||||
return ErrProtocol.New("expected get or get repair or audit action got %v", limit.Action) // TODO: report rpc status unauthorized or bad request
|
||||
return rpcstatus.Errorf(rpcstatus.InvalidArgument,
|
||||
"expected get or get repair or audit action got %v", limit.Action)
|
||||
}
|
||||
|
||||
if chunk.ChunkSize > limit.Limit {
|
||||
return ErrProtocol.New("requested more that order limit allows, limit=%v requested=%v", limit.Limit, chunk.ChunkSize)
|
||||
return rpcstatus.Errorf(rpcstatus.InvalidArgument,
|
||||
"requested more that order limit allows, limit=%v requested=%v", limit.Limit, chunk.ChunkSize)
|
||||
}
|
||||
|
||||
if err := endpoint.verifyOrderLimit(ctx, limit); err != nil {
|
||||
return Error.Wrap(err) // TODO: report rpc status unauthorized or bad request
|
||||
return err
|
||||
}
|
||||
|
||||
var pieceReader *pieces.Reader
|
||||
@ -569,9 +581,9 @@ func (endpoint *Endpoint) doDownload(stream downloadStream) (err error) {
|
||||
pieceReader, err = endpoint.store.Reader(ctx, limit.SatelliteId, limit.PieceId)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return rpcstatus.Error(rpcstatus.NotFound, err.Error())
|
||||
return rpcstatus.Wrap(rpcstatus.NotFound, err)
|
||||
}
|
||||
return rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
return rpcstatus.Wrap(rpcstatus.Internal, err)
|
||||
}
|
||||
defer func() {
|
||||
err := pieceReader.Close() // similarly how transcation Rollback works
|
||||
@ -587,25 +599,29 @@ func (endpoint *Endpoint) doDownload(stream downloadStream) (err error) {
|
||||
pieceHash, orderLimit, err := endpoint.store.GetHashAndLimit(ctx, limit.SatelliteId, limit.PieceId, pieceReader)
|
||||
if err != nil {
|
||||
endpoint.log.Error("could not get hash and order limit", zap.Error(err))
|
||||
return rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
return rpcstatus.Wrap(rpcstatus.Internal, err)
|
||||
}
|
||||
|
||||
err = stream.Send(&pb.PieceDownloadResponse{Hash: &pieceHash, Limit: &orderLimit})
|
||||
err = rpctimeout.Run(ctx, endpoint.config.StreamOperationTimeout, func(_ context.Context) (err error) {
|
||||
return stream.Send(&pb.PieceDownloadResponse{Hash: &pieceHash, Limit: &orderLimit})
|
||||
})
|
||||
if err != nil {
|
||||
endpoint.log.Error("error sending hash and order limit", zap.Error(err))
|
||||
return rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
return rpcstatus.Wrap(rpcstatus.Internal, err)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: verify chunk.Size behavior logic with regards to reading all
|
||||
if chunk.Offset+chunk.ChunkSize > pieceReader.Size() {
|
||||
return Error.New("requested more data than available, requesting=%v available=%v", chunk.Offset+chunk.ChunkSize, pieceReader.Size())
|
||||
return rpcstatus.Errorf(rpcstatus.InvalidArgument,
|
||||
"requested more data than available, requesting=%v available=%v",
|
||||
chunk.Offset+chunk.ChunkSize, pieceReader.Size())
|
||||
}
|
||||
|
||||
availableBandwidth, err := endpoint.monitor.AvailableBandwidth(ctx)
|
||||
if err != nil {
|
||||
endpoint.log.Error("error getting available bandwidth", zap.Error(err))
|
||||
return rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
return rpcstatus.Wrap(rpcstatus.Internal, err)
|
||||
}
|
||||
|
||||
throttle := sync2.NewThrottle()
|
||||
@ -631,26 +647,31 @@ func (endpoint *Endpoint) doDownload(stream downloadStream) (err error) {
|
||||
_, err = pieceReader.Seek(currentOffset, io.SeekStart)
|
||||
if err != nil {
|
||||
endpoint.log.Error("error seeking on piecereader", zap.Error(err))
|
||||
return rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
return rpcstatus.Wrap(rpcstatus.Internal, err)
|
||||
}
|
||||
|
||||
// ReadFull is required to ensure we are sending the right amount of data.
|
||||
_, err = io.ReadFull(pieceReader, chunkData)
|
||||
if err != nil {
|
||||
endpoint.log.Error("error reading from piecereader", zap.Error(err))
|
||||
return rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
return rpcstatus.Wrap(rpcstatus.Internal, err)
|
||||
}
|
||||
|
||||
err = stream.Send(&pb.PieceDownloadResponse{
|
||||
Chunk: &pb.PieceDownloadResponse_Chunk{
|
||||
Offset: currentOffset,
|
||||
Data: chunkData,
|
||||
},
|
||||
err = rpctimeout.Run(ctx, endpoint.config.StreamOperationTimeout, func(_ context.Context) (err error) {
|
||||
return stream.Send(&pb.PieceDownloadResponse{
|
||||
Chunk: &pb.PieceDownloadResponse_Chunk{
|
||||
Offset: currentOffset,
|
||||
Data: chunkData,
|
||||
},
|
||||
})
|
||||
})
|
||||
if err != nil {
|
||||
if errs.Is(err, io.EOF) {
|
||||
// err is io.EOF when uplink asked for a piece, but decided not to retrieve it,
|
||||
// no need to propagate it
|
||||
return ErrProtocol.Wrap(ignoreEOF(err))
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return rpcstatus.Wrap(rpcstatus.Internal, err)
|
||||
}
|
||||
|
||||
currentOffset += chunkSize
|
||||
@ -667,20 +688,26 @@ func (endpoint *Endpoint) doDownload(stream downloadStream) (err error) {
|
||||
defer throttle.Fail(io.EOF)
|
||||
|
||||
for {
|
||||
// TODO: check errors
|
||||
// TODO: add timeout here
|
||||
message, err = stream.Recv()
|
||||
if err != nil {
|
||||
// N.B.: we are only allowed to use message if the returned error is nil. it would be
|
||||
// a race condition otherwise as Run does not wait for the closure to exit.
|
||||
err = rpctimeout.Run(ctx, endpoint.config.StreamOperationTimeout, func(_ context.Context) (err error) {
|
||||
message, err = stream.Recv()
|
||||
return err
|
||||
})
|
||||
if errs.Is(err, io.EOF) {
|
||||
// err is io.EOF or canceled when uplink closed the connection, no need to return error
|
||||
if errs2.IsCanceled(err) {
|
||||
endpoint.log.Debug("client canceled connection")
|
||||
return nil
|
||||
}
|
||||
return ErrProtocol.Wrap(ignoreEOF(err))
|
||||
return nil
|
||||
}
|
||||
if errs2.IsCanceled(err) {
|
||||
endpoint.log.Debug("client canceled connection")
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return rpcstatus.Wrap(rpcstatus.Internal, err)
|
||||
}
|
||||
|
||||
if message == nil || message.Order == nil {
|
||||
return ErrProtocol.New("expected order as the message")
|
||||
return rpcstatus.Error(rpcstatus.InvalidArgument, "expected order as the message")
|
||||
}
|
||||
|
||||
if err := endpoint.VerifyOrder(ctx, limit, message.Order, largestOrder.Amount); err != nil {
|
||||
@ -690,12 +717,12 @@ func (endpoint *Endpoint) doDownload(stream downloadStream) (err error) {
|
||||
chunkSize := message.Order.Amount - largestOrder.Amount
|
||||
availableBandwidth -= chunkSize
|
||||
if availableBandwidth < 0 {
|
||||
return ErrProtocol.New("out of bandwidth")
|
||||
return rpcstatus.Error(rpcstatus.ResourceExhausted, "out of bandwidth")
|
||||
}
|
||||
|
||||
if err := throttle.Produce(chunkSize); err != nil {
|
||||
// shouldn't happen since only receiving side is calling Fail
|
||||
return ErrInternal.Wrap(err)
|
||||
return rpcstatus.Wrap(rpcstatus.Internal, err)
|
||||
}
|
||||
largestOrder = *message.Order
|
||||
}
|
||||
@ -703,7 +730,7 @@ func (endpoint *Endpoint) doDownload(stream downloadStream) (err error) {
|
||||
|
||||
// ensure we wait for sender to complete
|
||||
sendErr := group.Wait()
|
||||
return Error.Wrap(errs.Combine(sendErr, recvErr))
|
||||
return rpcstatus.Wrap(rpcstatus.Internal, errs.Combine(sendErr, recvErr))
|
||||
}
|
||||
|
||||
// saveOrder saves the order with all necessary information. It assumes it has been already verified.
|
||||
@ -738,17 +765,17 @@ func (endpoint *Endpoint) RestoreTrash(ctx context.Context, restoreTrashReq *pb.
|
||||
|
||||
peer, err := identity.PeerIdentityFromContext(ctx)
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.Unauthenticated, Error.Wrap(err).Error())
|
||||
return nil, rpcstatus.Wrap(rpcstatus.Unauthenticated, err)
|
||||
}
|
||||
|
||||
err = endpoint.trust.VerifySatelliteID(ctx, peer.ID)
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.PermissionDenied, Error.New("RestoreTrash called with untrusted ID").Error())
|
||||
return nil, rpcstatus.Error(rpcstatus.PermissionDenied, "RestoreTrash called with untrusted ID")
|
||||
}
|
||||
|
||||
err = endpoint.store.RestoreTrash(ctx, peer.ID)
|
||||
if err != nil {
|
||||
return nil, ErrInternal.Wrap(err)
|
||||
return nil, rpcstatus.Wrap(rpcstatus.Internal, err)
|
||||
}
|
||||
|
||||
return &pb.RestoreTrashResponse{}, nil
|
||||
@ -765,17 +792,17 @@ func (endpoint *Endpoint) Retain(ctx context.Context, retainReq *pb.RetainReques
|
||||
|
||||
peer, err := identity.PeerIdentityFromContext(ctx)
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.Unauthenticated, Error.Wrap(err).Error())
|
||||
return nil, rpcstatus.Wrap(rpcstatus.Unauthenticated, err)
|
||||
}
|
||||
|
||||
err = endpoint.trust.VerifySatelliteID(ctx, peer.ID)
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.PermissionDenied, Error.New("retain called with untrusted ID").Error())
|
||||
return nil, rpcstatus.Errorf(rpcstatus.PermissionDenied, "retain called with untrusted ID")
|
||||
}
|
||||
|
||||
filter, err := bloomfilter.NewFromBytes(retainReq.GetFilter())
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, Error.Wrap(err).Error())
|
||||
return nil, rpcstatus.Wrap(rpcstatus.InvalidArgument, err)
|
||||
}
|
||||
|
||||
// the queue function will update the created before time based on the configurable retain buffer
|
||||
@ -803,12 +830,3 @@ func min(a, b int64) int64 {
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
// ignoreEOF ignores io.EOF error.
|
||||
func ignoreEOF(err error) error {
|
||||
// gRPC gives us an io.EOF but dRPC gives us a wrapped io.EOF
|
||||
if errs.Is(err, io.EOF) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
@ -56,15 +56,14 @@ func (endpoint *Endpoint) verifyOrderLimit(ctx context.Context, limit *pb.OrderL
|
||||
}
|
||||
|
||||
if err := endpoint.trust.VerifySatelliteID(ctx, limit.SatelliteId); err != nil {
|
||||
return rpcstatus.Errorf(rpcstatus.PermissionDenied, "untrusted: %+v", err)
|
||||
return rpcstatus.Wrap(rpcstatus.PermissionDenied, err)
|
||||
}
|
||||
|
||||
if err := endpoint.VerifyOrderLimitSignature(ctx, limit); err != nil {
|
||||
if errs2.IsCanceled(err) {
|
||||
return rpcstatus.Error(rpcstatus.Canceled, "context has been canceled")
|
||||
return rpcstatus.Wrap(rpcstatus.Canceled, err)
|
||||
}
|
||||
|
||||
return rpcstatus.Errorf(rpcstatus.Unauthenticated, "untrusted: %+v", err)
|
||||
return rpcstatus.Wrap(rpcstatus.Unauthenticated, err)
|
||||
}
|
||||
|
||||
serialExpiration := limit.OrderExpiration
|
||||
@ -75,7 +74,7 @@ func (endpoint *Endpoint) verifyOrderLimit(ctx context.Context, limit *pb.OrderL
|
||||
}
|
||||
|
||||
if err := endpoint.usedSerials.Add(ctx, limit.SatelliteId, limit.SerialNumber, serialExpiration); err != nil {
|
||||
return rpcstatus.Errorf(rpcstatus.Unauthenticated, "serial number is already used: %+v", err)
|
||||
return rpcstatus.Wrap(rpcstatus.Unauthenticated, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -86,18 +85,22 @@ func (endpoint *Endpoint) VerifyOrder(ctx context.Context, limit *pb.OrderLimit,
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if order.SerialNumber != limit.SerialNumber {
|
||||
return ErrProtocol.New("order serial number changed during upload") // TODO: report rpc status bad message
|
||||
return rpcstatus.Error(rpcstatus.InvalidArgument, "order serial number changed during upload")
|
||||
}
|
||||
// TODO: add check for minimum allocation step
|
||||
if order.Amount < largestOrderAmount {
|
||||
return ErrProtocol.New("order contained smaller amount=%v, previous=%v", order.Amount, largestOrderAmount) // TODO: report rpc status bad message
|
||||
return rpcstatus.Errorf(rpcstatus.InvalidArgument,
|
||||
"order contained smaller amount=%v, previous=%v",
|
||||
order.Amount, largestOrderAmount)
|
||||
}
|
||||
if order.Amount > limit.Limit {
|
||||
return ErrProtocol.New("order exceeded allowed amount=%v, limit=%v", order.Amount, limit.Limit) // TODO: report rpc status bad message
|
||||
return rpcstatus.Errorf(rpcstatus.InvalidArgument,
|
||||
"order exceeded allowed amount=%v, limit=%v",
|
||||
order.Amount, limit.Limit)
|
||||
}
|
||||
|
||||
if err := signing.VerifyUplinkOrderSignature(ctx, limit.UplinkPublicKey, order); err != nil {
|
||||
return ErrVerifyUntrusted.Wrap(err)
|
||||
return rpcstatus.Wrap(rpcstatus.Unauthenticated, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -108,17 +111,17 @@ func (endpoint *Endpoint) VerifyPieceHash(ctx context.Context, limit *pb.OrderLi
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if limit == nil || hash == nil || len(expectedHash) == 0 {
|
||||
return ErrProtocol.New("invalid arguments")
|
||||
return rpcstatus.Error(rpcstatus.InvalidArgument, "invalid arguments")
|
||||
}
|
||||
if limit.PieceId != hash.PieceId {
|
||||
return ErrProtocol.New("piece id changed") // TODO: report rpc status bad message
|
||||
return rpcstatus.Error(rpcstatus.InvalidArgument, "piece id changed")
|
||||
}
|
||||
if !bytes.Equal(hash.Hash, expectedHash) {
|
||||
return ErrProtocol.New("hashes don't match") // TODO: report rpc status bad message
|
||||
return rpcstatus.Error(rpcstatus.InvalidArgument, "hashes don't match")
|
||||
}
|
||||
|
||||
if err := signing.VerifyUplinkPieceHashSignature(ctx, limit.UplinkPublicKey, hash); err != nil {
|
||||
return ErrVerifyUntrusted.New("invalid piece hash signature") // TODO: report rpc status bad message
|
||||
return rpcstatus.Error(rpcstatus.Unauthenticated, "invalid piece hash signature")
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -131,13 +134,15 @@ func (endpoint *Endpoint) VerifyOrderLimitSignature(ctx context.Context, limit *
|
||||
signee, err := endpoint.trust.GetSignee(ctx, limit.SatelliteId)
|
||||
if err != nil {
|
||||
if errs2.IsCanceled(err) {
|
||||
return err
|
||||
return rpcstatus.Wrap(rpcstatus.Canceled, err)
|
||||
}
|
||||
return ErrVerifyUntrusted.New("unable to get signee: %w", err) // TODO: report rpc status bad message
|
||||
return rpcstatus.Wrap(rpcstatus.Unauthenticated,
|
||||
ErrVerifyUntrusted.New("unable to get signee: %w", err))
|
||||
}
|
||||
|
||||
if err := signing.VerifyOrderLimitSignature(ctx, signee, limit); err != nil {
|
||||
return ErrVerifyUntrusted.New("invalid order limit signature: %w", err) // TODO: report rpc status bad message
|
||||
return rpcstatus.Wrap(rpcstatus.Unauthenticated,
|
||||
ErrVerifyUntrusted.New("invalid order limit signature: %w", err))
|
||||
}
|
||||
|
||||
return nil
|
||||
|
Loading…
Reference in New Issue
Block a user