Clean up last segment handling (#408)

* Clean up last segment handling

* Fix increment for AES-GCM nonce

* Fix stream size calculation

* Adapt stream store tests

* Fix Delete method

* Rename info callback to segmentInfo

* Clearer calculation for offset in Nonce.AESGCMNonce()

* Adapt to the new little-endian nonce increment
This commit is contained in:
Kaloyan Raev 2018-10-03 16:05:40 +03:00 committed by GitHub
parent f9545c40aa
commit e2d745fe8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 241 additions and 204 deletions

View File

@ -90,7 +90,7 @@ type aesgcmDecrypter struct {
// through with key. See the comments for NewAESGCMEncrypter about
// startingNonce.
func NewAESGCMDecrypter(key *Key, startingNonce *AESGCMNonce, encryptedBlockSize int) (Transformer, error) {
block, err := aes.NewCipher((*key)[:])
block, err := aes.NewCipher(key[:])
if err != nil {
return nil, err
}
@ -127,8 +127,8 @@ func (s *aesgcmDecrypter) Transform(out, in []byte, blockNum int64) ([]byte, err
}
// EncryptAESGCM encrypts byte data with a key and nonce. The cipher data is returned
func EncryptAESGCM(data, key, nonce []byte) (cipherData []byte, err error) {
block, err := aes.NewCipher(key)
func EncryptAESGCM(data []byte, key *Key, nonce *AESGCMNonce) (cipherData []byte, err error) {
block, err := aes.NewCipher(key[:])
if err != nil {
return []byte{}, errs.Wrap(err)
}
@ -136,16 +136,16 @@ func EncryptAESGCM(data, key, nonce []byte) (cipherData []byte, err error) {
if err != nil {
return []byte{}, errs.Wrap(err)
}
cipherData = aesgcm.Seal(nil, nonce, data, nil)
cipherData = aesgcm.Seal(nil, nonce[:], data, nil)
return cipherData, nil
}
// DecryptAESGCM decrypts byte data with a key and nonce. The plain data is returned
func DecryptAESGCM(cipherData, key, nonce []byte) (data []byte, err error) {
func DecryptAESGCM(cipherData []byte, key *Key, nonce *AESGCMNonce) (data []byte, err error) {
if len(cipherData) == 0 {
return []byte{}, errs.New("empty cipher data")
}
block, err := aes.NewCipher(key)
block, err := aes.NewCipher(key[:])
if err != nil {
return []byte{}, errs.Wrap(err)
}
@ -153,7 +153,7 @@ func DecryptAESGCM(cipherData, key, nonce []byte) (data []byte, err error) {
if err != nil {
return []byte{}, errs.Wrap(err)
}
decrypted, err := aesgcm.Open(nil, nonce, cipherData, nil)
decrypted, err := aesgcm.Open(nil, nonce[:], cipherData, nil)
if err != nil {
return []byte{}, errs.Wrap(err)
}

View File

@ -48,7 +48,7 @@ func (nonce *Nonce) Increment(amount int64) (truncated bool, err error) {
// AESGCMNonce returns the nonce as a AES-GCM nonce
func (nonce *Nonce) AESGCMNonce() *AESGCMNonce {
aes := new(AESGCMNonce)
copy((*aes)[:], (*nonce)[:AESGCMNonceSize])
copy((*aes)[:], nonce[:AESGCMNonceSize])
return aes
}
@ -67,7 +67,7 @@ func (cipher Cipher) Encrypt(data []byte, key *Key, nonce *Nonce) (cipherData []
case None:
return data, nil
case AESGCM:
return EncryptAESGCM(data, key[:], nonce[:AESGCMNonceSize])
return EncryptAESGCM(data, key, nonce.AESGCMNonce())
case SecretBox:
return EncryptSecretBox(data, key, nonce)
default:
@ -82,7 +82,7 @@ func (cipher Cipher) Decrypt(cipherData []byte, key *Key, nonce *Nonce) (data []
case None:
return cipherData, nil
case AESGCM:
return DecryptAESGCM(cipherData, key[:], nonce[:AESGCMNonceSize])
return DecryptAESGCM(cipherData, key, nonce.AESGCMNonce())
case SecretBox:
return DecryptSecretBox(cipherData, key, nonce)
default:

View File

@ -19,22 +19,23 @@ var _ = math.Inf
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type MetaStreamInfo struct {
NumberOfSegments int64 `protobuf:"varint,1,opt,name=number_of_segments,json=numberOfSegments,proto3" json:"number_of_segments,omitempty"`
SegmentsSize int64 `protobuf:"varint,2,opt,name=segments_size,json=segmentsSize,proto3" json:"segments_size,omitempty"`
LastSegmentSize int64 `protobuf:"varint,3,opt,name=last_segment_size,json=lastSegmentSize,proto3" json:"last_segment_size,omitempty"`
Metadata []byte `protobuf:"bytes,4,opt,name=metadata,proto3" json:"metadata,omitempty"`
EncryptionType int32 `protobuf:"varint,5,opt,name=encryption_type,json=encryptionType,proto3" json:"encryption_type,omitempty"`
EncryptionBlockSize int32 `protobuf:"varint,6,opt,name=encryption_block_size,json=encryptionBlockSize,proto3" json:"encryption_block_size,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
NumberOfSegments int64 `protobuf:"varint,1,opt,name=number_of_segments,json=numberOfSegments,proto3" json:"number_of_segments,omitempty"`
SegmentsSize int64 `protobuf:"varint,2,opt,name=segments_size,json=segmentsSize,proto3" json:"segments_size,omitempty"`
LastSegmentSize int64 `protobuf:"varint,3,opt,name=last_segment_size,json=lastSegmentSize,proto3" json:"last_segment_size,omitempty"`
Metadata []byte `protobuf:"bytes,4,opt,name=metadata,proto3" json:"metadata,omitempty"`
EncryptionType int32 `protobuf:"varint,5,opt,name=encryption_type,json=encryptionType,proto3" json:"encryption_type,omitempty"`
EncryptionBlockSize int32 `protobuf:"varint,6,opt,name=encryption_block_size,json=encryptionBlockSize,proto3" json:"encryption_block_size,omitempty"`
LastSegmentEncryptionKey []byte `protobuf:"bytes,7,opt,name=last_segment_encryption_key,json=lastSegmentEncryptionKey,proto3" json:"last_segment_encryption_key,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *MetaStreamInfo) Reset() { *m = MetaStreamInfo{} }
func (m *MetaStreamInfo) String() string { return proto.CompactTextString(m) }
func (*MetaStreamInfo) ProtoMessage() {}
func (*MetaStreamInfo) Descriptor() ([]byte, []int) {
return fileDescriptor_meta_20ddc39f331d9ae2, []int{0}
return fileDescriptor_meta_026f5a060e38d7ef, []int{0}
}
func (m *MetaStreamInfo) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_MetaStreamInfo.Unmarshal(m, b)
@ -96,27 +97,35 @@ func (m *MetaStreamInfo) GetEncryptionBlockSize() int32 {
return 0
}
func (m *MetaStreamInfo) GetLastSegmentEncryptionKey() []byte {
if m != nil {
return m.LastSegmentEncryptionKey
}
return nil
}
func init() {
proto.RegisterType((*MetaStreamInfo)(nil), "streams.MetaStreamInfo")
}
func init() { proto.RegisterFile("meta.proto", fileDescriptor_meta_20ddc39f331d9ae2) }
func init() { proto.RegisterFile("meta.proto", fileDescriptor_meta_026f5a060e38d7ef) }
var fileDescriptor_meta_20ddc39f331d9ae2 = []byte{
// 228 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x4c, 0x90, 0xcd, 0x4a, 0x03, 0x31,
0x14, 0x85, 0xc9, 0xf4, 0x47, 0xb9, 0xd4, 0x56, 0x23, 0x42, 0x70, 0x35, 0xe8, 0xc2, 0x41, 0xc4,
0x85, 0xbe, 0x41, 0x77, 0x2e, 0x44, 0x98, 0x71, 0xe5, 0x26, 0x24, 0xf5, 0x8e, 0x0c, 0x36, 0x3f,
0x24, 0xd7, 0xc5, 0xf4, 0x09, 0x7c, 0x6c, 0x49, 0xa6, 0x63, 0xbb, 0xbc, 0xe7, 0xfb, 0x38, 0x27,
0x04, 0xc0, 0x20, 0xa9, 0x47, 0x1f, 0x1c, 0x39, 0x7e, 0x12, 0x29, 0xa0, 0x32, 0xf1, 0xe6, 0xb7,
0x80, 0xe5, 0x2b, 0x92, 0x6a, 0xf2, 0xfd, 0x62, 0x5b, 0xc7, 0x1f, 0x80, 0xdb, 0x1f, 0xa3, 0x31,
0x48, 0xd7, 0xca, 0x88, 0x5f, 0x06, 0x2d, 0x45, 0xc1, 0x4a, 0x56, 0x4d, 0xea, 0xf3, 0x81, 0xbc,
0xb5, 0xcd, 0x3e, 0xe7, 0xb7, 0x70, 0x36, 0x3a, 0x32, 0x76, 0x3b, 0x14, 0x45, 0x16, 0x17, 0x63,
0xd8, 0x74, 0x3b, 0xe4, 0xf7, 0x70, 0xb1, 0x55, 0x91, 0xc6, 0xb6, 0x41, 0x9c, 0x64, 0x71, 0x95,
0xc0, 0xbe, 0x2d, 0xbb, 0xd7, 0x70, 0x9a, 0x1e, 0xfa, 0xa9, 0x48, 0x89, 0x69, 0xc9, 0xaa, 0x45,
0xfd, 0x7f, 0xf3, 0x3b, 0x58, 0xa1, 0xdd, 0x84, 0xde, 0x53, 0xe7, 0xac, 0xa4, 0xde, 0xa3, 0x98,
0x95, 0xac, 0x9a, 0xd5, 0xcb, 0x43, 0xfc, 0xde, 0x7b, 0xe4, 0x4f, 0x70, 0x75, 0x24, 0xea, 0xad,
0xdb, 0x7c, 0x0f, 0xa3, 0xf3, 0xac, 0x5f, 0x1e, 0xe0, 0x3a, 0xb1, 0x34, 0xbc, 0x9e, 0x7e, 0x14,
0x5e, 0xeb, 0x79, 0xfe, 0xa0, 0xe7, 0xbf, 0x00, 0x00, 0x00, 0xff, 0xff, 0xcf, 0xfc, 0x52, 0x0a,
0x2e, 0x01, 0x00, 0x00,
var fileDescriptor_meta_026f5a060e38d7ef = []byte{
// 250 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x90, 0x4f, 0x4b, 0xc3, 0x40,
0x10, 0xc5, 0x49, 0xfa, 0x4f, 0x86, 0xda, 0xea, 0x8a, 0xb0, 0xe8, 0x25, 0xe8, 0xc1, 0x20, 0xe2,
0x41, 0xcf, 0x5e, 0x0a, 0x1e, 0x44, 0x44, 0x48, 0x3c, 0x79, 0x09, 0x9b, 0x3a, 0x91, 0xd0, 0x66,
0x77, 0xc9, 0x8e, 0x87, 0xed, 0x17, 0xf4, 0x6b, 0x49, 0x26, 0x4d, 0x13, 0x8f, 0xf3, 0xde, 0x8f,
0xf7, 0x1e, 0x03, 0x50, 0x21, 0xa9, 0x7b, 0x5b, 0x1b, 0x32, 0x62, 0xe6, 0xa8, 0x46, 0x55, 0xb9,
0xab, 0xdf, 0x10, 0x16, 0x6f, 0x48, 0x2a, 0xe5, 0xfb, 0x45, 0x17, 0x46, 0xdc, 0x81, 0xd0, 0x3f,
0x55, 0x8e, 0x75, 0x66, 0x8a, 0xcc, 0xe1, 0x77, 0x85, 0x9a, 0x9c, 0x0c, 0xa2, 0x20, 0x1e, 0x25,
0x27, 0xad, 0xf3, 0x5e, 0xa4, 0x7b, 0x5d, 0x5c, 0xc3, 0x71, 0xc7, 0x64, 0xae, 0xdc, 0xa1, 0x0c,
0x19, 0x9c, 0x77, 0x62, 0x5a, 0xee, 0x50, 0xdc, 0xc2, 0xe9, 0x56, 0x39, 0xea, 0xd2, 0x5a, 0x70,
0xc4, 0xe0, 0xb2, 0x31, 0xf6, 0x69, 0xcc, 0x5e, 0xc0, 0x51, 0x33, 0xf4, 0x4b, 0x91, 0x92, 0xe3,
0x28, 0x88, 0xe7, 0xc9, 0xe1, 0x16, 0x37, 0xb0, 0x44, 0xbd, 0xae, 0xbd, 0xa5, 0xd2, 0xe8, 0x8c,
0xbc, 0x45, 0x39, 0x89, 0x82, 0x78, 0x92, 0x2c, 0x7a, 0xf9, 0xc3, 0x5b, 0x14, 0x0f, 0x70, 0x3e,
0x00, 0xf3, 0xad, 0x59, 0x6f, 0xda, 0xd2, 0x29, 0xe3, 0x67, 0xbd, 0xb9, 0x6a, 0x3c, 0x2e, 0x7e,
0x82, 0xcb, 0x7f, 0x23, 0x07, 0x01, 0x1b, 0xf4, 0x72, 0xc6, 0x5b, 0xe4, 0x60, 0xee, 0xf3, 0x01,
0x78, 0x45, 0xbf, 0x1a, 0x7f, 0x86, 0x36, 0xcf, 0xa7, 0xfc, 0xdf, 0xc7, 0xbf, 0x00, 0x00, 0x00,
0xff, 0xff, 0x9e, 0xf7, 0xce, 0x8a, 0x6d, 0x01, 0x00, 0x00,
}

View File

@ -13,4 +13,5 @@ message MetaStreamInfo {
bytes metadata = 4;
int32 encryption_type = 5;
int32 encryption_block_size = 6;
bytes last_segment_encryption_key = 7;
}

View File

@ -38,22 +38,21 @@ func (m *MockStore) EXPECT() *MockStoreMockRecorder {
return m.recorder
}
// Meta mocks base method
func (m *MockStore) Meta(ctx context.Context, path paths.Path) (Meta, error) {
ret := m.ctrl.Call(m, "Meta", ctx, path)
ret0, _ := ret[0].(Meta)
ret1, _ := ret[1].(error)
return ret0, ret1
// Delete mocks base method
func (m *MockStore) Delete(arg0 context.Context, arg1 paths.Path) error {
ret := m.ctrl.Call(m, "Delete", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// Meta indicates an expected call of Meta
func (mr *MockStoreMockRecorder) Meta(ctx, path interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Meta", reflect.TypeOf((*MockStore)(nil).Meta), ctx, path)
// Delete indicates an expected call of Delete
func (mr *MockStoreMockRecorder) Delete(arg0, arg1 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockStore)(nil).Delete), arg0, arg1)
}
// Get mocks base method
func (m *MockStore) Get(ctx context.Context, path paths.Path) (ranger.Ranger, Meta, error) {
ret := m.ctrl.Call(m, "Get", ctx, path)
func (m *MockStore) Get(arg0 context.Context, arg1 paths.Path) (ranger.Ranger, Meta, error) {
ret := m.ctrl.Call(m, "Get", arg0, arg1)
ret0, _ := ret[0].(ranger.Ranger)
ret1, _ := ret[1].(Meta)
ret2, _ := ret[2].(error)
@ -61,38 +60,13 @@ func (m *MockStore) Get(ctx context.Context, path paths.Path) (ranger.Ranger, Me
}
// Get indicates an expected call of Get
func (mr *MockStoreMockRecorder) Get(ctx, path interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockStore)(nil).Get), ctx, path)
}
// Put mocks base method
func (m *MockStore) Put(ctx context.Context, path paths.Path, data io.Reader, metadata []byte, expiration time.Time) (Meta, error) {
ret := m.ctrl.Call(m, "Put", ctx, path, data, metadata, expiration)
ret0, _ := ret[0].(Meta)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// Put indicates an expected call of Put
func (mr *MockStoreMockRecorder) Put(ctx, path, data, metadata, expiration interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Put", reflect.TypeOf((*MockStore)(nil).Put), ctx, path, data, metadata, expiration)
}
// Delete mocks base method
func (m *MockStore) Delete(ctx context.Context, path paths.Path) error {
ret := m.ctrl.Call(m, "Delete", ctx, path)
ret0, _ := ret[0].(error)
return ret0
}
// Delete indicates an expected call of Delete
func (mr *MockStoreMockRecorder) Delete(ctx, path interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockStore)(nil).Delete), ctx, path)
func (mr *MockStoreMockRecorder) Get(arg0, arg1 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockStore)(nil).Get), arg0, arg1)
}
// List mocks base method
func (m *MockStore) List(ctx context.Context, prefix, startAfter, endBefore paths.Path, recursive bool, limit int, metaFlags uint32) ([]ListItem, bool, error) {
ret := m.ctrl.Call(m, "List", ctx, prefix, startAfter, endBefore, recursive, limit, metaFlags)
func (m *MockStore) List(arg0 context.Context, arg1, arg2, arg3 paths.Path, arg4 bool, arg5 int, arg6 uint32) ([]ListItem, bool, error) {
ret := m.ctrl.Call(m, "List", arg0, arg1, arg2, arg3, arg4, arg5, arg6)
ret0, _ := ret[0].([]ListItem)
ret1, _ := ret[1].(bool)
ret2, _ := ret[2].(error)
@ -100,6 +74,32 @@ func (m *MockStore) List(ctx context.Context, prefix, startAfter, endBefore path
}
// List indicates an expected call of List
func (mr *MockStoreMockRecorder) List(ctx, prefix, startAfter, endBefore, recursive, limit, metaFlags interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "List", reflect.TypeOf((*MockStore)(nil).List), ctx, prefix, startAfter, endBefore, recursive, limit, metaFlags)
func (mr *MockStoreMockRecorder) List(arg0, arg1, arg2, arg3, arg4, arg5, arg6 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "List", reflect.TypeOf((*MockStore)(nil).List), arg0, arg1, arg2, arg3, arg4, arg5, arg6)
}
// Meta mocks base method
func (m *MockStore) Meta(arg0 context.Context, arg1 paths.Path) (Meta, error) {
ret := m.ctrl.Call(m, "Meta", arg0, arg1)
ret0, _ := ret[0].(Meta)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// Meta indicates an expected call of Meta
func (mr *MockStoreMockRecorder) Meta(arg0, arg1 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Meta", reflect.TypeOf((*MockStore)(nil).Meta), arg0, arg1)
}
// Put mocks base method
func (m *MockStore) Put(arg0 context.Context, arg1 io.Reader, arg2 time.Time, arg3 func() (paths.Path, []byte, error)) (Meta, error) {
ret := m.ctrl.Call(m, "Put", arg0, arg1, arg2, arg3)
ret0, _ := ret[0].(Meta)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// Put indicates an expected call of Put
func (mr *MockStoreMockRecorder) Put(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Put", reflect.TypeOf((*MockStore)(nil).Put), arg0, arg1, arg2, arg3)
}

View File

@ -48,14 +48,10 @@ type ListItem struct {
// Store for segments
type Store interface {
Meta(ctx context.Context, path paths.Path) (meta Meta, err error)
Get(ctx context.Context, path paths.Path) (rr ranger.Ranger,
meta Meta, err error)
Put(ctx context.Context, path paths.Path, data io.Reader, metadata []byte,
expiration time.Time) (meta Meta, err error)
Get(ctx context.Context, path paths.Path) (rr ranger.Ranger, meta Meta, err error)
Put(ctx context.Context, data io.Reader, expiration time.Time, segmentInfo func() (paths.Path, []byte, error)) (meta Meta, err error)
Delete(ctx context.Context, path paths.Path) (err error)
List(ctx context.Context, prefix, startAfter, endBefore paths.Path,
recursive bool, limit int, metaFlags uint32) (items []ListItem,
more bool, err error)
List(ctx context.Context, prefix, startAfter, endBefore paths.Path, recursive bool, limit int, metaFlags uint32) (items []ListItem, more bool, err error)
}
type segmentStore struct {
@ -86,12 +82,9 @@ func (s *segmentStore) Meta(ctx context.Context, path paths.Path) (meta Meta,
}
// Put uploads a segment to an erasure code client
func (s *segmentStore) Put(ctx context.Context, path paths.Path, data io.Reader,
metadata []byte, expiration time.Time) (meta Meta, err error) {
func (s *segmentStore) Put(ctx context.Context, data io.Reader, expiration time.Time, segmentInfo func() (paths.Path, []byte, error)) (meta Meta, err error) {
defer mon.Task()(&ctx)(&err)
var p *pb.Pointer
exp, err := ptypes.TimestampProto(expiration)
if err != nil {
return Meta{}, Error.Wrap(err)
@ -102,8 +95,17 @@ func (s *segmentStore) Put(ctx context.Context, path paths.Path, data io.Reader,
if err != nil {
return Meta{}, err
}
var path paths.Path
var pointer *pb.Pointer
if !remoteSized {
p = &pb.Pointer{
p, metadata, err := segmentInfo()
if err != nil {
return Meta{}, Error.Wrap(err)
}
path = p
pointer = &pb.Pointer{
Type: pb.Pointer_INLINE,
InlineSegment: peekReader.thresholdBuf,
Size: int64(len(peekReader.thresholdBuf)),
@ -124,14 +126,21 @@ func (s *segmentStore) Put(ctx context.Context, path paths.Path, data io.Reader,
if err != nil {
return Meta{}, Error.Wrap(err)
}
p, err = s.makeRemotePointer(successfulNodes, pieceID, sizedReader.Size(), exp, metadata)
p, metadata, err := segmentInfo()
if err != nil {
return Meta{}, Error.Wrap(err)
}
path = p
pointer, err = s.makeRemotePointer(successfulNodes, pieceID, sizedReader.Size(), exp, metadata)
if err != nil {
return Meta{}, err
}
}
// puts pointer to pointerDB
err = s.pdb.Put(ctx, path, p)
err = s.pdb.Put(ctx, path, pointer)
if err != nil {
return Meta{}, Error.Wrap(err)
}

View File

@ -132,7 +132,9 @@ func TestSegmentStorePutRemote(t *testing.T) {
}
gomock.InOrder(calls...)
_, err := ss.Put(ctx, p, r, tt.mdInput, tt.expiration)
_, err := ss.Put(ctx, r, tt.expiration, func() (paths.Path, []byte, error) {
return p, tt.mdInput, nil
})
assert.NoError(t, err, tt.name)
}
}
@ -175,7 +177,9 @@ func TestSegmentStorePutInline(t *testing.T) {
}
gomock.InOrder(calls...)
_, err := ss.Put(ctx, p, r, tt.mdInput, tt.expiration)
_, err := ss.Put(ctx, r, tt.expiration, func() (paths.Path, []byte, error) {
return p, tt.mdInput, nil
})
assert.NoError(t, err, tt.name)
}
}

View File

@ -35,16 +35,16 @@ type Meta struct {
}
// convertMeta converts segment metadata to stream metadata
func convertMeta(segmentMeta segments.Meta) (Meta, error) {
func convertMeta(lastSegmentMeta segments.Meta) (Meta, error) {
msi := pb.MetaStreamInfo{}
err := proto.Unmarshal(segmentMeta.Data, &msi)
err := proto.Unmarshal(lastSegmentMeta.Data, &msi)
if err != nil {
return Meta{}, err
}
return Meta{
Modified: segmentMeta.Modified,
Expiration: segmentMeta.Expiration,
Modified: lastSegmentMeta.Modified,
Expiration: lastSegmentMeta.Expiration,
Size: ((msi.NumberOfSegments - 1) * msi.SegmentsSize) + msi.LastSegmentSize,
Data: msi.Metadata,
}, nil
@ -54,12 +54,9 @@ func convertMeta(segmentMeta segments.Meta) (Meta, error) {
type Store interface {
Meta(ctx context.Context, path paths.Path) (Meta, error)
Get(ctx context.Context, path paths.Path) (ranger.Ranger, Meta, error)
Put(ctx context.Context, path paths.Path, data io.Reader,
metadata []byte, expiration time.Time) (Meta, error)
Put(ctx context.Context, path paths.Path, data io.Reader, metadata []byte, expiration time.Time) (Meta, error)
Delete(ctx context.Context, path paths.Path) error
List(ctx context.Context, prefix, startAfter, endBefore paths.Path,
recursive bool, limit int, metaFlags uint32) (items []ListItem,
more bool, err error)
List(ctx context.Context, prefix, startAfter, endBefore paths.Path, recursive bool, limit int, metaFlags uint32) (items []ListItem, more bool, err error)
}
// streamStore is a store for streams
@ -96,13 +93,12 @@ func NewStreamStore(segments segments.Store, segmentSize int64, rootKey string,
// store the first piece at s0/<path>, second piece at s1/<path>, and the
// *last* piece at l/<path>. Store the given metadata, along with the number
// of segments, in a new protobuf, in the metadata of l/<path>.
func (s *streamStore) Put(ctx context.Context, path paths.Path, data io.Reader,
metadata []byte, expiration time.Time) (m Meta, err error) {
func (s *streamStore) Put(ctx context.Context, path paths.Path, data io.Reader, metadata []byte, expiration time.Time) (m Meta, err error) {
defer mon.Task()(&ctx)(&err)
var totalSegments int64
var totalSize int64
var lastSegmentSize int64
var currentSegment int64
var streamSize int64
var putMeta segments.Meta
derivedKey, err := path.DeriveContentKey(s.rootKey)
if err != nil {
@ -121,7 +117,7 @@ func (s *streamStore) Put(ctx context.Context, path paths.Path, data io.Reader,
}
var nonce eestream.Nonce
_, err := nonce.Increment(totalSegments)
_, err := nonce.Increment(currentSegment)
if err != nil {
return Meta{}, err
}
@ -137,7 +133,6 @@ func (s *streamStore) Put(ctx context.Context, path paths.Path, data io.Reader,
}
sizeReader := NewSizeReader(eofReader)
segmentPath := getSegmentPath(path, totalSegments)
segmentReader := io.LimitReader(sizeReader, s.segmentSize)
peekReader := segments.NewPeekThresholdReader(segmentReader)
largeData, err := peekReader.IsLargerThan(encrypter.InBlockSize())
@ -160,52 +155,50 @@ func (s *streamStore) Put(ctx context.Context, path paths.Path, data io.Reader,
transformedReader = bytes.NewReader(cipherData)
}
_, err = s.segments.Put(ctx, segmentPath, transformedReader, encryptedEncKey, expiration)
putMeta, err = s.segments.Put(ctx, transformedReader, expiration, func() (paths.Path, []byte, error) {
if !eofReader.isEOF() {
segmentPath := getSegmentPath(path, currentSegment)
return segmentPath, encryptedEncKey, nil
}
lastSegmentPath := path.Prepend("l")
msi := pb.MetaStreamInfo{
NumberOfSegments: currentSegment + 1,
SegmentsSize: s.segmentSize,
LastSegmentSize: sizeReader.Size(),
Metadata: metadata,
EncryptionType: int32(s.encType),
EncryptionBlockSize: int32(s.encBlockSize),
LastSegmentEncryptionKey: encryptedEncKey,
}
lastSegmentMeta, err := proto.Marshal(&msi)
if err != nil {
return nil, nil, err
}
return lastSegmentPath, lastSegmentMeta, nil
})
if err != nil {
return Meta{}, err
}
lastSegmentSize = sizeReader.Size()
totalSize = totalSize + lastSegmentSize
totalSegments = totalSegments + 1
currentSegment++
streamSize += sizeReader.Size()
}
if eofReader.hasError() {
return Meta{}, eofReader.err
}
lastSegmentPath := path.Prepend("l")
md := pb.MetaStreamInfo{
NumberOfSegments: totalSegments,
SegmentsSize: s.segmentSize,
LastSegmentSize: lastSegmentSize,
Metadata: metadata,
EncryptionType: int32(s.encType),
EncryptionBlockSize: int32(s.encBlockSize),
}
lastSegmentMetadata, err := proto.Marshal(&md)
if err != nil {
return Meta{}, err
}
putMeta, err := s.segments.Put(ctx, lastSegmentPath, data,
lastSegmentMetadata, expiration)
if err != nil {
return Meta{}, err
}
totalSize = totalSize + putMeta.Size
resultMeta := Meta{
Modified: putMeta.Modified,
Expiration: expiration,
Size: totalSize,
Size: streamSize,
Data: metadata,
}
return resultMeta, nil
}
// GetSegmentPath returns the unique path for a particular segment
// getSegmentPath returns the unique path for a particular segment
func getSegmentPath(p paths.Path, segNum int64) paths.Path {
return p.Prepend(fmt.Sprintf("s%d", segNum))
}
@ -213,11 +206,10 @@ func getSegmentPath(p paths.Path, segNum int64) paths.Path {
// Get returns a ranger that knows what the overall size is (from l/<path>)
// and then returns the appropriate data from segments s0/<path>, s1/<path>,
// ..., l/<path>.
func (s *streamStore) Get(ctx context.Context, path paths.Path) (
rr ranger.Ranger, meta Meta, err error) {
func (s *streamStore) Get(ctx context.Context, path paths.Path) (rr ranger.Ranger, meta Meta, err error) {
defer mon.Task()(&ctx)(&err)
lastSegmentMeta, err := s.segments.Meta(ctx, path.Prepend("l"))
lastSegmentRanger, lastSegmentMeta, err := s.segments.Get(ctx, path.Prepend("l"))
if err != nil {
return nil, Meta{}, err
}
@ -228,7 +220,7 @@ func (s *streamStore) Get(ctx context.Context, path paths.Path) (
return nil, Meta{}, err
}
newMeta, err := convertMeta(lastSegmentMeta)
streamMeta, err := convertMeta(lastSegmentMeta)
if err != nil {
return nil, Meta{}, err
}
@ -239,12 +231,9 @@ func (s *streamStore) Get(ctx context.Context, path paths.Path) (
}
var rangers []ranger.Ranger
for i := int64(0); i < msi.NumberOfSegments; i++ {
for i := int64(0); i < msi.NumberOfSegments-1; i++ {
currentPath := getSegmentPath(path, i)
size := msi.SegmentsSize
if i == msi.NumberOfSegments-1 {
size = msi.LastSegmentSize
}
var nonce eestream.Nonce
_, err := nonce.Increment(i)
if err != nil {
@ -262,24 +251,44 @@ func (s *streamStore) Get(ctx context.Context, path paths.Path) (
rangers = append(rangers, rr)
}
var nonce eestream.Nonce
_, err = nonce.Increment(msi.NumberOfSegments - 1)
if err != nil {
return nil, Meta{}, err
}
decryptedLastSegmentRanger, err := decryptRanger(
ctx,
lastSegmentRanger,
msi.LastSegmentSize,
eestream.Cipher(msi.EncryptionType),
msi.LastSegmentEncryptionKey,
(*eestream.Key)(derivedKey),
&nonce,
int(msi.EncryptionBlockSize),
)
if err != nil {
return nil, Meta{}, err
}
rangers = append(rangers, decryptedLastSegmentRanger)
catRangers := ranger.Concat(rangers...)
return catRangers, newMeta, nil
return catRangers, streamMeta, nil
}
// Meta implements Store.Meta
func (s *streamStore) Meta(ctx context.Context, path paths.Path) (Meta, error) {
segmentMeta, err := s.segments.Meta(ctx, path.Prepend("l"))
lastSegmentMeta, err := s.segments.Meta(ctx, path.Prepend("l"))
if err != nil {
return Meta{}, err
}
meta, err := convertMeta(segmentMeta)
streamMeta, err := convertMeta(lastSegmentMeta)
if err != nil {
return Meta{}, err
}
return meta, nil
return streamMeta, nil
}
// Delete all the segments, with the last one last
@ -297,7 +306,7 @@ func (s *streamStore) Delete(ctx context.Context, path paths.Path) (err error) {
return err
}
for i := 0; i < int(msi.NumberOfSegments); i++ {
for i := 0; i < int(msi.NumberOfSegments-1); i++ {
currentPath := getSegmentPath(path, int64(i))
err := s.segments.Delete(ctx, currentPath)
if err != nil {
@ -316,9 +325,7 @@ type ListItem struct {
}
// List all the paths inside l/, stripping off the l/ prefix
func (s *streamStore) List(ctx context.Context, prefix, startAfter, endBefore paths.Path,
recursive bool, limit int, metaFlags uint32) (items []ListItem,
more bool, err error) {
func (s *streamStore) List(ctx context.Context, prefix, startAfter, endBefore paths.Path, recursive bool, limit int, metaFlags uint32) (items []ListItem, more bool, err error) {
defer mon.Task()(&ctx)(&err)
if metaFlags&meta.Size != 0 {
@ -362,54 +369,52 @@ func (lr *lazySegmentRanger) Size() int64 {
// Range implements Ranger.Range to be lazily connected
func (lr *lazySegmentRanger) Range(ctx context.Context, offset, length int64) (io.ReadCloser, error) {
cipher := lr.encType
if lr.ranger == nil {
rr, m, err := lr.segments.Get(ctx, lr.path)
if err != nil {
return nil, err
}
encryptedEncKey := m.Data
e, err := cipher.Decrypt(encryptedEncKey, lr.derivedKey, lr.startingNonce)
lr.ranger, err = decryptRanger(ctx, rr, lr.size, lr.encType, m.Data, lr.derivedKey, lr.startingNonce, lr.encBlockSize)
if err != nil {
return nil, err
}
var encKey eestream.Key
copy(encKey[:], e)
decrypter, err := cipher.NewDecrypter(&encKey, lr.startingNonce, lr.encBlockSize)
if err != nil {
return nil, err
}
var rd ranger.Ranger
if rr.Size()%int64(decrypter.InBlockSize()) != 0 {
reader, err := rr.Range(ctx, 0, rr.Size())
if err != nil {
return nil, err
}
cipherData, err := ioutil.ReadAll(reader)
if err != nil {
return nil, err
}
data, err := cipher.Decrypt(cipherData, &encKey, lr.startingNonce)
if err != nil {
return nil, err
}
rd = ranger.ByteRanger(data)
lr.ranger = rd
} else {
rd, err = eestream.Transform(rr, decrypter)
if err != nil {
return nil, err
}
paddedSize := rd.Size()
rc, err := eestream.Unpad(rd, int(paddedSize-lr.Size()))
if err != nil {
return nil, err
}
lr.ranger = rc
}
}
return lr.ranger.Range(ctx, offset, length)
}
// decryptRanger returns a decrypted ranger of the given rr ranger
func decryptRanger(ctx context.Context, rr ranger.Ranger, decryptedSize int64, cipher eestream.Cipher, encryptedEncKey []byte, derivedKey *eestream.Key, startingNonce *eestream.Nonce, encBlockSize int) (ranger.Ranger, error) {
e, err := cipher.Decrypt(encryptedEncKey, derivedKey, startingNonce)
if err != nil {
return nil, err
}
var encKey eestream.Key
copy(encKey[:], e)
decrypter, err := cipher.NewDecrypter(&encKey, startingNonce, encBlockSize)
if err != nil {
return nil, err
}
var rd ranger.Ranger
if rr.Size()%int64(decrypter.InBlockSize()) != 0 {
reader, err := rr.Range(ctx, 0, rr.Size())
if err != nil {
return nil, err
}
cipherData, err := ioutil.ReadAll(reader)
if err != nil {
return nil, err
}
data, err := cipher.Decrypt(cipherData, &encKey, startingNonce)
if err != nil {
return nil, err
}
return ranger.ByteRanger(data), nil
}
rd, err = eestream.Transform(rr, decrypter)
if err != nil {
return nil, err
}
return eestream.Unpad(rd, int(rd.Size()-decryptedSize))
}

View File

@ -102,7 +102,7 @@ func TestStreamStorePut(t *testing.T) {
streamMeta := Meta{
Modified: segmentMeta.Modified,
Expiration: segmentMeta.Expiration,
Size: 14,
Size: 4,
Data: []byte("metadata"),
}
@ -124,10 +124,9 @@ func TestStreamStorePut(t *testing.T) {
errTag := fmt.Sprintf("Test case #%d", i)
mockSegmentStore.EXPECT().
Put(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Put(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(test.segmentMeta, test.segmentError).
Times(2).
Do(func(ctx context.Context, path paths.Path, data io.Reader, metadata []byte, expiration time.Time) {
Do(func(ctx context.Context, data io.Reader, expiration time.Time, info func() (paths.Path, []byte, error)) {
for {
buf := make([]byte, 4)
_, err := data.Read(buf)
@ -181,11 +180,21 @@ func TestStreamStoreGet(t *testing.T) {
closer: readCloserStub{},
}
msi := pb.MetaStreamInfo{
NumberOfSegments: 1,
SegmentsSize: 10,
LastSegmentSize: 0,
}
lastSegmentMeta, err := proto.Marshal(&msi)
if err != nil {
t.Fatal(err)
}
segmentMeta := segments.Meta{
Modified: staticTime,
Expiration: staticTime,
Size: 10,
Data: []byte{},
Data: lastSegmentMeta,
}
streamRanger := ranger.ByteRanger(nil)
@ -215,8 +224,8 @@ func TestStreamStoreGet(t *testing.T) {
calls := []*gomock.Call{
mockSegmentStore.EXPECT().
Meta(gomock.Any(), gomock.Any()).
Return(test.segmentMeta, test.segmentError),
Get(gomock.Any(), gomock.Any()).
Return(test.segmentRanger, test.segmentMeta, test.segmentError),
}
gomock.InOrder(calls...)
@ -231,7 +240,7 @@ func TestStreamStoreGet(t *testing.T) {
t.Fatal(err)
}
assert.Equal(t, test.streamRanger, ranger, errTag)
assert.Equal(t, test.streamRanger.Size(), ranger.Size(), errTag)
assert.Equal(t, test.streamMeta, meta, errTag)
}
}