storj/pkg/storage/streams/store.go
Kaloyan Raev 99640225fd
Refactor Path type (#522)
The old paths.Path type is now replaced with the new storj.Path.

storj.Path is simply an alias to the built-in string type. As such it can be used just as any string, which simplifies a lot working with paths. No more conversions paths.New and path.String().

As an alias storj.Path does not define any methods. However, any functions applying to strings (like those from the strings package) gracefully apply to storj.Path too. In addition we have a few more functions defined:

    storj.SplitPath
    storj.JoinPaths
    encryption.EncryptPath
    encryption.DecryptPath
    encryption.DerivePathKey
    encryption.DeriveContentKey

All code in master is migrated to the new storj.Path type.

The Path example is also updated and is good for reference: /pkg/encryption/examples_test.go

This PR also resolve a nonce misuse issue in path encryption: https://storjlabs.atlassian.net/browse/V3-545
2018-10-25 23:28:16 +03:00

690 lines
19 KiB
Go

// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package streams
import (
"bytes"
"context"
"crypto/rand"
"fmt"
"io"
"io/ioutil"
"strings"
"time"
"github.com/golang/protobuf/proto"
"github.com/zeebo/errs"
"go.uber.org/zap"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/pkg/eestream"
"storj.io/storj/pkg/encryption"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/ranger"
"storj.io/storj/pkg/storage/meta"
"storj.io/storj/pkg/storage/segments"
"storj.io/storj/pkg/storj"
"storj.io/storj/storage"
)
var mon = monkit.Package()
// Meta info about a segment
type Meta struct {
Modified time.Time
Expiration time.Time
Size int64
Data []byte
}
// convertMeta converts segment metadata to stream metadata
func convertMeta(lastSegmentMeta segments.Meta) (Meta, error) {
stream := pb.StreamInfo{}
err := proto.Unmarshal(lastSegmentMeta.Data, &stream)
if err != nil {
return Meta{}, err
}
return Meta{
Modified: lastSegmentMeta.Modified,
Expiration: lastSegmentMeta.Expiration,
Size: ((stream.NumberOfSegments - 1) * stream.SegmentsSize) + stream.LastSegmentSize,
Data: stream.Metadata,
}, nil
}
// Store interface methods for streams to satisfy to be a store
type Store interface {
Meta(ctx context.Context, path storj.Path) (Meta, error)
Get(ctx context.Context, path storj.Path) (ranger.Ranger, Meta, error)
Put(ctx context.Context, path storj.Path, data io.Reader, metadata []byte, expiration time.Time) (Meta, error)
Delete(ctx context.Context, path storj.Path) error
List(ctx context.Context, prefix, startAfter, endBefore storj.Path, recursive bool, limit int, metaFlags uint32) (items []ListItem, more bool, err error)
}
// streamStore is a store for streams
type streamStore struct {
segments segments.Store
segmentSize int64
rootKey *storj.Key
encBlockSize int
cipher storj.Cipher
}
// NewStreamStore stuff
func NewStreamStore(segments segments.Store, segmentSize int64, rootKey *storj.Key, encBlockSize int, cipher storj.Cipher) (Store, error) {
if segmentSize <= 0 {
return nil, errs.New("segment size must be larger than 0")
}
if rootKey == nil {
return nil, errs.New("encryption key must not be empty")
}
if encBlockSize <= 0 {
return nil, errs.New("encryption block size must be larger than 0")
}
return &streamStore{
segments: segments,
segmentSize: segmentSize,
rootKey: rootKey,
encBlockSize: encBlockSize,
cipher: cipher,
}, nil
}
// Put breaks up data as it comes in into s.segmentSize length pieces, then
// store the first piece at s0/<path>, second piece at s1/<path>, and the
// *last* piece at l/<path>. Store the given metadata, along with the number
// of segments, in a new protobuf, in the metadata of l/<path>.
func (s *streamStore) Put(ctx context.Context, path storj.Path, data io.Reader, metadata []byte, expiration time.Time) (m Meta, err error) {
defer mon.Task()(&ctx)(&err)
// previously file uploaded?
err = s.Delete(ctx, path)
if err != nil && !storage.ErrKeyNotFound.Has(err) {
//something wrong happened checking for an existing
//file with the same name
return Meta{}, err
}
m, lastSegment, err := s.upload(ctx, path, data, metadata, expiration)
if err != nil {
s.cancelHandler(context.Background(), lastSegment, path)
}
return m, err
}
func (s *streamStore) upload(ctx context.Context, path storj.Path, data io.Reader, metadata []byte, expiration time.Time) (m Meta, lastSegment int64, err error) {
defer mon.Task()(&ctx)(&err)
var currentSegment int64
var streamSize int64
var putMeta segments.Meta
defer func() {
select {
case <-ctx.Done():
s.cancelHandler(context.Background(), currentSegment, path)
default:
}
}()
derivedKey, err := encryption.DeriveContentKey(path, s.rootKey)
if err != nil {
return Meta{}, currentSegment, err
}
eofReader := NewEOFReader(data)
for !eofReader.isEOF() && !eofReader.hasError() {
// generate random key for encrypting the segment's content
var contentKey storj.Key
_, err = rand.Read(contentKey[:])
if err != nil {
return Meta{}, currentSegment, err
}
// Initialize the content nonce with the segment's index incremented by 1.
// The increment by 1 is to avoid nonce reuse with the metadata encryption,
// which is encrypted with the zero nonce.
var contentNonce storj.Nonce
_, err := encryption.Increment(&contentNonce, currentSegment+1)
if err != nil {
return Meta{}, currentSegment, err
}
encrypter, err := encryption.NewEncrypter(s.cipher, &contentKey, &contentNonce, s.encBlockSize)
if err != nil {
return Meta{}, currentSegment, err
}
// generate random nonce for encrypting the content key
var keyNonce storj.Nonce
_, err = rand.Read(keyNonce[:])
if err != nil {
return Meta{}, currentSegment, err
}
encryptedKey, err := encryption.EncryptKey(&contentKey, s.cipher, derivedKey, &keyNonce)
if err != nil {
return Meta{}, currentSegment, err
}
sizeReader := NewSizeReader(eofReader)
segmentReader := io.LimitReader(sizeReader, s.segmentSize)
peekReader := segments.NewPeekThresholdReader(segmentReader)
largeData, err := peekReader.IsLargerThan(encrypter.InBlockSize())
if err != nil {
return Meta{}, currentSegment, err
}
var transformedReader io.Reader
if largeData {
paddedReader := eestream.PadReader(ioutil.NopCloser(peekReader), encrypter.InBlockSize())
transformedReader = encryption.TransformReader(paddedReader, encrypter, 0)
} else {
data, err := ioutil.ReadAll(peekReader)
if err != nil {
return Meta{}, currentSegment, err
}
cipherData, err := encryption.Encrypt(data, s.cipher, &contentKey, &contentNonce)
if err != nil {
return Meta{}, currentSegment, err
}
transformedReader = bytes.NewReader(cipherData)
}
putMeta, err = s.segments.Put(ctx, transformedReader, expiration, func() (storj.Path, []byte, error) {
encPath, err := encryptAfterBucket(path, s.rootKey)
if err != nil {
return "", nil, err
}
if !eofReader.isEOF() {
segmentPath := getSegmentPath(encPath, currentSegment)
if s.cipher == storj.Unencrypted {
return segmentPath, nil, nil
}
segmentMeta, err := proto.Marshal(&pb.SegmentMeta{
EncryptedKey: encryptedKey,
KeyNonce: keyNonce[:],
})
if err != nil {
return "", nil, err
}
return segmentPath, segmentMeta, nil
}
lastSegmentPath := storj.JoinPaths("l", encPath)
streamInfo, err := proto.Marshal(&pb.StreamInfo{
NumberOfSegments: currentSegment + 1,
SegmentsSize: s.segmentSize,
LastSegmentSize: sizeReader.Size(),
Metadata: metadata,
})
if err != nil {
return "", nil, err
}
// encrypt metadata with the content encryption key and zero nonce
encryptedStreamInfo, err := encryption.Encrypt(streamInfo, s.cipher, &contentKey, &storj.Nonce{})
if err != nil {
return "", nil, err
}
streamMeta := pb.StreamMeta{
EncryptedStreamInfo: encryptedStreamInfo,
EncryptionType: int32(s.cipher),
EncryptionBlockSize: int32(s.encBlockSize),
}
if s.cipher != storj.Unencrypted {
streamMeta.LastSegmentMeta = &pb.SegmentMeta{
EncryptedKey: encryptedKey,
KeyNonce: keyNonce[:],
}
}
lastSegmentMeta, err := proto.Marshal(&streamMeta)
if err != nil {
return "", nil, err
}
return lastSegmentPath, lastSegmentMeta, nil
})
if err != nil {
return Meta{}, currentSegment, err
}
currentSegment++
streamSize += sizeReader.Size()
}
if eofReader.hasError() {
return Meta{}, currentSegment, eofReader.err
}
resultMeta := Meta{
Modified: putMeta.Modified,
Expiration: expiration,
Size: streamSize,
Data: metadata,
}
return resultMeta, currentSegment, nil
}
// getSegmentPath returns the unique path for a particular segment
func getSegmentPath(path storj.Path, segNum int64) storj.Path {
return storj.JoinPaths(fmt.Sprintf("s%d", segNum), path)
}
// Get returns a ranger that knows what the overall size is (from l/<path>)
// and then returns the appropriate data from segments s0/<path>, s1/<path>,
// ..., l/<path>.
func (s *streamStore) Get(ctx context.Context, path storj.Path) (rr ranger.Ranger, meta Meta, err error) {
defer mon.Task()(&ctx)(&err)
encPath, err := encryptAfterBucket(path, s.rootKey)
if err != nil {
return nil, Meta{}, err
}
lastSegmentRanger, lastSegmentMeta, err := s.segments.Get(ctx, storj.JoinPaths("l", encPath))
if err != nil {
return nil, Meta{}, err
}
streamInfo, err := decryptStreamInfo(ctx, lastSegmentMeta, path, s.rootKey)
if err != nil {
return nil, Meta{}, err
}
stream := pb.StreamInfo{}
err = proto.Unmarshal(streamInfo, &stream)
if err != nil {
return nil, Meta{}, err
}
streamMeta := pb.StreamMeta{}
err = proto.Unmarshal(lastSegmentMeta.Data, &streamMeta)
if err != nil {
return nil, Meta{}, err
}
derivedKey, err := encryption.DeriveContentKey(path, s.rootKey)
if err != nil {
return nil, Meta{}, err
}
var rangers []ranger.Ranger
for i := int64(0); i < stream.NumberOfSegments-1; i++ {
currentPath := getSegmentPath(encPath, i)
size := stream.SegmentsSize
var contentNonce storj.Nonce
_, err := encryption.Increment(&contentNonce, i+1)
if err != nil {
return nil, Meta{}, err
}
rr := &lazySegmentRanger{
segments: s.segments,
path: currentPath,
size: size,
derivedKey: derivedKey,
startingNonce: &contentNonce,
encBlockSize: int(streamMeta.EncryptionBlockSize),
cipher: storj.Cipher(streamMeta.EncryptionType),
}
rangers = append(rangers, rr)
}
var contentNonce storj.Nonce
_, err = encryption.Increment(&contentNonce, stream.NumberOfSegments)
if err != nil {
return nil, Meta{}, err
}
encryptedKey, keyNonce := getEncryptedKeyAndNonce(streamMeta.LastSegmentMeta)
decryptedLastSegmentRanger, err := decryptRanger(
ctx,
lastSegmentRanger,
stream.LastSegmentSize,
storj.Cipher(streamMeta.EncryptionType),
derivedKey,
encryptedKey,
keyNonce,
&contentNonce,
int(streamMeta.EncryptionBlockSize),
)
if err != nil {
return nil, Meta{}, err
}
rangers = append(rangers, decryptedLastSegmentRanger)
catRangers := ranger.Concat(rangers...)
lastSegmentMeta.Data = streamInfo
meta, err = convertMeta(lastSegmentMeta)
if err != nil {
return nil, Meta{}, err
}
return catRangers, meta, nil
}
// Meta implements Store.Meta
func (s *streamStore) Meta(ctx context.Context, path storj.Path) (meta Meta, err error) {
defer mon.Task()(&ctx)(&err)
encPath, err := encryptAfterBucket(path, s.rootKey)
if err != nil {
return Meta{}, err
}
lastSegmentMeta, err := s.segments.Meta(ctx, storj.JoinPaths("l", encPath))
if err != nil {
return Meta{}, err
}
streamInfo, err := decryptStreamInfo(ctx, lastSegmentMeta, path, s.rootKey)
if err != nil {
return Meta{}, err
}
lastSegmentMeta.Data = streamInfo
newStreamMeta, err := convertMeta(lastSegmentMeta)
if err != nil {
return Meta{}, err
}
return newStreamMeta, nil
}
// Delete all the segments, with the last one last
func (s *streamStore) Delete(ctx context.Context, path storj.Path) (err error) {
defer mon.Task()(&ctx)(&err)
encPath, err := encryptAfterBucket(path, s.rootKey)
if err != nil {
return err
}
lastSegmentMeta, err := s.segments.Meta(ctx, storj.JoinPaths("l", encPath))
if err != nil {
return err
}
streamInfo, err := decryptStreamInfo(ctx, lastSegmentMeta, path, s.rootKey)
if err != nil {
return err
}
stream := pb.StreamInfo{}
err = proto.Unmarshal(streamInfo, &stream)
if err != nil {
return err
}
for i := 0; i < int(stream.NumberOfSegments-1); i++ {
encPath, err = encryptAfterBucket(path, s.rootKey)
if err != nil {
return err
}
currentPath := getSegmentPath(encPath, int64(i))
err := s.segments.Delete(ctx, currentPath)
if err != nil {
return err
}
}
return s.segments.Delete(ctx, storj.JoinPaths("l", encPath))
}
// ListItem is a single item in a listing
type ListItem struct {
Path storj.Path
Meta Meta
IsPrefix bool
}
// List all the paths inside l/, stripping off the l/ prefix
func (s *streamStore) List(ctx context.Context, prefix, startAfter, endBefore storj.Path, recursive bool, limit int, metaFlags uint32) (items []ListItem, more bool, err error) {
defer mon.Task()(&ctx)(&err)
if metaFlags&meta.Size != 0 {
// Calculating the stream's size require also the user-defined metadata,
// where stream store keeps info about the number of segments and their size.
metaFlags |= meta.UserDefined
}
prefix = strings.TrimSuffix(prefix, "/")
encPrefix, err := encryptAfterBucket(prefix, s.rootKey)
if err != nil {
return nil, false, err
}
prefixKey, err := encryption.DerivePathKey(prefix, s.rootKey, len(storj.SplitPath(prefix)))
if err != nil {
return nil, false, err
}
encStartAfter, err := s.encryptMarker(startAfter, prefixKey)
if err != nil {
return nil, false, err
}
encEndBefore, err := s.encryptMarker(endBefore, prefixKey)
if err != nil {
return nil, false, err
}
segments, more, err := s.segments.List(ctx, storj.JoinPaths("l", encPrefix), encStartAfter, encEndBefore, recursive, limit, metaFlags)
if err != nil {
return nil, false, err
}
items = make([]ListItem, len(segments))
for i, item := range segments {
path, err := s.decryptMarker(item.Path, prefixKey)
if err != nil {
return nil, false, err
}
streamInfo, err := decryptStreamInfo(ctx, item.Meta, storj.JoinPaths(prefix, path), s.rootKey)
if err != nil {
return nil, false, err
}
item.Meta.Data = streamInfo
newMeta, err := convertMeta(item.Meta)
if err != nil {
return nil, false, err
}
items[i] = ListItem{Path: path, Meta: newMeta, IsPrefix: item.IsPrefix}
}
return items, more, nil
}
// encryptMarker is a helper method for encrypting startAfter and endBefore markers
func (s *streamStore) encryptMarker(marker storj.Path, prefixKey *storj.Key) (storj.Path, error) {
if bytes.Equal(s.rootKey[:], prefixKey[:]) { // empty prefix
return encryptAfterBucket(marker, s.rootKey)
}
return encryption.EncryptPath(marker, prefixKey)
}
// decryptMarker is a helper method for decrypting listed path markers
func (s *streamStore) decryptMarker(marker storj.Path, prefixKey *storj.Key) (storj.Path, error) {
if bytes.Equal(s.rootKey[:], prefixKey[:]) { // empty prefix
return decryptAfterBucket(marker, s.rootKey)
}
return encryption.DecryptPath(marker, prefixKey)
}
type lazySegmentRanger struct {
ranger ranger.Ranger
segments segments.Store
path storj.Path
size int64
derivedKey *storj.Key
startingNonce *storj.Nonce
encBlockSize int
cipher storj.Cipher
}
// Size implements Ranger.Size
func (lr *lazySegmentRanger) Size() int64 {
return lr.size
}
// Range implements Ranger.Range to be lazily connected
func (lr *lazySegmentRanger) Range(ctx context.Context, offset, length int64) (io.ReadCloser, error) {
if lr.ranger == nil {
rr, m, err := lr.segments.Get(ctx, lr.path)
if err != nil {
return nil, err
}
segmentMeta := pb.SegmentMeta{}
err = proto.Unmarshal(m.Data, &segmentMeta)
if err != nil {
return nil, err
}
encryptedKey, keyNonce := getEncryptedKeyAndNonce(&segmentMeta)
lr.ranger, err = decryptRanger(ctx, rr, lr.size, lr.cipher, lr.derivedKey, encryptedKey, keyNonce, lr.startingNonce, lr.encBlockSize)
if err != nil {
return nil, err
}
}
return lr.ranger.Range(ctx, offset, length)
}
// decryptRanger returns a decrypted ranger of the given rr ranger
func decryptRanger(ctx context.Context, rr ranger.Ranger, decryptedSize int64, cipher storj.Cipher, derivedKey *storj.Key, encryptedKey storj.EncryptedPrivateKey, encryptedKeyNonce, startingNonce *storj.Nonce, encBlockSize int) (ranger.Ranger, error) {
contentKey, err := encryption.DecryptKey(encryptedKey, cipher, derivedKey, encryptedKeyNonce)
if err != nil {
return nil, err
}
decrypter, err := encryption.NewDecrypter(cipher, contentKey, startingNonce, encBlockSize)
if err != nil {
return nil, err
}
var rd ranger.Ranger
if rr.Size()%int64(decrypter.InBlockSize()) != 0 {
reader, err := rr.Range(ctx, 0, rr.Size())
if err != nil {
return nil, err
}
cipherData, err := ioutil.ReadAll(reader)
if err != nil {
return nil, err
}
data, err := encryption.Decrypt(cipherData, cipher, contentKey, startingNonce)
if err != nil {
return nil, err
}
return ranger.ByteRanger(data), nil
}
rd, err = encryption.Transform(rr, decrypter)
if err != nil {
return nil, err
}
return eestream.Unpad(rd, int(rd.Size()-decryptedSize))
}
// encryptAfterBucket encrypts a path without encrypting its first element
func encryptAfterBucket(path storj.Path, key *storj.Key) (encrypted storj.Path, err error) {
comps := storj.SplitPath(path)
if len(comps) <= 1 {
return path, nil
}
encrypted, err = encryption.EncryptPath(path, key)
if err != nil {
return "", err
}
// replace the first path component with the unencrypted bucket name
return storj.JoinPaths(comps[0], storj.JoinPaths(storj.SplitPath(encrypted)[1:]...)), nil
}
// decryptAfterBucket decrypts a path without modifying its first element
func decryptAfterBucket(path storj.Path, key *storj.Key) (decrypted storj.Path, err error) {
comps := storj.SplitPath(path)
if len(comps) <= 1 {
return path, nil
}
bucket := comps[0]
toDecrypt := storj.JoinPaths(comps[1:]...)
bucketKey, err := encryption.DerivePathKey(path, key, 1)
if err != nil {
return "", err
}
decPath, err := encryption.DecryptPath(toDecrypt, bucketKey)
if err != nil {
return "", err
}
return storj.JoinPaths(bucket, decPath), nil
}
// CancelHandler handles clean up of segments on receiving CTRL+C
func (s *streamStore) cancelHandler(ctx context.Context, totalSegments int64, path storj.Path) {
for i := int64(0); i < totalSegments; i++ {
encPath, err := encryptAfterBucket(path, s.rootKey)
if err != nil {
zap.S().Warnf("Failed deleting a segment due to encryption path %v %v", i, err)
}
currentPath := getSegmentPath(encPath, i)
err = s.segments.Delete(ctx, currentPath)
if err != nil {
zap.S().Warnf("Failed deleting a segment %v %v", currentPath, err)
}
}
}
func getEncryptedKeyAndNonce(m *pb.SegmentMeta) (storj.EncryptedPrivateKey, *storj.Nonce) {
if m == nil {
return nil, nil
}
var nonce storj.Nonce
copy(nonce[:], m.KeyNonce)
return m.EncryptedKey, &nonce
}
func decryptStreamInfo(ctx context.Context, item segments.Meta, path storj.Path, rootKey *storj.Key) (streamInfo []byte, err error) {
streamMeta := pb.StreamMeta{}
err = proto.Unmarshal(item.Data, &streamMeta)
if err != nil {
return nil, err
}
derivedKey, err := encryption.DeriveContentKey(path, rootKey)
if err != nil {
return nil, err
}
cipher := storj.Cipher(streamMeta.EncryptionType)
encryptedKey, keyNonce := getEncryptedKeyAndNonce(streamMeta.LastSegmentMeta)
contentKey, err := encryption.DecryptKey(encryptedKey, cipher, derivedKey, keyNonce)
if err != nil {
return nil, err
}
// decrypt metadata with the content encryption key and zero nonce
return encryption.Decrypt(streamMeta.EncryptedStreamInfo, cipher, contentKey, &storj.Nonce{})
}