From e2d745fe8f409a7545ad3219d6db9e45eb9a8faf Mon Sep 17 00:00:00 2001 From: Kaloyan Raev Date: Wed, 3 Oct 2018 16:05:40 +0300 Subject: [PATCH] Clean up last segment handling (#408) * Clean up last segment handling * Fix increment for AES-GCM nonce * Fix stream size calculation * Adapt stream store tests * Fix Delete method * Rename info callback to segmentInfo * Clearer calculation for offset in Nonce.AESGCMNonce() * Adapt to the new little-endian nonce increment --- pkg/eestream/aesgcm.go | 14 +- pkg/eestream/encryption.go | 6 +- pkg/pb/meta.pb.go | 65 +++++---- pkg/pb/meta.proto | 1 + pkg/storage/segments/mock_store.go | 84 ++++++------ pkg/storage/segments/store.go | 37 ++++-- pkg/storage/segments/store_test.go | 8 +- pkg/storage/streams/store.go | 205 +++++++++++++++-------------- pkg/storage/streams/store_test.go | 25 ++-- 9 files changed, 241 insertions(+), 204 deletions(-) diff --git a/pkg/eestream/aesgcm.go b/pkg/eestream/aesgcm.go index 246a64c98..032a09972 100644 --- a/pkg/eestream/aesgcm.go +++ b/pkg/eestream/aesgcm.go @@ -90,7 +90,7 @@ type aesgcmDecrypter struct { // through with key. See the comments for NewAESGCMEncrypter about // startingNonce. func NewAESGCMDecrypter(key *Key, startingNonce *AESGCMNonce, encryptedBlockSize int) (Transformer, error) { - block, err := aes.NewCipher((*key)[:]) + block, err := aes.NewCipher(key[:]) if err != nil { return nil, err } @@ -127,8 +127,8 @@ func (s *aesgcmDecrypter) Transform(out, in []byte, blockNum int64) ([]byte, err } // EncryptAESGCM encrypts byte data with a key and nonce. The cipher data is returned -func EncryptAESGCM(data, key, nonce []byte) (cipherData []byte, err error) { - block, err := aes.NewCipher(key) +func EncryptAESGCM(data []byte, key *Key, nonce *AESGCMNonce) (cipherData []byte, err error) { + block, err := aes.NewCipher(key[:]) if err != nil { return []byte{}, errs.Wrap(err) } @@ -136,16 +136,16 @@ func EncryptAESGCM(data, key, nonce []byte) (cipherData []byte, err error) { if err != nil { return []byte{}, errs.Wrap(err) } - cipherData = aesgcm.Seal(nil, nonce, data, nil) + cipherData = aesgcm.Seal(nil, nonce[:], data, nil) return cipherData, nil } // DecryptAESGCM decrypts byte data with a key and nonce. The plain data is returned -func DecryptAESGCM(cipherData, key, nonce []byte) (data []byte, err error) { +func DecryptAESGCM(cipherData []byte, key *Key, nonce *AESGCMNonce) (data []byte, err error) { if len(cipherData) == 0 { return []byte{}, errs.New("empty cipher data") } - block, err := aes.NewCipher(key) + block, err := aes.NewCipher(key[:]) if err != nil { return []byte{}, errs.Wrap(err) } @@ -153,7 +153,7 @@ func DecryptAESGCM(cipherData, key, nonce []byte) (data []byte, err error) { if err != nil { return []byte{}, errs.Wrap(err) } - decrypted, err := aesgcm.Open(nil, nonce, cipherData, nil) + decrypted, err := aesgcm.Open(nil, nonce[:], cipherData, nil) if err != nil { return []byte{}, errs.Wrap(err) } diff --git a/pkg/eestream/encryption.go b/pkg/eestream/encryption.go index eff39884a..04dbe2f4a 100644 --- a/pkg/eestream/encryption.go +++ b/pkg/eestream/encryption.go @@ -48,7 +48,7 @@ func (nonce *Nonce) Increment(amount int64) (truncated bool, err error) { // AESGCMNonce returns the nonce as a AES-GCM nonce func (nonce *Nonce) AESGCMNonce() *AESGCMNonce { aes := new(AESGCMNonce) - copy((*aes)[:], (*nonce)[:AESGCMNonceSize]) + copy((*aes)[:], nonce[:AESGCMNonceSize]) return aes } @@ -67,7 +67,7 @@ func (cipher Cipher) Encrypt(data []byte, key *Key, nonce *Nonce) (cipherData [] case None: return data, nil case AESGCM: - return EncryptAESGCM(data, key[:], nonce[:AESGCMNonceSize]) + return EncryptAESGCM(data, key, nonce.AESGCMNonce()) case SecretBox: return EncryptSecretBox(data, key, nonce) default: @@ -82,7 +82,7 @@ func (cipher Cipher) Decrypt(cipherData []byte, key *Key, nonce *Nonce) (data [] case None: return cipherData, nil case AESGCM: - return DecryptAESGCM(cipherData, key[:], nonce[:AESGCMNonceSize]) + return DecryptAESGCM(cipherData, key, nonce.AESGCMNonce()) case SecretBox: return DecryptSecretBox(cipherData, key, nonce) default: diff --git a/pkg/pb/meta.pb.go b/pkg/pb/meta.pb.go index dd96c56d6..85354984f 100644 --- a/pkg/pb/meta.pb.go +++ b/pkg/pb/meta.pb.go @@ -19,22 +19,23 @@ var _ = math.Inf const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package type MetaStreamInfo struct { - NumberOfSegments int64 `protobuf:"varint,1,opt,name=number_of_segments,json=numberOfSegments,proto3" json:"number_of_segments,omitempty"` - SegmentsSize int64 `protobuf:"varint,2,opt,name=segments_size,json=segmentsSize,proto3" json:"segments_size,omitempty"` - LastSegmentSize int64 `protobuf:"varint,3,opt,name=last_segment_size,json=lastSegmentSize,proto3" json:"last_segment_size,omitempty"` - Metadata []byte `protobuf:"bytes,4,opt,name=metadata,proto3" json:"metadata,omitempty"` - EncryptionType int32 `protobuf:"varint,5,opt,name=encryption_type,json=encryptionType,proto3" json:"encryption_type,omitempty"` - EncryptionBlockSize int32 `protobuf:"varint,6,opt,name=encryption_block_size,json=encryptionBlockSize,proto3" json:"encryption_block_size,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + NumberOfSegments int64 `protobuf:"varint,1,opt,name=number_of_segments,json=numberOfSegments,proto3" json:"number_of_segments,omitempty"` + SegmentsSize int64 `protobuf:"varint,2,opt,name=segments_size,json=segmentsSize,proto3" json:"segments_size,omitempty"` + LastSegmentSize int64 `protobuf:"varint,3,opt,name=last_segment_size,json=lastSegmentSize,proto3" json:"last_segment_size,omitempty"` + Metadata []byte `protobuf:"bytes,4,opt,name=metadata,proto3" json:"metadata,omitempty"` + EncryptionType int32 `protobuf:"varint,5,opt,name=encryption_type,json=encryptionType,proto3" json:"encryption_type,omitempty"` + EncryptionBlockSize int32 `protobuf:"varint,6,opt,name=encryption_block_size,json=encryptionBlockSize,proto3" json:"encryption_block_size,omitempty"` + LastSegmentEncryptionKey []byte `protobuf:"bytes,7,opt,name=last_segment_encryption_key,json=lastSegmentEncryptionKey,proto3" json:"last_segment_encryption_key,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *MetaStreamInfo) Reset() { *m = MetaStreamInfo{} } func (m *MetaStreamInfo) String() string { return proto.CompactTextString(m) } func (*MetaStreamInfo) ProtoMessage() {} func (*MetaStreamInfo) Descriptor() ([]byte, []int) { - return fileDescriptor_meta_20ddc39f331d9ae2, []int{0} + return fileDescriptor_meta_026f5a060e38d7ef, []int{0} } func (m *MetaStreamInfo) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_MetaStreamInfo.Unmarshal(m, b) @@ -96,27 +97,35 @@ func (m *MetaStreamInfo) GetEncryptionBlockSize() int32 { return 0 } +func (m *MetaStreamInfo) GetLastSegmentEncryptionKey() []byte { + if m != nil { + return m.LastSegmentEncryptionKey + } + return nil +} + func init() { proto.RegisterType((*MetaStreamInfo)(nil), "streams.MetaStreamInfo") } -func init() { proto.RegisterFile("meta.proto", fileDescriptor_meta_20ddc39f331d9ae2) } +func init() { proto.RegisterFile("meta.proto", fileDescriptor_meta_026f5a060e38d7ef) } -var fileDescriptor_meta_20ddc39f331d9ae2 = []byte{ - // 228 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x4c, 0x90, 0xcd, 0x4a, 0x03, 0x31, - 0x14, 0x85, 0xc9, 0xf4, 0x47, 0xb9, 0xd4, 0x56, 0x23, 0x42, 0x70, 0x35, 0xe8, 0xc2, 0x41, 0xc4, - 0x85, 0xbe, 0x41, 0x77, 0x2e, 0x44, 0x98, 0x71, 0xe5, 0x26, 0x24, 0xf5, 0x8e, 0x0c, 0x36, 0x3f, - 0x24, 0xd7, 0xc5, 0xf4, 0x09, 0x7c, 0x6c, 0x49, 0xa6, 0x63, 0xbb, 0xbc, 0xe7, 0xfb, 0x38, 0x27, - 0x04, 0xc0, 0x20, 0xa9, 0x47, 0x1f, 0x1c, 0x39, 0x7e, 0x12, 0x29, 0xa0, 0x32, 0xf1, 0xe6, 0xb7, - 0x80, 0xe5, 0x2b, 0x92, 0x6a, 0xf2, 0xfd, 0x62, 0x5b, 0xc7, 0x1f, 0x80, 0xdb, 0x1f, 0xa3, 0x31, - 0x48, 0xd7, 0xca, 0x88, 0x5f, 0x06, 0x2d, 0x45, 0xc1, 0x4a, 0x56, 0x4d, 0xea, 0xf3, 0x81, 0xbc, - 0xb5, 0xcd, 0x3e, 0xe7, 0xb7, 0x70, 0x36, 0x3a, 0x32, 0x76, 0x3b, 0x14, 0x45, 0x16, 0x17, 0x63, - 0xd8, 0x74, 0x3b, 0xe4, 0xf7, 0x70, 0xb1, 0x55, 0x91, 0xc6, 0xb6, 0x41, 0x9c, 0x64, 0x71, 0x95, - 0xc0, 0xbe, 0x2d, 0xbb, 0xd7, 0x70, 0x9a, 0x1e, 0xfa, 0xa9, 0x48, 0x89, 0x69, 0xc9, 0xaa, 0x45, - 0xfd, 0x7f, 0xf3, 0x3b, 0x58, 0xa1, 0xdd, 0x84, 0xde, 0x53, 0xe7, 0xac, 0xa4, 0xde, 0xa3, 0x98, - 0x95, 0xac, 0x9a, 0xd5, 0xcb, 0x43, 0xfc, 0xde, 0x7b, 0xe4, 0x4f, 0x70, 0x75, 0x24, 0xea, 0xad, - 0xdb, 0x7c, 0x0f, 0xa3, 0xf3, 0xac, 0x5f, 0x1e, 0xe0, 0x3a, 0xb1, 0x34, 0xbc, 0x9e, 0x7e, 0x14, - 0x5e, 0xeb, 0x79, 0xfe, 0xa0, 0xe7, 0xbf, 0x00, 0x00, 0x00, 0xff, 0xff, 0xcf, 0xfc, 0x52, 0x0a, - 0x2e, 0x01, 0x00, 0x00, +var fileDescriptor_meta_026f5a060e38d7ef = []byte{ + // 250 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x90, 0x4f, 0x4b, 0xc3, 0x40, + 0x10, 0xc5, 0x49, 0xfa, 0x4f, 0x86, 0xda, 0xea, 0x8a, 0xb0, 0xe8, 0x25, 0xe8, 0xc1, 0x20, 0xe2, + 0x41, 0xcf, 0x5e, 0x0a, 0x1e, 0x44, 0x44, 0x48, 0x3c, 0x79, 0x09, 0x9b, 0x3a, 0x91, 0xd0, 0x66, + 0x77, 0xc9, 0x8e, 0x87, 0xed, 0x17, 0xf4, 0x6b, 0x49, 0x26, 0x4d, 0x13, 0x8f, 0xf3, 0xde, 0x8f, + 0xf7, 0x1e, 0x03, 0x50, 0x21, 0xa9, 0x7b, 0x5b, 0x1b, 0x32, 0x62, 0xe6, 0xa8, 0x46, 0x55, 0xb9, + 0xab, 0xdf, 0x10, 0x16, 0x6f, 0x48, 0x2a, 0xe5, 0xfb, 0x45, 0x17, 0x46, 0xdc, 0x81, 0xd0, 0x3f, + 0x55, 0x8e, 0x75, 0x66, 0x8a, 0xcc, 0xe1, 0x77, 0x85, 0x9a, 0x9c, 0x0c, 0xa2, 0x20, 0x1e, 0x25, + 0x27, 0xad, 0xf3, 0x5e, 0xa4, 0x7b, 0x5d, 0x5c, 0xc3, 0x71, 0xc7, 0x64, 0xae, 0xdc, 0xa1, 0x0c, + 0x19, 0x9c, 0x77, 0x62, 0x5a, 0xee, 0x50, 0xdc, 0xc2, 0xe9, 0x56, 0x39, 0xea, 0xd2, 0x5a, 0x70, + 0xc4, 0xe0, 0xb2, 0x31, 0xf6, 0x69, 0xcc, 0x5e, 0xc0, 0x51, 0x33, 0xf4, 0x4b, 0x91, 0x92, 0xe3, + 0x28, 0x88, 0xe7, 0xc9, 0xe1, 0x16, 0x37, 0xb0, 0x44, 0xbd, 0xae, 0xbd, 0xa5, 0xd2, 0xe8, 0x8c, + 0xbc, 0x45, 0x39, 0x89, 0x82, 0x78, 0x92, 0x2c, 0x7a, 0xf9, 0xc3, 0x5b, 0x14, 0x0f, 0x70, 0x3e, + 0x00, 0xf3, 0xad, 0x59, 0x6f, 0xda, 0xd2, 0x29, 0xe3, 0x67, 0xbd, 0xb9, 0x6a, 0x3c, 0x2e, 0x7e, + 0x82, 0xcb, 0x7f, 0x23, 0x07, 0x01, 0x1b, 0xf4, 0x72, 0xc6, 0x5b, 0xe4, 0x60, 0xee, 0xf3, 0x01, + 0x78, 0x45, 0xbf, 0x1a, 0x7f, 0x86, 0x36, 0xcf, 0xa7, 0xfc, 0xdf, 0xc7, 0xbf, 0x00, 0x00, 0x00, + 0xff, 0xff, 0x9e, 0xf7, 0xce, 0x8a, 0x6d, 0x01, 0x00, 0x00, } diff --git a/pkg/pb/meta.proto b/pkg/pb/meta.proto index d959327ef..a4ff2470d 100644 --- a/pkg/pb/meta.proto +++ b/pkg/pb/meta.proto @@ -13,4 +13,5 @@ message MetaStreamInfo { bytes metadata = 4; int32 encryption_type = 5; int32 encryption_block_size = 6; + bytes last_segment_encryption_key = 7; } \ No newline at end of file diff --git a/pkg/storage/segments/mock_store.go b/pkg/storage/segments/mock_store.go index 4dacd5d83..3381e39e8 100644 --- a/pkg/storage/segments/mock_store.go +++ b/pkg/storage/segments/mock_store.go @@ -38,22 +38,21 @@ func (m *MockStore) EXPECT() *MockStoreMockRecorder { return m.recorder } -// Meta mocks base method -func (m *MockStore) Meta(ctx context.Context, path paths.Path) (Meta, error) { - ret := m.ctrl.Call(m, "Meta", ctx, path) - ret0, _ := ret[0].(Meta) - ret1, _ := ret[1].(error) - return ret0, ret1 +// Delete mocks base method +func (m *MockStore) Delete(arg0 context.Context, arg1 paths.Path) error { + ret := m.ctrl.Call(m, "Delete", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 } -// Meta indicates an expected call of Meta -func (mr *MockStoreMockRecorder) Meta(ctx, path interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Meta", reflect.TypeOf((*MockStore)(nil).Meta), ctx, path) +// Delete indicates an expected call of Delete +func (mr *MockStoreMockRecorder) Delete(arg0, arg1 interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockStore)(nil).Delete), arg0, arg1) } // Get mocks base method -func (m *MockStore) Get(ctx context.Context, path paths.Path) (ranger.Ranger, Meta, error) { - ret := m.ctrl.Call(m, "Get", ctx, path) +func (m *MockStore) Get(arg0 context.Context, arg1 paths.Path) (ranger.Ranger, Meta, error) { + ret := m.ctrl.Call(m, "Get", arg0, arg1) ret0, _ := ret[0].(ranger.Ranger) ret1, _ := ret[1].(Meta) ret2, _ := ret[2].(error) @@ -61,38 +60,13 @@ func (m *MockStore) Get(ctx context.Context, path paths.Path) (ranger.Ranger, Me } // Get indicates an expected call of Get -func (mr *MockStoreMockRecorder) Get(ctx, path interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockStore)(nil).Get), ctx, path) -} - -// Put mocks base method -func (m *MockStore) Put(ctx context.Context, path paths.Path, data io.Reader, metadata []byte, expiration time.Time) (Meta, error) { - ret := m.ctrl.Call(m, "Put", ctx, path, data, metadata, expiration) - ret0, _ := ret[0].(Meta) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// Put indicates an expected call of Put -func (mr *MockStoreMockRecorder) Put(ctx, path, data, metadata, expiration interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Put", reflect.TypeOf((*MockStore)(nil).Put), ctx, path, data, metadata, expiration) -} - -// Delete mocks base method -func (m *MockStore) Delete(ctx context.Context, path paths.Path) error { - ret := m.ctrl.Call(m, "Delete", ctx, path) - ret0, _ := ret[0].(error) - return ret0 -} - -// Delete indicates an expected call of Delete -func (mr *MockStoreMockRecorder) Delete(ctx, path interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockStore)(nil).Delete), ctx, path) +func (mr *MockStoreMockRecorder) Get(arg0, arg1 interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockStore)(nil).Get), arg0, arg1) } // List mocks base method -func (m *MockStore) List(ctx context.Context, prefix, startAfter, endBefore paths.Path, recursive bool, limit int, metaFlags uint32) ([]ListItem, bool, error) { - ret := m.ctrl.Call(m, "List", ctx, prefix, startAfter, endBefore, recursive, limit, metaFlags) +func (m *MockStore) List(arg0 context.Context, arg1, arg2, arg3 paths.Path, arg4 bool, arg5 int, arg6 uint32) ([]ListItem, bool, error) { + ret := m.ctrl.Call(m, "List", arg0, arg1, arg2, arg3, arg4, arg5, arg6) ret0, _ := ret[0].([]ListItem) ret1, _ := ret[1].(bool) ret2, _ := ret[2].(error) @@ -100,6 +74,32 @@ func (m *MockStore) List(ctx context.Context, prefix, startAfter, endBefore path } // List indicates an expected call of List -func (mr *MockStoreMockRecorder) List(ctx, prefix, startAfter, endBefore, recursive, limit, metaFlags interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "List", reflect.TypeOf((*MockStore)(nil).List), ctx, prefix, startAfter, endBefore, recursive, limit, metaFlags) +func (mr *MockStoreMockRecorder) List(arg0, arg1, arg2, arg3, arg4, arg5, arg6 interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "List", reflect.TypeOf((*MockStore)(nil).List), arg0, arg1, arg2, arg3, arg4, arg5, arg6) +} + +// Meta mocks base method +func (m *MockStore) Meta(arg0 context.Context, arg1 paths.Path) (Meta, error) { + ret := m.ctrl.Call(m, "Meta", arg0, arg1) + ret0, _ := ret[0].(Meta) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Meta indicates an expected call of Meta +func (mr *MockStoreMockRecorder) Meta(arg0, arg1 interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Meta", reflect.TypeOf((*MockStore)(nil).Meta), arg0, arg1) +} + +// Put mocks base method +func (m *MockStore) Put(arg0 context.Context, arg1 io.Reader, arg2 time.Time, arg3 func() (paths.Path, []byte, error)) (Meta, error) { + ret := m.ctrl.Call(m, "Put", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(Meta) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Put indicates an expected call of Put +func (mr *MockStoreMockRecorder) Put(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Put", reflect.TypeOf((*MockStore)(nil).Put), arg0, arg1, arg2, arg3) } diff --git a/pkg/storage/segments/store.go b/pkg/storage/segments/store.go index e3c6e6b0f..85c202140 100644 --- a/pkg/storage/segments/store.go +++ b/pkg/storage/segments/store.go @@ -48,14 +48,10 @@ type ListItem struct { // Store for segments type Store interface { Meta(ctx context.Context, path paths.Path) (meta Meta, err error) - Get(ctx context.Context, path paths.Path) (rr ranger.Ranger, - meta Meta, err error) - Put(ctx context.Context, path paths.Path, data io.Reader, metadata []byte, - expiration time.Time) (meta Meta, err error) + Get(ctx context.Context, path paths.Path) (rr ranger.Ranger, meta Meta, err error) + Put(ctx context.Context, data io.Reader, expiration time.Time, segmentInfo func() (paths.Path, []byte, error)) (meta Meta, err error) Delete(ctx context.Context, path paths.Path) (err error) - List(ctx context.Context, prefix, startAfter, endBefore paths.Path, - recursive bool, limit int, metaFlags uint32) (items []ListItem, - more bool, err error) + List(ctx context.Context, prefix, startAfter, endBefore paths.Path, recursive bool, limit int, metaFlags uint32) (items []ListItem, more bool, err error) } type segmentStore struct { @@ -86,12 +82,9 @@ func (s *segmentStore) Meta(ctx context.Context, path paths.Path) (meta Meta, } // Put uploads a segment to an erasure code client -func (s *segmentStore) Put(ctx context.Context, path paths.Path, data io.Reader, - metadata []byte, expiration time.Time) (meta Meta, err error) { +func (s *segmentStore) Put(ctx context.Context, data io.Reader, expiration time.Time, segmentInfo func() (paths.Path, []byte, error)) (meta Meta, err error) { defer mon.Task()(&ctx)(&err) - var p *pb.Pointer - exp, err := ptypes.TimestampProto(expiration) if err != nil { return Meta{}, Error.Wrap(err) @@ -102,8 +95,17 @@ func (s *segmentStore) Put(ctx context.Context, path paths.Path, data io.Reader, if err != nil { return Meta{}, err } + + var path paths.Path + var pointer *pb.Pointer if !remoteSized { - p = &pb.Pointer{ + p, metadata, err := segmentInfo() + if err != nil { + return Meta{}, Error.Wrap(err) + } + path = p + + pointer = &pb.Pointer{ Type: pb.Pointer_INLINE, InlineSegment: peekReader.thresholdBuf, Size: int64(len(peekReader.thresholdBuf)), @@ -124,14 +126,21 @@ func (s *segmentStore) Put(ctx context.Context, path paths.Path, data io.Reader, if err != nil { return Meta{}, Error.Wrap(err) } - p, err = s.makeRemotePointer(successfulNodes, pieceID, sizedReader.Size(), exp, metadata) + + p, metadata, err := segmentInfo() + if err != nil { + return Meta{}, Error.Wrap(err) + } + path = p + + pointer, err = s.makeRemotePointer(successfulNodes, pieceID, sizedReader.Size(), exp, metadata) if err != nil { return Meta{}, err } } // puts pointer to pointerDB - err = s.pdb.Put(ctx, path, p) + err = s.pdb.Put(ctx, path, pointer) if err != nil { return Meta{}, Error.Wrap(err) } diff --git a/pkg/storage/segments/store_test.go b/pkg/storage/segments/store_test.go index a70466b86..c32c183e0 100644 --- a/pkg/storage/segments/store_test.go +++ b/pkg/storage/segments/store_test.go @@ -132,7 +132,9 @@ func TestSegmentStorePutRemote(t *testing.T) { } gomock.InOrder(calls...) - _, err := ss.Put(ctx, p, r, tt.mdInput, tt.expiration) + _, err := ss.Put(ctx, r, tt.expiration, func() (paths.Path, []byte, error) { + return p, tt.mdInput, nil + }) assert.NoError(t, err, tt.name) } } @@ -175,7 +177,9 @@ func TestSegmentStorePutInline(t *testing.T) { } gomock.InOrder(calls...) - _, err := ss.Put(ctx, p, r, tt.mdInput, tt.expiration) + _, err := ss.Put(ctx, r, tt.expiration, func() (paths.Path, []byte, error) { + return p, tt.mdInput, nil + }) assert.NoError(t, err, tt.name) } } diff --git a/pkg/storage/streams/store.go b/pkg/storage/streams/store.go index b72ad8d69..682d06b5d 100644 --- a/pkg/storage/streams/store.go +++ b/pkg/storage/streams/store.go @@ -35,16 +35,16 @@ type Meta struct { } // convertMeta converts segment metadata to stream metadata -func convertMeta(segmentMeta segments.Meta) (Meta, error) { +func convertMeta(lastSegmentMeta segments.Meta) (Meta, error) { msi := pb.MetaStreamInfo{} - err := proto.Unmarshal(segmentMeta.Data, &msi) + err := proto.Unmarshal(lastSegmentMeta.Data, &msi) if err != nil { return Meta{}, err } return Meta{ - Modified: segmentMeta.Modified, - Expiration: segmentMeta.Expiration, + Modified: lastSegmentMeta.Modified, + Expiration: lastSegmentMeta.Expiration, Size: ((msi.NumberOfSegments - 1) * msi.SegmentsSize) + msi.LastSegmentSize, Data: msi.Metadata, }, nil @@ -54,12 +54,9 @@ func convertMeta(segmentMeta segments.Meta) (Meta, error) { type Store interface { Meta(ctx context.Context, path paths.Path) (Meta, error) Get(ctx context.Context, path paths.Path) (ranger.Ranger, Meta, error) - Put(ctx context.Context, path paths.Path, data io.Reader, - metadata []byte, expiration time.Time) (Meta, error) + Put(ctx context.Context, path paths.Path, data io.Reader, metadata []byte, expiration time.Time) (Meta, error) Delete(ctx context.Context, path paths.Path) error - List(ctx context.Context, prefix, startAfter, endBefore paths.Path, - recursive bool, limit int, metaFlags uint32) (items []ListItem, - more bool, err error) + List(ctx context.Context, prefix, startAfter, endBefore paths.Path, recursive bool, limit int, metaFlags uint32) (items []ListItem, more bool, err error) } // streamStore is a store for streams @@ -96,13 +93,12 @@ func NewStreamStore(segments segments.Store, segmentSize int64, rootKey string, // store the first piece at s0/, second piece at s1/, and the // *last* piece at l/. Store the given metadata, along with the number // of segments, in a new protobuf, in the metadata of l/. -func (s *streamStore) Put(ctx context.Context, path paths.Path, data io.Reader, - metadata []byte, expiration time.Time) (m Meta, err error) { +func (s *streamStore) Put(ctx context.Context, path paths.Path, data io.Reader, metadata []byte, expiration time.Time) (m Meta, err error) { defer mon.Task()(&ctx)(&err) - var totalSegments int64 - var totalSize int64 - var lastSegmentSize int64 + var currentSegment int64 + var streamSize int64 + var putMeta segments.Meta derivedKey, err := path.DeriveContentKey(s.rootKey) if err != nil { @@ -121,7 +117,7 @@ func (s *streamStore) Put(ctx context.Context, path paths.Path, data io.Reader, } var nonce eestream.Nonce - _, err := nonce.Increment(totalSegments) + _, err := nonce.Increment(currentSegment) if err != nil { return Meta{}, err } @@ -137,7 +133,6 @@ func (s *streamStore) Put(ctx context.Context, path paths.Path, data io.Reader, } sizeReader := NewSizeReader(eofReader) - segmentPath := getSegmentPath(path, totalSegments) segmentReader := io.LimitReader(sizeReader, s.segmentSize) peekReader := segments.NewPeekThresholdReader(segmentReader) largeData, err := peekReader.IsLargerThan(encrypter.InBlockSize()) @@ -160,52 +155,50 @@ func (s *streamStore) Put(ctx context.Context, path paths.Path, data io.Reader, transformedReader = bytes.NewReader(cipherData) } - _, err = s.segments.Put(ctx, segmentPath, transformedReader, encryptedEncKey, expiration) + putMeta, err = s.segments.Put(ctx, transformedReader, expiration, func() (paths.Path, []byte, error) { + if !eofReader.isEOF() { + segmentPath := getSegmentPath(path, currentSegment) + return segmentPath, encryptedEncKey, nil + } + + lastSegmentPath := path.Prepend("l") + msi := pb.MetaStreamInfo{ + NumberOfSegments: currentSegment + 1, + SegmentsSize: s.segmentSize, + LastSegmentSize: sizeReader.Size(), + Metadata: metadata, + EncryptionType: int32(s.encType), + EncryptionBlockSize: int32(s.encBlockSize), + LastSegmentEncryptionKey: encryptedEncKey, + } + lastSegmentMeta, err := proto.Marshal(&msi) + if err != nil { + return nil, nil, err + } + return lastSegmentPath, lastSegmentMeta, nil + }) if err != nil { return Meta{}, err } - lastSegmentSize = sizeReader.Size() - totalSize = totalSize + lastSegmentSize - totalSegments = totalSegments + 1 + currentSegment++ + streamSize += sizeReader.Size() } if eofReader.hasError() { return Meta{}, eofReader.err } - lastSegmentPath := path.Prepend("l") - - md := pb.MetaStreamInfo{ - NumberOfSegments: totalSegments, - SegmentsSize: s.segmentSize, - LastSegmentSize: lastSegmentSize, - Metadata: metadata, - EncryptionType: int32(s.encType), - EncryptionBlockSize: int32(s.encBlockSize), - } - lastSegmentMetadata, err := proto.Marshal(&md) - if err != nil { - return Meta{}, err - } - - putMeta, err := s.segments.Put(ctx, lastSegmentPath, data, - lastSegmentMetadata, expiration) - if err != nil { - return Meta{}, err - } - totalSize = totalSize + putMeta.Size - resultMeta := Meta{ Modified: putMeta.Modified, Expiration: expiration, - Size: totalSize, + Size: streamSize, Data: metadata, } return resultMeta, nil } -// GetSegmentPath returns the unique path for a particular segment +// getSegmentPath returns the unique path for a particular segment func getSegmentPath(p paths.Path, segNum int64) paths.Path { return p.Prepend(fmt.Sprintf("s%d", segNum)) } @@ -213,11 +206,10 @@ func getSegmentPath(p paths.Path, segNum int64) paths.Path { // Get returns a ranger that knows what the overall size is (from l/) // and then returns the appropriate data from segments s0/, s1/, // ..., l/. -func (s *streamStore) Get(ctx context.Context, path paths.Path) ( - rr ranger.Ranger, meta Meta, err error) { +func (s *streamStore) Get(ctx context.Context, path paths.Path) (rr ranger.Ranger, meta Meta, err error) { defer mon.Task()(&ctx)(&err) - lastSegmentMeta, err := s.segments.Meta(ctx, path.Prepend("l")) + lastSegmentRanger, lastSegmentMeta, err := s.segments.Get(ctx, path.Prepend("l")) if err != nil { return nil, Meta{}, err } @@ -228,7 +220,7 @@ func (s *streamStore) Get(ctx context.Context, path paths.Path) ( return nil, Meta{}, err } - newMeta, err := convertMeta(lastSegmentMeta) + streamMeta, err := convertMeta(lastSegmentMeta) if err != nil { return nil, Meta{}, err } @@ -239,12 +231,9 @@ func (s *streamStore) Get(ctx context.Context, path paths.Path) ( } var rangers []ranger.Ranger - for i := int64(0); i < msi.NumberOfSegments; i++ { + for i := int64(0); i < msi.NumberOfSegments-1; i++ { currentPath := getSegmentPath(path, i) size := msi.SegmentsSize - if i == msi.NumberOfSegments-1 { - size = msi.LastSegmentSize - } var nonce eestream.Nonce _, err := nonce.Increment(i) if err != nil { @@ -262,24 +251,44 @@ func (s *streamStore) Get(ctx context.Context, path paths.Path) ( rangers = append(rangers, rr) } + var nonce eestream.Nonce + _, err = nonce.Increment(msi.NumberOfSegments - 1) + if err != nil { + return nil, Meta{}, err + } + decryptedLastSegmentRanger, err := decryptRanger( + ctx, + lastSegmentRanger, + msi.LastSegmentSize, + eestream.Cipher(msi.EncryptionType), + msi.LastSegmentEncryptionKey, + (*eestream.Key)(derivedKey), + &nonce, + int(msi.EncryptionBlockSize), + ) + if err != nil { + return nil, Meta{}, err + } + rangers = append(rangers, decryptedLastSegmentRanger) + catRangers := ranger.Concat(rangers...) - return catRangers, newMeta, nil + return catRangers, streamMeta, nil } // Meta implements Store.Meta func (s *streamStore) Meta(ctx context.Context, path paths.Path) (Meta, error) { - segmentMeta, err := s.segments.Meta(ctx, path.Prepend("l")) + lastSegmentMeta, err := s.segments.Meta(ctx, path.Prepend("l")) if err != nil { return Meta{}, err } - meta, err := convertMeta(segmentMeta) + streamMeta, err := convertMeta(lastSegmentMeta) if err != nil { return Meta{}, err } - return meta, nil + return streamMeta, nil } // Delete all the segments, with the last one last @@ -297,7 +306,7 @@ func (s *streamStore) Delete(ctx context.Context, path paths.Path) (err error) { return err } - for i := 0; i < int(msi.NumberOfSegments); i++ { + for i := 0; i < int(msi.NumberOfSegments-1); i++ { currentPath := getSegmentPath(path, int64(i)) err := s.segments.Delete(ctx, currentPath) if err != nil { @@ -316,9 +325,7 @@ type ListItem struct { } // List all the paths inside l/, stripping off the l/ prefix -func (s *streamStore) List(ctx context.Context, prefix, startAfter, endBefore paths.Path, - recursive bool, limit int, metaFlags uint32) (items []ListItem, - more bool, err error) { +func (s *streamStore) List(ctx context.Context, prefix, startAfter, endBefore paths.Path, recursive bool, limit int, metaFlags uint32) (items []ListItem, more bool, err error) { defer mon.Task()(&ctx)(&err) if metaFlags&meta.Size != 0 { @@ -362,54 +369,52 @@ func (lr *lazySegmentRanger) Size() int64 { // Range implements Ranger.Range to be lazily connected func (lr *lazySegmentRanger) Range(ctx context.Context, offset, length int64) (io.ReadCloser, error) { - cipher := lr.encType - if lr.ranger == nil { rr, m, err := lr.segments.Get(ctx, lr.path) if err != nil { return nil, err } - encryptedEncKey := m.Data - e, err := cipher.Decrypt(encryptedEncKey, lr.derivedKey, lr.startingNonce) + lr.ranger, err = decryptRanger(ctx, rr, lr.size, lr.encType, m.Data, lr.derivedKey, lr.startingNonce, lr.encBlockSize) if err != nil { return nil, err } - var encKey eestream.Key - copy(encKey[:], e) - decrypter, err := cipher.NewDecrypter(&encKey, lr.startingNonce, lr.encBlockSize) - if err != nil { - return nil, err - } - - var rd ranger.Ranger - if rr.Size()%int64(decrypter.InBlockSize()) != 0 { - reader, err := rr.Range(ctx, 0, rr.Size()) - if err != nil { - return nil, err - } - cipherData, err := ioutil.ReadAll(reader) - if err != nil { - return nil, err - } - data, err := cipher.Decrypt(cipherData, &encKey, lr.startingNonce) - if err != nil { - return nil, err - } - rd = ranger.ByteRanger(data) - lr.ranger = rd - } else { - rd, err = eestream.Transform(rr, decrypter) - if err != nil { - return nil, err - } - - paddedSize := rd.Size() - rc, err := eestream.Unpad(rd, int(paddedSize-lr.Size())) - if err != nil { - return nil, err - } - lr.ranger = rc - } } return lr.ranger.Range(ctx, offset, length) } + +// decryptRanger returns a decrypted ranger of the given rr ranger +func decryptRanger(ctx context.Context, rr ranger.Ranger, decryptedSize int64, cipher eestream.Cipher, encryptedEncKey []byte, derivedKey *eestream.Key, startingNonce *eestream.Nonce, encBlockSize int) (ranger.Ranger, error) { + e, err := cipher.Decrypt(encryptedEncKey, derivedKey, startingNonce) + if err != nil { + return nil, err + } + var encKey eestream.Key + copy(encKey[:], e) + decrypter, err := cipher.NewDecrypter(&encKey, startingNonce, encBlockSize) + if err != nil { + return nil, err + } + + var rd ranger.Ranger + if rr.Size()%int64(decrypter.InBlockSize()) != 0 { + reader, err := rr.Range(ctx, 0, rr.Size()) + if err != nil { + return nil, err + } + cipherData, err := ioutil.ReadAll(reader) + if err != nil { + return nil, err + } + data, err := cipher.Decrypt(cipherData, &encKey, startingNonce) + if err != nil { + return nil, err + } + return ranger.ByteRanger(data), nil + } + + rd, err = eestream.Transform(rr, decrypter) + if err != nil { + return nil, err + } + return eestream.Unpad(rd, int(rd.Size()-decryptedSize)) +} diff --git a/pkg/storage/streams/store_test.go b/pkg/storage/streams/store_test.go index 1d2609659..30ce42e95 100644 --- a/pkg/storage/streams/store_test.go +++ b/pkg/storage/streams/store_test.go @@ -102,7 +102,7 @@ func TestStreamStorePut(t *testing.T) { streamMeta := Meta{ Modified: segmentMeta.Modified, Expiration: segmentMeta.Expiration, - Size: 14, + Size: 4, Data: []byte("metadata"), } @@ -124,10 +124,9 @@ func TestStreamStorePut(t *testing.T) { errTag := fmt.Sprintf("Test case #%d", i) mockSegmentStore.EXPECT(). - Put(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Put(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(test.segmentMeta, test.segmentError). - Times(2). - Do(func(ctx context.Context, path paths.Path, data io.Reader, metadata []byte, expiration time.Time) { + Do(func(ctx context.Context, data io.Reader, expiration time.Time, info func() (paths.Path, []byte, error)) { for { buf := make([]byte, 4) _, err := data.Read(buf) @@ -181,11 +180,21 @@ func TestStreamStoreGet(t *testing.T) { closer: readCloserStub{}, } + msi := pb.MetaStreamInfo{ + NumberOfSegments: 1, + SegmentsSize: 10, + LastSegmentSize: 0, + } + lastSegmentMeta, err := proto.Marshal(&msi) + if err != nil { + t.Fatal(err) + } + segmentMeta := segments.Meta{ Modified: staticTime, Expiration: staticTime, Size: 10, - Data: []byte{}, + Data: lastSegmentMeta, } streamRanger := ranger.ByteRanger(nil) @@ -215,8 +224,8 @@ func TestStreamStoreGet(t *testing.T) { calls := []*gomock.Call{ mockSegmentStore.EXPECT(). - Meta(gomock.Any(), gomock.Any()). - Return(test.segmentMeta, test.segmentError), + Get(gomock.Any(), gomock.Any()). + Return(test.segmentRanger, test.segmentMeta, test.segmentError), } gomock.InOrder(calls...) @@ -231,7 +240,7 @@ func TestStreamStoreGet(t *testing.T) { t.Fatal(err) } - assert.Equal(t, test.streamRanger, ranger, errTag) + assert.Equal(t, test.streamRanger.Size(), ranger.Size(), errTag) assert.Equal(t, test.streamMeta, meta, errTag) } }