storagenode/piecestore: handle order if provided in first message

we may in the future want to accept orders as part of the initial
request message
(e.g. https://review.dev.storj.io/c/storj/uplink/+/9246).

this change is forward compatible but continues to work with
existing clients.

Change-Id: I475ad50d6cbfee8a1f843383230698e4ef9b9e54
This commit is contained in:
JT Olio 2023-01-04 22:43:10 -05:00 committed by Egon Elbre
parent 7ffa9ef914
commit 8d69837f02

View File

@ -727,6 +727,25 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err
// ensure that we always terminate sending goroutine
defer throttle.Fail(io.EOF)
handleOrder := func(order *pb.Order) error {
if err := endpoint.VerifyOrder(ctx, limit, order, largestOrder.Amount); err != nil {
return err
}
chunkSize := order.Amount - largestOrder.Amount
if err := throttle.Produce(chunkSize); err != nil {
// shouldn't happen since only receiving side is calling Fail
return rpcstatus.Wrap(rpcstatus.Internal, err)
}
largestOrder = *order
return nil
}
if message.Order != nil {
if err = handleOrder(message.Order); err != nil {
return err
}
}
for {
// N.B.: we are only allowed to use message if the returned error is nil. it would be
// a race condition otherwise as Run does not wait for the closure to exit.
@ -749,16 +768,9 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err
return rpcstatus.Error(rpcstatus.InvalidArgument, "expected order as the message")
}
if err := endpoint.VerifyOrder(ctx, limit, message.Order, largestOrder.Amount); err != nil {
if err = handleOrder(message.Order); err != nil {
return err
}
chunkSize := message.Order.Amount - largestOrder.Amount
if err := throttle.Produce(chunkSize); err != nil {
// shouldn't happen since only receiving side is calling Fail
return rpcstatus.Wrap(rpcstatus.Internal, err)
}
largestOrder = *message.Order
}
}()