diff --git a/storagenode/piecestore/endpoint.go b/storagenode/piecestore/endpoint.go index 56d8a1faa..df3c37fc4 100644 --- a/storagenode/piecestore/endpoint.go +++ b/storagenode/piecestore/endpoint.go @@ -407,14 +407,110 @@ func (endpoint *Endpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err err limit: endpoint.config.MinUploadSpeed, } - for { + handleMessage := func(ctx context.Context, message *pb.PieceUploadRequest) (done bool, err error) { + if message.Order != nil { + if err := endpoint.VerifyOrder(ctx, limit, message.Order, largestOrder.Amount); err != nil { + return true, err + } + largestOrder = *message.Order + } + if message.Chunk != nil { + if message.Chunk.Offset != pieceWriter.Size() { + return true, rpcstatus.Error(rpcstatus.InvalidArgument, "chunk out of order") + } + + chunkSize := int64(len(message.Chunk.Data)) + if largestOrder.Amount < pieceWriter.Size()+chunkSize { + // TODO: should we write currently and give a chance for uplink to remedy the situation? + return true, rpcstatus.Errorf(rpcstatus.InvalidArgument, + "not enough allocated, allocated=%v writing=%v", + largestOrder.Amount, pieceWriter.Size()+int64(len(message.Chunk.Data))) + } + + availableSpace -= chunkSize + if availableSpace < 0 { + return true, rpcstatus.Error(rpcstatus.Internal, "out of space") + } + if _, err := pieceWriter.Write(message.Chunk.Data); err != nil { + return true, rpcstatus.Wrap(rpcstatus.Internal, err) + } + } + + if message.Done == nil { + return false, nil + } + + if message.Done.HashAlgorithm != hashAlgorithm { + return true, rpcstatus.Wrap(rpcstatus.Internal, errs.New("Hash algorithm in the first and last upload message are different %s %s", hashAlgorithm, message.Done.HashAlgorithm)) + } + + calculatedHash := pieceWriter.Hash() + if err := endpoint.VerifyPieceHash(ctx, limit, message.Done, calculatedHash); err != nil { + return true, rpcstatus.Wrap(rpcstatus.Internal, err) + } + if message.Done.PieceSize != pieceWriter.Size() { + return true, rpcstatus.Errorf(rpcstatus.InvalidArgument, + "Size of finished piece does not match size declared by uplink! %d != %d", + message.Done.PieceSize, pieceWriter.Size()) + } + + { + info := &pb.PieceHeader{ + Hash: calculatedHash, + HashAlgorithm: hashAlgorithm, + CreationTime: message.Done.Timestamp, + Signature: message.Done.GetSignature(), + OrderLimit: *limit, + } + if err := pieceWriter.Commit(ctx, info); err != nil { + return true, rpcstatus.Wrap(rpcstatus.Internal, err) + } + committed = true + if !limit.PieceExpiration.IsZero() { + err := endpoint.store.SetExpiration(ctx, limit.SatelliteId, limit.PieceId, limit.PieceExpiration) + if err != nil { + return true, rpcstatus.Wrap(rpcstatus.Internal, err) + } + } + } + + storageNodeHash, err := signing.SignPieceHash(ctx, signing.SignerFromFullIdentity(endpoint.ident), &pb.PieceHash{ + PieceId: limit.PieceId, + Hash: calculatedHash, + HashAlgorithm: hashAlgorithm, + PieceSize: pieceWriter.Size(), + Timestamp: time.Now(), + }) + if err != nil { + return true, rpcstatus.Wrap(rpcstatus.Internal, err) + } + + closeErr := rpctimeout.Run(ctx, endpoint.config.StreamOperationTimeout, func(_ context.Context) (err error) { + return stream.SendAndClose(&pb.PieceUploadResponse{ + Done: storageNodeHash, + NodeCertchain: identity.EncodePeerIdentity(endpoint.ident.PeerIdentity())}) + }) + if errs.Is(closeErr, io.EOF) { + closeErr = nil + } + if closeErr != nil { + return true, rpcstatus.Wrap(rpcstatus.Internal, closeErr) + } + return true, nil + } + + // handle any data in the first message we already received. + if done, err := handleMessage(ctx, message); err != nil || done { + return err + } + + for { if err := speedEstimate.EnsureLimit(memory.Size(pieceWriter.Size()), endpoint.isCongested(), time.Now()); err != nil { return rpcstatus.Wrap(rpcstatus.Aborted, err) } // TODO: reuse messages to avoid allocations - // N.B.: we are only allowed to use message if the returned error is nil. it would be // a race condition otherwise as Run does not wait for the closure to exit. err = rpctimeout.Run(ctx, endpoint.config.StreamOperationTimeout, func(_ context.Context) (err error) { @@ -426,101 +522,12 @@ func (endpoint *Endpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err err } else if err != nil { return rpcstatus.Wrap(rpcstatus.Internal, err) } - if message == nil { return rpcstatus.Error(rpcstatus.InvalidArgument, "expected a message") } - if message.Order == nil && message.Chunk == nil && message.Done == nil { - return rpcstatus.Error(rpcstatus.InvalidArgument, "expected a message") - } - if message.Order != nil { - if err := endpoint.VerifyOrder(ctx, limit, message.Order, largestOrder.Amount); err != nil { - return err - } - largestOrder = *message.Order - } - - if message.Chunk != nil { - if message.Chunk.Offset != pieceWriter.Size() { - return rpcstatus.Error(rpcstatus.InvalidArgument, "chunk out of order") - } - - chunkSize := int64(len(message.Chunk.Data)) - if largestOrder.Amount < pieceWriter.Size()+chunkSize { - // TODO: should we write currently and give a chance for uplink to remedy the situation? - return rpcstatus.Errorf(rpcstatus.InvalidArgument, - "not enough allocated, allocated=%v writing=%v", - largestOrder.Amount, pieceWriter.Size()+int64(len(message.Chunk.Data))) - } - - availableSpace -= chunkSize - if availableSpace < 0 { - return rpcstatus.Error(rpcstatus.Internal, "out of space") - } - if _, err := pieceWriter.Write(message.Chunk.Data); err != nil { - return rpcstatus.Wrap(rpcstatus.Internal, err) - } - } - - if message.Done != nil { - if message.Done.HashAlgorithm != hashAlgorithm { - return rpcstatus.Wrap(rpcstatus.Internal, errs.New("Hash algorithm in the first and last upload message are different %s %s", hashAlgorithm, message.Done.HashAlgorithm)) - } - - calculatedHash := pieceWriter.Hash() - if err := endpoint.VerifyPieceHash(ctx, limit, message.Done, calculatedHash); err != nil { - return rpcstatus.Wrap(rpcstatus.Internal, err) - } - if message.Done.PieceSize != pieceWriter.Size() { - return rpcstatus.Errorf(rpcstatus.InvalidArgument, - "Size of finished piece does not match size declared by uplink! %d != %d", - message.Done.PieceSize, pieceWriter.Size()) - } - - { - info := &pb.PieceHeader{ - Hash: calculatedHash, - HashAlgorithm: hashAlgorithm, - CreationTime: message.Done.Timestamp, - Signature: message.Done.GetSignature(), - OrderLimit: *limit, - } - if err := pieceWriter.Commit(ctx, info); err != nil { - return rpcstatus.Wrap(rpcstatus.Internal, err) - } - committed = true - if !limit.PieceExpiration.IsZero() { - err := endpoint.store.SetExpiration(ctx, limit.SatelliteId, limit.PieceId, limit.PieceExpiration) - if err != nil { - return rpcstatus.Wrap(rpcstatus.Internal, err) - } - } - } - - storageNodeHash, err := signing.SignPieceHash(ctx, signing.SignerFromFullIdentity(endpoint.ident), &pb.PieceHash{ - PieceId: limit.PieceId, - Hash: calculatedHash, - HashAlgorithm: hashAlgorithm, - PieceSize: pieceWriter.Size(), - Timestamp: time.Now(), - }) - if err != nil { - return rpcstatus.Wrap(rpcstatus.Internal, err) - } - - closeErr := rpctimeout.Run(ctx, endpoint.config.StreamOperationTimeout, func(_ context.Context) (err error) { - return stream.SendAndClose(&pb.PieceUploadResponse{ - Done: storageNodeHash, - NodeCertchain: identity.EncodePeerIdentity(endpoint.ident.PeerIdentity())}) - }) - if errs.Is(closeErr, io.EOF) { - closeErr = nil - } - if closeErr != nil { - return rpcstatus.Wrap(rpcstatus.Internal, closeErr) - } - return nil + if done, err := handleMessage(ctx, message); err != nil || done { + return err } } }