diff --git a/pkg/pb/piecestore2.pb.go b/pkg/pb/piecestore2.pb.go index c1b8841b9..032b89640 100644 --- a/pkg/pb/piecestore2.pb.go +++ b/pkg/pb/piecestore2.pb.go @@ -49,7 +49,7 @@ func (x PieceHeader_FormatVersion) String() string { } func (PieceHeader_FormatVersion) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_23ff32dd550c2439, []int{8, 0} + return fileDescriptor_23ff32dd550c2439, []int{10, 0} } // Expected order of messages from uplink: @@ -565,6 +565,66 @@ func (m *RetainResponse) XXX_DiscardUnknown() { var xxx_messageInfo_RetainResponse proto.InternalMessageInfo +type RestoreTrashRequest struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *RestoreTrashRequest) Reset() { *m = RestoreTrashRequest{} } +func (m *RestoreTrashRequest) String() string { return proto.CompactTextString(m) } +func (*RestoreTrashRequest) ProtoMessage() {} +func (*RestoreTrashRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_23ff32dd550c2439, []int{8} +} +func (m *RestoreTrashRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_RestoreTrashRequest.Unmarshal(m, b) +} +func (m *RestoreTrashRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_RestoreTrashRequest.Marshal(b, m, deterministic) +} +func (m *RestoreTrashRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_RestoreTrashRequest.Merge(m, src) +} +func (m *RestoreTrashRequest) XXX_Size() int { + return xxx_messageInfo_RestoreTrashRequest.Size(m) +} +func (m *RestoreTrashRequest) XXX_DiscardUnknown() { + xxx_messageInfo_RestoreTrashRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_RestoreTrashRequest proto.InternalMessageInfo + +type RestoreTrashResponse struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *RestoreTrashResponse) Reset() { *m = RestoreTrashResponse{} } +func (m *RestoreTrashResponse) String() string { return proto.CompactTextString(m) } +func (*RestoreTrashResponse) ProtoMessage() {} +func (*RestoreTrashResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_23ff32dd550c2439, []int{9} +} +func (m *RestoreTrashResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_RestoreTrashResponse.Unmarshal(m, b) +} +func (m *RestoreTrashResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_RestoreTrashResponse.Marshal(b, m, deterministic) +} +func (m *RestoreTrashResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_RestoreTrashResponse.Merge(m, src) +} +func (m *RestoreTrashResponse) XXX_Size() int { + return xxx_messageInfo_RestoreTrashResponse.Size(m) +} +func (m *RestoreTrashResponse) XXX_DiscardUnknown() { + xxx_messageInfo_RestoreTrashResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_RestoreTrashResponse proto.InternalMessageInfo + // PieceHeader is used in piece storage to keep track of piece attributes. type PieceHeader struct { // the storage format version being used for this piece. The piece filename should agree with this. @@ -590,7 +650,7 @@ func (m *PieceHeader) Reset() { *m = PieceHeader{} } func (m *PieceHeader) String() string { return proto.CompactTextString(m) } func (*PieceHeader) ProtoMessage() {} func (*PieceHeader) Descriptor() ([]byte, []int) { - return fileDescriptor_23ff32dd550c2439, []int{8} + return fileDescriptor_23ff32dd550c2439, []int{10} } func (m *PieceHeader) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_PieceHeader.Unmarshal(m, b) @@ -658,54 +718,58 @@ func init() { proto.RegisterType((*PieceDeleteResponse)(nil), "piecestore.PieceDeleteResponse") proto.RegisterType((*RetainRequest)(nil), "piecestore.RetainRequest") proto.RegisterType((*RetainResponse)(nil), "piecestore.RetainResponse") + proto.RegisterType((*RestoreTrashRequest)(nil), "piecestore.RestoreTrashRequest") + proto.RegisterType((*RestoreTrashResponse)(nil), "piecestore.RestoreTrashResponse") proto.RegisterType((*PieceHeader)(nil), "piecestore.PieceHeader") } func init() { proto.RegisterFile("piecestore2.proto", fileDescriptor_23ff32dd550c2439) } var fileDescriptor_23ff32dd550c2439 = []byte{ - // 648 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x54, 0xcb, 0x6e, 0xd3, 0x4c, - 0x14, 0x8e, 0x73, 0x53, 0x7b, 0x6a, 0x57, 0xed, 0xfc, 0x7f, 0x51, 0xb0, 0x80, 0x14, 0x43, 0xa1, - 0x1b, 0xdc, 0x92, 0xae, 0x40, 0xa5, 0x88, 0x52, 0x55, 0x20, 0x5a, 0xb5, 0x1a, 0xda, 0x2e, 0xd8, - 0x44, 0x93, 0x64, 0x9c, 0x58, 0x24, 0x1e, 0xe3, 0x99, 0x80, 0xd4, 0xa7, 0xe0, 0x91, 0x10, 0x2b, - 0x1e, 0x02, 0xc1, 0x82, 0xc7, 0x60, 0x83, 0xe6, 0xe2, 0x24, 0x4e, 0xd2, 0x44, 0x45, 0x62, 0x95, - 0xcc, 0xb9, 0x9f, 0xef, 0xfb, 0x8e, 0x61, 0x35, 0x0e, 0x69, 0x93, 0x72, 0xc1, 0x12, 0x5a, 0xf3, - 0xe3, 0x84, 0x09, 0x86, 0x60, 0x68, 0x72, 0xa1, 0xcd, 0xda, 0x4c, 0xdb, 0xdd, 0x6a, 0x9b, 0xb1, - 0x76, 0x97, 0x6e, 0xa9, 0x57, 0xa3, 0x1f, 0x6c, 0x89, 0xb0, 0x47, 0xb9, 0x20, 0xbd, 0xd8, 0x04, - 0xd8, 0x2c, 0x69, 0xd1, 0x84, 0xeb, 0x97, 0xf7, 0xdb, 0x02, 0x74, 0x2a, 0x2b, 0x9d, 0xc7, 0x5d, - 0x46, 0x5a, 0x98, 0x7e, 0xe8, 0x53, 0x2e, 0xd0, 0x26, 0x94, 0xba, 0x61, 0x2f, 0x14, 0x15, 0x6b, - 0xdd, 0xda, 0x5c, 0xaa, 0x21, 0xdf, 0x24, 0x9d, 0xc8, 0x9f, 0x23, 0xe9, 0xc1, 0x3a, 0x00, 0xdd, - 0x83, 0x92, 0xf2, 0x55, 0xf2, 0x2a, 0xd2, 0xc9, 0x44, 0x62, 0xed, 0x43, 0x4f, 0xa1, 0xd4, 0xec, - 0xf4, 0xa3, 0xf7, 0x95, 0x82, 0x0a, 0xba, 0xef, 0x0f, 0x87, 0xf7, 0x27, 0xbb, 0xfb, 0x2f, 0x65, - 0x2c, 0xd6, 0x29, 0x68, 0x03, 0x8a, 0x2d, 0x16, 0xd1, 0x4a, 0x51, 0xa5, 0xae, 0xa6, 0xf5, 0x55, - 0xda, 0x2b, 0xc2, 0x3b, 0x58, 0xb9, 0xdd, 0x1d, 0x28, 0xa9, 0x34, 0x74, 0x03, 0xca, 0x2c, 0x08, - 0x38, 0xd5, 0xb3, 0x17, 0xb0, 0x79, 0x21, 0x04, 0xc5, 0x16, 0x11, 0x44, 0xcd, 0x69, 0x63, 0xf5, - 0xdf, 0xdb, 0x85, 0xff, 0x32, 0xed, 0x79, 0xcc, 0x22, 0x4e, 0x07, 0x2d, 0xad, 0x99, 0x2d, 0xbd, - 0x5f, 0x16, 0xfc, 0xaf, 0x6c, 0x07, 0xec, 0x53, 0xf4, 0x0f, 0xd1, 0xdb, 0xcd, 0xa2, 0xf7, 0x60, - 0x02, 0xbd, 0xb1, 0xfe, 0x19, 0xfc, 0xdc, 0xbd, 0x79, 0xc0, 0xdc, 0x06, 0x50, 0x91, 0x75, 0x1e, - 0x5e, 0x52, 0x35, 0x48, 0x01, 0x2f, 0x2a, 0xcb, 0xdb, 0xf0, 0x92, 0x7a, 0xdf, 0x2d, 0x58, 0x1b, - 0xeb, 0x62, 0x60, 0x7a, 0x96, 0xce, 0xa5, 0xd7, 0x7c, 0x38, 0x63, 0x2e, 0x9d, 0x31, 0x41, 0x6c, - 0x87, 0xf0, 0x8e, 0x59, 0x7d, 0x1a, 0xca, 0xd2, 0x3d, 0x04, 0xb3, 0x30, 0x07, 0xcc, 0xbf, 0x93, - 0xc0, 0x9e, 0xd1, 0xff, 0x01, 0xed, 0x52, 0x41, 0xaf, 0xcd, 0xa0, 0xb7, 0x66, 0x24, 0x94, 0xe6, - 0xeb, 0x4d, 0xbd, 0x04, 0x1c, 0x4c, 0x05, 0x09, 0xa3, 0xb4, 0xe2, 0x6b, 0x70, 0x9a, 0x09, 0x25, - 0x22, 0x64, 0x51, 0xbd, 0x45, 0x44, 0x2a, 0x2e, 0xd7, 0xd7, 0xf7, 0xea, 0xa7, 0xf7, 0xea, 0x9f, - 0xa5, 0xf7, 0xba, 0xbf, 0xf0, 0xed, 0x47, 0x35, 0xf7, 0xf9, 0x67, 0xd5, 0xc2, 0x76, 0x9a, 0x7a, - 0x40, 0x04, 0x95, 0xeb, 0x05, 0x61, 0x57, 0x18, 0xd5, 0xd8, 0xd8, 0xbc, 0xbc, 0x15, 0x58, 0x4e, - 0x7b, 0x9a, 0x29, 0xbe, 0xe6, 0x61, 0x49, 0xe3, 0x49, 0x89, 0x54, 0xd2, 0x11, 0x2c, 0x07, 0x2c, - 0xe9, 0x11, 0x51, 0xff, 0x48, 0x13, 0x1e, 0xb2, 0x48, 0x4d, 0xb1, 0x5c, 0xdb, 0x98, 0xa0, 0x4e, - 0x27, 0xf8, 0x87, 0x2a, 0xfa, 0x42, 0x07, 0x63, 0x27, 0x18, 0x7d, 0x4a, 0x38, 0x07, 0x04, 0xda, - 0x86, 0xad, 0xd1, 0x35, 0xe5, 0x97, 0xc7, 0xb0, 0x76, 0xcd, 0x35, 0xa5, 0x13, 0xdd, 0x82, 0x45, - 0x1e, 0xb6, 0x23, 0x22, 0xfa, 0x89, 0xbe, 0x7e, 0x1b, 0x0f, 0x0d, 0xe8, 0x09, 0x2c, 0x29, 0x4e, - 0xea, 0x9a, 0xa7, 0xd2, 0x55, 0x3c, 0xed, 0x17, 0x65, 0x79, 0x0c, 0x6c, 0x60, 0xf1, 0x1e, 0x81, - 0x93, 0xd9, 0x0b, 0x39, 0xb0, 0x78, 0x78, 0x82, 0x8f, 0x5f, 0x9c, 0xd5, 0x2f, 0xb6, 0x57, 0x72, - 0xa3, 0xcf, 0xc7, 0x2b, 0x56, 0xed, 0x4b, 0x1e, 0xe0, 0x74, 0x00, 0x0f, 0x3a, 0x86, 0xb2, 0xfe, - 0x5c, 0xa0, 0x3b, 0xb3, 0x3f, 0x63, 0x6e, 0xf5, 0x4a, 0xbf, 0xa1, 0x27, 0xb7, 0x69, 0xa1, 0x73, - 0x58, 0x48, 0xcf, 0x04, 0xad, 0xcf, 0xbb, 0x6c, 0xf7, 0xee, 0xdc, 0x1b, 0x93, 0x45, 0xb7, 0x2d, - 0xf4, 0x06, 0xca, 0x5a, 0x91, 0x53, 0xa6, 0xcc, 0x48, 0x7d, 0xca, 0x94, 0x63, 0x52, 0xce, 0xa1, - 0xe7, 0x50, 0xd6, 0xc2, 0x42, 0x37, 0x47, 0x83, 0x33, 0x02, 0x77, 0xdd, 0x69, 0x2e, 0x5d, 0x62, - 0xbf, 0xf8, 0x2e, 0x1f, 0x37, 0x1a, 0x65, 0x45, 0xfe, 0xce, 0x9f, 0x00, 0x00, 0x00, 0xff, 0xff, - 0x8a, 0x5d, 0xb7, 0x8b, 0xce, 0x06, 0x00, 0x00, + // 685 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x54, 0x4b, 0x6f, 0xd3, 0x4e, + 0x10, 0x8f, 0x9b, 0x87, 0xda, 0xa9, 0x5d, 0xb5, 0xdb, 0x87, 0xf2, 0xb7, 0xfe, 0x90, 0x62, 0x28, + 0xf4, 0x82, 0x5b, 0xd2, 0x13, 0xa8, 0x14, 0x51, 0xaa, 0x0a, 0x44, 0xab, 0x56, 0xdb, 0xc7, 0x81, + 0x4b, 0xb4, 0x6d, 0xd6, 0x89, 0x45, 0xe2, 0x35, 0xde, 0x0d, 0x48, 0xfd, 0x14, 0x5c, 0xf8, 0x42, + 0x9c, 0xf8, 0x10, 0x08, 0x0e, 0x7c, 0x0c, 0x2e, 0x68, 0x1f, 0x4e, 0xe2, 0x3c, 0x55, 0x24, 0x4e, + 0xf6, 0xce, 0xfc, 0x66, 0x67, 0xf6, 0x37, 0xbf, 0x19, 0x58, 0x8a, 0x43, 0x7a, 0x4d, 0xb9, 0x60, + 0x09, 0xad, 0xfa, 0x71, 0xc2, 0x04, 0x43, 0xd0, 0x33, 0xb9, 0xd0, 0x60, 0x0d, 0xa6, 0xed, 0x6e, + 0xa5, 0xc1, 0x58, 0xa3, 0x45, 0xb7, 0xd4, 0xe9, 0xaa, 0x13, 0x6c, 0x89, 0xb0, 0x4d, 0xb9, 0x20, + 0xed, 0xd8, 0x00, 0x6c, 0x96, 0xd4, 0x69, 0xc2, 0xf5, 0xc9, 0xfb, 0x6d, 0x01, 0x3a, 0x95, 0x37, + 0x5d, 0xc4, 0x2d, 0x46, 0xea, 0x98, 0x7e, 0xe8, 0x50, 0x2e, 0xd0, 0x26, 0x14, 0x5b, 0x61, 0x3b, + 0x14, 0x65, 0x6b, 0xdd, 0xda, 0x9c, 0xaf, 0x22, 0xdf, 0x04, 0x9d, 0xc8, 0xcf, 0x91, 0xf4, 0x60, + 0x0d, 0x40, 0xf7, 0xa1, 0xa8, 0x7c, 0xe5, 0x19, 0x85, 0x74, 0x32, 0x48, 0xac, 0x7d, 0xe8, 0x19, + 0x14, 0xaf, 0x9b, 0x9d, 0xe8, 0x7d, 0x39, 0xaf, 0x40, 0x0f, 0xfc, 0x5e, 0xf1, 0xfe, 0x70, 0x76, + 0xff, 0x95, 0xc4, 0x62, 0x1d, 0x82, 0x36, 0xa0, 0x50, 0x67, 0x11, 0x2d, 0x17, 0x54, 0xe8, 0x52, + 0x7a, 0xbf, 0x0a, 0x7b, 0x4d, 0x78, 0x13, 0x2b, 0xb7, 0xbb, 0x03, 0x45, 0x15, 0x86, 0xd6, 0xa0, + 0xc4, 0x82, 0x80, 0x53, 0x5d, 0x7b, 0x1e, 0x9b, 0x13, 0x42, 0x50, 0xa8, 0x13, 0x41, 0x54, 0x9d, + 0x36, 0x56, 0xff, 0xde, 0x2e, 0x2c, 0x67, 0xd2, 0xf3, 0x98, 0x45, 0x9c, 0x76, 0x53, 0x5a, 0x13, + 0x53, 0x7a, 0xbf, 0x2c, 0x58, 0x51, 0xb6, 0x03, 0xf6, 0x29, 0xfa, 0x87, 0xec, 0xed, 0x66, 0xd9, + 0x7b, 0x38, 0xc4, 0xde, 0x40, 0xfe, 0x0c, 0x7f, 0xee, 0xde, 0x34, 0x62, 0xee, 0x00, 0x28, 0x64, + 0x8d, 0x87, 0x37, 0x54, 0x15, 0x92, 0xc7, 0x73, 0xca, 0x72, 0x16, 0xde, 0x50, 0xef, 0xbb, 0x05, + 0xab, 0x03, 0x59, 0x0c, 0x4d, 0xcf, 0xd3, 0xba, 0xf4, 0x33, 0x1f, 0x4d, 0xa8, 0x4b, 0x47, 0x0c, + 0x35, 0xb6, 0x49, 0x78, 0xd3, 0x3c, 0x7d, 0x14, 0xcb, 0xd2, 0xdd, 0x23, 0x33, 0x3f, 0x85, 0xcc, + 0xbf, 0x93, 0xc0, 0x9e, 0xd1, 0xff, 0x01, 0x6d, 0x51, 0x41, 0x6f, 0xdd, 0x41, 0x6f, 0xd5, 0x48, + 0x28, 0x8d, 0xd7, 0x2f, 0xf5, 0x12, 0x70, 0x30, 0x15, 0x24, 0x8c, 0xd2, 0x1b, 0xdf, 0x80, 0x73, + 0x9d, 0x50, 0x22, 0x42, 0x16, 0xd5, 0xea, 0x44, 0xa4, 0xe2, 0x72, 0x7d, 0x3d, 0xaf, 0x7e, 0x3a, + 0xaf, 0xfe, 0x79, 0x3a, 0xaf, 0xfb, 0xb3, 0xdf, 0x7e, 0x54, 0x72, 0x9f, 0x7f, 0x56, 0x2c, 0x6c, + 0xa7, 0xa1, 0x07, 0x44, 0x50, 0xf9, 0xbc, 0x20, 0x6c, 0x09, 0xa3, 0x1a, 0x1b, 0x9b, 0x93, 0xb7, + 0x08, 0x0b, 0x69, 0x4e, 0x53, 0xc5, 0x2a, 0x2c, 0x63, 0xdd, 0x90, 0xf3, 0x44, 0x32, 0xaa, 0x6b, + 0xf1, 0xd6, 0x60, 0x25, 0x6b, 0x36, 0xf0, 0xaf, 0x33, 0x30, 0xaf, 0xe9, 0xa7, 0x44, 0x0a, 0xef, + 0x08, 0x16, 0x02, 0x96, 0xb4, 0x89, 0xa8, 0x7d, 0xa4, 0x09, 0x0f, 0x59, 0xa4, 0x8a, 0x5e, 0xa8, + 0x6e, 0x0c, 0x75, 0x5a, 0x07, 0xf8, 0x87, 0x0a, 0x7d, 0xa9, 0xc1, 0xd8, 0x09, 0xfa, 0x8f, 0x92, + 0xfd, 0x6e, 0xbf, 0x6d, 0xd3, 0xdc, 0x7e, 0x56, 0xe4, 0xa2, 0x32, 0x4d, 0xbe, 0x25, 0x2b, 0xd2, + 0x89, 0xfe, 0x87, 0x39, 0x1e, 0x36, 0x22, 0x22, 0x3a, 0x89, 0x5e, 0x16, 0x36, 0xee, 0x19, 0xd0, + 0x53, 0x98, 0x57, 0x2d, 0xac, 0xe9, 0xb6, 0x16, 0xc7, 0xb5, 0x75, 0xbf, 0x20, 0xaf, 0xc7, 0xc0, + 0xba, 0x16, 0xef, 0x31, 0x38, 0x99, 0x77, 0x21, 0x07, 0xe6, 0x0e, 0x4f, 0xf0, 0xf1, 0xcb, 0xf3, + 0xda, 0xe5, 0xf6, 0x62, 0xae, 0xff, 0xf8, 0x64, 0xd1, 0xaa, 0x7e, 0xc9, 0x03, 0x9c, 0x76, 0xe9, + 0x41, 0xc7, 0x50, 0xd2, 0xdb, 0x05, 0xdd, 0x9d, 0xbc, 0xf5, 0xdc, 0xca, 0x58, 0xbf, 0x69, 0x4f, + 0x6e, 0xd3, 0x42, 0x17, 0x30, 0x9b, 0x4e, 0x15, 0x5a, 0x9f, 0xb6, 0x08, 0xdc, 0x7b, 0x53, 0x47, + 0x52, 0x5e, 0xba, 0x6d, 0xa1, 0xb7, 0x50, 0xd2, 0x02, 0x1e, 0x51, 0x65, 0x66, 0x32, 0x46, 0x54, + 0x39, 0xa0, 0xfc, 0x1c, 0x7a, 0x01, 0x25, 0xad, 0x43, 0xf4, 0x5f, 0x3f, 0x38, 0x33, 0x0f, 0xae, + 0x3b, 0xca, 0x65, 0x16, 0xcb, 0x19, 0xd8, 0xfd, 0xfa, 0x44, 0x95, 0x2c, 0x76, 0x48, 0xd0, 0xee, + 0xfa, 0x78, 0x40, 0x5a, 0xd5, 0x7e, 0xe1, 0xdd, 0x4c, 0x7c, 0x75, 0x55, 0x52, 0x8a, 0xda, 0xf9, + 0x13, 0x00, 0x00, 0xff, 0xff, 0x8e, 0x06, 0xc3, 0xd8, 0x52, 0x07, 0x00, 0x00, } type DRPCPiecestoreClient interface { @@ -715,6 +779,7 @@ type DRPCPiecestoreClient interface { Download(ctx context.Context) (DRPCPiecestore_DownloadClient, error) Delete(ctx context.Context, in *PieceDeleteRequest) (*PieceDeleteResponse, error) Retain(ctx context.Context, in *RetainRequest) (*RetainResponse, error) + RestoreTrash(ctx context.Context, in *RestoreTrashRequest) (*RestoreTrashResponse, error) } type drpcPiecestoreClient struct { @@ -810,16 +875,26 @@ func (c *drpcPiecestoreClient) Retain(ctx context.Context, in *RetainRequest) (* return out, nil } +func (c *drpcPiecestoreClient) RestoreTrash(ctx context.Context, in *RestoreTrashRequest) (*RestoreTrashResponse, error) { + out := new(RestoreTrashResponse) + err := c.cc.Invoke(ctx, "/piecestore.Piecestore/RestoreTrash", in, out) + if err != nil { + return nil, err + } + return out, nil +} + type DRPCPiecestoreServer interface { Upload(DRPCPiecestore_UploadStream) error Download(DRPCPiecestore_DownloadStream) error Delete(context.Context, *PieceDeleteRequest) (*PieceDeleteResponse, error) Retain(context.Context, *RetainRequest) (*RetainResponse, error) + RestoreTrash(context.Context, *RestoreTrashRequest) (*RestoreTrashResponse, error) } type DRPCPiecestoreDescription struct{} -func (DRPCPiecestoreDescription) NumMethods() int { return 4 } +func (DRPCPiecestoreDescription) NumMethods() int { return 5 } func (DRPCPiecestoreDescription) Method(n int) (string, drpc.Handler, interface{}, bool) { switch n { @@ -857,6 +932,15 @@ func (DRPCPiecestoreDescription) Method(n int) (string, drpc.Handler, interface{ in1.(*RetainRequest), ) }, DRPCPiecestoreServer.Retain, true + case 4: + return "/piecestore.Piecestore/RestoreTrash", + func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) { + return srv.(DRPCPiecestoreServer). + RestoreTrash( + ctx, + in1.(*RestoreTrashRequest), + ) + }, DRPCPiecestoreServer.RestoreTrash, true default: return "", nil, nil, false } @@ -945,6 +1029,22 @@ func (x *drpcPiecestoreRetainStream) SendAndClose(m *RetainResponse) error { return x.CloseSend() } +type DRPCPiecestore_RestoreTrashStream interface { + drpc.Stream + SendAndClose(*RestoreTrashResponse) error +} + +type drpcPiecestoreRestoreTrashStream struct { + drpc.Stream +} + +func (x *drpcPiecestoreRestoreTrashStream) SendAndClose(m *RestoreTrashResponse) error { + if err := x.MsgSend(m); err != nil { + return err + } + return x.CloseSend() +} + // Reference imports to suppress errors if they are not otherwise used. var _ context.Context var _ grpc.ClientConn @@ -961,6 +1061,7 @@ type PiecestoreClient interface { Download(ctx context.Context, opts ...grpc.CallOption) (Piecestore_DownloadClient, error) Delete(ctx context.Context, in *PieceDeleteRequest, opts ...grpc.CallOption) (*PieceDeleteResponse, error) Retain(ctx context.Context, in *RetainRequest, opts ...grpc.CallOption) (*RetainResponse, error) + RestoreTrash(ctx context.Context, in *RestoreTrashRequest, opts ...grpc.CallOption) (*RestoreTrashResponse, error) } type piecestoreClient struct { @@ -1054,12 +1155,22 @@ func (c *piecestoreClient) Retain(ctx context.Context, in *RetainRequest, opts . return out, nil } +func (c *piecestoreClient) RestoreTrash(ctx context.Context, in *RestoreTrashRequest, opts ...grpc.CallOption) (*RestoreTrashResponse, error) { + out := new(RestoreTrashResponse) + err := c.cc.Invoke(ctx, "/piecestore.Piecestore/RestoreTrash", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // PiecestoreServer is the server API for Piecestore service. type PiecestoreServer interface { Upload(Piecestore_UploadServer) error Download(Piecestore_DownloadServer) error Delete(context.Context, *PieceDeleteRequest) (*PieceDeleteResponse, error) Retain(context.Context, *RetainRequest) (*RetainResponse, error) + RestoreTrash(context.Context, *RestoreTrashRequest) (*RestoreTrashResponse, error) } func RegisterPiecestoreServer(s *grpc.Server, srv PiecestoreServer) { @@ -1154,6 +1265,24 @@ func _Piecestore_Retain_Handler(srv interface{}, ctx context.Context, dec func(i return interceptor(ctx, in, info, handler) } +func _Piecestore_RestoreTrash_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(RestoreTrashRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(PiecestoreServer).RestoreTrash(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/piecestore.Piecestore/RestoreTrash", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(PiecestoreServer).RestoreTrash(ctx, req.(*RestoreTrashRequest)) + } + return interceptor(ctx, in, info, handler) +} + var _Piecestore_serviceDesc = grpc.ServiceDesc{ ServiceName: "piecestore.Piecestore", HandlerType: (*PiecestoreServer)(nil), @@ -1166,6 +1295,10 @@ var _Piecestore_serviceDesc = grpc.ServiceDesc{ MethodName: "Retain", Handler: _Piecestore_Retain_Handler, }, + { + MethodName: "RestoreTrash", + Handler: _Piecestore_RestoreTrash_Handler, + }, }, Streams: []grpc.StreamDesc{ { diff --git a/pkg/pb/piecestore2.proto b/pkg/pb/piecestore2.proto index 815dd149d..ef3c9774b 100644 --- a/pkg/pb/piecestore2.proto +++ b/pkg/pb/piecestore2.proto @@ -15,6 +15,7 @@ service Piecestore { rpc Download(stream PieceDownloadRequest) returns (stream PieceDownloadResponse) {} rpc Delete(PieceDeleteRequest) returns (PieceDeleteResponse) {} rpc Retain(RetainRequest) returns (RetainResponse); + rpc RestoreTrash(RestoreTrashRequest) returns (RestoreTrashResponse) {} } // Expected order of messages from uplink: @@ -62,7 +63,7 @@ message PieceDownloadRequest { int64 offset = 1; int64 chunk_size = 2; } - + // request for the chunk Chunk chunk = 3; } @@ -93,6 +94,9 @@ message RetainRequest { message RetainResponse { } +message RestoreTrashRequest {} +message RestoreTrashResponse {} + // PieceHeader is used in piece storage to keep track of piece attributes. message PieceHeader { enum FormatVersion { @@ -113,4 +117,4 @@ message PieceHeader { // the OrderLimit authorizing storage of this piece, as signed by the satellite and sent by // the uplink orders.OrderLimit order_limit = 5 [(gogoproto.nullable) = false]; -} \ No newline at end of file +} diff --git a/private/testplanet/uplink_test.go b/private/testplanet/uplink_test.go index f37dbac7f..983e91d2d 100644 --- a/private/testplanet/uplink_test.go +++ b/private/testplanet/uplink_test.go @@ -179,6 +179,9 @@ func (mock *piecestoreMock) Delete(ctx context.Context, delete *pb.PieceDeleteRe func (mock *piecestoreMock) Retain(ctx context.Context, retain *pb.RetainRequest) (_ *pb.RetainResponse, err error) { return nil, nil } +func (mock *piecestoreMock) RestoreTrash(context.Context, *pb.RestoreTrashRequest) (*pb.RestoreTrashResponse, error) { + return nil, nil +} func TestDownloadFromUnresponsiveNode(t *testing.T) { testplanet.Run(t, testplanet.Config{ diff --git a/proto.lock b/proto.lock index cf17a12e5..18b52e4e5 100644 --- a/proto.lock +++ b/proto.lock @@ -5588,6 +5588,12 @@ { "name": "RetainResponse" }, + { + "name": "RestoreTrashRequest" + }, + { + "name": "RestoreTrashResponse" + }, { "name": "PieceHeader", "fields": [ @@ -5661,6 +5667,11 @@ "name": "Retain", "in_type": "RetainRequest", "out_type": "RetainResponse" + }, + { + "name": "RestoreTrash", + "in_type": "RestoreTrashRequest", + "out_type": "RestoreTrashResponse" } ] } diff --git a/storagenode/pieces/store.go b/storagenode/pieces/store.go index a4910d59c..eda231bb6 100644 --- a/storagenode/pieces/store.go +++ b/storagenode/pieces/store.go @@ -67,6 +67,10 @@ type PieceExpirationDB interface { // DeleteFailed marks an expiration record as having experienced a failure in deleting the // piece from the disk DeleteFailed(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID, failedAt time.Time) error + // Trash marks a piece as in the trash + Trash(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) error + // RestoreTrash marks all piece as not being in trash + RestoreTrash(ctx context.Context, satelliteID storj.NodeID) error } // V0PieceInfoDB stores meta information about pieces stored with storage format V0 (where @@ -275,6 +279,50 @@ func (store *Store) Delete(ctx context.Context, satellite storj.NodeID, pieceID return Error.Wrap(err) } +// Trash moves the specified piece to the blob trash. If necessary, it converts +// the v0 piece to a v1 piece. It also marks the item as "trashed" in the +// pieceExpirationDB. +func (store *Store) Trash(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (err error) { + defer mon.Task()(&ctx)(&err) + + // Check if the MaxFormatVersionSupported piece exists. If not, we assume + // this is an old piece version and attempt to migrate it. + _, err = store.blobs.StatWithStorageFormat(ctx, storage.BlobRef{ + Namespace: satellite.Bytes(), + Key: pieceID.Bytes(), + }, filestore.MaxFormatVersionSupported) + if err != nil && !errs.IsFunc(err, os.IsNotExist) { + return Error.Wrap(err) + } + + if errs.IsFunc(err, os.IsNotExist) { + // MaxFormatVersionSupported does not exist, migrate. + err = store.MigrateV0ToV1(ctx, satellite, pieceID) + if err != nil { + return Error.Wrap(err) + } + } + + err = store.expirationInfo.Trash(ctx, satellite, pieceID) + err = errs.Combine(err, store.blobs.Trash(ctx, storage.BlobRef{ + Namespace: satellite.Bytes(), + Key: pieceID.Bytes(), + })) + + return Error.Wrap(err) +} + +// RestoreTrash restores all pieces in the trash +func (store *Store) RestoreTrash(ctx context.Context, satelliteID storj.NodeID) (err error) { + defer mon.Task()(&ctx)(&err) + + err = store.blobs.RestoreTrash(ctx, satelliteID.Bytes()) + if err != nil { + return Error.Wrap(err) + } + return Error.Wrap(store.expirationInfo.RestoreTrash(ctx, satelliteID)) +} + // MigrateV0ToV1 will migrate a piece stored with storage format v0 to storage // format v1. If the piece is not stored as a v0 piece it will return an error. // The follow failures are possible: diff --git a/storagenode/pieces/store_test.go b/storagenode/pieces/store_test.go index 6736cc6c0..2d5ceed63 100644 --- a/storagenode/pieces/store_test.go +++ b/storagenode/pieces/store_test.go @@ -183,6 +183,240 @@ func tryOpeningAPiece(ctx context.Context, t testing.TB, store *pieces.Store, sa require.NoError(t, reader.Close()) } +func TestTrashAndRestore(t *testing.T) { + type testfile struct { + data []byte + formatVer storage.FormatVersion + } + type testpiece struct { + pieceID storj.PieceID + files []testfile + expiration time.Time + } + type testsatellite struct { + satelliteID storj.NodeID + pieces []testpiece + } + + size := memory.KB + + // Initialize pub/priv keys for signing piece hash + publicKeyBytes, err := hex.DecodeString("01eaebcb418cd629d4c01f365f33006c9de3ce70cf04da76c39cdc993f48fe53") + require.NoError(t, err) + privateKeyBytes, err := hex.DecodeString("afefcccadb3d17b1f241b7c83f88c088b54c01b5a25409c13cbeca6bfa22b06901eaebcb418cd629d4c01f365f33006c9de3ce70cf04da76c39cdc993f48fe53") + require.NoError(t, err) + publicKey, err := storj.PiecePublicKeyFromBytes(publicKeyBytes) + require.NoError(t, err) + privateKey, err := storj.PiecePrivateKeyFromBytes(privateKeyBytes) + require.NoError(t, err) + + satellites := []testsatellite{ + { + satelliteID: testrand.NodeID(), + pieces: []testpiece{ + { + expiration: time.Time{}, // no expiration + pieceID: testrand.PieceID(), + files: []testfile{ + { + data: testrand.Bytes(size), + formatVer: filestore.FormatV0, + }, + }, + }, + { + pieceID: testrand.PieceID(), + expiration: time.Now().Add(24 * time.Hour), + files: []testfile{ + { + data: testrand.Bytes(size), + formatVer: filestore.FormatV1, + }, + }, + }, + { + pieceID: testrand.PieceID(), + expiration: time.Now().Add(24 * time.Hour), + files: []testfile{ + { + data: testrand.Bytes(size), + formatVer: filestore.FormatV0, + }, + { + data: testrand.Bytes(size), + formatVer: filestore.FormatV1, + }, + }, + }, + }, + }, + { + satelliteID: testrand.NodeID(), + pieces: []testpiece{ + { + pieceID: testrand.PieceID(), + expiration: time.Now().Add(24 * time.Hour), + files: []testfile{ + { + data: testrand.Bytes(size), + formatVer: filestore.FormatV1, + }, + }, + }, + }, + }, + } + + storagenodedbtest.Run(t, func(t *testing.T, db storagenode.DB) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + blobs, err := filestore.NewAt(zaptest.NewLogger(t), ctx.Dir("store")) + require.NoError(t, err) + defer ctx.Check(blobs.Close) + + v0PieceInfo, ok := db.V0PieceInfo().(pieces.V0PieceInfoDBForTest) + require.True(t, ok, "V0PieceInfoDB can not satisfy V0PieceInfoDBForTest") + + store := pieces.NewStore(zaptest.NewLogger(t), blobs, v0PieceInfo, db.PieceExpirationDB(), nil) + tStore := &pieces.StoreForTest{store} + + for _, satellite := range satellites { + now := time.Now() + for _, piece := range satellite.pieces { + // If test has expiration, add to expiration db + if !piece.expiration.IsZero() { + require.NoError(t, store.SetExpiration(ctx, satellite.satelliteID, piece.pieceID, piece.expiration)) + } + + for _, file := range piece.files { + w, err := tStore.WriterForFormatVersion(ctx, satellite.satelliteID, piece.pieceID, file.formatVer) + require.NoError(t, err) + + _, err = w.Write(file.data) + require.NoError(t, err) + + // Create, sign, and commit piece hash (to piece or v0PieceInfo) + pieceHash := &pb.PieceHash{ + PieceId: piece.pieceID, + Hash: w.Hash(), + PieceSize: w.Size(), + Timestamp: now, + } + signedPieceHash, err := signing.SignUplinkPieceHash(ctx, privateKey, pieceHash) + require.NoError(t, err) + require.NoError(t, w.Commit(ctx, &pb.PieceHeader{ + Hash: signedPieceHash.GetHash(), + CreationTime: signedPieceHash.GetTimestamp(), + Signature: signedPieceHash.GetSignature(), + })) + + if file.formatVer == filestore.FormatV0 { + err = v0PieceInfo.Add(ctx, &pieces.Info{ + SatelliteID: satellite.satelliteID, + PieceID: piece.pieceID, + PieceSize: signedPieceHash.GetPieceSize(), + PieceCreation: signedPieceHash.GetTimestamp(), + OrderLimit: &pb.OrderLimit{}, + UplinkPieceHash: signedPieceHash, + }) + require.NoError(t, err) + } + + // Verify piece matches data, has correct signature and expiration + verifyPieceData(ctx, t, store, satellite.satelliteID, piece.pieceID, file.formatVer, file.data, piece.expiration, publicKey) + + } + + // Trash the piece + require.NoError(t, store.Trash(ctx, satellite.satelliteID, piece.pieceID)) + + // Confirm is missing + r, err := store.Reader(ctx, satellite.satelliteID, piece.pieceID) + require.Error(t, err) + require.Nil(t, r) + + // Verify no expiry information is returned for this piece + if !piece.expiration.IsZero() { + infos, err := store.GetExpired(ctx, time.Now().Add(720*time.Hour), 1000) + require.NoError(t, err) + var found bool + for _, info := range infos { + if info.SatelliteID == satellite.satelliteID && info.PieceID == piece.pieceID { + found = true + } + } + require.False(t, found) + } + } + } + + // Restore all pieces in the first satellite + require.NoError(t, store.RestoreTrash(ctx, satellites[0].satelliteID)) + + // Check that each piece for first satellite is back, that they are + // MaxFormatVersionSupported (regardless of which version they began + // with), and that signature matches. + for _, piece := range satellites[0].pieces { + lastFile := piece.files[len(piece.files)-1] + verifyPieceData(ctx, t, store, satellites[0].satelliteID, piece.pieceID, filestore.MaxFormatVersionSupported, lastFile.data, piece.expiration, publicKey) + } + + // Confirm 2nd satellite pieces are still missing + for _, piece := range satellites[1].pieces { + r, err := store.Reader(ctx, satellites[1].satelliteID, piece.pieceID) + require.Error(t, err) + require.Nil(t, r) + } + + }) +} + +func verifyPieceData(ctx context.Context, t testing.TB, store *pieces.Store, satelliteID storj.NodeID, pieceID storj.PieceID, formatVer storage.FormatVersion, expected []byte, expiration time.Time, publicKey storj.PiecePublicKey) { + r, err := store.ReaderWithStorageFormat(ctx, satelliteID, pieceID, formatVer) + require.NoError(t, err) + + // Get piece hash, verify signature + var pieceHash *pb.PieceHash + if formatVer > filestore.FormatV0 { + header, err := r.GetPieceHeader() + require.NoError(t, err) + pieceHash = &pb.PieceHash{ + PieceId: pieceID, + Hash: header.GetHash(), + PieceSize: r.Size(), + Timestamp: header.GetCreationTime(), + Signature: header.GetSignature(), + } + } else { + info, err := store.GetV0PieceInfo(ctx, satelliteID, pieceID) + require.NoError(t, err) + pieceHash = info.UplinkPieceHash + } + require.NoError(t, signing.VerifyUplinkPieceHashSignature(ctx, publicKey, pieceHash)) + + // Require piece data to match expected + buf, err := ioutil.ReadAll(r) + require.NoError(t, err) + require.NoError(t, r.Close()) + assert.True(t, bytes.Equal(buf, expected)) + + // Require expiration to match expected + infos, err := store.GetExpired(ctx, time.Now().Add(720*time.Hour), 1000) + require.NoError(t, err) + var found bool + for _, info := range infos { + if info.SatelliteID == satelliteID && info.PieceID == pieceID { + found = true + } + } + if expiration.IsZero() { + require.False(t, found) + } else { + require.True(t, found) + } +} + func TestPieceVersionMigrate(t *testing.T) { storagenodedbtest.Run(t, func(t *testing.T, db storagenode.DB) { const pieceSize = 1024 diff --git a/storagenode/piecestore/endpoint.go b/storagenode/piecestore/endpoint.go index 4074005a2..bb80868c1 100644 --- a/storagenode/piecestore/endpoint.go +++ b/storagenode/piecestore/endpoint.go @@ -636,6 +636,28 @@ func (endpoint *Endpoint) saveOrder(ctx context.Context, limit *pb.OrderLimit, o } } +// RestoreTrash restores all trashed items for the satellite issuing the call +func (endpoint *Endpoint) RestoreTrash(ctx context.Context, restoreTrashReq *pb.RestoreTrashRequest) (res *pb.RestoreTrashResponse, err error) { + defer mon.Task()(&ctx)(&err) + + peer, err := identity.PeerIdentityFromContext(ctx) + if err != nil { + return nil, rpcstatus.Error(rpcstatus.Unauthenticated, Error.Wrap(err).Error()) + } + + err = endpoint.trust.VerifySatelliteID(ctx, peer.ID) + if err != nil { + return nil, rpcstatus.Error(rpcstatus.PermissionDenied, Error.New("RestoreTrash called with untrusted ID").Error()) + } + + err = endpoint.store.RestoreTrash(ctx, peer.ID) + if err != nil { + return nil, ErrInternal.Wrap(err) + } + + return &pb.RestoreTrashResponse{}, nil +} + // Retain keeps only piece ids specified in the request func (endpoint *Endpoint) Retain(ctx context.Context, retainReq *pb.RetainRequest) (res *pb.RetainResponse, err error) { defer mon.Task()(&ctx)(&err) diff --git a/storagenode/retain/retain.go b/storagenode/retain/retain.go index 145b74198..5c3b893db 100644 --- a/storagenode/retain/retain.go +++ b/storagenode/retain/retain.go @@ -385,7 +385,7 @@ func (s *Service) retainPieces(ctx context.Context, req Request) (err error) { // if retain status is enabled, delete pieceid if s.config.Status == Enabled { - if err = s.store.Delete(ctx, satelliteID, pieceID); err != nil { + if err = s.store.Trash(ctx, satelliteID, pieceID); err != nil { s.log.Warn("failed to delete piece", zap.Stringer("Satellite ID", satelliteID), zap.Stringer("Piece ID", pieceID), diff --git a/storagenode/retain/retain_test.go b/storagenode/retain/retain_test.go index 28c0b9a12..de66891ec 100644 --- a/storagenode/retain/retain_test.go +++ b/storagenode/retain/retain_test.go @@ -17,9 +17,12 @@ import ( "storj.io/storj/pkg/signing" "storj.io/storj/pkg/storj" "storj.io/storj/private/errs2" + "storj.io/storj/private/memory" "storj.io/storj/private/testcontext" "storj.io/storj/private/testidentity" "storj.io/storj/private/testrand" + "storj.io/storj/storage" + "storj.io/storj/storage/filestore" "storj.io/storj/storagenode" "storj.io/storj/storagenode/pieces" "storj.io/storj/storagenode/retain" @@ -33,8 +36,8 @@ func TestRetainPieces(t *testing.T) { store := pieces.NewStore(zaptest.NewLogger(t), db.Pieces(), db.V0PieceInfo(), db.PieceExpirationDB(), db.PieceSpaceUsedDB()) testStore := pieces.StoreForTest{Store: store} - const numPieces = 1000 - const numPiecesToKeep = 990 + const numPieces = 100 + const numPiecesToKeep = 95 // pieces from numPiecesToKeep + numOldPieces to numPieces will // have a recent timestamp and thus should not be deleted const numOldPieces = 5 @@ -49,65 +52,58 @@ func TestRetainPieces(t *testing.T) { uplink := testidentity.MustPregeneratedSignedIdentity(3, storj.LatestIDVersion()) - recentTime := time.Now() - oldTime := recentTime.Add(-time.Duration(48) * time.Hour) - // keep pieceIDs[0 : numPiecesToKeep] (old + in filter) // delete pieceIDs[numPiecesToKeep : numPiecesToKeep+numOldPieces] (old + not in filter) // keep pieceIDs[numPiecesToKeep+numOldPieces : numPieces] (recent + not in filter) - var pieceCreation time.Time // add all pieces to the node pieces info DB - but only count piece ids in filter for index, id := range pieceIDs { + var formatVer storage.FormatVersion + if index%2 == 0 { + formatVer = filestore.FormatV0 + } else { + formatVer = filestore.FormatV1 + } + if index < numPiecesToKeep { filter.Add(id) } - if index < numPiecesToKeep+numOldPieces { - pieceCreation = oldTime - } else { - pieceCreation = recentTime + const size = 100 * memory.B + + // Write file for all satellites + for _, satelliteID := range []storj.NodeID{satellite0.ID, satellite1.ID} { + now := time.Now() + w, err := testStore.WriterForFormatVersion(ctx, satelliteID, id, formatVer) + require.NoError(t, err) + + _, err = w.Write(testrand.Bytes(size)) + require.NoError(t, err) + + require.NoError(t, w.Commit(ctx, &pb.PieceHeader{ + CreationTime: now, + })) + + piecehash, err := signing.SignPieceHash(ctx, + signing.SignerFromFullIdentity(uplink), + &pb.PieceHash{ + PieceId: id, + Hash: []byte{0, 2, 3, 4, 5}, + }) + require.NoError(t, err) + + if formatVer == filestore.FormatV0 { + v0db := testStore.GetV0PieceInfoDBForTest() + err = v0db.Add(ctx, &pieces.Info{ + SatelliteID: satelliteID, + PieceSize: 4, + PieceID: id, + PieceCreation: now, + UplinkPieceHash: piecehash, + OrderLimit: &pb.OrderLimit{}, + }) + require.NoError(t, err) + } } - - piecehash0, err := signing.SignPieceHash(ctx, - signing.SignerFromFullIdentity(uplink), - &pb.PieceHash{ - PieceId: id, - Hash: []byte{0, 2, 3, 4, 5}, - }) - require.NoError(t, err) - - piecehash1, err := signing.SignPieceHash(ctx, - signing.SignerFromFullIdentity(uplink), - &pb.PieceHash{ - PieceId: id, - Hash: []byte{0, 2, 3, 4, 5}, - }) - require.NoError(t, err) - - pieceinfo0 := pieces.Info{ - SatelliteID: satellite0.ID, - PieceSize: 4, - PieceID: id, - PieceCreation: pieceCreation, - UplinkPieceHash: piecehash0, - OrderLimit: &pb.OrderLimit{}, - } - pieceinfo1 := pieces.Info{ - SatelliteID: satellite1.ID, - PieceSize: 4, - PieceID: id, - PieceCreation: pieceCreation, - UplinkPieceHash: piecehash1, - OrderLimit: &pb.OrderLimit{}, - } - - v0db := testStore.GetV0PieceInfoDBForTest() - err = v0db.Add(ctx, &pieceinfo0) - require.NoError(t, err) - - err = v0db.Add(ctx, &pieceinfo1) - require.NoError(t, err) - } retainEnabled := retain.NewService(zaptest.NewLogger(t), store, retain.Config{ @@ -146,7 +142,7 @@ func TestRetainPieces(t *testing.T) { // expect that disabled and debug endpoints do not delete any pieces req := retain.Request{ SatelliteID: satellite0.ID, - CreatedBefore: recentTime, + CreatedBefore: time.Now(), Filter: filter, } queued := retainDisabled.Queue(req) @@ -157,11 +153,11 @@ func TestRetainPieces(t *testing.T) { require.True(t, queued) retainDebug.TestWaitUntilEmpty() - satellite1Pieces, err := getAllPieceIDs(ctx, store, satellite1.ID, recentTime.Add(5*time.Second)) + satellite1Pieces, err := getAllPieceIDs(ctx, store, satellite1.ID) require.NoError(t, err) require.Equal(t, numPieces, len(satellite1Pieces)) - satellite0Pieces, err := getAllPieceIDs(ctx, store, satellite0.ID, recentTime.Add(5*time.Second)) + satellite0Pieces, err := getAllPieceIDs(ctx, store, satellite0.ID) require.NoError(t, err) require.Equal(t, numPieces, len(satellite0Pieces)) @@ -171,13 +167,13 @@ func TestRetainPieces(t *testing.T) { retainEnabled.TestWaitUntilEmpty() // check we have deleted nothing for satellite1 - satellite1Pieces, err = getAllPieceIDs(ctx, store, satellite1.ID, recentTime.Add(5*time.Second)) + satellite1Pieces, err = getAllPieceIDs(ctx, store, satellite1.ID) require.NoError(t, err) require.Equal(t, numPieces, len(satellite1Pieces)) // check we did not delete recent pieces or retained pieces for satellite0 // also check that we deleted the correct pieces for satellite0 - satellite0Pieces, err = getAllPieceIDs(ctx, store, satellite0.ID, recentTime.Add(5*time.Second)) + satellite0Pieces, err = getAllPieceIDs(ctx, store, satellite0.ID) require.NoError(t, err) require.Equal(t, numPieces-numOldPieces, len(satellite0Pieces)) @@ -200,15 +196,8 @@ func TestRetainPieces(t *testing.T) { }) } -func getAllPieceIDs(ctx context.Context, store *pieces.Store, satellite storj.NodeID, createdBefore time.Time) (pieceIDs []storj.PieceID, err error) { +func getAllPieceIDs(ctx context.Context, store *pieces.Store, satellite storj.NodeID) (pieceIDs []storj.PieceID, err error) { err = store.WalkSatellitePieces(ctx, satellite, func(pieceAccess pieces.StoredPieceAccess) error { - mTime, err := pieceAccess.CreationTime(ctx) - if err != nil { - return err - } - if !mTime.Before(createdBefore) { - return nil - } pieceIDs = append(pieceIDs, pieceAccess.PieceID()) return nil }) diff --git a/storagenode/storagenodedb/database.go b/storagenode/storagenodedb/database.go index d73732509..f5c05555f 100644 --- a/storagenode/storagenodedb/database.go +++ b/storagenode/storagenodedb/database.go @@ -874,6 +874,17 @@ func (db *DB) Migration(ctx context.Context) *migrate.Migration { `DROP TABLE _satellites_old`, }, }, + { + DB: db.pieceExpirationDB, + Description: "Add Trash column to pieceExpirationDB", + Version: 26, + Action: migrate.SQL{ + `ALTER TABLE piece_expirations ADD COLUMN trash INTEGER NOT NULL DEFAULT 0`, + `CREATE INDEX idx_piece_expirations_trashed + ON piece_expirations(satellite_id, trash) + WHERE trash = 1`, + }, + }, }, } } diff --git a/storagenode/storagenodedb/pieceexpiration.go b/storagenode/storagenodedb/pieceexpiration.go index 02eb91313..2df6936e9 100644 --- a/storagenode/storagenodedb/pieceexpiration.go +++ b/storagenode/storagenodedb/pieceexpiration.go @@ -32,6 +32,7 @@ func (db *pieceExpirationDB) GetExpired(ctx context.Context, expiresBefore time. FROM piece_expirations WHERE piece_expiration < ? AND ((deletion_failed_at IS NULL) OR deletion_failed_at <> ?) + AND trash = 0 LIMIT ? `, expiresBefore.UTC(), expiresBefore.UTC(), limit) if err != nil { @@ -97,3 +98,29 @@ func (db *pieceExpirationDB) DeleteFailed(ctx context.Context, satelliteID storj `, when.UTC(), satelliteID, pieceID) return ErrPieceExpiration.Wrap(err) } + +// Trash marks a piece expiration as "trashed" +func (db *pieceExpirationDB) Trash(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) (err error) { + defer mon.Task()(&ctx)(&err) + + _, err = db.ExecContext(ctx, ` + UPDATE piece_expirations + SET trash = 1 + WHERE satellite_id = ? + AND piece_id = ? + `, satelliteID, pieceID) + return ErrPieceExpiration.Wrap(err) +} + +// Restore restores all trashed pieces +func (db *pieceExpirationDB) RestoreTrash(ctx context.Context, satelliteID storj.NodeID) (err error) { + defer mon.Task()(&ctx)(&err) + + _, err = db.ExecContext(ctx, ` + UPDATE piece_expirations + SET trash = 0 + WHERE satellite_id = ? + AND trash = 1 + `, satelliteID) + return ErrPieceExpiration.Wrap(err) +} diff --git a/storagenode/storagenodedb/testdata/multidbsnapshot.go b/storagenode/storagenodedb/testdata/multidbsnapshot.go index 5d72f1330..004eeea06 100644 --- a/storagenode/storagenodedb/testdata/multidbsnapshot.go +++ b/storagenode/storagenodedb/testdata/multidbsnapshot.go @@ -39,6 +39,7 @@ var States = MultiDBStates{ &v23, &v24, &v25, + &v26, }, } diff --git a/storagenode/storagenodedb/testdata/v26.go b/storagenode/storagenodedb/testdata/v26.go new file mode 100644 index 000000000..8eb46b348 --- /dev/null +++ b/storagenode/storagenodedb/testdata/v26.go @@ -0,0 +1,37 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package testdata + +import "storj.io/storj/storagenode/storagenodedb" + +var v26 = MultiDBState{ + Version: 26, + DBStates: DBStates{ + storagenodedb.UsedSerialsDBName: v25.DBStates[storagenodedb.UsedSerialsDBName], + storagenodedb.StorageUsageDBName: v25.DBStates[storagenodedb.StorageUsageDBName], + storagenodedb.ReputationDBName: v25.DBStates[storagenodedb.ReputationDBName], + storagenodedb.PieceSpaceUsedDBName: v25.DBStates[storagenodedb.PieceSpaceUsedDBName], + storagenodedb.PieceInfoDBName: v25.DBStates[storagenodedb.PieceInfoDBName], + storagenodedb.PieceExpirationDBName: &DBState{ + SQL: ` + -- table to hold expiration data (and only expirations. no other pieceinfo) + CREATE TABLE piece_expirations ( + satellite_id BLOB NOT NULL, + piece_id BLOB NOT NULL, + piece_expiration TIMESTAMP NOT NULL, -- date when it can be deleted + deletion_failed_at TIMESTAMP, + trash INTEGER NOT NULL DEFAULT 0, + PRIMARY KEY ( satellite_id, piece_id ) + ); + CREATE INDEX idx_piece_expirations_piece_expiration ON piece_expirations(piece_expiration); + CREATE INDEX idx_piece_expirations_deletion_failed_at ON piece_expirations(deletion_failed_at); + CREATE INDEX idx_piece_expirations_trashed ON piece_expirations(satellite_id, trash) WHERE trash = 1; + `, + }, + storagenodedb.OrdersDBName: v25.DBStates[storagenodedb.OrdersDBName], + storagenodedb.BandwidthDBName: v25.DBStates[storagenodedb.BandwidthDBName], + storagenodedb.SatellitesDBName: v25.DBStates[storagenodedb.SatellitesDBName], + storagenodedb.DeprecatedInfoDBName: v25.DBStates[storagenodedb.DeprecatedInfoDBName], + }, +}