d9d82b0336
* uplink/metainfo: Return classified Not Found error Metainfo client Batch method must return the Storj Not Found error class when the RCP server response with a not found status code as any other metainfo Client method does. Also if the error isn't Not Found one, it must wrap the error. * uplink/storage/streams: Use Batch request in Delete Change the 2 individual metainfo Client calls that streamStore Delete method does by a single Batch one.
701 lines
21 KiB
Go
701 lines
21 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package streams
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/rand"
|
|
"io"
|
|
"io/ioutil"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/gogo/protobuf/proto"
|
|
"github.com/zeebo/errs"
|
|
"go.uber.org/zap"
|
|
monkit "gopkg.in/spacemonkeygo/monkit.v2"
|
|
|
|
"storj.io/storj/pkg/encryption"
|
|
"storj.io/storj/pkg/paths"
|
|
"storj.io/storj/pkg/pb"
|
|
"storj.io/storj/pkg/ranger"
|
|
"storj.io/storj/pkg/storj"
|
|
"storj.io/storj/uplink/eestream"
|
|
"storj.io/storj/uplink/metainfo"
|
|
"storj.io/storj/uplink/storage/segments"
|
|
)
|
|
|
|
var mon = monkit.Package()
|
|
|
|
// Meta info about a stream
|
|
type Meta struct {
|
|
Modified time.Time
|
|
Expiration time.Time
|
|
Size int64
|
|
Data []byte
|
|
}
|
|
|
|
func numberOfSegments(stream *pb.StreamInfo, streamMeta *pb.StreamMeta) int64 {
|
|
if streamMeta.NumberOfSegments > 0 {
|
|
return streamMeta.NumberOfSegments
|
|
}
|
|
return stream.DeprecatedNumberOfSegments
|
|
}
|
|
|
|
// convertMeta converts segment metadata to stream metadata
|
|
func convertMeta(modified, expiration time.Time, stream pb.StreamInfo, streamMeta pb.StreamMeta) Meta {
|
|
return Meta{
|
|
Modified: modified,
|
|
Expiration: expiration,
|
|
Size: ((numberOfSegments(&stream, &streamMeta) - 1) * stream.SegmentsSize) + stream.LastSegmentSize,
|
|
Data: stream.Metadata,
|
|
}
|
|
}
|
|
|
|
// Store interface methods for streams to satisfy to be a store
|
|
type typedStore interface {
|
|
Meta(ctx context.Context, path Path, pathCipher storj.CipherSuite) (Meta, error)
|
|
Get(ctx context.Context, path Path, pathCipher storj.CipherSuite) (ranger.Ranger, Meta, error)
|
|
Put(ctx context.Context, path Path, pathCipher storj.CipherSuite, data io.Reader, metadata []byte, expiration time.Time) (Meta, error)
|
|
Delete(ctx context.Context, path Path, pathCipher storj.CipherSuite) error
|
|
List(ctx context.Context, prefix Path, startAfter, endBefore string, pathCipher storj.CipherSuite, recursive bool, limit int, metaFlags uint32) (items []ListItem, more bool, err error)
|
|
}
|
|
|
|
// streamStore is a store for streams. It implements typedStore as part of an ongoing migration
|
|
// to use typed paths. See the shim for the store that the rest of the world interacts with.
|
|
type streamStore struct {
|
|
metainfo *metainfo.Client
|
|
segments segments.Store
|
|
segmentSize int64
|
|
encStore *encryption.Store
|
|
encBlockSize int
|
|
cipher storj.CipherSuite
|
|
inlineThreshold int
|
|
}
|
|
|
|
// newTypedStreamStore constructs a typedStore backed by a streamStore.
|
|
func newTypedStreamStore(metainfo *metainfo.Client, segments segments.Store, segmentSize int64, encStore *encryption.Store, encBlockSize int, cipher storj.CipherSuite, inlineThreshold int) (typedStore, error) {
|
|
if segmentSize <= 0 {
|
|
return nil, errs.New("segment size must be larger than 0")
|
|
}
|
|
if encBlockSize <= 0 {
|
|
return nil, errs.New("encryption block size must be larger than 0")
|
|
}
|
|
|
|
return &streamStore{
|
|
metainfo: metainfo,
|
|
segments: segments,
|
|
segmentSize: segmentSize,
|
|
encStore: encStore,
|
|
encBlockSize: encBlockSize,
|
|
cipher: cipher,
|
|
inlineThreshold: inlineThreshold,
|
|
}, nil
|
|
}
|
|
|
|
// Put breaks up data as it comes in into s.segmentSize length pieces, then
|
|
// store the first piece at s0/<path>, second piece at s1/<path>, and the
|
|
// *last* piece at l/<path>. Store the given metadata, along with the number
|
|
// of segments, in a new protobuf, in the metadata of l/<path>.
|
|
func (s *streamStore) Put(ctx context.Context, path Path, pathCipher storj.CipherSuite, data io.Reader, metadata []byte, expiration time.Time) (m Meta, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
// previously file uploaded?
|
|
err = s.Delete(ctx, path, pathCipher)
|
|
if err != nil && !storj.ErrObjectNotFound.Has(err) {
|
|
// something wrong happened checking for an existing
|
|
// file with the same name
|
|
return Meta{}, err
|
|
}
|
|
|
|
m, lastSegment, streamID, err := s.upload(ctx, path, pathCipher, data, metadata, expiration)
|
|
if err != nil {
|
|
s.cancelHandler(context.Background(), streamID, lastSegment, path, pathCipher)
|
|
}
|
|
|
|
return m, err
|
|
}
|
|
|
|
func (s *streamStore) upload(ctx context.Context, path Path, pathCipher storj.CipherSuite, data io.Reader, metadata []byte, expiration time.Time) (m Meta, lastSegment int64, streamID storj.StreamID, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
var currentSegment int64
|
|
var streamSize int64
|
|
var putMeta segments.Meta
|
|
var objectMetadata []byte
|
|
|
|
derivedKey, err := encryption.DeriveContentKey(path.Bucket(), path.UnencryptedPath(), s.encStore)
|
|
if err != nil {
|
|
return Meta{}, currentSegment, streamID, err
|
|
}
|
|
encPath, err := encryption.EncryptPath(path.Bucket(), path.UnencryptedPath(), pathCipher, s.encStore)
|
|
if err != nil {
|
|
return Meta{}, currentSegment, streamID, err
|
|
}
|
|
|
|
streamID, err = s.metainfo.BeginObject(ctx, metainfo.BeginObjectParams{
|
|
Bucket: []byte(path.Bucket()),
|
|
EncryptedPath: []byte(encPath.Raw()),
|
|
ExpiresAt: expiration,
|
|
})
|
|
if err != nil {
|
|
return Meta{}, currentSegment, streamID, err
|
|
}
|
|
|
|
defer func() {
|
|
select {
|
|
case <-ctx.Done():
|
|
s.cancelHandler(context.Background(), streamID, currentSegment, path, pathCipher)
|
|
default:
|
|
}
|
|
}()
|
|
|
|
eofReader := NewEOFReader(data)
|
|
|
|
for !eofReader.isEOF() && !eofReader.hasError() {
|
|
// generate random key for encrypting the segment's content
|
|
var contentKey storj.Key
|
|
_, err = rand.Read(contentKey[:])
|
|
if err != nil {
|
|
return Meta{}, currentSegment, streamID, err
|
|
}
|
|
|
|
// Initialize the content nonce with the segment's index incremented by 1.
|
|
// The increment by 1 is to avoid nonce reuse with the metadata encryption,
|
|
// which is encrypted with the zero nonce.
|
|
var contentNonce storj.Nonce
|
|
_, err := encryption.Increment(&contentNonce, currentSegment+1)
|
|
if err != nil {
|
|
return Meta{}, currentSegment, streamID, err
|
|
}
|
|
|
|
encrypter, err := encryption.NewEncrypter(s.cipher, &contentKey, &contentNonce, s.encBlockSize)
|
|
if err != nil {
|
|
return Meta{}, currentSegment, streamID, err
|
|
}
|
|
|
|
// generate random nonce for encrypting the content key
|
|
var keyNonce storj.Nonce
|
|
_, err = rand.Read(keyNonce[:])
|
|
if err != nil {
|
|
return Meta{}, currentSegment, streamID, err
|
|
}
|
|
|
|
encryptedKey, err := encryption.EncryptKey(&contentKey, s.cipher, derivedKey, &keyNonce)
|
|
if err != nil {
|
|
return Meta{}, currentSegment, streamID, err
|
|
}
|
|
|
|
sizeReader := NewSizeReader(eofReader)
|
|
segmentReader := io.LimitReader(sizeReader, s.segmentSize)
|
|
peekReader := segments.NewPeekThresholdReader(segmentReader)
|
|
// If the data is larger than the inline threshold size, then it will be a remote segment
|
|
isRemote, err := peekReader.IsLargerThan(s.inlineThreshold)
|
|
if err != nil {
|
|
return Meta{}, currentSegment, streamID, err
|
|
}
|
|
var transformedReader io.Reader
|
|
if isRemote {
|
|
paddedReader := eestream.PadReader(ioutil.NopCloser(peekReader), encrypter.InBlockSize())
|
|
transformedReader = encryption.TransformReader(paddedReader, encrypter, 0)
|
|
} else {
|
|
data, err := ioutil.ReadAll(peekReader)
|
|
if err != nil {
|
|
return Meta{}, currentSegment, streamID, err
|
|
}
|
|
cipherData, err := encryption.Encrypt(data, s.cipher, &contentKey, &contentNonce)
|
|
if err != nil {
|
|
return Meta{}, currentSegment, streamID, err
|
|
}
|
|
transformedReader = bytes.NewReader(cipherData)
|
|
}
|
|
|
|
putMeta, err = s.segments.Put(ctx, streamID, transformedReader, expiration, func() (_ int64, segmentEncryption storj.SegmentEncryption, err error) {
|
|
if s.cipher != storj.EncNull {
|
|
segmentEncryption = storj.SegmentEncryption{
|
|
EncryptedKey: encryptedKey,
|
|
EncryptedKeyNonce: keyNonce,
|
|
}
|
|
}
|
|
return currentSegment, segmentEncryption, nil
|
|
})
|
|
if err != nil {
|
|
return Meta{}, currentSegment, streamID, err
|
|
}
|
|
|
|
streamInfo, err := proto.Marshal(&pb.StreamInfo{
|
|
DeprecatedNumberOfSegments: currentSegment + 1,
|
|
SegmentsSize: s.segmentSize,
|
|
LastSegmentSize: sizeReader.Size(),
|
|
Metadata: metadata,
|
|
})
|
|
if err != nil {
|
|
return Meta{}, currentSegment, streamID, err
|
|
}
|
|
|
|
// encrypt metadata with the content encryption key and zero nonce
|
|
encryptedStreamInfo, err := encryption.Encrypt(streamInfo, s.cipher, &contentKey, &storj.Nonce{})
|
|
if err != nil {
|
|
return Meta{}, currentSegment, streamID, err
|
|
}
|
|
|
|
streamMeta := pb.StreamMeta{
|
|
NumberOfSegments: currentSegment + 1,
|
|
EncryptedStreamInfo: encryptedStreamInfo,
|
|
EncryptionType: int32(s.cipher),
|
|
EncryptionBlockSize: int32(s.encBlockSize),
|
|
}
|
|
|
|
if s.cipher != storj.EncNull {
|
|
streamMeta.LastSegmentMeta = &pb.SegmentMeta{
|
|
EncryptedKey: encryptedKey,
|
|
KeyNonce: keyNonce[:],
|
|
}
|
|
|
|
}
|
|
|
|
objectMetadata, err = proto.Marshal(&streamMeta)
|
|
if err != nil {
|
|
return Meta{}, currentSegment, streamID, err
|
|
}
|
|
|
|
currentSegment++
|
|
streamSize += sizeReader.Size()
|
|
}
|
|
|
|
if eofReader.hasError() {
|
|
return Meta{}, currentSegment, streamID, eofReader.err
|
|
}
|
|
|
|
err = s.metainfo.CommitObject(ctx, metainfo.CommitObjectParams{
|
|
StreamID: streamID,
|
|
EncryptedMetadata: objectMetadata,
|
|
})
|
|
if err != nil {
|
|
return Meta{}, currentSegment, streamID, err
|
|
}
|
|
|
|
resultMeta := Meta{
|
|
Modified: putMeta.Modified,
|
|
Expiration: expiration,
|
|
Size: streamSize,
|
|
Data: metadata,
|
|
}
|
|
|
|
return resultMeta, currentSegment, streamID, nil
|
|
}
|
|
|
|
// Get returns a ranger that knows what the overall size is (from l/<path>)
|
|
// and then returns the appropriate data from segments s0/<path>, s1/<path>,
|
|
// ..., l/<path>.
|
|
func (s *streamStore) Get(ctx context.Context, path Path, pathCipher storj.CipherSuite) (rr ranger.Ranger, meta Meta, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
encPath, err := encryption.EncryptPath(path.Bucket(), path.UnencryptedPath(), pathCipher, s.encStore)
|
|
if err != nil {
|
|
return nil, Meta{}, err
|
|
}
|
|
|
|
object, err := s.metainfo.GetObject(ctx, metainfo.GetObjectParams{
|
|
Bucket: []byte(path.Bucket()),
|
|
EncryptedPath: []byte(encPath.Raw()),
|
|
})
|
|
if err != nil {
|
|
return nil, Meta{}, err
|
|
}
|
|
|
|
lastSegmentRanger, _, err := s.segments.Get(ctx, object.StreamID, -1, object.RedundancyScheme)
|
|
if err != nil {
|
|
return nil, Meta{}, err
|
|
}
|
|
|
|
streamInfo, streamMeta, err := TypedDecryptStreamInfo(ctx, object.Metadata, path, s.encStore)
|
|
if err != nil {
|
|
return nil, Meta{}, err
|
|
}
|
|
|
|
stream := pb.StreamInfo{}
|
|
err = proto.Unmarshal(streamInfo, &stream)
|
|
if err != nil {
|
|
return nil, Meta{}, err
|
|
}
|
|
|
|
derivedKey, err := encryption.DeriveContentKey(path.Bucket(), path.UnencryptedPath(), s.encStore)
|
|
if err != nil {
|
|
return nil, Meta{}, err
|
|
}
|
|
|
|
var rangers []ranger.Ranger
|
|
for i := int64(0); i < numberOfSegments(&stream, &streamMeta)-1; i++ {
|
|
var contentNonce storj.Nonce
|
|
_, err = encryption.Increment(&contentNonce, i+1)
|
|
if err != nil {
|
|
return nil, Meta{}, err
|
|
}
|
|
|
|
rangers = append(rangers, &lazySegmentRanger{
|
|
segments: s.segments,
|
|
streamID: object.StreamID,
|
|
segmentIndex: int32(i),
|
|
rs: object.RedundancyScheme,
|
|
m: streamMeta.LastSegmentMeta,
|
|
size: stream.SegmentsSize,
|
|
derivedKey: derivedKey,
|
|
startingNonce: &contentNonce,
|
|
encBlockSize: int(streamMeta.EncryptionBlockSize),
|
|
cipher: storj.CipherSuite(streamMeta.EncryptionType),
|
|
})
|
|
}
|
|
|
|
var contentNonce storj.Nonce
|
|
_, err = encryption.Increment(&contentNonce, numberOfSegments(&stream, &streamMeta))
|
|
if err != nil {
|
|
return nil, Meta{}, err
|
|
}
|
|
|
|
encryptedKey, keyNonce := getEncryptedKeyAndNonce(streamMeta.LastSegmentMeta)
|
|
decryptedLastSegmentRanger, err := decryptRanger(
|
|
ctx,
|
|
lastSegmentRanger,
|
|
stream.LastSegmentSize,
|
|
storj.CipherSuite(streamMeta.EncryptionType),
|
|
derivedKey,
|
|
encryptedKey,
|
|
keyNonce,
|
|
&contentNonce,
|
|
int(streamMeta.EncryptionBlockSize),
|
|
)
|
|
if err != nil {
|
|
return nil, Meta{}, err
|
|
}
|
|
|
|
rangers = append(rangers, decryptedLastSegmentRanger)
|
|
catRangers := ranger.Concat(rangers...)
|
|
meta = convertMeta(object.Modified, object.Expires, stream, streamMeta)
|
|
return catRangers, meta, nil
|
|
}
|
|
|
|
// Meta implements Store.Meta
|
|
func (s *streamStore) Meta(ctx context.Context, path Path, pathCipher storj.CipherSuite) (meta Meta, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
encPath, err := encryption.EncryptPath(path.Bucket(), path.UnencryptedPath(), pathCipher, s.encStore)
|
|
if err != nil {
|
|
return Meta{}, err
|
|
}
|
|
|
|
object, err := s.metainfo.GetObject(ctx, metainfo.GetObjectParams{
|
|
Bucket: []byte(path.Bucket()),
|
|
EncryptedPath: []byte(encPath.Raw()),
|
|
})
|
|
|
|
streamInfo, streamMeta, err := TypedDecryptStreamInfo(ctx, object.Metadata, path, s.encStore)
|
|
if err != nil {
|
|
return Meta{}, err
|
|
}
|
|
|
|
var stream pb.StreamInfo
|
|
if err := proto.Unmarshal(streamInfo, &stream); err != nil {
|
|
return Meta{}, err
|
|
}
|
|
|
|
return convertMeta(object.Modified, object.Expires, stream, streamMeta), nil
|
|
}
|
|
|
|
// Delete all the segments, with the last one last
|
|
func (s *streamStore) Delete(ctx context.Context, path Path, pathCipher storj.CipherSuite) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
encPath, err := encryption.EncryptPath(path.Bucket(), path.UnencryptedPath(), pathCipher, s.encStore)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
batchItems := []metainfo.BatchItem{
|
|
&metainfo.BeginDeleteObjectParams{
|
|
Bucket: []byte(path.Bucket()),
|
|
EncryptedPath: []byte(encPath.Raw()),
|
|
},
|
|
&metainfo.ListSegmentsParams{
|
|
CursorPosition: storj.SegmentPosition{
|
|
Index: 0,
|
|
},
|
|
},
|
|
}
|
|
|
|
resps, err := s.metainfo.Batch(ctx, batchItems...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(resps) != 2 {
|
|
return errs.New(
|
|
"metainfo.Batch request returned an unexpected number of responses. Want: 2, got: %d", len(resps),
|
|
)
|
|
}
|
|
|
|
delResp, err := resps[0].BeginDeleteObject()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
listResp, err := resps[1].ListSegment()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// TODO handle listResp.More
|
|
|
|
var errlist errs.Group
|
|
for _, item := range listResp.Items {
|
|
err = s.segments.Delete(ctx, delResp.StreamID, item.Position.Index)
|
|
if err != nil {
|
|
errlist.Add(err)
|
|
continue
|
|
}
|
|
}
|
|
|
|
return errlist.Err()
|
|
}
|
|
|
|
// ListItem is a single item in a listing
|
|
type ListItem struct {
|
|
Path string
|
|
Meta Meta
|
|
IsPrefix bool
|
|
}
|
|
|
|
// pathForKey removes the trailing `/` from the raw path, which is required so
|
|
// the derived key matches the final list path (which also has the trailing
|
|
// encrypted `/` part of the path removed)
|
|
func pathForKey(raw string) paths.Unencrypted {
|
|
return paths.NewUnencrypted(strings.TrimSuffix(raw, "/"))
|
|
}
|
|
|
|
// List all the paths inside l/, stripping off the l/ prefix
|
|
func (s *streamStore) List(ctx context.Context, prefix Path, startAfter, endBefore string, pathCipher storj.CipherSuite, recursive bool, limit int, metaFlags uint32) (items []ListItem, more bool, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
// TODO use flags with listing
|
|
// if metaFlags&meta.Size != 0 {
|
|
// Calculating the stream's size require also the user-defined metadata,
|
|
// where stream store keeps info about the number of segments and their size.
|
|
// metaFlags |= meta.UserDefined
|
|
// }
|
|
|
|
prefixKey, err := encryption.DerivePathKey(prefix.Bucket(), pathForKey(prefix.UnencryptedPath().Raw()), s.encStore)
|
|
if err != nil {
|
|
return nil, false, err
|
|
}
|
|
|
|
encPrefix, err := encryption.EncryptPath(prefix.Bucket(), prefix.UnencryptedPath(), pathCipher, s.encStore)
|
|
if err != nil {
|
|
return nil, false, err
|
|
}
|
|
|
|
// If the raw unencrypted path ends in a `/` we need to remove the final
|
|
// section of the encrypted path. For example, if we are listing the path
|
|
// `/bob/`, the encrypted path results in `enc("")/enc("bob")/enc("")`. This
|
|
// is an incorrect list prefix, what we really want is `enc("")/enc("bob")`
|
|
if strings.HasSuffix(prefix.UnencryptedPath().Raw(), "/") {
|
|
lastSlashIdx := strings.LastIndex(encPrefix.Raw(), "/")
|
|
encPrefix = paths.NewEncrypted(encPrefix.Raw()[:lastSlashIdx])
|
|
}
|
|
|
|
// We have to encrypt startAfter and endBefore but only if they don't contain a bucket.
|
|
// They contain a bucket if and only if the prefix has no bucket. This is why they are raw
|
|
// strings instead of a typed string: it's either a bucket or an unencrypted path component
|
|
// and that isn't known at compile time.
|
|
needsEncryption := prefix.Bucket() != ""
|
|
if needsEncryption {
|
|
startAfter, err = encryption.EncryptPathRaw(startAfter, pathCipher, prefixKey)
|
|
if err != nil {
|
|
return nil, false, err
|
|
}
|
|
}
|
|
|
|
objects, more, err := s.metainfo.ListObjects(ctx, metainfo.ListObjectsParams{
|
|
Bucket: []byte(prefix.Bucket()),
|
|
EncryptedPrefix: []byte(encPrefix.Raw()),
|
|
EncryptedCursor: []byte(startAfter),
|
|
Limit: int32(limit),
|
|
Recursive: recursive,
|
|
})
|
|
if err != nil {
|
|
return nil, false, err
|
|
}
|
|
|
|
items = make([]ListItem, len(objects))
|
|
for i, item := range objects {
|
|
var path Path
|
|
var itemPath string
|
|
|
|
if needsEncryption {
|
|
itemPath, err = encryption.DecryptPathRaw(string(item.EncryptedPath), pathCipher, prefixKey)
|
|
if err != nil {
|
|
return nil, false, err
|
|
}
|
|
|
|
// TODO(jeff): this shouldn't be necessary if we handled trailing slashes
|
|
// appropriately. there's some issues with list.
|
|
fullPath := prefix.UnencryptedPath().Raw()
|
|
if len(fullPath) > 0 && fullPath[len(fullPath)-1] != '/' {
|
|
fullPath += "/"
|
|
}
|
|
fullPath += itemPath
|
|
|
|
path = CreatePath(prefix.Bucket(), paths.NewUnencrypted(fullPath))
|
|
} else {
|
|
itemPath = string(item.EncryptedPath)
|
|
path = CreatePath(string(item.EncryptedPath), paths.Unencrypted{})
|
|
}
|
|
|
|
streamInfo, streamMeta, err := TypedDecryptStreamInfo(ctx, item.EncryptedMetadata, path, s.encStore)
|
|
if err != nil {
|
|
return nil, false, err
|
|
}
|
|
|
|
var stream pb.StreamInfo
|
|
if err := proto.Unmarshal(streamInfo, &stream); err != nil {
|
|
return nil, false, err
|
|
}
|
|
|
|
newMeta := convertMeta(item.CreatedAt, item.ExpiresAt, stream, streamMeta)
|
|
items[i] = ListItem{
|
|
Path: itemPath,
|
|
Meta: newMeta,
|
|
IsPrefix: item.IsPrefix,
|
|
}
|
|
}
|
|
|
|
return items, more, nil
|
|
}
|
|
|
|
type lazySegmentRanger struct {
|
|
ranger ranger.Ranger
|
|
segments segments.Store
|
|
streamID storj.StreamID
|
|
segmentIndex int32
|
|
rs storj.RedundancyScheme
|
|
m *pb.SegmentMeta
|
|
size int64
|
|
derivedKey *storj.Key
|
|
startingNonce *storj.Nonce
|
|
encBlockSize int
|
|
cipher storj.CipherSuite
|
|
}
|
|
|
|
// Size implements Ranger.Size
|
|
func (lr *lazySegmentRanger) Size() int64 {
|
|
return lr.size
|
|
}
|
|
|
|
// Range implements Ranger.Range to be lazily connected
|
|
func (lr *lazySegmentRanger) Range(ctx context.Context, offset, length int64) (_ io.ReadCloser, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
if lr.ranger == nil {
|
|
rr, encryption, err := lr.segments.Get(ctx, lr.streamID, lr.segmentIndex, lr.rs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
encryptedKey, keyNonce := encryption.EncryptedKey, encryption.EncryptedKeyNonce
|
|
lr.ranger, err = decryptRanger(ctx, rr, lr.size, lr.cipher, lr.derivedKey, encryptedKey, &keyNonce, lr.startingNonce, lr.encBlockSize)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return lr.ranger.Range(ctx, offset, length)
|
|
}
|
|
|
|
// decryptRanger returns a decrypted ranger of the given rr ranger
|
|
func decryptRanger(ctx context.Context, rr ranger.Ranger, decryptedSize int64, cipher storj.CipherSuite, derivedKey *storj.Key, encryptedKey storj.EncryptedPrivateKey, encryptedKeyNonce, startingNonce *storj.Nonce, encBlockSize int) (decrypted ranger.Ranger, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
contentKey, err := encryption.DecryptKey(encryptedKey, cipher, derivedKey, encryptedKeyNonce)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
decrypter, err := encryption.NewDecrypter(cipher, contentKey, startingNonce, encBlockSize)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var rd ranger.Ranger
|
|
if rr.Size()%int64(decrypter.InBlockSize()) != 0 {
|
|
reader, err := rr.Range(ctx, 0, rr.Size())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer func() { err = errs.Combine(err, reader.Close()) }()
|
|
cipherData, err := ioutil.ReadAll(reader)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
data, err := encryption.Decrypt(cipherData, cipher, contentKey, startingNonce)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return ranger.ByteRanger(data), nil
|
|
}
|
|
|
|
rd, err = encryption.Transform(rr, decrypter)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return eestream.Unpad(rd, int(rd.Size()-decryptedSize))
|
|
}
|
|
|
|
// CancelHandler handles clean up of segments on receiving CTRL+C
|
|
func (s *streamStore) cancelHandler(ctx context.Context, streamID storj.StreamID, totalSegments int64, path Path, pathCipher storj.CipherSuite) {
|
|
defer mon.Task()(&ctx)(nil)
|
|
|
|
for i := int64(0); i < totalSegments; i++ {
|
|
err := s.segments.Delete(ctx, streamID, int32(i))
|
|
if err != nil {
|
|
zap.L().Warn("Failed deleting segment", zap.String("path", path.String()), zap.Int64("segmentIndex", i), zap.Error(err))
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
func getEncryptedKeyAndNonce(m *pb.SegmentMeta) (storj.EncryptedPrivateKey, *storj.Nonce) {
|
|
if m == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
var nonce storj.Nonce
|
|
copy(nonce[:], m.KeyNonce)
|
|
|
|
return m.EncryptedKey, &nonce
|
|
}
|
|
|
|
// TypedDecryptStreamInfo decrypts stream info
|
|
func TypedDecryptStreamInfo(ctx context.Context, streamMetaBytes []byte, path Path, encStore *encryption.Store) (
|
|
streamInfo []byte, streamMeta pb.StreamMeta, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
err = proto.Unmarshal(streamMetaBytes, &streamMeta)
|
|
if err != nil {
|
|
return nil, pb.StreamMeta{}, err
|
|
}
|
|
|
|
derivedKey, err := encryption.DeriveContentKey(path.Bucket(), path.UnencryptedPath(), encStore)
|
|
if err != nil {
|
|
return nil, pb.StreamMeta{}, err
|
|
}
|
|
|
|
cipher := storj.CipherSuite(streamMeta.EncryptionType)
|
|
encryptedKey, keyNonce := getEncryptedKeyAndNonce(streamMeta.LastSegmentMeta)
|
|
contentKey, err := encryption.DecryptKey(encryptedKey, cipher, derivedKey, keyNonce)
|
|
if err != nil {
|
|
return nil, pb.StreamMeta{}, err
|
|
}
|
|
|
|
// decrypt metadata with the content encryption key and zero nonce
|
|
streamInfo, err = encryption.Decrypt(streamMeta.EncryptedStreamInfo, cipher, contentKey, &storj.Nonce{})
|
|
return streamInfo, streamMeta, err
|
|
}
|