piecestore: pipeline chunks with orders (#2451)

This commit is contained in:
JT Olio 2019-07-08 08:26:19 -06:00 committed by Egon Elbre
parent 88732188cb
commit 65aa8f227f
2 changed files with 12 additions and 18 deletions

View File

@ -244,18 +244,18 @@ func (endpoint *Endpoint) Upload(stream pb.Piecestore_UploadServer) (err error)
if message == nil {
return ErrProtocol.New("expected a message") // TODO: report grpc status bad message
}
if message.Order == nil && message.Chunk == nil && message.Done == nil {
return ErrProtocol.New("expected a message") // TODO: report grpc status bad message
}
switch {
default:
return ErrProtocol.New("message didn't contain any of order, chunk or done") // TODO: report grpc status bad message
case message.Order != nil:
if message.Order != nil {
if err := endpoint.VerifyOrder(ctx, peer, limit, message.Order, largestOrder.Amount); err != nil {
return err
}
largestOrder = *message.Order
}
case message.Chunk != nil:
if message.Chunk != nil {
if message.Chunk.Offset != pieceWriter.Size() {
return ErrProtocol.New("chunk out of order") // TODO: report grpc status bad message
}
@ -278,8 +278,9 @@ func (endpoint *Endpoint) Upload(stream pb.Piecestore_UploadServer) (err error)
if _, err := pieceWriter.Write(message.Chunk.Data); err != nil {
return ErrInternal.Wrap(err) // TODO: report grpc status internal server error
}
}
case message.Done != nil:
if message.Done != nil {
expectedHash := pieceWriter.Hash()
if err := endpoint.VerifyPieceHash(ctx, peer, limit, message.Done, expectedHash); err != nil {
return err // TODO: report grpc status internal server error

View File

@ -98,7 +98,7 @@ func (client *Upload) Write(data []byte) (written int, err error) {
}
// if we already encountered an error, keep returning it
if client.sendError != nil {
return 0, ErrProtocol.Wrap(client.sendError)
return 0, client.sendError
}
fullData := data
@ -126,25 +126,18 @@ func (client *Upload) Write(data []byte) (written int, err error) {
return written, ErrInternal.Wrap(err)
}
// send signed order so that storagenode will accept data
// send signed order + data
err = client.stream.Send(&pb.PieceUploadRequest{
Order: order,
})
if err != nil {
client.sendError = err
return written, ErrProtocol.Wrap(client.sendError)
}
// send data as the next message
err = client.stream.Send(&pb.PieceUploadRequest{
Chunk: &pb.PieceUploadRequest_Chunk{
Offset: client.offset,
Data: sendData,
},
})
if err != nil {
err = ErrProtocol.Wrap(err)
client.sendError = err
return written, ErrProtocol.Wrap(client.sendError)
return written, err
}
// update our offset