From bf97ef06fcc7829a72f722aad5cbce3f81a71978 Mon Sep 17 00:00:00 2001 From: Ivan Fraixedes Date: Tue, 26 Nov 2019 18:47:19 +0100 Subject: [PATCH] =?UTF-8?q?storagenode:=20Add=20new=20endpoint=20to=20rece?= =?UTF-8?q?ive=20satellite=20requests=20for=E2=80=A6=20(#3590)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * pkg/pg: Add new service function storage node Add a new service function to the storage node piece store for deleting pieces when satellites request them. * storagenode/piecestore: Add endpoint to delete piece Add a new endpoint to receive from trusted satellites to delete a piece. * private/testplanet: Fix storagenode mock Add to the storagenode mock the new endpoint method. * proto.lock: Update it with the last protbuff changes * storagenode/piecestore: Reuse test piece upload Extract the repeated logic from several tests functions for uploading a test piece to a test helper function. * uplink/piecestore: Implement client side method Implement the client side method of the new piecestore RPC function. * storagenode/piecestore: Add test DeletePiece endpoint Implement a test for the DeletePiece new endpoint method. --- pkg/pb/piecestore2.pb.go | 245 ++++++++++++++++++----- pkg/pb/piecestore2.proto | 13 +- private/testplanet/uplink_test.go | 3 + proto.lock | 36 +++- storagenode/piecestore/endpoint.go | 47 ++++- storagenode/piecestore/endpoint_test.go | 247 +++++++++++++++--------- uplink/piecestore/client.go | 9 + 7 files changed, 454 insertions(+), 146 deletions(-) diff --git a/pkg/pb/piecestore2.pb.go b/pkg/pb/piecestore2.pb.go index 032b89640..bb55d120c 100644 --- a/pkg/pb/piecestore2.pb.go +++ b/pkg/pb/piecestore2.pb.go @@ -49,7 +49,7 @@ func (x PieceHeader_FormatVersion) String() string { } func (PieceHeader_FormatVersion) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_23ff32dd550c2439, []int{10, 0} + return fileDescriptor_23ff32dd550c2439, []int{12, 0} } // Expected order of messages from uplink: @@ -489,6 +489,67 @@ func (m *PieceDeleteResponse) XXX_DiscardUnknown() { var xxx_messageInfo_PieceDeleteResponse proto.InternalMessageInfo +type PieceDeletePieceRequest struct { + PieceId PieceID `protobuf:"bytes,1,opt,name=piece_id,json=pieceId,proto3,customtype=PieceID" json:"piece_id"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *PieceDeletePieceRequest) Reset() { *m = PieceDeletePieceRequest{} } +func (m *PieceDeletePieceRequest) String() string { return proto.CompactTextString(m) } +func (*PieceDeletePieceRequest) ProtoMessage() {} +func (*PieceDeletePieceRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_23ff32dd550c2439, []int{6} +} +func (m *PieceDeletePieceRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_PieceDeletePieceRequest.Unmarshal(m, b) +} +func (m *PieceDeletePieceRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_PieceDeletePieceRequest.Marshal(b, m, deterministic) +} +func (m *PieceDeletePieceRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_PieceDeletePieceRequest.Merge(m, src) +} +func (m *PieceDeletePieceRequest) XXX_Size() int { + return xxx_messageInfo_PieceDeletePieceRequest.Size(m) +} +func (m *PieceDeletePieceRequest) XXX_DiscardUnknown() { + xxx_messageInfo_PieceDeletePieceRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_PieceDeletePieceRequest proto.InternalMessageInfo + +type PieceDeletePieceResponse struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *PieceDeletePieceResponse) Reset() { *m = PieceDeletePieceResponse{} } +func (m *PieceDeletePieceResponse) String() string { return proto.CompactTextString(m) } +func (*PieceDeletePieceResponse) ProtoMessage() {} +func (*PieceDeletePieceResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_23ff32dd550c2439, []int{7} +} +func (m *PieceDeletePieceResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_PieceDeletePieceResponse.Unmarshal(m, b) +} +func (m *PieceDeletePieceResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_PieceDeletePieceResponse.Marshal(b, m, deterministic) +} +func (m *PieceDeletePieceResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_PieceDeletePieceResponse.Merge(m, src) +} +func (m *PieceDeletePieceResponse) XXX_Size() int { + return xxx_messageInfo_PieceDeletePieceResponse.Size(m) +} +func (m *PieceDeletePieceResponse) XXX_DiscardUnknown() { + xxx_messageInfo_PieceDeletePieceResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_PieceDeletePieceResponse proto.InternalMessageInfo + type RetainRequest struct { CreationDate time.Time `protobuf:"bytes,1,opt,name=creation_date,json=creationDate,proto3,stdtime" json:"creation_date"` Filter []byte `protobuf:"bytes,2,opt,name=filter,proto3" json:"filter,omitempty"` @@ -501,7 +562,7 @@ func (m *RetainRequest) Reset() { *m = RetainRequest{} } func (m *RetainRequest) String() string { return proto.CompactTextString(m) } func (*RetainRequest) ProtoMessage() {} func (*RetainRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_23ff32dd550c2439, []int{6} + return fileDescriptor_23ff32dd550c2439, []int{8} } func (m *RetainRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_RetainRequest.Unmarshal(m, b) @@ -545,7 +606,7 @@ func (m *RetainResponse) Reset() { *m = RetainResponse{} } func (m *RetainResponse) String() string { return proto.CompactTextString(m) } func (*RetainResponse) ProtoMessage() {} func (*RetainResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_23ff32dd550c2439, []int{7} + return fileDescriptor_23ff32dd550c2439, []int{9} } func (m *RetainResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_RetainResponse.Unmarshal(m, b) @@ -575,7 +636,7 @@ func (m *RestoreTrashRequest) Reset() { *m = RestoreTrashRequest{} } func (m *RestoreTrashRequest) String() string { return proto.CompactTextString(m) } func (*RestoreTrashRequest) ProtoMessage() {} func (*RestoreTrashRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_23ff32dd550c2439, []int{8} + return fileDescriptor_23ff32dd550c2439, []int{10} } func (m *RestoreTrashRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_RestoreTrashRequest.Unmarshal(m, b) @@ -605,7 +666,7 @@ func (m *RestoreTrashResponse) Reset() { *m = RestoreTrashResponse{} } func (m *RestoreTrashResponse) String() string { return proto.CompactTextString(m) } func (*RestoreTrashResponse) ProtoMessage() {} func (*RestoreTrashResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_23ff32dd550c2439, []int{9} + return fileDescriptor_23ff32dd550c2439, []int{11} } func (m *RestoreTrashResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_RestoreTrashResponse.Unmarshal(m, b) @@ -650,7 +711,7 @@ func (m *PieceHeader) Reset() { *m = PieceHeader{} } func (m *PieceHeader) String() string { return proto.CompactTextString(m) } func (*PieceHeader) ProtoMessage() {} func (*PieceHeader) Descriptor() ([]byte, []int) { - return fileDescriptor_23ff32dd550c2439, []int{10} + return fileDescriptor_23ff32dd550c2439, []int{12} } func (m *PieceHeader) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_PieceHeader.Unmarshal(m, b) @@ -716,6 +777,8 @@ func init() { proto.RegisterType((*PieceDownloadResponse_Chunk)(nil), "piecestore.PieceDownloadResponse.Chunk") proto.RegisterType((*PieceDeleteRequest)(nil), "piecestore.PieceDeleteRequest") proto.RegisterType((*PieceDeleteResponse)(nil), "piecestore.PieceDeleteResponse") + proto.RegisterType((*PieceDeletePieceRequest)(nil), "piecestore.PieceDeletePieceRequest") + proto.RegisterType((*PieceDeletePieceResponse)(nil), "piecestore.PieceDeletePieceResponse") proto.RegisterType((*RetainRequest)(nil), "piecestore.RetainRequest") proto.RegisterType((*RetainResponse)(nil), "piecestore.RetainResponse") proto.RegisterType((*RestoreTrashRequest)(nil), "piecestore.RestoreTrashRequest") @@ -726,50 +789,54 @@ func init() { func init() { proto.RegisterFile("piecestore2.proto", fileDescriptor_23ff32dd550c2439) } var fileDescriptor_23ff32dd550c2439 = []byte{ - // 685 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x54, 0x4b, 0x6f, 0xd3, 0x4e, - 0x10, 0x8f, 0x9b, 0x87, 0xda, 0xa9, 0x5d, 0xb5, 0xdb, 0x87, 0xf2, 0xb7, 0xfe, 0x90, 0x62, 0x28, - 0xf4, 0x82, 0x5b, 0xd2, 0x13, 0xa8, 0x14, 0x51, 0xaa, 0x0a, 0x44, 0xab, 0x56, 0xdb, 0xc7, 0x81, - 0x4b, 0xb4, 0x6d, 0xd6, 0x89, 0x45, 0xe2, 0x35, 0xde, 0x0d, 0x48, 0xfd, 0x14, 0x5c, 0xf8, 0x42, - 0x9c, 0xf8, 0x10, 0x08, 0x0e, 0x7c, 0x0c, 0x2e, 0x68, 0x1f, 0x4e, 0xe2, 0x3c, 0x55, 0x24, 0x4e, - 0xf6, 0xce, 0xfc, 0x66, 0x67, 0xf6, 0x37, 0xbf, 0x19, 0x58, 0x8a, 0x43, 0x7a, 0x4d, 0xb9, 0x60, - 0x09, 0xad, 0xfa, 0x71, 0xc2, 0x04, 0x43, 0xd0, 0x33, 0xb9, 0xd0, 0x60, 0x0d, 0xa6, 0xed, 0x6e, - 0xa5, 0xc1, 0x58, 0xa3, 0x45, 0xb7, 0xd4, 0xe9, 0xaa, 0x13, 0x6c, 0x89, 0xb0, 0x4d, 0xb9, 0x20, - 0xed, 0xd8, 0x00, 0x6c, 0x96, 0xd4, 0x69, 0xc2, 0xf5, 0xc9, 0xfb, 0x6d, 0x01, 0x3a, 0x95, 0x37, - 0x5d, 0xc4, 0x2d, 0x46, 0xea, 0x98, 0x7e, 0xe8, 0x50, 0x2e, 0xd0, 0x26, 0x14, 0x5b, 0x61, 0x3b, - 0x14, 0x65, 0x6b, 0xdd, 0xda, 0x9c, 0xaf, 0x22, 0xdf, 0x04, 0x9d, 0xc8, 0xcf, 0x91, 0xf4, 0x60, - 0x0d, 0x40, 0xf7, 0xa1, 0xa8, 0x7c, 0xe5, 0x19, 0x85, 0x74, 0x32, 0x48, 0xac, 0x7d, 0xe8, 0x19, - 0x14, 0xaf, 0x9b, 0x9d, 0xe8, 0x7d, 0x39, 0xaf, 0x40, 0x0f, 0xfc, 0x5e, 0xf1, 0xfe, 0x70, 0x76, - 0xff, 0x95, 0xc4, 0x62, 0x1d, 0x82, 0x36, 0xa0, 0x50, 0x67, 0x11, 0x2d, 0x17, 0x54, 0xe8, 0x52, - 0x7a, 0xbf, 0x0a, 0x7b, 0x4d, 0x78, 0x13, 0x2b, 0xb7, 0xbb, 0x03, 0x45, 0x15, 0x86, 0xd6, 0xa0, - 0xc4, 0x82, 0x80, 0x53, 0x5d, 0x7b, 0x1e, 0x9b, 0x13, 0x42, 0x50, 0xa8, 0x13, 0x41, 0x54, 0x9d, - 0x36, 0x56, 0xff, 0xde, 0x2e, 0x2c, 0x67, 0xd2, 0xf3, 0x98, 0x45, 0x9c, 0x76, 0x53, 0x5a, 0x13, - 0x53, 0x7a, 0xbf, 0x2c, 0x58, 0x51, 0xb6, 0x03, 0xf6, 0x29, 0xfa, 0x87, 0xec, 0xed, 0x66, 0xd9, - 0x7b, 0x38, 0xc4, 0xde, 0x40, 0xfe, 0x0c, 0x7f, 0xee, 0xde, 0x34, 0x62, 0xee, 0x00, 0x28, 0x64, - 0x8d, 0x87, 0x37, 0x54, 0x15, 0x92, 0xc7, 0x73, 0xca, 0x72, 0x16, 0xde, 0x50, 0xef, 0xbb, 0x05, - 0xab, 0x03, 0x59, 0x0c, 0x4d, 0xcf, 0xd3, 0xba, 0xf4, 0x33, 0x1f, 0x4d, 0xa8, 0x4b, 0x47, 0x0c, - 0x35, 0xb6, 0x49, 0x78, 0xd3, 0x3c, 0x7d, 0x14, 0xcb, 0xd2, 0xdd, 0x23, 0x33, 0x3f, 0x85, 0xcc, - 0xbf, 0x93, 0xc0, 0x9e, 0xd1, 0xff, 0x01, 0x6d, 0x51, 0x41, 0x6f, 0xdd, 0x41, 0x6f, 0xd5, 0x48, - 0x28, 0x8d, 0xd7, 0x2f, 0xf5, 0x12, 0x70, 0x30, 0x15, 0x24, 0x8c, 0xd2, 0x1b, 0xdf, 0x80, 0x73, - 0x9d, 0x50, 0x22, 0x42, 0x16, 0xd5, 0xea, 0x44, 0xa4, 0xe2, 0x72, 0x7d, 0x3d, 0xaf, 0x7e, 0x3a, - 0xaf, 0xfe, 0x79, 0x3a, 0xaf, 0xfb, 0xb3, 0xdf, 0x7e, 0x54, 0x72, 0x9f, 0x7f, 0x56, 0x2c, 0x6c, - 0xa7, 0xa1, 0x07, 0x44, 0x50, 0xf9, 0xbc, 0x20, 0x6c, 0x09, 0xa3, 0x1a, 0x1b, 0x9b, 0x93, 0xb7, - 0x08, 0x0b, 0x69, 0x4e, 0x53, 0xc5, 0x2a, 0x2c, 0x63, 0xdd, 0x90, 0xf3, 0x44, 0x32, 0xaa, 0x6b, - 0xf1, 0xd6, 0x60, 0x25, 0x6b, 0x36, 0xf0, 0xaf, 0x33, 0x30, 0xaf, 0xe9, 0xa7, 0x44, 0x0a, 0xef, - 0x08, 0x16, 0x02, 0x96, 0xb4, 0x89, 0xa8, 0x7d, 0xa4, 0x09, 0x0f, 0x59, 0xa4, 0x8a, 0x5e, 0xa8, - 0x6e, 0x0c, 0x75, 0x5a, 0x07, 0xf8, 0x87, 0x0a, 0x7d, 0xa9, 0xc1, 0xd8, 0x09, 0xfa, 0x8f, 0x92, - 0xfd, 0x6e, 0xbf, 0x6d, 0xd3, 0xdc, 0x7e, 0x56, 0xe4, 0xa2, 0x32, 0x4d, 0xbe, 0x25, 0x2b, 0xd2, - 0x89, 0xfe, 0x87, 0x39, 0x1e, 0x36, 0x22, 0x22, 0x3a, 0x89, 0x5e, 0x16, 0x36, 0xee, 0x19, 0xd0, - 0x53, 0x98, 0x57, 0x2d, 0xac, 0xe9, 0xb6, 0x16, 0xc7, 0xb5, 0x75, 0xbf, 0x20, 0xaf, 0xc7, 0xc0, - 0xba, 0x16, 0xef, 0x31, 0x38, 0x99, 0x77, 0x21, 0x07, 0xe6, 0x0e, 0x4f, 0xf0, 0xf1, 0xcb, 0xf3, - 0xda, 0xe5, 0xf6, 0x62, 0xae, 0xff, 0xf8, 0x64, 0xd1, 0xaa, 0x7e, 0xc9, 0x03, 0x9c, 0x76, 0xe9, - 0x41, 0xc7, 0x50, 0xd2, 0xdb, 0x05, 0xdd, 0x9d, 0xbc, 0xf5, 0xdc, 0xca, 0x58, 0xbf, 0x69, 0x4f, - 0x6e, 0xd3, 0x42, 0x17, 0x30, 0x9b, 0x4e, 0x15, 0x5a, 0x9f, 0xb6, 0x08, 0xdc, 0x7b, 0x53, 0x47, - 0x52, 0x5e, 0xba, 0x6d, 0xa1, 0xb7, 0x50, 0xd2, 0x02, 0x1e, 0x51, 0x65, 0x66, 0x32, 0x46, 0x54, - 0x39, 0xa0, 0xfc, 0x1c, 0x7a, 0x01, 0x25, 0xad, 0x43, 0xf4, 0x5f, 0x3f, 0x38, 0x33, 0x0f, 0xae, - 0x3b, 0xca, 0x65, 0x16, 0xcb, 0x19, 0xd8, 0xfd, 0xfa, 0x44, 0x95, 0x2c, 0x76, 0x48, 0xd0, 0xee, - 0xfa, 0x78, 0x40, 0x5a, 0xd5, 0x7e, 0xe1, 0xdd, 0x4c, 0x7c, 0x75, 0x55, 0x52, 0x8a, 0xda, 0xf9, - 0x13, 0x00, 0x00, 0xff, 0xff, 0x8e, 0x06, 0xc3, 0xd8, 0x52, 0x07, 0x00, 0x00, + // 747 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x54, 0x4b, 0x4f, 0x13, 0x51, + 0x14, 0x66, 0xfa, 0x02, 0x4e, 0xa7, 0x08, 0x97, 0x87, 0x75, 0xa2, 0x16, 0x07, 0x50, 0x62, 0xe2, + 0x80, 0x65, 0xa5, 0x41, 0x8c, 0xb5, 0x12, 0x49, 0x40, 0xc8, 0xe5, 0x11, 0xe3, 0xa6, 0x19, 0x98, + 0xdb, 0x76, 0x62, 0x3b, 0xb7, 0xce, 0xdc, 0x6a, 0xc2, 0x2f, 0x70, 0xe9, 0x6f, 0x72, 0xe5, 0x6f, + 0x30, 0x06, 0x17, 0xfe, 0x03, 0xb7, 0x6e, 0xcc, 0x7d, 0x4c, 0x3b, 0xd3, 0x67, 0x30, 0x71, 0xd5, + 0xde, 0x73, 0xbe, 0xf3, 0x98, 0xef, 0x7c, 0xe7, 0xc0, 0x5c, 0xcb, 0x25, 0x17, 0x24, 0x60, 0xd4, + 0x27, 0x45, 0xab, 0xe5, 0x53, 0x46, 0x11, 0x74, 0x4d, 0x06, 0xd4, 0x68, 0x8d, 0x4a, 0xbb, 0x51, + 0xa8, 0x51, 0x5a, 0x6b, 0x90, 0x0d, 0xf1, 0x3a, 0x6f, 0x57, 0x37, 0x98, 0xdb, 0x24, 0x01, 0xb3, + 0x9b, 0x2d, 0x05, 0xd0, 0xa9, 0xef, 0x10, 0x3f, 0x90, 0x2f, 0xf3, 0x8f, 0x06, 0xe8, 0x88, 0x67, + 0x3a, 0x6d, 0x35, 0xa8, 0xed, 0x60, 0xf2, 0xa1, 0x4d, 0x02, 0x86, 0xd6, 0x21, 0xdd, 0x70, 0x9b, + 0x2e, 0xcb, 0x6b, 0xcb, 0xda, 0x7a, 0xb6, 0x88, 0x2c, 0x15, 0x74, 0xc8, 0x7f, 0xf6, 0xb9, 0x07, + 0x4b, 0x00, 0x5a, 0x81, 0xb4, 0xf0, 0xe5, 0x13, 0x02, 0x99, 0x8b, 0x21, 0xb1, 0xf4, 0xa1, 0xa7, + 0x90, 0xbe, 0xa8, 0xb7, 0xbd, 0xf7, 0xf9, 0xa4, 0x00, 0xad, 0x5a, 0xdd, 0xe6, 0xad, 0xfe, 0xea, + 0xd6, 0x4b, 0x8e, 0xc5, 0x32, 0x04, 0xad, 0x41, 0xca, 0xa1, 0x1e, 0xc9, 0xa7, 0x44, 0xe8, 0x5c, + 0x98, 0x5f, 0x84, 0xbd, 0xb6, 0x83, 0x3a, 0x16, 0x6e, 0x63, 0x0b, 0xd2, 0x22, 0x0c, 0x2d, 0x41, + 0x86, 0x56, 0xab, 0x01, 0x91, 0xbd, 0x27, 0xb1, 0x7a, 0x21, 0x04, 0x29, 0xc7, 0x66, 0xb6, 0xe8, + 0x53, 0xc7, 0xe2, 0xbf, 0xb9, 0x0d, 0xf3, 0xb1, 0xf2, 0x41, 0x8b, 0x7a, 0x01, 0xe9, 0x94, 0xd4, + 0x46, 0x96, 0x34, 0x7f, 0x69, 0xb0, 0x20, 0x6c, 0x65, 0xfa, 0xc9, 0xfb, 0x8f, 0xec, 0x6d, 0xc7, + 0xd9, 0xbb, 0xdf, 0xc7, 0x5e, 0x4f, 0xfd, 0x18, 0x7f, 0xc6, 0xce, 0x38, 0x62, 0xee, 0x00, 0x08, + 0x64, 0x25, 0x70, 0x2f, 0x89, 0x68, 0x24, 0x89, 0xa7, 0x85, 0xe5, 0xd8, 0xbd, 0x24, 0xe6, 0x0f, + 0x0d, 0x16, 0x7b, 0xaa, 0x28, 0x9a, 0x9e, 0x85, 0x7d, 0xc9, 0xcf, 0x7c, 0x30, 0xa2, 0x2f, 0x19, + 0xd1, 0x37, 0xd8, 0xba, 0x1d, 0xd4, 0xd5, 0xa7, 0x0f, 0x62, 0x99, 0xbb, 0xbb, 0x64, 0x26, 0xc7, + 0x90, 0xf9, 0x6f, 0x12, 0xd8, 0x51, 0xfa, 0x2f, 0x93, 0x06, 0x61, 0xe4, 0xda, 0x13, 0x34, 0x17, + 0x95, 0x84, 0xc2, 0x78, 0xf9, 0xa5, 0xe6, 0x2b, 0xb8, 0x19, 0x31, 0x8b, 0xbf, 0x61, 0xee, 0x87, + 0x30, 0x25, 0x88, 0xaa, 0xb8, 0x8e, 0x48, 0xaf, 0x97, 0x6e, 0x7c, 0xbb, 0x2a, 0x4c, 0x7c, 0xbf, + 0x2a, 0x4c, 0x0a, 0xdc, 0x5e, 0x19, 0x4f, 0x0a, 0xc0, 0x9e, 0x63, 0x1a, 0x90, 0xef, 0x4f, 0xa3, + 0x4a, 0xf8, 0x90, 0xc3, 0x84, 0xd9, 0xae, 0x17, 0x26, 0xde, 0x83, 0xdc, 0x85, 0x4f, 0x6c, 0xe6, + 0x52, 0xaf, 0xe2, 0xd8, 0x2c, 0xd4, 0xaf, 0x61, 0xc9, 0x93, 0x60, 0x85, 0x27, 0xc1, 0x3a, 0x09, + 0x4f, 0x42, 0x69, 0x8a, 0x57, 0xfe, 0xf2, 0xb3, 0xa0, 0x61, 0x3d, 0x0c, 0x2d, 0xdb, 0x8c, 0x70, + 0x06, 0xab, 0x6e, 0x83, 0x29, 0x61, 0xea, 0x58, 0xbd, 0xcc, 0x59, 0x98, 0x09, 0x6b, 0xaa, 0x2e, + 0x16, 0x61, 0x1e, 0xcb, 0x99, 0x9f, 0xf8, 0x7c, 0x68, 0xb2, 0x17, 0x73, 0x09, 0x16, 0xe2, 0x66, + 0x05, 0xff, 0x9a, 0x80, 0xac, 0x9c, 0x30, 0xb1, 0xb9, 0xb6, 0xf7, 0x61, 0xa6, 0x4a, 0xfd, 0xa6, + 0xcd, 0x2a, 0x1f, 0x89, 0x1f, 0xb8, 0xd4, 0x13, 0x4d, 0xcf, 0x14, 0xd7, 0xfa, 0xc4, 0x24, 0x03, + 0xac, 0x5d, 0x81, 0x3e, 0x93, 0x60, 0x9c, 0xab, 0x46, 0x9f, 0x7c, 0xc0, 0x1d, 0x49, 0xe9, 0x4a, + 0x3f, 0x51, 0x56, 0xf8, 0x2d, 0x54, 0x3a, 0xba, 0x26, 0x2b, 0xdc, 0x89, 0x6e, 0xc3, 0x74, 0xe0, + 0xd6, 0x3c, 0x9b, 0xb5, 0x7d, 0x79, 0x8f, 0x74, 0xdc, 0x35, 0xa0, 0x27, 0x90, 0x15, 0x2a, 0xa9, + 0x48, 0xe5, 0xa4, 0x87, 0x29, 0xa7, 0x94, 0xe2, 0xe9, 0x31, 0xd0, 0x8e, 0xc5, 0x7c, 0x04, 0xb9, + 0xd8, 0x77, 0xa1, 0x1c, 0x4c, 0xef, 0x1e, 0xe2, 0x83, 0x17, 0x27, 0x95, 0xb3, 0xcd, 0xd9, 0x89, + 0xe8, 0xf3, 0xf1, 0xac, 0x56, 0xfc, 0x9d, 0x04, 0x38, 0xea, 0xd0, 0x83, 0x0e, 0x20, 0x23, 0x0f, + 0x18, 0xba, 0x3b, 0xfa, 0xb0, 0x1a, 0x85, 0xa1, 0x7e, 0x35, 0x9e, 0x89, 0x75, 0x0d, 0x9d, 0xc2, + 0x54, 0xb8, 0xb8, 0x68, 0x79, 0xdc, 0xad, 0x31, 0xee, 0x8d, 0xdd, 0x7a, 0x9e, 0x74, 0x53, 0x43, + 0x6f, 0x20, 0x23, 0x55, 0x3c, 0xa0, 0xcb, 0xd8, 0xf2, 0x0d, 0xe8, 0xb2, 0x67, 0xb9, 0x92, 0x9f, + 0x13, 0x1a, 0x7a, 0x0b, 0xd9, 0xc8, 0x56, 0xa0, 0x95, 0x21, 0x41, 0xd1, 0xd5, 0x33, 0x56, 0x47, + 0x83, 0xd4, 0x5d, 0x7b, 0x0e, 0x19, 0x29, 0x72, 0x74, 0x2b, 0x8a, 0x8f, 0x2d, 0x9b, 0x61, 0x0c, + 0x72, 0xa9, 0x04, 0xc7, 0xa0, 0x47, 0xc5, 0x8f, 0x0a, 0x71, 0x6c, 0xdf, 0xb6, 0x18, 0xcb, 0xc3, + 0x01, 0x21, 0x87, 0xa5, 0xd4, 0xbb, 0x44, 0xeb, 0xfc, 0x3c, 0x23, 0xe4, 0xba, 0xf5, 0x37, 0x00, + 0x00, 0xff, 0xff, 0xfb, 0xe1, 0x1b, 0xbe, 0x12, 0x08, 0x00, 0x00, } type DRPCPiecestoreClient interface { @@ -778,6 +845,8 @@ type DRPCPiecestoreClient interface { Upload(ctx context.Context) (DRPCPiecestore_UploadClient, error) Download(ctx context.Context) (DRPCPiecestore_DownloadClient, error) Delete(ctx context.Context, in *PieceDeleteRequest) (*PieceDeleteResponse, error) + // DeletePiece deletes a piece from a satellite request + DeletePiece(ctx context.Context, in *PieceDeletePieceRequest) (*PieceDeletePieceResponse, error) Retain(ctx context.Context, in *RetainRequest) (*RetainResponse, error) RestoreTrash(ctx context.Context, in *RestoreTrashRequest) (*RestoreTrashResponse, error) } @@ -866,6 +935,15 @@ func (c *drpcPiecestoreClient) Delete(ctx context.Context, in *PieceDeleteReques return out, nil } +func (c *drpcPiecestoreClient) DeletePiece(ctx context.Context, in *PieceDeletePieceRequest) (*PieceDeletePieceResponse, error) { + out := new(PieceDeletePieceResponse) + err := c.cc.Invoke(ctx, "/piecestore.Piecestore/DeletePiece", in, out) + if err != nil { + return nil, err + } + return out, nil +} + func (c *drpcPiecestoreClient) Retain(ctx context.Context, in *RetainRequest) (*RetainResponse, error) { out := new(RetainResponse) err := c.cc.Invoke(ctx, "/piecestore.Piecestore/Retain", in, out) @@ -888,13 +966,15 @@ type DRPCPiecestoreServer interface { Upload(DRPCPiecestore_UploadStream) error Download(DRPCPiecestore_DownloadStream) error Delete(context.Context, *PieceDeleteRequest) (*PieceDeleteResponse, error) + // DeletePiece deletes a piece from a satellite request + DeletePiece(context.Context, *PieceDeletePieceRequest) (*PieceDeletePieceResponse, error) Retain(context.Context, *RetainRequest) (*RetainResponse, error) RestoreTrash(context.Context, *RestoreTrashRequest) (*RestoreTrashResponse, error) } type DRPCPiecestoreDescription struct{} -func (DRPCPiecestoreDescription) NumMethods() int { return 5 } +func (DRPCPiecestoreDescription) NumMethods() int { return 6 } func (DRPCPiecestoreDescription) Method(n int) (string, drpc.Handler, interface{}, bool) { switch n { @@ -924,6 +1004,15 @@ func (DRPCPiecestoreDescription) Method(n int) (string, drpc.Handler, interface{ ) }, DRPCPiecestoreServer.Delete, true case 3: + return "/piecestore.Piecestore/DeletePiece", + func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) { + return srv.(DRPCPiecestoreServer). + DeletePiece( + ctx, + in1.(*PieceDeletePieceRequest), + ) + }, DRPCPiecestoreServer.DeletePiece, true + case 4: return "/piecestore.Piecestore/Retain", func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) { return srv.(DRPCPiecestoreServer). @@ -932,7 +1021,7 @@ func (DRPCPiecestoreDescription) Method(n int) (string, drpc.Handler, interface{ in1.(*RetainRequest), ) }, DRPCPiecestoreServer.Retain, true - case 4: + case 5: return "/piecestore.Piecestore/RestoreTrash", func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) { return srv.(DRPCPiecestoreServer). @@ -1013,6 +1102,22 @@ func (x *drpcPiecestoreDeleteStream) SendAndClose(m *PieceDeleteResponse) error return x.CloseSend() } +type DRPCPiecestore_DeletePieceStream interface { + drpc.Stream + SendAndClose(*PieceDeletePieceResponse) error +} + +type drpcPiecestoreDeletePieceStream struct { + drpc.Stream +} + +func (x *drpcPiecestoreDeletePieceStream) SendAndClose(m *PieceDeletePieceResponse) error { + if err := x.MsgSend(m); err != nil { + return err + } + return x.CloseSend() +} + type DRPCPiecestore_RetainStream interface { drpc.Stream SendAndClose(*RetainResponse) error @@ -1060,6 +1165,8 @@ type PiecestoreClient interface { Upload(ctx context.Context, opts ...grpc.CallOption) (Piecestore_UploadClient, error) Download(ctx context.Context, opts ...grpc.CallOption) (Piecestore_DownloadClient, error) Delete(ctx context.Context, in *PieceDeleteRequest, opts ...grpc.CallOption) (*PieceDeleteResponse, error) + // DeletePiece deletes a piece from a satellite request + DeletePiece(ctx context.Context, in *PieceDeletePieceRequest, opts ...grpc.CallOption) (*PieceDeletePieceResponse, error) Retain(ctx context.Context, in *RetainRequest, opts ...grpc.CallOption) (*RetainResponse, error) RestoreTrash(ctx context.Context, in *RestoreTrashRequest, opts ...grpc.CallOption) (*RestoreTrashResponse, error) } @@ -1137,6 +1244,7 @@ func (x *piecestoreDownloadClient) Recv() (*PieceDownloadResponse, error) { return m, nil } +// Deprecated: Do not use. func (c *piecestoreClient) Delete(ctx context.Context, in *PieceDeleteRequest, opts ...grpc.CallOption) (*PieceDeleteResponse, error) { out := new(PieceDeleteResponse) err := c.cc.Invoke(ctx, "/piecestore.Piecestore/Delete", in, out, opts...) @@ -1146,6 +1254,15 @@ func (c *piecestoreClient) Delete(ctx context.Context, in *PieceDeleteRequest, o return out, nil } +func (c *piecestoreClient) DeletePiece(ctx context.Context, in *PieceDeletePieceRequest, opts ...grpc.CallOption) (*PieceDeletePieceResponse, error) { + out := new(PieceDeletePieceResponse) + err := c.cc.Invoke(ctx, "/piecestore.Piecestore/DeletePiece", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *piecestoreClient) Retain(ctx context.Context, in *RetainRequest, opts ...grpc.CallOption) (*RetainResponse, error) { out := new(RetainResponse) err := c.cc.Invoke(ctx, "/piecestore.Piecestore/Retain", in, out, opts...) @@ -1169,6 +1286,8 @@ type PiecestoreServer interface { Upload(Piecestore_UploadServer) error Download(Piecestore_DownloadServer) error Delete(context.Context, *PieceDeleteRequest) (*PieceDeleteResponse, error) + // DeletePiece deletes a piece from a satellite request + DeletePiece(context.Context, *PieceDeletePieceRequest) (*PieceDeletePieceResponse, error) Retain(context.Context, *RetainRequest) (*RetainResponse, error) RestoreTrash(context.Context, *RestoreTrashRequest) (*RestoreTrashResponse, error) } @@ -1247,6 +1366,24 @@ func _Piecestore_Delete_Handler(srv interface{}, ctx context.Context, dec func(i return interceptor(ctx, in, info, handler) } +func _Piecestore_DeletePiece_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(PieceDeletePieceRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(PiecestoreServer).DeletePiece(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/piecestore.Piecestore/DeletePiece", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(PiecestoreServer).DeletePiece(ctx, req.(*PieceDeletePieceRequest)) + } + return interceptor(ctx, in, info, handler) +} + func _Piecestore_Retain_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(RetainRequest) if err := dec(in); err != nil { @@ -1291,6 +1428,10 @@ var _Piecestore_serviceDesc = grpc.ServiceDesc{ MethodName: "Delete", Handler: _Piecestore_Delete_Handler, }, + { + MethodName: "DeletePiece", + Handler: _Piecestore_DeletePiece_Handler, + }, { MethodName: "Retain", Handler: _Piecestore_Retain_Handler, diff --git a/pkg/pb/piecestore2.proto b/pkg/pb/piecestore2.proto index ef3c9774b..3bf06bde8 100644 --- a/pkg/pb/piecestore2.proto +++ b/pkg/pb/piecestore2.proto @@ -13,7 +13,11 @@ import "orders.proto"; service Piecestore { rpc Upload(stream PieceUploadRequest) returns (PieceUploadResponse) {} rpc Download(stream PieceDownloadRequest) returns (stream PieceDownloadResponse) {} - rpc Delete(PieceDeleteRequest) returns (PieceDeleteResponse) {} + rpc Delete(PieceDeleteRequest) returns (PieceDeleteResponse) { + option deprecated = true; + } + // DeletePiece deletes a piece from a satellite request + rpc DeletePiece(PieceDeletePieceRequest) returns (PieceDeletePieceResponse); rpc Retain(RetainRequest) returns (RetainResponse); rpc RestoreTrash(RestoreTrashRequest) returns (RestoreTrashResponse) {} } @@ -86,6 +90,13 @@ message PieceDeleteRequest { message PieceDeleteResponse { } +message PieceDeletePieceRequest { + bytes piece_id = 1 [(gogoproto.customtype) = "PieceID", (gogoproto.nullable) = false]; +} + +message PieceDeletePieceResponse { +} + message RetainRequest { google.protobuf.Timestamp creation_date = 1 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; bytes filter = 2; diff --git a/private/testplanet/uplink_test.go b/private/testplanet/uplink_test.go index 983e91d2d..ce03b4195 100644 --- a/private/testplanet/uplink_test.go +++ b/private/testplanet/uplink_test.go @@ -176,6 +176,9 @@ func (mock *piecestoreMock) Download(server pb.Piecestore_DownloadServer) error func (mock *piecestoreMock) Delete(ctx context.Context, delete *pb.PieceDeleteRequest) (_ *pb.PieceDeleteResponse, err error) { return nil, nil } +func (mock *piecestoreMock) DeletePiece(ctx context.Context, delete *pb.PieceDeletePieceRequest) (_ *pb.PieceDeletePieceResponse, err error) { + return nil, nil +} func (mock *piecestoreMock) Retain(ctx context.Context, retain *pb.RetainRequest) (_ *pb.RetainResponse, err error) { return nil, nil } diff --git a/proto.lock b/proto.lock index 076f90ffc..937443a84 100644 --- a/proto.lock +++ b/proto.lock @@ -5570,6 +5570,29 @@ { "name": "PieceDeleteResponse" }, + { + "name": "PieceDeletePieceRequest", + "fields": [ + { + "id": 1, + "name": "piece_id", + "type": "bytes", + "options": [ + { + "name": "(gogoproto.customtype)", + "value": "PieceID" + }, + { + "name": "(gogoproto.nullable)", + "value": "false" + } + ] + } + ] + }, + { + "name": "PieceDeletePieceResponse" + }, { "name": "RetainRequest", "fields": [ @@ -5671,7 +5694,18 @@ { "name": "Delete", "in_type": "PieceDeleteRequest", - "out_type": "PieceDeleteResponse" + "out_type": "PieceDeleteResponse", + "options": [ + { + "name": "deprecated", + "value": "true" + } + ] + }, + { + "name": "DeletePiece", + "in_type": "PieceDeletePieceRequest", + "out_type": "PieceDeletePieceResponse" }, { "name": "Retain", diff --git a/storagenode/piecestore/endpoint.go b/storagenode/piecestore/endpoint.go index bb80868c1..be37c3376 100644 --- a/storagenode/piecestore/endpoint.go +++ b/storagenode/piecestore/endpoint.go @@ -7,6 +7,7 @@ import ( "context" "io" "os" + "strings" "sync/atomic" "time" @@ -134,7 +135,9 @@ func NewEndpoint(log *zap.Logger, signer signing.Signer, trust *trust.Pool, moni var monLiveRequests = mon.TaskNamed("live-request") -// Delete handles deleting a piece on piece store. +// Delete handles deleting a piece on piece store requested by uplink. +// +// DEPRECATED in favor of DeletePiece. func (endpoint *Endpoint) Delete(ctx context.Context, delete *pb.PieceDeleteRequest) (_ *pb.PieceDeleteResponse, err error) { defer monLiveRequests(&ctx)(&err) defer mon.Task()(&ctx)(&err) @@ -166,6 +169,48 @@ func (endpoint *Endpoint) Delete(ctx context.Context, delete *pb.PieceDeleteRequ return &pb.PieceDeleteResponse{}, nil } +// DeletePiece handles deleting a piece on piece store requested by satellite. +// +// It doesn't return an error if the piece isn't found by any reason. +func (endpoint *Endpoint) DeletePiece( + ctx context.Context, req *pb.PieceDeletePieceRequest, +) (_ *pb.PieceDeletePieceResponse, err error) { + defer mon.Task()(&ctx, req.PieceId.String())(&err) + + peer, err := identity.PeerIdentityFromContext(ctx) + if err != nil { + return nil, rpcstatus.Error(rpcstatus.Unauthenticated, Error.Wrap(err).Error()) + } + + err = endpoint.trust.VerifySatelliteID(ctx, peer.ID) + if err != nil { + return nil, rpcstatus.Error(rpcstatus.PermissionDenied, + Error.New("%s", "delete piece called with untrusted ID").Error(), + ) + } + + err = endpoint.store.Delete(ctx, peer.ID, req.PieceId) + if err != nil { + // TODO: https://storjlabs.atlassian.net/browse/V3-3222 + // Once this method returns error classes change the following conditional + if strings.Contains(err.Error(), "file does not exist") { + return nil, rpcstatus.Error(rpcstatus.NotFound, "piece not found") + } + + endpoint.log.Error("delete piece failed", + zap.Error(Error.Wrap(err)), + zap.Stringer("Satellite ID", peer.ID), + zap.Stringer("Piece ID", req.PieceId), + ) + + return nil, rpcstatus.Error(rpcstatus.Internal, + Error.New("%s", "delete piece failed").Error(), + ) + } + + return &pb.PieceDeletePieceResponse{}, nil +} + // Upload handles uploading a piece on piece store. func (endpoint *Endpoint) Upload(stream pb.Piecestore_UploadServer) (err error) { return endpoint.doUpload(stream, endpoint.grpcReqLimit) diff --git a/storagenode/piecestore/endpoint_test.go b/storagenode/piecestore/endpoint_test.go index 6abe5dc30..0693a8e14 100644 --- a/storagenode/piecestore/endpoint_test.go +++ b/storagenode/piecestore/endpoint_test.go @@ -178,37 +178,10 @@ func TestDownload(t *testing.T) { planet.Start(ctx) - // upload test piece + pieceID := storj.PieceID{1} + expectedData, _, _ := uploadPiece(t, ctx, pieceID, planet.StorageNodes[0], planet.Uplinks[0], planet.Satellites[0]) client, err := planet.Uplinks[0].DialPiecestore(ctx, planet.StorageNodes[0]) require.NoError(t, err) - defer ctx.Check(client.Close) - - expectedData := testrand.Bytes(10 * memory.KiB) - serialNumber := testrand.SerialNumber() - - orderLimit, piecePrivateKey := GenerateOrderLimit( - t, - planet.Satellites[0].ID(), - planet.StorageNodes[0].ID(), - storj.PieceID{1}, - pb.PieceAction_PUT, - serialNumber, - 24*time.Hour, - 24*time.Hour, - int64(len(expectedData)), - ) - signer := signing.SignerFromFullIdentity(planet.Satellites[0].Identity) - orderLimit, err = signing.SignOrderLimit(ctx, signer, orderLimit) - require.NoError(t, err) - - uploader, err := client.Upload(ctx, orderLimit, piecePrivateKey) - require.NoError(t, err) - - _, err = uploader.Write(expectedData) - require.NoError(t, err) - - _, err = uploader.Commit(ctx) - require.NoError(t, err) for _, tt := range []struct { pieceID storj.PieceID @@ -216,7 +189,7 @@ func TestDownload(t *testing.T) { errs []string }{ { // should successfully download data - pieceID: orderLimit.PieceId, + pieceID: pieceID, action: pb.PieceAction_GET, }, { // should err with piece ID not specified @@ -230,7 +203,7 @@ func TestDownload(t *testing.T) { errs: []string{"file does not exist", "The system cannot find the path specified"}, }, { // should err with invalid action - pieceID: orderLimit.PieceId, + pieceID: pieceID, action: pb.PieceAction_PUT, errs: []string{"expected get or get repair or audit action got PUT"}, }, @@ -289,40 +262,15 @@ func TestDownloadGetRepair(t *testing.T) { planet.Start(ctx) - // upload test piece + pieceID := storj.PieceID{1} + expectedData, ulOrderLimit, originHash := uploadPiece( + t, ctx, pieceID, planet.StorageNodes[0], planet.Uplinks[0], planet.Satellites[0], + ) client, err := planet.Uplinks[0].DialPiecestore(ctx, planet.StorageNodes[0]) require.NoError(t, err) - defer ctx.Check(client.Close) - expectedData := testrand.Bytes(10 * memory.KiB) serialNumber := testrand.SerialNumber() - ulOrderLimit, piecePrivateKey := GenerateOrderLimit( - t, - planet.Satellites[0].ID(), - planet.StorageNodes[0].ID(), - storj.PieceID{1}, - pb.PieceAction_PUT, - serialNumber, - 24*time.Hour, - 24*time.Hour, - int64(len(expectedData)), - ) - signer := signing.SignerFromFullIdentity(planet.Satellites[0].Identity) - ulOrderLimit, err = signing.SignOrderLimit(ctx, signer, ulOrderLimit) - require.NoError(t, err) - - uploader, err := client.Upload(ctx, ulOrderLimit, piecePrivateKey) - require.NoError(t, err) - - _, err = uploader.Write(expectedData) - require.NoError(t, err) - - originHash, err := uploader.Commit(ctx) - require.NoError(t, err) - - serialNumber = testrand.SerialNumber() - dlOrderLimit, piecePrivateKey := GenerateOrderLimit( t, planet.Satellites[0].ID(), @@ -334,6 +282,7 @@ func TestDownloadGetRepair(t *testing.T) { 24*time.Hour, int64(len(expectedData)), ) + signer := signing.SignerFromFullIdentity(planet.Satellites[0].Identity) dlOrderLimit, err = signing.SignOrderLimit(ctx, signer, dlOrderLimit) require.NoError(t, err) @@ -374,37 +323,10 @@ func TestDelete(t *testing.T) { planet.Start(ctx) - // upload test piece + pieceID := storj.PieceID{1} + uploadPiece(t, ctx, pieceID, planet.StorageNodes[0], planet.Uplinks[0], planet.Satellites[0]) client, err := planet.Uplinks[0].DialPiecestore(ctx, planet.StorageNodes[0]) require.NoError(t, err) - defer ctx.Check(client.Close) - - expectedData := testrand.Bytes(10 * memory.KiB) - serialNumber := testrand.SerialNumber() - - orderLimit, piecePrivateKey := GenerateOrderLimit( - t, - planet.Satellites[0].ID(), - planet.StorageNodes[0].ID(), - storj.PieceID{1}, - pb.PieceAction_PUT, - serialNumber, - 24*time.Hour, - 24*time.Hour, - int64(len(expectedData)), - ) - signer := signing.SignerFromFullIdentity(planet.Satellites[0].Identity) - orderLimit, err = signing.SignOrderLimit(ctx, signer, orderLimit) - require.NoError(t, err) - - uploader, err := client.Upload(ctx, orderLimit, piecePrivateKey) - require.NoError(t, err) - - _, err = uploader.Write(expectedData) - require.NoError(t, err) - - _, err = uploader.Commit(ctx) - require.NoError(t, err) for _, tt := range []struct { pieceID storj.PieceID @@ -412,7 +334,7 @@ func TestDelete(t *testing.T) { err string }{ { // should successfully delete data - pieceID: orderLimit.PieceId, + pieceID: pieceID, action: pb.PieceAction_DELETE, err: "", }, @@ -427,7 +349,7 @@ func TestDelete(t *testing.T) { err: "missing piece id", }, { // should err due to incorrect action - pieceID: orderLimit.PieceId, + pieceID: pieceID, action: pb.PieceAction_GET, err: "expected delete action got GET", }, @@ -459,6 +381,64 @@ func TestDelete(t *testing.T) { } } +func TestDeletePiece(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + planet, err := testplanet.New(t, 1, 1, 1) + require.NoError(t, err) + defer ctx.Check(planet.Shutdown) + + planet.Start(ctx) + + var ( + planetSat = planet.Satellites[0] + planetSN = planet.StorageNodes[0] + ) + + var client *piecestore.Client + { + dossier, err := planetSat.Overlay.DB.Get(ctx.Context, planetSN.ID()) + require.NoError(t, err) + + client, err = piecestore.Dial( + ctx.Context, planetSat.Dialer, &dossier.Node, zaptest.NewLogger(t), piecestore.Config{}, + ) + require.NoError(t, err) + } + + t.Run("Ok", func(t *testing.T) { + pieceID := storj.PieceID{1} + data, _, _ := uploadPiece(t, ctx, pieceID, planetSN, planet.Uplinks[0], planetSat) + + err := client.DeletePiece(ctx.Context, pieceID) + require.NoError(t, err) + + _, err = downloadPiece(t, ctx, pieceID, int64(len(data)), planetSN, planet.Uplinks[0], planetSat) + require.Error(t, err) + + require.Condition(t, func() bool { + return strings.Contains(err.Error(), "file does not exist") || + strings.Contains(err.Error(), "The system cannot find the path specified") + }, "unexpected error message") + }) + + t.Run("error: Not found", func(t *testing.T) { + err := client.DeletePiece(ctx.Context, storj.PieceID{2}) + require.Error(t, err) + require.Equal(t, rpcstatus.NotFound, rpcstatus.Code(err)) + }) + + t.Run("error: permission denied", func(t *testing.T) { + client, err := planet.Uplinks[0].DialPiecestore(ctx, planetSN) + require.NoError(t, err) + + err = client.DeletePiece(ctx.Context, storj.PieceID{}) + require.Error(t, err) + require.Equal(t, rpcstatus.PermissionDenied, rpcstatus.Code(err)) + }) +} + func TestTooManyRequests(t *testing.T) { t.Skip("flaky, because of EOF issues") @@ -588,3 +568,88 @@ func GenerateOrderLimit(t *testing.T, satellite storj.NodeID, storageNode storj. Limit: limit, }, piecePrivateKey } + +// uploadPiece uploads piece to storageNode. +func uploadPiece( + t *testing.T, ctx *testcontext.Context, piece storj.PieceID, storageNode *storagenode.Peer, + uplink *testplanet.Uplink, satellite *testplanet.SatelliteSystem, +) (uploadedData []byte, _ *pb.OrderLimit, _ *pb.PieceHash) { + t.Helper() + + client, err := uplink.DialPiecestore(ctx, storageNode) + require.NoError(t, err) + defer ctx.Check(client.Close) + + serialNumber := testrand.SerialNumber() + uploadedData = testrand.Bytes(10 * memory.KiB) + + orderLimit, piecePrivateKey := GenerateOrderLimit( + t, + satellite.ID(), + storageNode.ID(), + piece, + pb.PieceAction_PUT, + serialNumber, + 24*time.Hour, + 24*time.Hour, + int64(len(uploadedData)), + ) + signer := signing.SignerFromFullIdentity(satellite.Identity) + orderLimit, err = signing.SignOrderLimit(ctx, signer, orderLimit) + require.NoError(t, err) + + uploader, err := client.Upload(ctx, orderLimit, piecePrivateKey) + require.NoError(t, err) + + _, err = uploader.Write(uploadedData) + require.NoError(t, err) + + hash, err := uploader.Commit(ctx) + require.NoError(t, err) + + return uploadedData, orderLimit, hash +} + +// downloadPiece downlodads piece from storageNode. +func downloadPiece( + t *testing.T, ctx *testcontext.Context, piece storj.PieceID, limit int64, + storageNode *storagenode.Peer, uplink *testplanet.Uplink, satellite *testplanet.SatelliteSystem, +) (pieceData []byte, err error) { + t.Helper() + + serialNumber := testrand.SerialNumber() + orderLimit, piecePrivateKey := GenerateOrderLimit( + t, + satellite.ID(), + storageNode.ID(), + piece, + pb.PieceAction_GET, + serialNumber, + 24*time.Hour, + 24*time.Hour, + limit, + ) + signer := signing.SignerFromFullIdentity(satellite.Identity) + orderLimit, err = signing.SignOrderLimit(ctx.Context, signer, orderLimit) + require.NoError(t, err) + + client, err := uplink.DialPiecestore(ctx, storageNode) + require.NoError(t, err) + + downloader, err := client.Download(ctx.Context, orderLimit, piecePrivateKey, 0, limit) + require.NoError(t, err) + defer func() { + if err != nil { + // Ignore err in Close if an error happened in Download because it's also + // returned by Close. + _ = downloader.Close() + return + } + + err = downloader.Close() + }() + + buffer := make([]byte, limit) + n, err := downloader.Read(buffer) + return buffer[:n], err +} diff --git a/uplink/piecestore/client.go b/uplink/piecestore/client.go index 04a8f2cd3..28bcecee4 100644 --- a/uplink/piecestore/client.go +++ b/uplink/piecestore/client.go @@ -70,6 +70,15 @@ func (client *Client) Delete(ctx context.Context, limit *pb.OrderLimit, privateK return Error.Wrap(err) } +// DeletePiece deletes a piece. +func (client *Client) DeletePiece(ctx context.Context, id storj.PieceID) (err error) { + defer mon.Task()(&ctx, id.String())(&err) + _, err = client.client.DeletePiece(ctx, &pb.PieceDeletePieceRequest{ + PieceId: id, + }) + return Error.Wrap(err) +} + // Retain uses a bloom filter to tell the piece store which pieces to keep. func (client *Client) Retain(ctx context.Context, req *pb.RetainRequest) (err error) { defer mon.Task()(&ctx)(&err)