storagenode/piecestore + uplink/piecestore: return PieceHash and original OrderLimit during GET_REPAIR (#2775)

This commit is contained in:
Bill Thorp 2019-08-26 14:57:41 -04:00 committed by Maximillian von Briesen
parent 977472ed32
commit a250551b6d
8 changed files with 255 additions and 47 deletions

View File

@ -10,6 +10,8 @@ import (
proto "github.com/gogo/protobuf/proto"
_ "github.com/golang/protobuf/ptypes/timestamp"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
math "math"
time "time"
)
@ -321,6 +323,8 @@ func (m *PieceDownloadRequest_Chunk) GetChunkSize() int64 {
type PieceDownloadResponse struct {
Chunk *PieceDownloadResponse_Chunk `protobuf:"bytes,1,opt,name=chunk,proto3" json:"chunk,omitempty"`
Hash *PieceHash `protobuf:"bytes,2,opt,name=hash,proto3" json:"hash,omitempty"`
Limit *OrderLimit `protobuf:"bytes,3,opt,name=limit,proto3" json:"limit,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -357,6 +361,20 @@ func (m *PieceDownloadResponse) GetChunk() *PieceDownloadResponse_Chunk {
return nil
}
func (m *PieceDownloadResponse) GetHash() *PieceHash {
if m != nil {
return m.Hash
}
return nil
}
func (m *PieceDownloadResponse) GetLimit() *OrderLimit {
if m != nil {
return m.Limit
}
return nil
}
// Chunk response for download request
type PieceDownloadResponse_Chunk struct {
Offset int64 `protobuf:"varint,1,opt,name=offset,proto3" json:"offset,omitempty"`
@ -647,47 +665,48 @@ func init() {
func init() { proto.RegisterFile("piecestore2.proto", fileDescriptor_23ff32dd550c2439) }
var fileDescriptor_23ff32dd550c2439 = []byte{
// 634 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x54, 0x4f, 0x6f, 0x12, 0x4f,
0x18, 0x66, 0x29, 0x90, 0xf2, 0xc2, 0x12, 0x3a, 0xbf, 0x5f, 0x0d, 0x6e, 0x54, 0xea, 0x6a, 0x95,
0x8b, 0xdb, 0x4a, 0x4f, 0x9a, 0x5a, 0x23, 0x92, 0x46, 0x63, 0x9b, 0x36, 0x63, 0xdb, 0x83, 0x17,
0x32, 0xc0, 0x2c, 0x6c, 0x84, 0x9d, 0x75, 0x67, 0xd0, 0xa4, 0x5f, 0xc1, 0x8b, 0x1f, 0xc9, 0x78,
0xf2, 0x53, 0xe8, 0xc1, 0x8f, 0xe1, 0xc5, 0xcc, 0xcc, 0x2e, 0xb0, 0x40, 0x21, 0x35, 0xf1, 0x04,
0xef, 0xff, 0x67, 0x9e, 0xf7, 0x79, 0x17, 0x36, 0x02, 0x8f, 0x76, 0x28, 0x17, 0x2c, 0xa4, 0x75,
0x27, 0x08, 0x99, 0x60, 0x08, 0x26, 0x2e, 0x0b, 0x7a, 0xac, 0xc7, 0xb4, 0xdf, 0xaa, 0xf6, 0x18,
0xeb, 0x0d, 0xe8, 0x8e, 0xb2, 0xda, 0x23, 0x77, 0x47, 0x78, 0x43, 0xca, 0x05, 0x19, 0x06, 0x51,
0x42, 0x91, 0x85, 0x5d, 0x1a, 0x72, 0x6d, 0xd9, 0xbf, 0x0d, 0x40, 0xa7, 0xb2, 0xd3, 0x79, 0x30,
0x60, 0xa4, 0x8b, 0xe9, 0x87, 0x11, 0xe5, 0x02, 0xd5, 0x20, 0x3b, 0xf0, 0x86, 0x9e, 0xa8, 0x18,
0x5b, 0x46, 0xad, 0x50, 0x47, 0x4e, 0x54, 0x74, 0x22, 0x7f, 0x8e, 0x64, 0x04, 0xeb, 0x04, 0x74,
0x0f, 0xb2, 0x2a, 0x56, 0x49, 0xab, 0x4c, 0x33, 0x91, 0x89, 0x75, 0x0c, 0x3d, 0x85, 0x6c, 0xa7,
0x3f, 0xf2, 0xdf, 0x57, 0xd6, 0x54, 0xd2, 0x7d, 0x67, 0x02, 0xde, 0x99, 0x9f, 0xee, 0xbc, 0x94,
0xb9, 0x58, 0x97, 0xa0, 0x6d, 0xc8, 0x74, 0x99, 0x4f, 0x2b, 0x19, 0x55, 0xba, 0x11, 0xf7, 0x57,
0x65, 0xaf, 0x08, 0xef, 0x63, 0x15, 0xb6, 0xf6, 0x20, 0xab, 0xca, 0xd0, 0x0d, 0xc8, 0x31, 0xd7,
0xe5, 0x54, 0x63, 0x5f, 0xc3, 0x91, 0x85, 0x10, 0x64, 0xba, 0x44, 0x10, 0x85, 0xb3, 0x88, 0xd5,
0x7f, 0x7b, 0x1f, 0xfe, 0x4b, 0x8c, 0xe7, 0x01, 0xf3, 0x39, 0x1d, 0x8f, 0x34, 0x96, 0x8e, 0xb4,
0x7f, 0x19, 0xf0, 0xbf, 0xf2, 0x35, 0xd9, 0x27, 0xff, 0x1f, 0xb2, 0xb7, 0x9f, 0x64, 0xef, 0xc1,
0x1c, 0x7b, 0x33, 0xf3, 0x13, 0xfc, 0x59, 0x07, 0xab, 0x88, 0xb9, 0x0d, 0xa0, 0x32, 0x5b, 0xdc,
0xbb, 0xa4, 0x0a, 0xc8, 0x1a, 0xce, 0x2b, 0xcf, 0x5b, 0xef, 0x92, 0xda, 0x9f, 0x0d, 0xd8, 0x9c,
0x99, 0x12, 0xd1, 0xf4, 0x2c, 0xc6, 0xa5, 0x9f, 0xf9, 0x70, 0x09, 0x2e, 0x5d, 0x91, 0x04, 0xf6,
0x57, 0x1b, 0x3b, 0x88, 0xe4, 0xda, 0xa4, 0x03, 0x2a, 0xe8, 0xb5, 0x09, 0xb7, 0x37, 0xa3, 0x8d,
0xc7, 0xf5, 0x1a, 0x98, 0x1d, 0x82, 0x89, 0xa9, 0x20, 0x9e, 0x1f, 0x77, 0x7c, 0x0d, 0x66, 0x27,
0xa4, 0x44, 0x78, 0xcc, 0x6f, 0x75, 0x89, 0x88, 0xb5, 0x60, 0x39, 0xfa, 0xbc, 0x9c, 0xf8, 0xbc,
0x9c, 0xb3, 0xf8, 0xbc, 0x1a, 0xeb, 0xdf, 0x7f, 0x54, 0x53, 0x5f, 0x7e, 0x56, 0x0d, 0x5c, 0x8c,
0x4b, 0x9b, 0x44, 0x50, 0xf9, 0x3c, 0xd7, 0x1b, 0x88, 0x68, 0xc9, 0x45, 0x1c, 0x59, 0x76, 0x19,
0x4a, 0xf1, 0xcc, 0x08, 0xc5, 0xb7, 0x34, 0x14, 0xb4, 0xc8, 0x28, 0x91, 0x8b, 0x3f, 0x82, 0x92,
0xcb, 0xc2, 0x21, 0x11, 0xad, 0x8f, 0x34, 0xe4, 0x1e, 0xf3, 0x15, 0x8a, 0x52, 0x7d, 0x7b, 0x8e,
0x69, 0x5d, 0xe0, 0x1c, 0xaa, 0xec, 0x0b, 0x9d, 0x8c, 0x4d, 0x77, 0xda, 0x94, 0x74, 0xf6, 0x09,
0xef, 0xc7, 0x74, 0xca, 0xff, 0x89, 0x67, 0xca, 0x0f, 0x45, 0x24, 0xb1, 0x6b, 0x3e, 0x53, 0x06,
0xd1, 0x2d, 0xc8, 0x73, 0xaf, 0xe7, 0x13, 0x31, 0x0a, 0xf5, 0xb1, 0x16, 0xf1, 0xc4, 0x81, 0x9e,
0x40, 0x41, 0xed, 0xa4, 0xa5, 0xf7, 0x94, 0xbd, 0x6a, 0x4f, 0x8d, 0x8c, 0x6c, 0x8f, 0x81, 0x8d,
0x3d, 0xf6, 0x23, 0x30, 0x13, 0xef, 0x42, 0x26, 0xe4, 0x0f, 0x4f, 0xf0, 0xf1, 0x8b, 0xb3, 0xd6,
0xc5, 0x6e, 0x39, 0x35, 0x6d, 0x3e, 0x2e, 0x1b, 0xf5, 0xaf, 0x69, 0x80, 0xd3, 0x31, 0x3d, 0xe8,
0x18, 0x72, 0xfa, 0xba, 0xd1, 0x9d, 0xe5, 0x5f, 0x1d, 0xab, 0x7a, 0x65, 0x3c, 0x5a, 0x4f, 0xaa,
0x66, 0xa0, 0x73, 0x58, 0x8f, 0x55, 0x8d, 0xb6, 0x56, 0x1d, 0xa2, 0x75, 0x77, 0xe5, 0x49, 0xc8,
0xa6, 0xbb, 0x06, 0x7a, 0x03, 0x39, 0xad, 0xc8, 0x05, 0x28, 0x13, 0x52, 0x5f, 0x80, 0x72, 0x46,
0xca, 0x29, 0xf4, 0x1c, 0x72, 0x5a, 0x58, 0xe8, 0xe6, 0x74, 0x72, 0x42, 0xe0, 0x96, 0xb5, 0x28,
0xa4, 0x5b, 0x34, 0x32, 0xef, 0xd2, 0x41, 0xbb, 0x9d, 0x53, 0xcb, 0xdf, 0xfb, 0x13, 0x00, 0x00,
0xff, 0xff, 0x64, 0x65, 0x86, 0x27, 0x7d, 0x06, 0x00, 0x00,
// 648 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x54, 0xcb, 0x6e, 0xd3, 0x4c,
0x14, 0x8e, 0x73, 0x53, 0x7b, 0x6a, 0x57, 0xed, 0xfc, 0x7f, 0x51, 0xb0, 0x80, 0x14, 0x43, 0xa1,
0x1b, 0xdc, 0x92, 0xae, 0x40, 0xa5, 0x88, 0x52, 0x55, 0x20, 0x5a, 0xb5, 0x1a, 0xda, 0x2e, 0xd8,
0x44, 0x93, 0x64, 0x9c, 0x58, 0x24, 0x1e, 0xe3, 0x99, 0x80, 0xd4, 0xa7, 0xe0, 0x91, 0x10, 0x2b,
0x1e, 0x02, 0xc1, 0x82, 0xc7, 0x60, 0x83, 0xe6, 0xe2, 0x24, 0x4e, 0xd2, 0x44, 0x45, 0x62, 0x95,
0xcc, 0xb9, 0x9f, 0xef, 0xfb, 0x8e, 0x61, 0x35, 0x0e, 0x69, 0x93, 0x72, 0xc1, 0x12, 0x5a, 0xf3,
0xe3, 0x84, 0x09, 0x86, 0x60, 0x68, 0x72, 0xa1, 0xcd, 0xda, 0x4c, 0xdb, 0xdd, 0x6a, 0x9b, 0xb1,
0x76, 0x97, 0x6e, 0xa9, 0x57, 0xa3, 0x1f, 0x6c, 0x89, 0xb0, 0x47, 0xb9, 0x20, 0xbd, 0xd8, 0x04,
0xd8, 0x2c, 0x69, 0xd1, 0x84, 0xeb, 0x97, 0xf7, 0xdb, 0x02, 0x74, 0x2a, 0x2b, 0x9d, 0xc7, 0x5d,
0x46, 0x5a, 0x98, 0x7e, 0xe8, 0x53, 0x2e, 0xd0, 0x26, 0x94, 0xba, 0x61, 0x2f, 0x14, 0x15, 0x6b,
0xdd, 0xda, 0x5c, 0xaa, 0x21, 0xdf, 0x24, 0x9d, 0xc8, 0x9f, 0x23, 0xe9, 0xc1, 0x3a, 0x00, 0xdd,
0x83, 0x92, 0xf2, 0x55, 0xf2, 0x2a, 0xd2, 0xc9, 0x44, 0x62, 0xed, 0x43, 0x4f, 0xa1, 0xd4, 0xec,
0xf4, 0xa3, 0xf7, 0x95, 0x82, 0x0a, 0xba, 0xef, 0x0f, 0x87, 0xf7, 0x27, 0xbb, 0xfb, 0x2f, 0x65,
0x2c, 0xd6, 0x29, 0x68, 0x03, 0x8a, 0x2d, 0x16, 0xd1, 0x4a, 0x51, 0xa5, 0xae, 0xa6, 0xf5, 0x55,
0xda, 0x2b, 0xc2, 0x3b, 0x58, 0xb9, 0xdd, 0x1d, 0x28, 0xa9, 0x34, 0x74, 0x03, 0xca, 0x2c, 0x08,
0x38, 0xd5, 0xb3, 0x17, 0xb0, 0x79, 0x21, 0x04, 0xc5, 0x16, 0x11, 0x44, 0xcd, 0x69, 0x63, 0xf5,
0xdf, 0xdb, 0x85, 0xff, 0x32, 0xed, 0x79, 0xcc, 0x22, 0x4e, 0x07, 0x2d, 0xad, 0x99, 0x2d, 0xbd,
0x5f, 0x16, 0xfc, 0xaf, 0x6c, 0x07, 0xec, 0x53, 0xf4, 0x0f, 0xd1, 0xdb, 0xcd, 0xa2, 0xf7, 0x60,
0x02, 0xbd, 0xb1, 0xfe, 0x19, 0xfc, 0xdc, 0xbd, 0x79, 0xc0, 0xdc, 0x06, 0x50, 0x91, 0x75, 0x1e,
0x5e, 0x52, 0x35, 0x48, 0x01, 0x2f, 0x2a, 0xcb, 0xdb, 0xf0, 0x92, 0x7a, 0xdf, 0x2d, 0x58, 0x1b,
0xeb, 0x62, 0x60, 0x7a, 0x96, 0xce, 0xa5, 0xd7, 0x7c, 0x38, 0x63, 0x2e, 0x9d, 0x31, 0x41, 0x6c,
0x87, 0xf0, 0x8e, 0x59, 0x7d, 0x1a, 0xca, 0xd2, 0x3d, 0x04, 0xb3, 0x30, 0x07, 0xcc, 0xbf, 0x93,
0xc0, 0x9e, 0xd1, 0xff, 0x01, 0xed, 0x52, 0x41, 0xaf, 0xcd, 0xa0, 0xb7, 0x66, 0x24, 0x94, 0xe6,
0xeb, 0x4d, 0xbd, 0x04, 0x1c, 0x4c, 0x05, 0x09, 0xa3, 0xb4, 0xe2, 0x6b, 0x70, 0x9a, 0x09, 0x25,
0x22, 0x64, 0x51, 0xbd, 0x45, 0x44, 0x2a, 0x2e, 0xd7, 0xd7, 0xf7, 0xea, 0xa7, 0xf7, 0xea, 0x9f,
0xa5, 0xf7, 0xba, 0xbf, 0xf0, 0xed, 0x47, 0x35, 0xf7, 0xf9, 0x67, 0xd5, 0xc2, 0x76, 0x9a, 0x7a,
0x40, 0x04, 0x95, 0xeb, 0x05, 0x61, 0x57, 0x18, 0xd5, 0xd8, 0xd8, 0xbc, 0xbc, 0x15, 0x58, 0x4e,
0x7b, 0x9a, 0x29, 0xbe, 0xe6, 0x61, 0x49, 0xe3, 0x49, 0x89, 0x54, 0xd2, 0x11, 0x2c, 0x07, 0x2c,
0xe9, 0x11, 0x51, 0xff, 0x48, 0x13, 0x1e, 0xb2, 0x48, 0x4d, 0xb1, 0x5c, 0xdb, 0x98, 0xa0, 0x4e,
0x27, 0xf8, 0x87, 0x2a, 0xfa, 0x42, 0x07, 0x63, 0x27, 0x18, 0x7d, 0x4a, 0x38, 0x07, 0x04, 0xda,
0x86, 0xad, 0xd1, 0x35, 0xe5, 0x97, 0xc7, 0xb0, 0x76, 0xcd, 0x35, 0xa5, 0x13, 0xdd, 0x82, 0x45,
0x1e, 0xb6, 0x23, 0x22, 0xfa, 0x89, 0xbe, 0x7e, 0x1b, 0x0f, 0x0d, 0xe8, 0x09, 0x2c, 0x29, 0x4e,
0xea, 0x9a, 0xa7, 0xd2, 0x55, 0x3c, 0xed, 0x17, 0x65, 0x79, 0x0c, 0x6c, 0x60, 0xf1, 0x1e, 0x81,
0x93, 0xd9, 0x0b, 0x39, 0xb0, 0x78, 0x78, 0x82, 0x8f, 0x5f, 0x9c, 0xd5, 0x2f, 0xb6, 0x57, 0x72,
0xa3, 0xcf, 0xc7, 0x2b, 0x56, 0xed, 0x4b, 0x1e, 0xe0, 0x74, 0x00, 0x0f, 0x3a, 0x86, 0xb2, 0xfe,
0x5c, 0xa0, 0x3b, 0xb3, 0x3f, 0x63, 0x6e, 0xf5, 0x4a, 0xbf, 0xa1, 0x27, 0xb7, 0x69, 0xa1, 0x73,
0x58, 0x48, 0xcf, 0x04, 0xad, 0xcf, 0xbb, 0x6c, 0xf7, 0xee, 0xdc, 0x1b, 0x93, 0x45, 0xb7, 0x2d,
0xf4, 0x06, 0xca, 0x5a, 0x91, 0x53, 0xa6, 0xcc, 0x48, 0x7d, 0xca, 0x94, 0x63, 0x52, 0xce, 0xa1,
0xe7, 0x50, 0xd6, 0xc2, 0x42, 0x37, 0x47, 0x83, 0x33, 0x02, 0x77, 0xdd, 0x69, 0x2e, 0x5d, 0x62,
0xbf, 0xf8, 0x2e, 0x1f, 0x37, 0x1a, 0x65, 0x45, 0xfe, 0xce, 0x9f, 0x00, 0x00, 0x00, 0xff, 0xff,
0x8a, 0x5d, 0xb7, 0x8b, 0xce, 0x06, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
@ -807,6 +826,23 @@ type PiecestoreServer interface {
Retain(context.Context, *RetainRequest) (*RetainResponse, error)
}
// UnimplementedPiecestoreServer can be embedded to have forward compatible implementations.
type UnimplementedPiecestoreServer struct {
}
func (*UnimplementedPiecestoreServer) Upload(srv Piecestore_UploadServer) error {
return status.Errorf(codes.Unimplemented, "method Upload not implemented")
}
func (*UnimplementedPiecestoreServer) Download(srv Piecestore_DownloadServer) error {
return status.Errorf(codes.Unimplemented, "method Download not implemented")
}
func (*UnimplementedPiecestoreServer) Delete(ctx context.Context, req *PieceDeleteRequest) (*PieceDeleteResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Delete not implemented")
}
func (*UnimplementedPiecestoreServer) Retain(ctx context.Context, req *RetainRequest) (*RetainResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Retain not implemented")
}
func RegisterPiecestoreServer(s *grpc.Server, srv PiecestoreServer) {
s.RegisterService(&_Piecestore_serviceDesc, srv)
}

View File

@ -74,6 +74,8 @@ message PieceDownloadResponse {
bytes data = 2;
}
Chunk chunk = 1;
orders.PieceHash hash = 2;
orders.OrderLimit limit = 3;
}
message PieceDeleteRequest {

View File

@ -5067,6 +5067,16 @@
"id": 1,
"name": "chunk",
"type": "Chunk"
},
{
"id": 2,
"name": "hash",
"type": "orders.PieceHash"
},
{
"id": 3,
"name": "limit",
"type": "orders.OrderLimit"
}
],
"messages": [

View File

@ -56,6 +56,9 @@ const (
v1PieceHeaderFramingSize = 2
)
// BadFormatVersion is returned when a storage format cannot support the request function
var BadFormatVersion = errs.Class("Incompatible storage format version")
// Writer implements a piece writer that writes content to blob store and calculates a hash.
type Writer struct {
hash hash.Hash
@ -229,7 +232,7 @@ func (r *Reader) StorageFormatVersion() storage.FormatVersion {
// of performance we need to understand why and how often that would happen.)
func (r *Reader) GetPieceHeader() (*pb.PieceHeader, error) {
if r.formatVersion < filestore.FormatV1 {
return nil, Error.New("Can't get piece header from storage format V0 reader")
return nil, BadFormatVersion.New("Can't get piece header from storage format V0 reader")
}
if r.pos != 0 {
return nil, Error.New("GetPieceHeader called when not at the beginning of the blob stream")

View File

@ -409,6 +409,45 @@ func (endpoint *Endpoint) Download(stream pb.Piecestore_DownloadServer) (err err
}
}()
// for repair traffic, send along the PieceHash and original OrderLimit for validation
// before sending the piece itself
if message.Limit.Action == pb.PieceAction_GET_REPAIR {
var orderLimit pb.OrderLimit
var pieceHash pb.PieceHash
if pieceReader.StorageFormatVersion() == 0 {
// v0 stores this information in SQL
info, err := endpoint.store.GetV0PieceInfoDB().Get(ctx, limit.SatelliteId, limit.PieceId)
if err != nil {
endpoint.log.Error("error getting piece from v0 pieceinfo db", zap.Error(err))
return status.Error(codes.Internal, err.Error())
}
orderLimit = *info.OrderLimit
pieceHash = *info.UplinkPieceHash
} else {
//v1+ stores this information in the file
header, err := pieceReader.GetPieceHeader()
if err != nil {
endpoint.log.Error("error getting header from piecereader", zap.Error(err))
return status.Error(codes.Internal, err.Error())
}
orderLimit = header.OrderLimit
pieceHash = pb.PieceHash{
PieceId: limit.PieceId,
Hash: header.GetHash(),
PieceSize: pieceReader.Size(),
Timestamp: header.GetCreationTime(),
Signature: header.GetSignature(),
}
}
err = stream.Send(&pb.PieceDownloadResponse{Hash: &pieceHash, Limit: &orderLimit})
if err != nil {
endpoint.log.Error("error sending hash and order limit", zap.Error(err))
return status.Error(codes.Internal, err.Error())
}
}
// TODO: verify chunk.Size behavior logic with regards to reading all
if chunk.Offset+chunk.ChunkSize > pieceReader.Size() {
return Error.New("requested more data than available, requesting=%v available=%v", chunk.Offset+chunk.ChunkSize, pieceReader.Size())
@ -416,7 +455,8 @@ func (endpoint *Endpoint) Download(stream pb.Piecestore_DownloadServer) (err err
availableBandwidth, err := endpoint.monitor.AvailableBandwidth(ctx)
if err != nil {
return ErrInternal.Wrap(err)
endpoint.log.Error("error getting available bandwidth", zap.Error(err))
return status.Error(codes.Internal, err.Error())
}
throttle := sync2.NewThrottle()
@ -441,13 +481,15 @@ func (endpoint *Endpoint) Download(stream pb.Piecestore_DownloadServer) (err err
chunkData := make([]byte, chunkSize)
_, err = pieceReader.Seek(currentOffset, io.SeekStart)
if err != nil {
return ErrInternal.Wrap(err)
endpoint.log.Error("error seeking on piecereader", zap.Error(err))
return status.Error(codes.Internal, err.Error())
}
// ReadFull is required to ensure we are sending the right amount of data.
_, err = io.ReadFull(pieceReader, chunkData)
if err != nil {
return ErrInternal.Wrap(err)
endpoint.log.Error("error reading from piecereader", zap.Error(err))
return status.Error(codes.Internal, err.Error())
}
err = stream.Send(&pb.PieceDownloadResponse{
@ -465,7 +507,6 @@ func (endpoint *Endpoint) Download(stream pb.Piecestore_DownloadServer) (err err
currentOffset += chunkSize
unsentAmount -= chunkSize
}
return nil
})

View File

@ -271,9 +271,99 @@ func TestDownload(t *testing.T) {
} else {
require.NoError(t, err)
}
// these should only be not-nil if action = pb.PieceAction_GET_REPAIR
hash, originalLimit := downloader.GetHashAndLimit()
require.Nil(t, hash)
require.Nil(t, originalLimit)
}
}
func TestDownloadGetRepair(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
planet, err := testplanet.New(t, 1, 1, 1)
require.NoError(t, err)
defer ctx.Check(planet.Shutdown)
planet.Start(ctx)
// upload test piece
client, err := planet.Uplinks[0].DialPiecestore(ctx, planet.StorageNodes[0])
require.NoError(t, err)
defer ctx.Check(client.Close)
expectedData := testrand.Bytes(10 * memory.KiB)
serialNumber := testrand.SerialNumber()
ulOrderLimit, piecePrivateKey := GenerateOrderLimit(
t,
planet.Satellites[0].ID(),
planet.StorageNodes[0].ID(),
storj.PieceID{1},
pb.PieceAction_PUT,
serialNumber,
24*time.Hour,
24*time.Hour,
int64(len(expectedData)),
)
signer := signing.SignerFromFullIdentity(planet.Satellites[0].Identity)
ulOrderLimit, err = signing.SignOrderLimit(ctx, signer, ulOrderLimit)
require.NoError(t, err)
uploader, err := client.Upload(ctx, ulOrderLimit, piecePrivateKey)
require.NoError(t, err)
_, err = uploader.Write(expectedData)
require.NoError(t, err)
originHash, err := uploader.Commit(ctx)
require.NoError(t, err)
serialNumber = testrand.SerialNumber()
dlOrderLimit, piecePrivateKey := GenerateOrderLimit(
t,
planet.Satellites[0].ID(),
planet.StorageNodes[0].ID(),
storj.PieceID{1},
pb.PieceAction_GET_REPAIR,
serialNumber,
24*time.Hour,
24*time.Hour,
int64(len(expectedData)),
)
dlOrderLimit, err = signing.SignOrderLimit(ctx, signer, dlOrderLimit)
require.NoError(t, err)
downloader, err := client.Download(ctx, dlOrderLimit, piecePrivateKey, 0, int64(len(expectedData)))
require.NoError(t, err)
buffer := make([]byte, len(expectedData))
n, err := downloader.Read(buffer)
require.NoError(t, err)
require.Equal(t, expectedData, buffer[:n])
err = downloader.Close()
require.NoError(t, err)
hash, originLimit := downloader.GetHashAndLimit()
require.NotNil(t, hash)
require.Equal(t, originHash.Hash, hash.Hash)
require.Equal(t, originHash.PieceId, hash.PieceId)
require.NotNil(t, originLimit)
require.Equal(t, originLimit.Action, ulOrderLimit.Action)
require.Equal(t, originLimit.Limit, ulOrderLimit.Limit)
require.Equal(t, originLimit.PieceId, ulOrderLimit.PieceId)
require.Equal(t, originLimit.SatelliteId, ulOrderLimit.SatelliteId)
require.Equal(t, originLimit.SerialNumber, ulOrderLimit.SerialNumber)
require.Equal(t, originLimit.SatelliteSignature, ulOrderLimit.SatelliteSignature)
require.Equal(t, originLimit.UplinkPublicKey, ulOrderLimit.UplinkPublicKey)
}
func TestDelete(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()

View File

@ -70,6 +70,11 @@ func (download *BufferedDownload) Close() error {
return download.download.Close()
}
// GetHashAndLimit gets the download's hash and original order limit.
func (download *BufferedDownload) GetHashAndLimit() (*pb.PieceHash, *pb.OrderLimit) {
return download.download.GetHashAndLimit()
}
// LockingUpload adds a lock around upload making it safe to use concurrently.
// TODO: this shouldn't be needed.
type LockingUpload struct {
@ -120,3 +125,8 @@ func (download *LockingDownload) Close() error {
defer download.mu.Unlock()
return download.download.Close()
}
// GetHashAndLimit gets the download's hash and original order limit
func (download *LockingDownload) GetHashAndLimit() (*pb.PieceHash, *pb.OrderLimit) {
return download.download.GetHashAndLimit()
}

View File

@ -18,10 +18,12 @@ import (
)
// Downloader is interface that can be used for downloading content.
// It matches signature of `io.ReadCloser`.
// It matches signature of `io.ReadCloser`, with one extra function,
// GetHashAndLimit(), used for accessing information during GET_REPAIR.
type Downloader interface {
Read([]byte) (int, error)
Close() error
GetHashAndLimit() (*pb.PieceHash, *pb.OrderLimit)
}
// Download implements downloading from a piecestore.
@ -43,6 +45,10 @@ type Download struct {
unread ReadBuffer
// hash and originLimit are received in the event of a GET_REPAIR
hash *pb.PieceHash
originLimit *pb.OrderLimit
closed bool
closingError error
}
@ -184,6 +190,11 @@ func (client *Download) Read(data []byte) (read int, err error) {
client.downloaded += int64(len(response.Chunk.Data))
client.unread.Fill(response.Chunk.Data)
}
// This is a GET_REPAIR because we got a piece hash and the original order limit.
if response != nil && response.Hash != nil && response.Limit != nil {
client.hash = response.Hash
client.originLimit = response.Limit
}
// we may have some data buffered, so we cannot immediately return the error
// we'll queue the error and use the received error as the closing error
@ -245,6 +256,11 @@ func (client *Download) Close() (err error) {
return client.closingError
}
// GetHashAndLimit gets the download's hash and original order limit.
func (client *Download) GetHashAndLimit() (*pb.PieceHash, *pb.OrderLimit) {
return client.hash, client.originLimit
}
// ReadBuffer implements buffered reading with an error.
type ReadBuffer struct {
data []byte