diff --git a/storagenode/piecestore/endpoint.go b/storagenode/piecestore/endpoint.go index a38ade86f..63e3ed429 100644 --- a/storagenode/piecestore/endpoint.go +++ b/storagenode/piecestore/endpoint.go @@ -244,18 +244,18 @@ func (endpoint *Endpoint) Upload(stream pb.Piecestore_UploadServer) (err error) if message == nil { return ErrProtocol.New("expected a message") // TODO: report grpc status bad message } + if message.Order == nil && message.Chunk == nil && message.Done == nil { + return ErrProtocol.New("expected a message") // TODO: report grpc status bad message + } - switch { - default: - return ErrProtocol.New("message didn't contain any of order, chunk or done") // TODO: report grpc status bad message - - case message.Order != nil: + if message.Order != nil { if err := endpoint.VerifyOrder(ctx, peer, limit, message.Order, largestOrder.Amount); err != nil { return err } largestOrder = *message.Order + } - case message.Chunk != nil: + if message.Chunk != nil { if message.Chunk.Offset != pieceWriter.Size() { return ErrProtocol.New("chunk out of order") // TODO: report grpc status bad message } @@ -278,8 +278,9 @@ func (endpoint *Endpoint) Upload(stream pb.Piecestore_UploadServer) (err error) if _, err := pieceWriter.Write(message.Chunk.Data); err != nil { return ErrInternal.Wrap(err) // TODO: report grpc status internal server error } + } - case message.Done != nil: + if message.Done != nil { expectedHash := pieceWriter.Hash() if err := endpoint.VerifyPieceHash(ctx, peer, limit, message.Done, expectedHash); err != nil { return err // TODO: report grpc status internal server error diff --git a/uplink/piecestore/upload.go b/uplink/piecestore/upload.go index 92af8ac8a..f0952ec6f 100644 --- a/uplink/piecestore/upload.go +++ b/uplink/piecestore/upload.go @@ -98,7 +98,7 @@ func (client *Upload) Write(data []byte) (written int, err error) { } // if we already encountered an error, keep returning it if client.sendError != nil { - return 0, ErrProtocol.Wrap(client.sendError) + return 0, client.sendError } fullData := data @@ -126,25 +126,18 @@ func (client *Upload) Write(data []byte) (written int, err error) { return written, ErrInternal.Wrap(err) } - // send signed order so that storagenode will accept data + // send signed order + data err = client.stream.Send(&pb.PieceUploadRequest{ Order: order, - }) - if err != nil { - client.sendError = err - return written, ErrProtocol.Wrap(client.sendError) - } - - // send data as the next message - err = client.stream.Send(&pb.PieceUploadRequest{ Chunk: &pb.PieceUploadRequest_Chunk{ Offset: client.offset, Data: sendData, }, }) if err != nil { + err = ErrProtocol.Wrap(err) client.sendError = err - return written, ErrProtocol.Wrap(client.sendError) + return written, err } // update our offset