storagenode/piecestore: handle upload write if provided in first message

we may in the future want to accept writes and commits as part of the
initial request message, just like
https://review.dev.storj.io/c/storj/storj/+/9245

this change is forward compatible but continues to work with
existing clients.

Change-Id: Ifd3ac8606d498a43bb35d0a3751859656e1e8995
This commit is contained in:
JT Olio 2023-01-04 22:43:10 -05:00 committed by Storj Robot
parent 1a2ad602d7
commit 529e3674e4

View File

@ -407,14 +407,110 @@ func (endpoint *Endpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err err
limit: endpoint.config.MinUploadSpeed,
}
for {
handleMessage := func(ctx context.Context, message *pb.PieceUploadRequest) (done bool, err error) {
if message.Order != nil {
if err := endpoint.VerifyOrder(ctx, limit, message.Order, largestOrder.Amount); err != nil {
return true, err
}
largestOrder = *message.Order
}
if message.Chunk != nil {
if message.Chunk.Offset != pieceWriter.Size() {
return true, rpcstatus.Error(rpcstatus.InvalidArgument, "chunk out of order")
}
chunkSize := int64(len(message.Chunk.Data))
if largestOrder.Amount < pieceWriter.Size()+chunkSize {
// TODO: should we write currently and give a chance for uplink to remedy the situation?
return true, rpcstatus.Errorf(rpcstatus.InvalidArgument,
"not enough allocated, allocated=%v writing=%v",
largestOrder.Amount, pieceWriter.Size()+int64(len(message.Chunk.Data)))
}
availableSpace -= chunkSize
if availableSpace < 0 {
return true, rpcstatus.Error(rpcstatus.Internal, "out of space")
}
if _, err := pieceWriter.Write(message.Chunk.Data); err != nil {
return true, rpcstatus.Wrap(rpcstatus.Internal, err)
}
}
if message.Done == nil {
return false, nil
}
if message.Done.HashAlgorithm != hashAlgorithm {
return true, rpcstatus.Wrap(rpcstatus.Internal, errs.New("Hash algorithm in the first and last upload message are different %s %s", hashAlgorithm, message.Done.HashAlgorithm))
}
calculatedHash := pieceWriter.Hash()
if err := endpoint.VerifyPieceHash(ctx, limit, message.Done, calculatedHash); err != nil {
return true, rpcstatus.Wrap(rpcstatus.Internal, err)
}
if message.Done.PieceSize != pieceWriter.Size() {
return true, rpcstatus.Errorf(rpcstatus.InvalidArgument,
"Size of finished piece does not match size declared by uplink! %d != %d",
message.Done.PieceSize, pieceWriter.Size())
}
{
info := &pb.PieceHeader{
Hash: calculatedHash,
HashAlgorithm: hashAlgorithm,
CreationTime: message.Done.Timestamp,
Signature: message.Done.GetSignature(),
OrderLimit: *limit,
}
if err := pieceWriter.Commit(ctx, info); err != nil {
return true, rpcstatus.Wrap(rpcstatus.Internal, err)
}
committed = true
if !limit.PieceExpiration.IsZero() {
err := endpoint.store.SetExpiration(ctx, limit.SatelliteId, limit.PieceId, limit.PieceExpiration)
if err != nil {
return true, rpcstatus.Wrap(rpcstatus.Internal, err)
}
}
}
storageNodeHash, err := signing.SignPieceHash(ctx, signing.SignerFromFullIdentity(endpoint.ident), &pb.PieceHash{
PieceId: limit.PieceId,
Hash: calculatedHash,
HashAlgorithm: hashAlgorithm,
PieceSize: pieceWriter.Size(),
Timestamp: time.Now(),
})
if err != nil {
return true, rpcstatus.Wrap(rpcstatus.Internal, err)
}
closeErr := rpctimeout.Run(ctx, endpoint.config.StreamOperationTimeout, func(_ context.Context) (err error) {
return stream.SendAndClose(&pb.PieceUploadResponse{
Done: storageNodeHash,
NodeCertchain: identity.EncodePeerIdentity(endpoint.ident.PeerIdentity())})
})
if errs.Is(closeErr, io.EOF) {
closeErr = nil
}
if closeErr != nil {
return true, rpcstatus.Wrap(rpcstatus.Internal, closeErr)
}
return true, nil
}
// handle any data in the first message we already received.
if done, err := handleMessage(ctx, message); err != nil || done {
return err
}
for {
if err := speedEstimate.EnsureLimit(memory.Size(pieceWriter.Size()), endpoint.isCongested(), time.Now()); err != nil {
return rpcstatus.Wrap(rpcstatus.Aborted, err)
}
// TODO: reuse messages to avoid allocations
// N.B.: we are only allowed to use message if the returned error is nil. it would be
// a race condition otherwise as Run does not wait for the closure to exit.
err = rpctimeout.Run(ctx, endpoint.config.StreamOperationTimeout, func(_ context.Context) (err error) {
@ -426,101 +522,12 @@ func (endpoint *Endpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err err
} else if err != nil {
return rpcstatus.Wrap(rpcstatus.Internal, err)
}
if message == nil {
return rpcstatus.Error(rpcstatus.InvalidArgument, "expected a message")
}
if message.Order == nil && message.Chunk == nil && message.Done == nil {
return rpcstatus.Error(rpcstatus.InvalidArgument, "expected a message")
}
if message.Order != nil {
if err := endpoint.VerifyOrder(ctx, limit, message.Order, largestOrder.Amount); err != nil {
return err
}
largestOrder = *message.Order
}
if message.Chunk != nil {
if message.Chunk.Offset != pieceWriter.Size() {
return rpcstatus.Error(rpcstatus.InvalidArgument, "chunk out of order")
}
chunkSize := int64(len(message.Chunk.Data))
if largestOrder.Amount < pieceWriter.Size()+chunkSize {
// TODO: should we write currently and give a chance for uplink to remedy the situation?
return rpcstatus.Errorf(rpcstatus.InvalidArgument,
"not enough allocated, allocated=%v writing=%v",
largestOrder.Amount, pieceWriter.Size()+int64(len(message.Chunk.Data)))
}
availableSpace -= chunkSize
if availableSpace < 0 {
return rpcstatus.Error(rpcstatus.Internal, "out of space")
}
if _, err := pieceWriter.Write(message.Chunk.Data); err != nil {
return rpcstatus.Wrap(rpcstatus.Internal, err)
}
}
if message.Done != nil {
if message.Done.HashAlgorithm != hashAlgorithm {
return rpcstatus.Wrap(rpcstatus.Internal, errs.New("Hash algorithm in the first and last upload message are different %s %s", hashAlgorithm, message.Done.HashAlgorithm))
}
calculatedHash := pieceWriter.Hash()
if err := endpoint.VerifyPieceHash(ctx, limit, message.Done, calculatedHash); err != nil {
return rpcstatus.Wrap(rpcstatus.Internal, err)
}
if message.Done.PieceSize != pieceWriter.Size() {
return rpcstatus.Errorf(rpcstatus.InvalidArgument,
"Size of finished piece does not match size declared by uplink! %d != %d",
message.Done.PieceSize, pieceWriter.Size())
}
{
info := &pb.PieceHeader{
Hash: calculatedHash,
HashAlgorithm: hashAlgorithm,
CreationTime: message.Done.Timestamp,
Signature: message.Done.GetSignature(),
OrderLimit: *limit,
}
if err := pieceWriter.Commit(ctx, info); err != nil {
return rpcstatus.Wrap(rpcstatus.Internal, err)
}
committed = true
if !limit.PieceExpiration.IsZero() {
err := endpoint.store.SetExpiration(ctx, limit.SatelliteId, limit.PieceId, limit.PieceExpiration)
if err != nil {
return rpcstatus.Wrap(rpcstatus.Internal, err)
}
}
}
storageNodeHash, err := signing.SignPieceHash(ctx, signing.SignerFromFullIdentity(endpoint.ident), &pb.PieceHash{
PieceId: limit.PieceId,
Hash: calculatedHash,
HashAlgorithm: hashAlgorithm,
PieceSize: pieceWriter.Size(),
Timestamp: time.Now(),
})
if err != nil {
return rpcstatus.Wrap(rpcstatus.Internal, err)
}
closeErr := rpctimeout.Run(ctx, endpoint.config.StreamOperationTimeout, func(_ context.Context) (err error) {
return stream.SendAndClose(&pb.PieceUploadResponse{
Done: storageNodeHash,
NodeCertchain: identity.EncodePeerIdentity(endpoint.ident.PeerIdentity())})
})
if errs.Is(closeErr, io.EOF) {
closeErr = nil
}
if closeErr != nil {
return rpcstatus.Wrap(rpcstatus.Internal, closeErr)
}
return nil
if done, err := handleMessage(ctx, message); err != nil || done {
return err
}
}
}