Create and use an encryption.Store (#2293)
* add path implementation This commit adds a pkg/paths package which contains two types, Encrypted and Unencrypted, to statically enforce what is contained in a path. It's part of a refactoring of the code base to be more clear about what is contained in a storj.Path at all the layers. Change-Id: Ifc4d4932da26a97ea99749b8356b4543496a8864 * add encryption store This change adds an encryption.Store type to keep a collection of root keys for arbitrary locations in some buckets. It allows one to look up all of the necessary information to encrypt paths, decrypt paths and decrypt list operations. It adds some exported functions to perform encryption on paths using a Store. Change-Id: I1a3d230c521d65f0ede727f93e1cb389f8be9497 * add shim around streams store This commit changes no functionality, but just reorganizes the code so that changes can be made directly to the streams store implementation without affecting callers. It also adds a Path type that will be used at the interface boundary for the streams store so that it can be sure that it's getting well formed paths that it expects. Change-Id: I50bd682995b185beb653b00562fab62ef11f1ab5 * refactor streams to use encryption store This commit changes the streams store to use the path type as well as the encryption store to handle all of it's encryption and decryption. Some changes were made to how the default key is returned in the encryption store to have it include the case when the bucket exists but no paths matched. The path iterator could also be simplified to not report if a consume was valid: that information is no longer necessary. The kvmetainfo tests were changed to appropriately pass the subtests *testing.T rather than having the closure it executes use the parent one. The test framework now correctly reports which test did the failing. There are still some latent issues with listing in that listing for "a/" and listing for "a" are not the same operation, but we treat them as such. I suspect that there are also issues with paths like "/" or "//foo", but that's for another time. Change-Id: I81cad4ba2850c3d14ba7e632777c4cac93db9472 * use an encryption store at the upper layers Change-Id: Id9b4dd5f27b3ecac863de586e9ae076f4f927f6f * fix linting failures Change-Id: Ifb8378879ad308d4d047a0483850156371a41280 * fix linting in encryption test Change-Id: Ia35647dfe18b0f20fe13763b28e53294f75c38fa * get rid of kvmetainfo rootKey Change-Id: Id795ca03d9417e3fe9634365a121430eb678d6d5 * Fix linting failure for return with else Change-Id: I0b9ffd92be42ffcd8fef7ea735c5fc114a55d3b5 * fix some bugs adding enc store to kvmetainfo Change-Id: I8e765970ba817289c65ec62971ae3bfa2c53a1ba * respond to review feedback Change-Id: I43e2ce29ce2fb6677b1cd6b9469838d80ec92c86
This commit is contained in:
parent
eb1b1c434d
commit
30f790a040
@ -182,7 +182,11 @@ func (p *Project) OpenBucket(ctx context.Context, bucketName string, access *Enc
|
||||
}
|
||||
segmentStore := segments.NewSegmentStore(p.metainfo, ec, rs, p.maxInlineSize.Int(), maxEncryptedSegmentSize)
|
||||
|
||||
streamStore, err := streams.NewStreamStore(segmentStore, cfg.Volatile.SegmentsSize.Int64(), &access.Key, int(encryptionScheme.BlockSize), encryptionScheme.Cipher, p.maxInlineSize.Int())
|
||||
// TODO(jeff): this is where we would load scope information in.
|
||||
encStore := encryption.NewStore()
|
||||
encStore.SetDefaultKey(&access.Key)
|
||||
|
||||
streamStore, err := streams.NewStreamStore(segmentStore, cfg.Volatile.SegmentsSize.Int64(), encStore, int(encryptionScheme.BlockSize), encryptionScheme.Cipher, p.maxInlineSize.Int())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -192,7 +196,7 @@ func (p *Project) OpenBucket(ctx context.Context, bucketName string, access *Enc
|
||||
Name: bucketInfo.Name,
|
||||
Created: bucketInfo.Created,
|
||||
bucket: bucketInfo,
|
||||
metainfo: kvmetainfo.New(p.project, p.metainfo, streamStore, segmentStore, &access.Key),
|
||||
metainfo: kvmetainfo.New(p.project, p.metainfo, streamStore, segmentStore, encStore),
|
||||
streams: streamStore,
|
||||
}, nil
|
||||
}
|
||||
|
300
pkg/encryption/path_new.go
Normal file
300
pkg/encryption/path_new.go
Normal file
@ -0,0 +1,300 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package encryption
|
||||
|
||||
import (
|
||||
"crypto/hmac"
|
||||
"crypto/sha512"
|
||||
"encoding/base64"
|
||||
"strings"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/pkg/paths"
|
||||
"storj.io/storj/pkg/storj"
|
||||
)
|
||||
|
||||
// StoreEncryptPath encrypts the path using the provided cipher and looking up
|
||||
// keys from the provided store and bucket.
|
||||
func StoreEncryptPath(bucket string, path paths.Unencrypted, cipher storj.Cipher, store *Store) (
|
||||
encPath paths.Encrypted, err error) {
|
||||
|
||||
// Invalid paths map to invalid paths
|
||||
if !path.Valid() {
|
||||
return paths.Encrypted{}, nil
|
||||
}
|
||||
|
||||
if cipher == storj.Unencrypted {
|
||||
return paths.NewEncrypted(path.Raw()), nil
|
||||
}
|
||||
|
||||
_, consumed, base := store.LookupUnencrypted(bucket, path)
|
||||
if base == nil {
|
||||
return paths.Encrypted{}, errs.New("unable to find encryption base for: %s/%q", bucket, path)
|
||||
}
|
||||
|
||||
remaining, ok := path.Consume(consumed)
|
||||
if !ok {
|
||||
return paths.Encrypted{}, errs.New("unable to encrypt bucket path: %s/%q", bucket, path)
|
||||
}
|
||||
|
||||
// if we didn't consume any path, we're at the root of the bucket, and so we have
|
||||
// to fold the bucket name into the key.
|
||||
key := &base.Key
|
||||
if !consumed.Valid() {
|
||||
key, err = derivePathKeyComponent(key, bucket)
|
||||
if err != nil {
|
||||
return paths.Encrypted{}, errs.Wrap(err)
|
||||
}
|
||||
}
|
||||
|
||||
encrypted, err := EncryptPathRaw(remaining.Raw(), cipher, key)
|
||||
if err != nil {
|
||||
return paths.Encrypted{}, errs.Wrap(err)
|
||||
}
|
||||
|
||||
var builder strings.Builder
|
||||
builder.WriteString(base.Encrypted.Raw())
|
||||
|
||||
if len(encrypted) > 0 {
|
||||
if builder.Len() > 0 {
|
||||
builder.WriteByte('/')
|
||||
}
|
||||
builder.WriteString(encrypted)
|
||||
}
|
||||
|
||||
return paths.NewEncrypted(builder.String()), nil
|
||||
}
|
||||
|
||||
// EncryptPathRaw encrypts the path using the provided key directly. EncryptPath should be
|
||||
// preferred if possible.
|
||||
func EncryptPathRaw(raw string, cipher storj.Cipher, key *storj.Key) (string, error) {
|
||||
if cipher == storj.Unencrypted {
|
||||
return raw, nil
|
||||
}
|
||||
|
||||
var builder strings.Builder
|
||||
for iter, i := paths.NewIterator(raw), 0; !iter.Done(); i++ {
|
||||
component := iter.Next()
|
||||
encComponent, err := storeEncryptPathComponent(component, cipher, key)
|
||||
if err != nil {
|
||||
return "", errs.Wrap(err)
|
||||
}
|
||||
key, err = derivePathKeyComponent(key, component)
|
||||
if err != nil {
|
||||
return "", errs.Wrap(err)
|
||||
}
|
||||
if i > 0 {
|
||||
builder.WriteByte('/')
|
||||
}
|
||||
builder.WriteString(encComponent)
|
||||
}
|
||||
return builder.String(), nil
|
||||
}
|
||||
|
||||
// StoreDecryptPath decrypts the path using the provided cipher and looking up
|
||||
// keys from the provided store and bucket.
|
||||
func StoreDecryptPath(bucket string, path paths.Encrypted, cipher storj.Cipher, store *Store) (
|
||||
unencPath paths.Unencrypted, err error) {
|
||||
|
||||
// Invalid paths map to invalid paths
|
||||
if !path.Valid() {
|
||||
return paths.Unencrypted{}, nil
|
||||
}
|
||||
|
||||
if cipher == storj.Unencrypted {
|
||||
return paths.NewUnencrypted(path.Raw()), nil
|
||||
}
|
||||
|
||||
_, consumed, base := store.LookupEncrypted(bucket, path)
|
||||
if base == nil {
|
||||
return paths.Unencrypted{}, errs.New("unable to find decryption base for: %q", path)
|
||||
}
|
||||
|
||||
remaining, ok := path.Consume(consumed)
|
||||
if !ok {
|
||||
return paths.Unencrypted{}, errs.New("unable to decrypt bucket path: %q", path)
|
||||
}
|
||||
|
||||
// if we didn't consume any path, we're at the root of the bucket, and so we have
|
||||
// to fold the bucket name into the key.
|
||||
key := &base.Key
|
||||
if !consumed.Valid() {
|
||||
key, err = derivePathKeyComponent(key, bucket)
|
||||
if err != nil {
|
||||
return paths.Unencrypted{}, errs.Wrap(err)
|
||||
}
|
||||
}
|
||||
|
||||
decrypted, err := DecryptPathRaw(remaining.Raw(), cipher, key)
|
||||
if err != nil {
|
||||
return paths.Unencrypted{}, errs.Wrap(err)
|
||||
}
|
||||
|
||||
var builder strings.Builder
|
||||
builder.WriteString(base.Unencrypted.Raw())
|
||||
|
||||
if len(decrypted) > 0 {
|
||||
if builder.Len() > 0 {
|
||||
builder.WriteByte('/')
|
||||
}
|
||||
builder.WriteString(decrypted)
|
||||
}
|
||||
|
||||
return paths.NewUnencrypted(builder.String()), nil
|
||||
}
|
||||
|
||||
// DecryptPathRaw decrypts the path using the provided key directly. DecryptPath should be
|
||||
// preferred if possible.
|
||||
func DecryptPathRaw(raw string, cipher storj.Cipher, key *storj.Key) (string, error) {
|
||||
if cipher == storj.Unencrypted {
|
||||
return raw, nil
|
||||
}
|
||||
|
||||
var builder strings.Builder
|
||||
for iter, i := paths.NewIterator(raw), 0; !iter.Done(); i++ {
|
||||
component := iter.Next()
|
||||
unencComponent, err := storeDecryptPathComponent(component, cipher, key)
|
||||
if err != nil {
|
||||
return "", errs.Wrap(err)
|
||||
}
|
||||
key, err = derivePathKeyComponent(key, unencComponent)
|
||||
if err != nil {
|
||||
return "", errs.Wrap(err)
|
||||
}
|
||||
if i > 0 {
|
||||
builder.WriteByte('/')
|
||||
}
|
||||
builder.WriteString(unencComponent)
|
||||
}
|
||||
return builder.String(), nil
|
||||
}
|
||||
|
||||
// StoreDeriveContentKey returns the content key for the passed in path by looking up
|
||||
// the appropriate base key from the store and bucket and deriving the rest.
|
||||
func StoreDeriveContentKey(bucket string, path paths.Unencrypted, store *Store) (key *storj.Key, err error) {
|
||||
key, err = StoreDerivePathKey(bucket, path, store)
|
||||
if err != nil {
|
||||
return nil, errs.Wrap(err)
|
||||
}
|
||||
key, err = DeriveKey(key, "content")
|
||||
return key, errs.Wrap(err)
|
||||
}
|
||||
|
||||
// StoreDerivePathKey returns the path key for the passed in path by looking up the
|
||||
// appropriate base key from the store and bucket and deriving the rest.
|
||||
func StoreDerivePathKey(bucket string, path paths.Unencrypted, store *Store) (key *storj.Key, err error) {
|
||||
_, consumed, base := store.LookupUnencrypted(bucket, path)
|
||||
if base == nil {
|
||||
return nil, errs.New("unable to find encryption base for: %s/%q", bucket, path)
|
||||
}
|
||||
|
||||
// If asking for the key at the bucket, do that and return.
|
||||
if !path.Valid() {
|
||||
key, err = derivePathKeyComponent(&base.Key, bucket)
|
||||
if err != nil {
|
||||
return nil, errs.Wrap(err)
|
||||
}
|
||||
return key, nil
|
||||
}
|
||||
|
||||
remaining, ok := path.Consume(consumed)
|
||||
if !ok {
|
||||
return nil, errs.New("unable to derive path key for: %s/%q", bucket, path)
|
||||
}
|
||||
|
||||
// if we didn't consume any path, we're at the root of the bucket, and so we have
|
||||
// to fold the bucket name into the key.
|
||||
key = &base.Key
|
||||
if !consumed.Valid() {
|
||||
key, err = derivePathKeyComponent(key, bucket)
|
||||
if err != nil {
|
||||
return nil, errs.Wrap(err)
|
||||
}
|
||||
}
|
||||
|
||||
for iter := remaining.Iterator(); !iter.Done(); {
|
||||
key, err = derivePathKeyComponent(key, iter.Next())
|
||||
if err != nil {
|
||||
return nil, errs.Wrap(err)
|
||||
}
|
||||
}
|
||||
return key, nil
|
||||
}
|
||||
|
||||
// derivePathKeyComponent derives a new key from the provided one using the component. It
|
||||
// should be preferred over DeriveKey when adding path components as it performs the
|
||||
// necessary transformation to the component.
|
||||
func derivePathKeyComponent(key *storj.Key, component string) (*storj.Key, error) {
|
||||
return DeriveKey(key, "path:"+component)
|
||||
}
|
||||
|
||||
// storeEncryptPathComponent encrypts a single path component with the provided cipher and key.
|
||||
func storeEncryptPathComponent(comp string, cipher storj.Cipher, key *storj.Key) (string, error) {
|
||||
if comp == "" {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
// derive the key for the next path component. this is so that
|
||||
// every encrypted component has a unique nonce.
|
||||
derivedKey, err := derivePathKeyComponent(key, comp)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// use the derived key to derive the nonce
|
||||
mac := hmac.New(sha512.New, derivedKey[:])
|
||||
_, err = mac.Write([]byte("nonce"))
|
||||
if err != nil {
|
||||
return "", Error.Wrap(err)
|
||||
}
|
||||
|
||||
nonce := new(storj.Nonce)
|
||||
copy(nonce[:], mac.Sum(nil))
|
||||
|
||||
// encrypt the path components with the parent's key and the derived nonce
|
||||
cipherText, err := Encrypt([]byte(comp), cipher, key, nonce)
|
||||
if err != nil {
|
||||
return "", Error.Wrap(err)
|
||||
}
|
||||
|
||||
nonceSize := storj.NonceSize
|
||||
if cipher == storj.AESGCM {
|
||||
nonceSize = AESGCMNonceSize
|
||||
}
|
||||
|
||||
// keep the nonce together with the cipher text
|
||||
return base64.RawURLEncoding.EncodeToString(append(nonce[:nonceSize], cipherText...)), nil
|
||||
}
|
||||
|
||||
// storeDecryptPathComponent decrypts a single path component with the provided cipher and key.
|
||||
func storeDecryptPathComponent(comp string, cipher storj.Cipher, key *storj.Key) (string, error) {
|
||||
if comp == "" {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
data, err := base64.RawURLEncoding.DecodeString(comp)
|
||||
if err != nil {
|
||||
return "", Error.Wrap(err)
|
||||
}
|
||||
|
||||
nonceSize := storj.NonceSize
|
||||
if cipher == storj.AESGCM {
|
||||
nonceSize = AESGCMNonceSize
|
||||
}
|
||||
if len(data) < nonceSize || nonceSize < 0 {
|
||||
return "", errs.New("component did not contain enough nonce bytes")
|
||||
}
|
||||
|
||||
// extract the nonce from the cipher text
|
||||
nonce := new(storj.Nonce)
|
||||
copy(nonce[:], data[:nonceSize])
|
||||
|
||||
decrypted, err := Decrypt(data[nonceSize:], cipher, key, nonce)
|
||||
if err != nil {
|
||||
return "", Error.Wrap(err)
|
||||
}
|
||||
|
||||
return string(decrypted), nil
|
||||
}
|
56
pkg/encryption/path_new_test.go
Normal file
56
pkg/encryption/path_new_test.go
Normal file
@ -0,0 +1,56 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package encryption
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"storj.io/storj/pkg/paths"
|
||||
"storj.io/storj/pkg/storj"
|
||||
)
|
||||
|
||||
func newStore(key storj.Key) *Store {
|
||||
store := NewStore()
|
||||
if err := store.Add("bucket", paths.Unencrypted{}, paths.Encrypted{}, key); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return store
|
||||
}
|
||||
|
||||
func TestStoreEncryption(t *testing.T) {
|
||||
forAllCiphers(func(cipher storj.Cipher) {
|
||||
for i, rawPath := range []string{
|
||||
"",
|
||||
"/",
|
||||
"//",
|
||||
"file.txt",
|
||||
"file.txt/",
|
||||
"fold1/file.txt",
|
||||
"fold1/fold2/file.txt",
|
||||
"/fold1/fold2/fold3/file.txt",
|
||||
} {
|
||||
errTag := fmt.Sprintf("test:%d path:%q cipher:%v", i, rawPath, cipher)
|
||||
|
||||
var key storj.Key
|
||||
copy(key[:], randData(storj.KeySize))
|
||||
store := newStore(key)
|
||||
path := paths.NewUnencrypted(rawPath)
|
||||
|
||||
encPath, err := StoreEncryptPath("bucket", path, cipher, store)
|
||||
if !assert.NoError(t, err, errTag) {
|
||||
continue
|
||||
}
|
||||
|
||||
decPath, err := StoreDecryptPath("bucket", encPath, cipher, store)
|
||||
if !assert.NoError(t, err, errTag) {
|
||||
continue
|
||||
}
|
||||
|
||||
assert.Equal(t, rawPath, decPath.Raw(), errTag)
|
||||
}
|
||||
})
|
||||
}
|
215
pkg/encryption/store.go
Normal file
215
pkg/encryption/store.go
Normal file
@ -0,0 +1,215 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package encryption
|
||||
|
||||
import (
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/pkg/paths"
|
||||
"storj.io/storj/pkg/storj"
|
||||
)
|
||||
|
||||
// The Store allows one to find the matching most encrypted key and path for
|
||||
// some unencrypted path. It also reports a mapping of encrypted to unencrypted paths
|
||||
// at the searched for unencrypted path.
|
||||
//
|
||||
// For example, if the Store contains the mappings
|
||||
//
|
||||
// b1, u1/u2/u3 => <e1/e2/e3, k3>
|
||||
// b1, u1/u2/u3/u4 => <e1/e2/e3/e4, k4>
|
||||
// b1, u1/u5 => <e1/e5, k5>
|
||||
// b1, u6 => <e6, k6>
|
||||
// b1, u6/u7/u8 => <e6/e7/e8, k8>
|
||||
// b2, u1 => <e1', k1'>
|
||||
//
|
||||
// Then the following lookups have outputs
|
||||
//
|
||||
// b1, u1 => <{e2:u2, e5:u5}, u1, nil>
|
||||
// b1, u1/u2/u3 => <{e4:u4}, u1/u2/u3, <u1/u2/u3, e1/e2/e3, k3>>
|
||||
// b1, u1/u2/u3/u6 => <{}, u1/u2/u3/, <u1/u2/u3, e1/e2/e3, k3>>
|
||||
// b1, u1/u2/u3/u4 => <{}, u1/u2/u3/u4, <u1/u2/u3/u4, e1/e2/e3/e4, k4>>
|
||||
// b1, u6/u7 => <{e8:u8}, u6/, <u6, e6, k6>>
|
||||
// b2, u1 => <{}, u1, <u1, e1', k1'>>
|
||||
type Store struct {
|
||||
roots map[string]*node
|
||||
defaultKey *storj.Key
|
||||
}
|
||||
|
||||
// node is a node in the Store graph. It may contain an encryption key and encrypted path,
|
||||
// a list of children nodes, and data to ensure a bijection between encrypted and unencrypted
|
||||
// path entries.
|
||||
type node struct {
|
||||
unenc map[string]*node // unenc => node
|
||||
unencMap map[string]string // unenc => enc
|
||||
enc map[string]*node // enc => node
|
||||
encMap map[string]string // enc => unenc
|
||||
base *Base
|
||||
}
|
||||
|
||||
// Base represents a key with which to derive further keys at some encrypted/unencrypted path.
|
||||
type Base struct {
|
||||
Unencrypted paths.Unencrypted
|
||||
Encrypted paths.Encrypted
|
||||
Key storj.Key
|
||||
}
|
||||
|
||||
// clone returns a copy of the Base. The implementation can be simple because the
|
||||
// types of its fields do not contain any references.
|
||||
func (b *Base) clone() *Base {
|
||||
if b == nil {
|
||||
return nil
|
||||
}
|
||||
bc := *b
|
||||
return &bc
|
||||
}
|
||||
|
||||
// NewStore constructs a Store.
|
||||
func NewStore() *Store {
|
||||
return &Store{roots: make(map[string]*node)}
|
||||
}
|
||||
|
||||
// newNode constructs a node.
|
||||
func newNode() *node {
|
||||
return &node{
|
||||
unenc: make(map[string]*node),
|
||||
unencMap: make(map[string]string),
|
||||
enc: make(map[string]*node),
|
||||
encMap: make(map[string]string),
|
||||
}
|
||||
}
|
||||
|
||||
// SetDefaultKey adds a default key to be returned for any lookup that does not match a bucket.
|
||||
func (s *Store) SetDefaultKey(defaultKey *storj.Key) {
|
||||
s.defaultKey = defaultKey
|
||||
}
|
||||
|
||||
// Add creates a mapping from the unencrypted path to the encrypted path and key.
|
||||
func (s *Store) Add(bucket string, unenc paths.Unencrypted, enc paths.Encrypted, key storj.Key) error {
|
||||
root, ok := s.roots[bucket]
|
||||
if !ok {
|
||||
root = newNode()
|
||||
}
|
||||
|
||||
// Perform the addition starting at the root node.
|
||||
if err := root.add(unenc.Iterator(), enc.Iterator(), &Base{
|
||||
Unencrypted: unenc,
|
||||
Encrypted: enc,
|
||||
Key: key,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// only update the root for the bucket if the add was successful.
|
||||
s.roots[bucket] = root
|
||||
return nil
|
||||
}
|
||||
|
||||
// add places the paths and base into the node tree structure.
|
||||
func (n *node) add(unenc, enc paths.Iterator, base *Base) error {
|
||||
if unenc.Done() != enc.Done() {
|
||||
return errs.New("encrypted and unencrypted paths had different number of components")
|
||||
}
|
||||
|
||||
// If we're done walking the paths, this node must have the provided base.
|
||||
if unenc.Done() {
|
||||
n.base = base
|
||||
return nil
|
||||
}
|
||||
|
||||
// Walk to the next parts and ensure they're consistent with previous additions.
|
||||
unencPart, encPart := unenc.Next(), enc.Next()
|
||||
if exUnencPart, ok := n.encMap[encPart]; ok && exUnencPart != unencPart {
|
||||
return errs.New("conflicting encrypted parts for unencrypted path")
|
||||
}
|
||||
if exEncPart, ok := n.unencMap[unencPart]; ok && exEncPart != encPart {
|
||||
return errs.New("conflicting encrypted parts for unencrypted path")
|
||||
}
|
||||
|
||||
// Look up the child node. Since we're sure the unenc and enc mappings are
|
||||
// consistent, we can look it up in one map and unconditionally insert it
|
||||
// into both maps if necessary.
|
||||
child, ok := n.unenc[unencPart]
|
||||
if !ok {
|
||||
child = newNode()
|
||||
}
|
||||
|
||||
// Recurse to the next node in the tree.
|
||||
if err := child.add(unenc, enc, base); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Only add to the maps if the child add was successful.
|
||||
n.unencMap[unencPart] = encPart
|
||||
n.encMap[encPart] = unencPart
|
||||
n.unenc[unencPart] = child
|
||||
n.enc[encPart] = child
|
||||
return nil
|
||||
}
|
||||
|
||||
// LookupUnencrypted finds the matching most unencrypted path added to the Store, reports how
|
||||
// much of the path matched, any known unencrypted paths at the requested path, and if a key
|
||||
// and encrypted path exists for some prefix of the unencrypted path.
|
||||
func (s *Store) LookupUnencrypted(bucket string, path paths.Unencrypted) (
|
||||
revealed map[string]string, consumed paths.Unencrypted, base *Base) {
|
||||
|
||||
root, ok := s.roots[bucket]
|
||||
if ok {
|
||||
var rawConsumed string
|
||||
revealed, rawConsumed, base = root.lookup(path.Iterator(), "", nil, true)
|
||||
consumed = paths.NewUnencrypted(rawConsumed)
|
||||
}
|
||||
if base == nil && s.defaultKey != nil {
|
||||
return nil, paths.Unencrypted{}, &Base{Key: *s.defaultKey}
|
||||
}
|
||||
return revealed, consumed, base.clone()
|
||||
}
|
||||
|
||||
// LookupEncrypted finds the matching most encrypted path added to the Store, reports how
|
||||
// much of the path matched, any known encrypted paths at the requested path, and if a key
|
||||
// an encrypted path exists for some prefix of the encrypted path.
|
||||
func (s *Store) LookupEncrypted(bucket string, path paths.Encrypted) (
|
||||
revealed map[string]string, consumed paths.Encrypted, base *Base) {
|
||||
|
||||
root, ok := s.roots[bucket]
|
||||
if ok {
|
||||
var rawConsumed string
|
||||
revealed, rawConsumed, base = root.lookup(path.Iterator(), "", nil, false)
|
||||
consumed = paths.NewEncrypted(rawConsumed)
|
||||
}
|
||||
if base == nil && s.defaultKey != nil {
|
||||
return nil, paths.Encrypted{}, &Base{Key: *s.defaultKey}
|
||||
}
|
||||
return revealed, consumed, base.clone()
|
||||
}
|
||||
|
||||
// lookup searches for the path in the node tree structure.
|
||||
func (n *node) lookup(path paths.Iterator, bestConsumed string, bestBase *Base, unenc bool) (
|
||||
map[string]string, string, *Base) {
|
||||
|
||||
// Keep track of the best match so far.
|
||||
if n.base != nil || bestBase == nil {
|
||||
bestBase, bestConsumed = n.base, path.Consumed()
|
||||
}
|
||||
|
||||
// Pick the tree we're walking down based on the unenc bool.
|
||||
revealed, children := n.unencMap, n.enc
|
||||
if unenc {
|
||||
revealed, children = n.encMap, n.unenc
|
||||
}
|
||||
|
||||
// If we're done walking the path, then return our best match along with the
|
||||
// revealed paths at this node.
|
||||
if path.Done() {
|
||||
return revealed, bestConsumed, bestBase
|
||||
}
|
||||
|
||||
// Walk to the next node in the tree. If there is no node, then report our best match.
|
||||
child, ok := children[path.Next()]
|
||||
if !ok {
|
||||
return nil, bestConsumed, bestBase
|
||||
}
|
||||
|
||||
// Recurse to the next node in the tree.
|
||||
return child.lookup(path, bestConsumed, bestBase, unenc)
|
||||
}
|
128
pkg/encryption/store_test.go
Normal file
128
pkg/encryption/store_test.go
Normal file
@ -0,0 +1,128 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package encryption
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"storj.io/storj/pkg/paths"
|
||||
"storj.io/storj/pkg/storj"
|
||||
)
|
||||
|
||||
func printLookup(revealed map[string]string, consumed interface{ Raw() string }, base *Base) {
|
||||
if base == nil {
|
||||
fmt.Printf("<%q, %q, nil>\n", revealed, consumed.Raw())
|
||||
} else {
|
||||
fmt.Printf("<%q, %q, <%q, %q, %q>>\n",
|
||||
revealed, consumed, base.Unencrypted, base.Encrypted, base.Key[:2])
|
||||
}
|
||||
}
|
||||
|
||||
func toKey(val string) (out storj.Key) {
|
||||
copy(out[:], val)
|
||||
return out
|
||||
}
|
||||
|
||||
func abortIfError(err error) {
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("%+v", err))
|
||||
}
|
||||
}
|
||||
|
||||
func ExampleStore() {
|
||||
s := NewStore()
|
||||
ep := paths.NewEncrypted
|
||||
up := paths.NewUnencrypted
|
||||
|
||||
// Add a fairly complicated tree to the store.
|
||||
abortIfError(s.Add("b1", up("u1/u2/u3"), ep("e1/e2/e3"), toKey("k3")))
|
||||
abortIfError(s.Add("b1", up("u1/u2/u3/u4"), ep("e1/e2/e3/e4"), toKey("k4")))
|
||||
abortIfError(s.Add("b1", up("u1/u5"), ep("e1/e5"), toKey("k5")))
|
||||
abortIfError(s.Add("b1", up("u6"), ep("e6"), toKey("k6")))
|
||||
abortIfError(s.Add("b1", up("u6/u7/u8"), ep("e6/e7/e8"), toKey("k8")))
|
||||
abortIfError(s.Add("b2", up("u1"), ep("e1'"), toKey("k1")))
|
||||
abortIfError(s.Add("b3", paths.Unencrypted{}, paths.Encrypted{}, toKey("m1")))
|
||||
|
||||
// Look up some complicated queries by the unencrypted path.
|
||||
printLookup(s.LookupUnencrypted("b1", up("u1")))
|
||||
printLookup(s.LookupUnencrypted("b1", up("u1/u2/u3")))
|
||||
printLookup(s.LookupUnencrypted("b1", up("u1/u2/u3/u6")))
|
||||
printLookup(s.LookupUnencrypted("b1", up("u1/u2/u3/u4")))
|
||||
printLookup(s.LookupUnencrypted("b1", up("u6/u7")))
|
||||
printLookup(s.LookupUnencrypted("b2", up("u1")))
|
||||
printLookup(s.LookupUnencrypted("b3", paths.Unencrypted{}))
|
||||
printLookup(s.LookupUnencrypted("b3", up("z1")))
|
||||
|
||||
fmt.Println()
|
||||
|
||||
// Look up some complicated queries by the encrypted path.
|
||||
printLookup(s.LookupEncrypted("b1", ep("e1")))
|
||||
printLookup(s.LookupEncrypted("b1", ep("e1/e2/e3")))
|
||||
printLookup(s.LookupEncrypted("b1", ep("e1/e2/e3/e6")))
|
||||
printLookup(s.LookupEncrypted("b1", ep("e1/e2/e3/e4")))
|
||||
printLookup(s.LookupEncrypted("b1", ep("e6/e7")))
|
||||
printLookup(s.LookupEncrypted("b2", ep("e1'")))
|
||||
printLookup(s.LookupEncrypted("b3", paths.Encrypted{}))
|
||||
printLookup(s.LookupEncrypted("b3", ep("z1")))
|
||||
|
||||
// output:
|
||||
//
|
||||
// <map["e2":"u2" "e5":"u5"], "u1", nil>
|
||||
// <map["e4":"u4"], "u1/u2/u3", <"u1/u2/u3", "e1/e2/e3", "k3">>
|
||||
// <map[], "u1/u2/u3/", <"u1/u2/u3", "e1/e2/e3", "k3">>
|
||||
// <map[], "u1/u2/u3/u4", <"u1/u2/u3/u4", "e1/e2/e3/e4", "k4">>
|
||||
// <map["e8":"u8"], "u6/", <"u6", "e6", "k6">>
|
||||
// <map[], "u1", <"u1", "e1'", "k1">>
|
||||
// <map[], "", <"", "", "m1">>
|
||||
// <map[], "", <"", "", "m1">>
|
||||
//
|
||||
// <map["u2":"e2" "u5":"e5"], "e1", nil>
|
||||
// <map["u4":"e4"], "e1/e2/e3", <"u1/u2/u3", "e1/e2/e3", "k3">>
|
||||
// <map[], "e1/e2/e3/", <"u1/u2/u3", "e1/e2/e3", "k3">>
|
||||
// <map[], "e1/e2/e3/e4", <"u1/u2/u3/u4", "e1/e2/e3/e4", "k4">>
|
||||
// <map["u8":"e8"], "e6/", <"u6", "e6", "k6">>
|
||||
// <map[], "e1'", <"u1", "e1'", "k1">>
|
||||
// <map[], "", <"", "", "m1">>
|
||||
// <map[], "", <"", "", "m1">>
|
||||
}
|
||||
|
||||
func TestStoreErrors(t *testing.T) {
|
||||
s := NewStore()
|
||||
ep := paths.NewEncrypted
|
||||
up := paths.NewUnencrypted
|
||||
|
||||
// Too many encrypted parts
|
||||
require.Error(t, s.Add("b1", up("u1"), ep("e1/e2/e3"), storj.Key{}))
|
||||
|
||||
// Too many unencrypted parts
|
||||
require.Error(t, s.Add("b1", up("u1/u2/u3"), ep("e1"), storj.Key{}))
|
||||
|
||||
// Mismatches
|
||||
require.NoError(t, s.Add("b1", up("u1"), ep("e1"), storj.Key{}))
|
||||
require.Error(t, s.Add("b1", up("u2"), ep("e1"), storj.Key{}))
|
||||
require.Error(t, s.Add("b1", up("u1"), ep("f1"), storj.Key{}))
|
||||
}
|
||||
|
||||
func TestStoreErrorState(t *testing.T) {
|
||||
s := NewStore()
|
||||
ep := paths.NewEncrypted
|
||||
up := paths.NewUnencrypted
|
||||
|
||||
// Do an empty lookup.
|
||||
revealed1, consumed1, base1 := s.LookupUnencrypted("b1", up("u1/u2"))
|
||||
|
||||
// Attempt to do an addition that fails.
|
||||
require.Error(t, s.Add("b1", up("u1/u2"), ep("e1/e2/e3"), storj.Key{}))
|
||||
|
||||
// Ensure that we get the same results as before
|
||||
revealed2, consumed2, base2 := s.LookupUnencrypted("b1", up("u1/u2"))
|
||||
|
||||
assert.Equal(t, revealed1, revealed2)
|
||||
assert.Equal(t, consumed1, consumed2)
|
||||
assert.Equal(t, base1, base2)
|
||||
}
|
@ -18,6 +18,7 @@ import (
|
||||
"storj.io/storj/internal/testcontext"
|
||||
"storj.io/storj/internal/testplanet"
|
||||
"storj.io/storj/pkg/eestream"
|
||||
"storj.io/storj/pkg/encryption"
|
||||
"storj.io/storj/pkg/macaroon"
|
||||
"storj.io/storj/pkg/metainfo/kvmetainfo"
|
||||
ecclient "storj.io/storj/pkg/storage/ec"
|
||||
@ -32,7 +33,7 @@ const (
|
||||
)
|
||||
|
||||
func TestBucketsBasic(t *testing.T) {
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
runTest(t, func(t *testing.T, ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
// Create new bucket
|
||||
bucket, err := db.CreateBucket(ctx, TestBucket, nil)
|
||||
if assert.NoError(t, err) {
|
||||
@ -72,7 +73,7 @@ func TestBucketsBasic(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestBucketsReadWrite(t *testing.T) {
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
runTest(t, func(t *testing.T, ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
// Create new bucket
|
||||
bucket, err := db.CreateBucket(ctx, TestBucket, nil)
|
||||
if assert.NoError(t, err) {
|
||||
@ -112,7 +113,7 @@ func TestBucketsReadWrite(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestErrNoBucket(t *testing.T) {
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
runTest(t, func(t *testing.T, ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
_, err := db.CreateBucket(ctx, "", nil)
|
||||
assert.True(t, storj.ErrNoBucket.Has(err))
|
||||
|
||||
@ -125,7 +126,7 @@ func TestErrNoBucket(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestBucketCreateCipher(t *testing.T) {
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
runTest(t, func(t *testing.T, ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
forAllCiphers(func(cipher storj.Cipher) {
|
||||
bucket, err := db.CreateBucket(ctx, "test", &storj.Bucket{PathCipher: cipher})
|
||||
if assert.NoError(t, err) {
|
||||
@ -144,7 +145,7 @@ func TestBucketCreateCipher(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestListBucketsEmpty(t *testing.T) {
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
runTest(t, func(t *testing.T, ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
_, err := db.ListBuckets(ctx, storj.BucketListOptions{})
|
||||
assert.EqualError(t, err, "kvmetainfo: invalid direction 0")
|
||||
|
||||
@ -164,7 +165,7 @@ func TestListBucketsEmpty(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestListBuckets(t *testing.T) {
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
runTest(t, func(t *testing.T, ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
bucketNames := []string{"a00", "aa0", "b00", "bb0", "c00"}
|
||||
|
||||
for _, name := range bucketNames {
|
||||
@ -270,14 +271,14 @@ func getBucketNames(bucketList storj.BucketList) []string {
|
||||
return names
|
||||
}
|
||||
|
||||
func runTest(t *testing.T, test func(context.Context, *testplanet.Planet, *kvmetainfo.DB, streams.Store)) {
|
||||
func runTest(t *testing.T, test func(*testing.T, context.Context, *testplanet.Planet, *kvmetainfo.DB, streams.Store)) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
db, streams, err := newMetainfoParts(planet)
|
||||
require.NoError(t, err)
|
||||
|
||||
test(ctx, planet, db, streams)
|
||||
test(t, ctx, planet, db, streams)
|
||||
})
|
||||
}
|
||||
|
||||
@ -329,15 +330,18 @@ func newMetainfoParts(planet *testplanet.Planet) (*kvmetainfo.DB, streams.Store,
|
||||
key := new(storj.Key)
|
||||
copy(key[:], TestEncKey)
|
||||
|
||||
encStore := encryption.NewStore()
|
||||
encStore.SetDefaultKey(key)
|
||||
|
||||
const stripesPerBlock = 2
|
||||
blockSize := stripesPerBlock * rs.StripeSize()
|
||||
inlineThreshold := 8 * memory.KiB.Int()
|
||||
streams, err := streams.NewStreamStore(segments, 64*memory.MiB.Int64(), key, blockSize, storj.AESGCM, inlineThreshold)
|
||||
streams, err := streams.NewStreamStore(segments, 64*memory.MiB.Int64(), encStore, blockSize, storj.AESGCM, inlineThreshold)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
proj := kvmetainfo.NewProject(streams, int32(blockSize), rs, 64*memory.MiB.Int64())
|
||||
return kvmetainfo.New(proj, metainfo, streams, segments, key), streams, nil
|
||||
return kvmetainfo.New(proj, metainfo, streams, segments, encStore), streams, nil
|
||||
}
|
||||
|
||||
func forAllCiphers(test func(cipher storj.Cipher)) {
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
monkit "gopkg.in/spacemonkeygo/monkit.v2"
|
||||
|
||||
"storj.io/storj/internal/memory"
|
||||
"storj.io/storj/pkg/encryption"
|
||||
"storj.io/storj/pkg/storage/segments"
|
||||
"storj.io/storj/pkg/storage/streams"
|
||||
"storj.io/storj/pkg/storj"
|
||||
@ -34,17 +35,17 @@ type DB struct {
|
||||
streams streams.Store
|
||||
segments segments.Store
|
||||
|
||||
rootKey *storj.Key
|
||||
encStore *encryption.Store
|
||||
}
|
||||
|
||||
// New creates a new metainfo database
|
||||
func New(project *Project, metainfo metainfo.Client, streams streams.Store, segments segments.Store, rootKey *storj.Key) *DB {
|
||||
func New(project *Project, metainfo metainfo.Client, streams streams.Store, segments segments.Store, encStore *encryption.Store) *DB {
|
||||
return &DB{
|
||||
project: project,
|
||||
metainfo: metainfo,
|
||||
streams: streams,
|
||||
segments: segments,
|
||||
rootKey: rootKey,
|
||||
encStore: encStore,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -15,6 +15,7 @@ import (
|
||||
|
||||
"storj.io/storj/internal/memory"
|
||||
"storj.io/storj/pkg/encryption"
|
||||
"storj.io/storj/pkg/paths"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/storage/meta"
|
||||
"storj.io/storj/pkg/storage/objects"
|
||||
@ -24,11 +25,6 @@ import (
|
||||
"storj.io/storj/storage"
|
||||
)
|
||||
|
||||
const (
|
||||
// commitedPrefix is prefix where completed object info is stored
|
||||
committedPrefix = "l/"
|
||||
)
|
||||
|
||||
// DefaultRS default values for RedundancyScheme
|
||||
var DefaultRS = storj.RedundancyScheme{
|
||||
Algorithm: storj.ReedSolomon,
|
||||
@ -50,7 +46,7 @@ var DefaultES = storj.EncryptionScheme{
|
||||
func (db *DB) GetObject(ctx context.Context, bucket string, path storj.Path) (info storj.Object, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
_, info, err = db.getInfo(ctx, committedPrefix, bucket, path)
|
||||
_, info, err = db.getInfo(ctx, bucket, path)
|
||||
|
||||
return info, err
|
||||
}
|
||||
@ -59,21 +55,22 @@ func (db *DB) GetObject(ctx context.Context, bucket string, path storj.Path) (in
|
||||
func (db *DB) GetObjectStream(ctx context.Context, bucket string, path storj.Path) (stream storj.ReadOnlyStream, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
meta, info, err := db.getInfo(ctx, committedPrefix, bucket, path)
|
||||
meta, info, err := db.getInfo(ctx, bucket, path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
streamKey, err := encryption.DeriveContentKey(meta.fullpath, db.rootKey)
|
||||
streamKey, err := encryption.StoreDeriveContentKey(bucket, meta.fullpath.UnencryptedPath(), db.encStore)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &readonlyStream{
|
||||
db: db,
|
||||
info: info,
|
||||
encryptedPath: meta.encryptedPath,
|
||||
streamKey: streamKey,
|
||||
db: db,
|
||||
info: info,
|
||||
bucket: meta.bucket,
|
||||
encPath: meta.encPath.Raw(),
|
||||
streamKey: streamKey,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -222,16 +219,18 @@ func (db *DB) ListObjects(ctx context.Context, bucket string, options storj.List
|
||||
}
|
||||
|
||||
type object struct {
|
||||
fullpath string
|
||||
encryptedPath string
|
||||
fullpath streams.Path
|
||||
bucket string
|
||||
encPath paths.Encrypted
|
||||
lastSegmentMeta segments.Meta
|
||||
streamInfo pb.StreamInfo
|
||||
streamMeta pb.StreamMeta
|
||||
}
|
||||
|
||||
func (db *DB) getInfo(ctx context.Context, prefix string, bucket string, path storj.Path) (obj object, info storj.Object, err error) {
|
||||
func (db *DB) getInfo(ctx context.Context, bucket string, path storj.Path) (obj object, info storj.Object, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
// TODO: we shouldn't need to go load the bucket metadata every time we get object info
|
||||
bucketInfo, err := db.GetBucket(ctx, bucket)
|
||||
if err != nil {
|
||||
return object{}, storj.Object{}, err
|
||||
@ -241,14 +240,14 @@ func (db *DB) getInfo(ctx context.Context, prefix string, bucket string, path st
|
||||
return object{}, storj.Object{}, storj.ErrNoPath.New("")
|
||||
}
|
||||
|
||||
fullpath := bucket + "/" + path
|
||||
fullpath := streams.CreatePath(bucket, paths.NewUnencrypted(path))
|
||||
|
||||
encryptedPath, err := streams.EncryptAfterBucket(ctx, fullpath, bucketInfo.PathCipher, db.rootKey)
|
||||
encPath, err := encryption.StoreEncryptPath(bucket, paths.NewUnencrypted(path), bucketInfo.PathCipher, db.encStore)
|
||||
if err != nil {
|
||||
return object{}, storj.Object{}, err
|
||||
}
|
||||
|
||||
pointer, err := db.metainfo.SegmentInfo(ctx, bucket, storj.JoinPaths(storj.SplitPath(encryptedPath)[1:]...), -1)
|
||||
pointer, err := db.metainfo.SegmentInfo(ctx, bucket, encPath.Raw(), -1)
|
||||
if err != nil {
|
||||
if storage.ErrKeyNotFound.Has(err) {
|
||||
err = storj.ErrObjectNotFound.Wrap(err)
|
||||
@ -278,7 +277,7 @@ func (db *DB) getInfo(ctx context.Context, prefix string, bucket string, path st
|
||||
Data: pointer.GetMetadata(),
|
||||
}
|
||||
|
||||
streamInfoData, streamMeta, err := streams.DecryptStreamInfo(ctx, lastSegmentMeta.Data, fullpath, db.rootKey)
|
||||
streamInfoData, streamMeta, err := streams.TypedDecryptStreamInfo(ctx, lastSegmentMeta.Data, fullpath, db.encStore)
|
||||
if err != nil {
|
||||
return object{}, storj.Object{}, err
|
||||
}
|
||||
@ -296,7 +295,8 @@ func (db *DB) getInfo(ctx context.Context, prefix string, bucket string, path st
|
||||
|
||||
return object{
|
||||
fullpath: fullpath,
|
||||
encryptedPath: encryptedPath,
|
||||
bucket: bucket,
|
||||
encPath: encPath,
|
||||
lastSegmentMeta: lastSegmentMeta,
|
||||
streamInfo: streamInfo,
|
||||
streamMeta: streamMeta,
|
||||
@ -418,7 +418,7 @@ func (object *mutableObject) DeleteStream(ctx context.Context) (err error) {
|
||||
|
||||
func (object *mutableObject) Commit(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
_, info, err := object.db.getInfo(ctx, committedPrefix, object.info.Bucket.Name, object.info.Path)
|
||||
_, info, err := object.db.getInfo(ctx, object.info.Bucket.Name, object.info.Path)
|
||||
object.info = info
|
||||
return err
|
||||
}
|
||||
|
@ -39,7 +39,7 @@ func TestCreateObject(t *testing.T) {
|
||||
BlockSize: stripesPerBlock * customRS.StripeSize(),
|
||||
}
|
||||
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
runTest(t, func(t *testing.T, ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
bucket, err := db.CreateBucket(ctx, TestBucket, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -87,7 +87,7 @@ func TestCreateObject(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestGetObject(t *testing.T) {
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
runTest(t, func(t *testing.T, ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
bucket, err := db.CreateBucket(ctx, TestBucket, nil)
|
||||
require.NoError(t, err)
|
||||
upload(ctx, t, db, streams, bucket, TestFile, nil)
|
||||
@ -114,7 +114,7 @@ func TestGetObject(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestGetObjectStream(t *testing.T) {
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
runTest(t, func(t *testing.T, ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
data := make([]byte, 32*memory.KiB)
|
||||
_, err := rand.Read(data)
|
||||
require.NoError(t, err)
|
||||
@ -243,7 +243,7 @@ func assertRemoteSegment(t *testing.T, segment storj.Segment) {
|
||||
}
|
||||
|
||||
func TestDeleteObject(t *testing.T) {
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
runTest(t, func(t *testing.T, ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
bucket, err := db.CreateBucket(ctx, TestBucket, nil)
|
||||
if !assert.NoError(t, err) {
|
||||
return
|
||||
@ -269,7 +269,7 @@ func TestDeleteObject(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestListObjectsEmpty(t *testing.T) {
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
runTest(t, func(t *testing.T, ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
bucket, err := db.CreateBucket(ctx, TestBucket, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -295,7 +295,7 @@ func TestListObjectsEmpty(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestListObjects(t *testing.T) {
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
runTest(t, func(t *testing.T, ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
bucket, err := db.CreateBucket(ctx, TestBucket, &storj.Bucket{PathCipher: storj.Unencrypted})
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -19,9 +19,10 @@ var _ storj.ReadOnlyStream = (*readonlyStream)(nil)
|
||||
type readonlyStream struct {
|
||||
db *DB
|
||||
|
||||
info storj.Object
|
||||
encryptedPath storj.Path
|
||||
streamKey *storj.Key // lazySegmentReader derivedKey
|
||||
info storj.Object
|
||||
bucket string
|
||||
encPath storj.Path
|
||||
streamKey *storj.Key // lazySegmentReader derivedKey
|
||||
}
|
||||
|
||||
func (stream *readonlyStream) Info() storj.Object { return stream.info }
|
||||
@ -44,10 +45,9 @@ func (stream *readonlyStream) segment(ctx context.Context, index int64) (segment
|
||||
Index: index,
|
||||
}
|
||||
|
||||
var segmentPath storj.Path
|
||||
isLastSegment := segment.Index+1 == stream.info.SegmentCount
|
||||
if !isLastSegment {
|
||||
segmentPath = getSegmentPath(stream.encryptedPath, index)
|
||||
segmentPath := getSegmentPath(storj.JoinPaths(stream.bucket, stream.encPath), index)
|
||||
_, meta, err := stream.db.segments.Get(ctx, segmentPath)
|
||||
if err != nil {
|
||||
return segment, err
|
||||
@ -79,15 +79,11 @@ func (stream *readonlyStream) segment(ctx context.Context, index int64) (segment
|
||||
return segment, err
|
||||
}
|
||||
|
||||
pathComponents := storj.SplitPath(stream.encryptedPath)
|
||||
bucket := pathComponents[0]
|
||||
segmentPath = storj.JoinPaths(pathComponents[1:]...)
|
||||
|
||||
if isLastSegment {
|
||||
index = -1
|
||||
}
|
||||
|
||||
pointer, err := stream.db.metainfo.SegmentInfo(ctx, bucket, segmentPath, index)
|
||||
pointer, err := stream.db.metainfo.SegmentInfo(ctx, stream.bucket, stream.encPath, index)
|
||||
if err != nil {
|
||||
return segment, err
|
||||
}
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
|
||||
"storj.io/storj/internal/memory"
|
||||
"storj.io/storj/pkg/eestream"
|
||||
"storj.io/storj/pkg/encryption"
|
||||
"storj.io/storj/pkg/storage/segments"
|
||||
"storj.io/storj/pkg/storage/streams"
|
||||
"storj.io/storj/pkg/storj"
|
||||
@ -34,10 +35,13 @@ func SetupProject(m metainfo.Client) (*Project, error) {
|
||||
maxBucketMetaSize := 10 * memory.MiB
|
||||
segment := segments.NewSegmentStore(m, nil, rs, maxBucketMetaSize.Int(), maxBucketMetaSize.Int64())
|
||||
|
||||
// volatile warning: we're setting an encryption key of all zeros when one isn't provided.
|
||||
// TODO: fix before the final alpha network wipe
|
||||
encryptionKey := new(storj.Key)
|
||||
strms, err := streams.NewStreamStore(segment, maxBucketMetaSize.Int64(), encryptionKey, memory.KiB.Int(), storj.AESGCM, maxBucketMetaSize.Int())
|
||||
// volatile warning: we're setting an encryption key of all zeros for bucket
|
||||
// metadata, when really the bucket metadata should be stored in a different
|
||||
// system altogether.
|
||||
// TODO: https://storjlabs.atlassian.net/browse/V3-1967
|
||||
encStore := encryption.NewStore()
|
||||
encStore.SetDefaultKey(new(storj.Key))
|
||||
strms, err := streams.NewStreamStore(segment, maxBucketMetaSize.Int64(), encStore, memory.KiB.Int(), storj.AESGCM, maxBucketMetaSize.Int())
|
||||
if err != nil {
|
||||
return nil, Error.New("failed to create streams: %v", err)
|
||||
}
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"storj.io/storj/internal/testplanet"
|
||||
libuplink "storj.io/storj/lib/uplink"
|
||||
"storj.io/storj/pkg/eestream"
|
||||
"storj.io/storj/pkg/encryption"
|
||||
"storj.io/storj/pkg/macaroon"
|
||||
"storj.io/storj/pkg/metainfo/kvmetainfo"
|
||||
"storj.io/storj/pkg/pb"
|
||||
@ -704,9 +705,12 @@ func initEnv(ctx context.Context, planet *testplanet.Planet) (minio.ObjectLayer,
|
||||
encKey := new(storj.Key)
|
||||
copy(encKey[:], TestEncKey)
|
||||
|
||||
encStore := encryption.NewStore()
|
||||
encStore.SetDefaultKey(encKey)
|
||||
|
||||
blockSize := rs.StripeSize()
|
||||
inlineThreshold := 4 * memory.KiB.Int()
|
||||
strms, err := streams.NewStreamStore(segments, 64*memory.MiB.Int64(), encKey, blockSize, storj.AESGCM, inlineThreshold)
|
||||
strms, err := streams.NewStreamStore(segments, 64*memory.MiB.Int64(), encStore, blockSize, storj.AESGCM, inlineThreshold)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
@ -715,7 +719,7 @@ func initEnv(ctx context.Context, planet *testplanet.Planet) (minio.ObjectLayer,
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
kvm := kvmetainfo.New(p, m, strms, segments, encKey)
|
||||
kvm := kvmetainfo.New(p, m, strms, segments, encStore)
|
||||
|
||||
cfg := libuplink.Config{}
|
||||
cfg.Volatile.TLS = struct {
|
||||
|
153
pkg/paths/path.go
Normal file
153
pkg/paths/path.go
Normal file
@ -0,0 +1,153 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package paths
|
||||
|
||||
import (
|
||||
"strings"
|
||||
)
|
||||
|
||||
//
|
||||
// To avoid confusion about when paths are encrypted, unencrypted, empty or
|
||||
// non existent, we create some wrapper types so that the compiler will complain
|
||||
// if someone attempts to use one in the wrong context.
|
||||
//
|
||||
|
||||
// Unencrypted is an opaque type representing an unencrypted path.
|
||||
type Unencrypted struct {
|
||||
raw string
|
||||
}
|
||||
|
||||
// Encrypted is an opaque type representing an encrypted path.
|
||||
type Encrypted struct {
|
||||
raw string
|
||||
}
|
||||
|
||||
//
|
||||
// unencrypted paths
|
||||
//
|
||||
|
||||
// NewUnencrypted takes a raw unencrypted path and returns it wrapped.
|
||||
func NewUnencrypted(raw string) Unencrypted {
|
||||
return Unencrypted{raw: raw}
|
||||
}
|
||||
|
||||
// Valid returns if the unencrypted path is valid, which is the same as not being empty.
|
||||
func (path Unencrypted) Valid() bool {
|
||||
return path.raw != ""
|
||||
}
|
||||
|
||||
// Raw returns the original raw path for the Unencrypted.
|
||||
func (path Unencrypted) Raw() string {
|
||||
return path.raw
|
||||
}
|
||||
|
||||
// String returns a human readable form of the Unencrypted.
|
||||
func (path Unencrypted) String() string {
|
||||
return path.Raw()
|
||||
}
|
||||
|
||||
// Consume attempts to remove the prefix from the Unencrypted path and
|
||||
// reports a boolean indicating if it was able to do so.
|
||||
func (path Unencrypted) Consume(prefix Unencrypted) (Unencrypted, bool) {
|
||||
if len(path.raw) >= len(prefix.raw) && path.raw[:len(prefix.raw)] == prefix.raw {
|
||||
return NewUnencrypted(path.raw[len(prefix.raw):]), true
|
||||
}
|
||||
return Unencrypted{}, false
|
||||
}
|
||||
|
||||
// Iterator returns an iterator over the components of the Unencrypted.
|
||||
func (path Unencrypted) Iterator() Iterator {
|
||||
return NewIterator(path.raw)
|
||||
}
|
||||
|
||||
// Less returns true if 'path' should be sorted earlier than 'other'
|
||||
func (path Unencrypted) Less(other Unencrypted) bool {
|
||||
return path.raw < other.raw
|
||||
}
|
||||
|
||||
//
|
||||
// encrypted path
|
||||
//
|
||||
|
||||
// NewEncrypted takes a raw encrypted path and returns it wrapped.
|
||||
func NewEncrypted(raw string) Encrypted {
|
||||
return Encrypted{raw: raw}
|
||||
}
|
||||
|
||||
// Valid returns if the encrypted path is valid, which is the same as not being empty.
|
||||
func (path Encrypted) Valid() bool {
|
||||
return path.raw != ""
|
||||
}
|
||||
|
||||
// Raw returns the original path for the Encrypted.
|
||||
func (path Encrypted) Raw() string {
|
||||
return path.raw
|
||||
}
|
||||
|
||||
// String returns a human readable form of the Encrypted.
|
||||
func (path Encrypted) String() string {
|
||||
return path.Raw()
|
||||
}
|
||||
|
||||
// Consume attempts to remove the prefix from the Encrypted path and
|
||||
// reports a boolean indicating if it was able to do so.
|
||||
func (path Encrypted) Consume(prefix Encrypted) (Encrypted, bool) {
|
||||
if len(path.raw) >= len(prefix.raw) && path.raw[:len(prefix.raw)] == prefix.raw {
|
||||
return NewEncrypted(path.raw[len(prefix.raw):]), true
|
||||
}
|
||||
return Encrypted{}, false
|
||||
}
|
||||
|
||||
// Iterator returns an iterator over the components of the Encrypted.
|
||||
func (path Encrypted) Iterator() Iterator {
|
||||
return NewIterator(path.raw)
|
||||
}
|
||||
|
||||
// Less returns true if 'path' should be sorted earlier than 'other'
|
||||
func (path Encrypted) Less(other Encrypted) bool {
|
||||
return path.raw < other.raw
|
||||
}
|
||||
|
||||
//
|
||||
// path component iteration
|
||||
//
|
||||
|
||||
// Iterator allows one to efficiently iterate over components of a path.
|
||||
type Iterator struct {
|
||||
raw string
|
||||
consumed int
|
||||
lastEmpty bool
|
||||
}
|
||||
|
||||
// NewIterator returns an Iterator for components of the provided raw path.
|
||||
func NewIterator(raw string) Iterator {
|
||||
return Iterator{raw: raw, lastEmpty: raw != ""}
|
||||
}
|
||||
|
||||
// Consumed reports how much of the path has been consumed (if any).
|
||||
func (pi Iterator) Consumed() string { return pi.raw[:pi.consumed] }
|
||||
|
||||
// Remaining reports how much of the path is remaining.
|
||||
func (pi Iterator) Remaining() string { return pi.raw[pi.consumed:] }
|
||||
|
||||
// Done reports if the path has been fully consumed.
|
||||
func (pi Iterator) Done() bool { return len(pi.raw) == pi.consumed && !pi.lastEmpty }
|
||||
|
||||
// Next returns the first component of the path, consuming it.
|
||||
func (pi *Iterator) Next() string {
|
||||
if pi.Done() {
|
||||
return ""
|
||||
}
|
||||
|
||||
rem := pi.Remaining()
|
||||
index := strings.IndexByte(rem, '/')
|
||||
if index == -1 {
|
||||
pi.consumed += len(rem)
|
||||
pi.lastEmpty = false
|
||||
return rem
|
||||
}
|
||||
pi.consumed += index + 1
|
||||
pi.lastEmpty = index == len(rem)-1
|
||||
return rem[:index]
|
||||
}
|
96
pkg/paths/path_test.go
Normal file
96
pkg/paths/path_test.go
Normal file
@ -0,0 +1,96 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package paths
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestUnencrypted(t *testing.T) {
|
||||
it := NewUnencrypted("foo").Iterator()
|
||||
assert.Equal(t, it.Consumed(), "")
|
||||
assert.False(t, it.Done())
|
||||
assert.Equal(t, it.Next(), "foo")
|
||||
assert.Equal(t, it.Consumed(), "foo")
|
||||
assert.True(t, it.Done())
|
||||
|
||||
it = NewUnencrypted("").Iterator()
|
||||
assert.Equal(t, it.Consumed(), "")
|
||||
assert.True(t, it.Done())
|
||||
assert.Equal(t, it.Next(), "")
|
||||
assert.Equal(t, it.Consumed(), "")
|
||||
|
||||
it = NewUnencrypted("foo/").Iterator()
|
||||
assert.Equal(t, it.Consumed(), "")
|
||||
assert.False(t, it.Done())
|
||||
assert.Equal(t, it.Next(), "foo")
|
||||
assert.Equal(t, it.Consumed(), "foo/")
|
||||
assert.False(t, it.Done())
|
||||
assert.Equal(t, it.Next(), "")
|
||||
assert.Equal(t, it.Consumed(), "foo/")
|
||||
assert.True(t, it.Done())
|
||||
|
||||
it = Unencrypted{}.Iterator()
|
||||
assert.Equal(t, it.Consumed(), "")
|
||||
assert.True(t, it.Done())
|
||||
assert.Equal(t, it.Next(), "")
|
||||
assert.Equal(t, it.Consumed(), "")
|
||||
}
|
||||
|
||||
func TestEncrypted(t *testing.T) {
|
||||
it := NewEncrypted("foo").Iterator()
|
||||
assert.Equal(t, it.Consumed(), "")
|
||||
assert.False(t, it.Done())
|
||||
assert.Equal(t, it.Next(), "foo")
|
||||
assert.Equal(t, it.Consumed(), "foo")
|
||||
assert.True(t, it.Done())
|
||||
|
||||
it = NewEncrypted("").Iterator()
|
||||
assert.Equal(t, it.Consumed(), "")
|
||||
assert.True(t, it.Done())
|
||||
assert.Equal(t, it.Next(), "")
|
||||
assert.Equal(t, it.Consumed(), "")
|
||||
|
||||
it = NewEncrypted("foo/").Iterator()
|
||||
assert.Equal(t, it.Consumed(), "")
|
||||
assert.False(t, it.Done())
|
||||
assert.Equal(t, it.Next(), "foo")
|
||||
assert.Equal(t, it.Consumed(), "foo/")
|
||||
assert.False(t, it.Done())
|
||||
assert.Equal(t, it.Next(), "")
|
||||
assert.Equal(t, it.Consumed(), "foo/")
|
||||
assert.True(t, it.Done())
|
||||
|
||||
it = Encrypted{}.Iterator()
|
||||
assert.Equal(t, it.Consumed(), "")
|
||||
assert.True(t, it.Done())
|
||||
assert.Equal(t, it.Next(), "")
|
||||
assert.Equal(t, it.Consumed(), "")
|
||||
}
|
||||
|
||||
func TestIterator(t *testing.T) {
|
||||
for i, tt := range []struct {
|
||||
path string
|
||||
comps []string
|
||||
}{
|
||||
{"", []string{}},
|
||||
{"/", []string{"", ""}},
|
||||
{"//", []string{"", "", ""}},
|
||||
{" ", []string{" "}},
|
||||
{"a", []string{"a"}},
|
||||
{"/a/", []string{"", "a", ""}},
|
||||
{"a/b/c/d", []string{"a", "b", "c", "d"}},
|
||||
{"///a//b////c/d///", []string{"", "", "", "a", "", "b", "", "", "", "c", "d", "", "", ""}},
|
||||
} {
|
||||
errTag := fmt.Sprintf("Test case #%d", i)
|
||||
iter, got := NewIterator(tt.path), make([]string, 0, len(tt.comps))
|
||||
for !iter.Done() {
|
||||
got = append(got, iter.Next())
|
||||
}
|
||||
assert.Equal(t, tt.comps, got, errTag)
|
||||
}
|
||||
}
|
60
pkg/storage/streams/path.go
Normal file
60
pkg/storage/streams/path.go
Normal file
@ -0,0 +1,60 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package streams
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
|
||||
"storj.io/storj/pkg/paths"
|
||||
)
|
||||
|
||||
// Path is a representation of an object path within a bucket
|
||||
type Path struct {
|
||||
bucket string
|
||||
unencPath paths.Unencrypted
|
||||
raw []byte
|
||||
}
|
||||
|
||||
// Bucket returns the bucket part of the path.
|
||||
func (p Path) Bucket() string { return p.bucket }
|
||||
|
||||
// UnencryptedPath returns the unencrypted path part of the path.
|
||||
func (p Path) UnencryptedPath() paths.Unencrypted { return p.unencPath }
|
||||
|
||||
// Raw returns the raw data in the path.
|
||||
func (p Path) Raw() []byte { return append([]byte(nil), p.raw...) }
|
||||
|
||||
// String returns the string form of the raw data in the path.
|
||||
func (p Path) String() string { return string(p.raw) }
|
||||
|
||||
// ParsePath returns a new Path with the given raw bytes.
|
||||
func ParsePath(ctx context.Context, raw []byte) (path Path, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
// A path must contain a bucket and maybe an unencrypted path.
|
||||
parts := bytes.SplitN(raw, []byte("/"), 2)
|
||||
|
||||
path.raw = raw
|
||||
path.bucket = string(parts[0])
|
||||
if len(parts) > 1 {
|
||||
path.unencPath = paths.NewUnencrypted(string(parts[1]))
|
||||
}
|
||||
|
||||
return path, nil
|
||||
}
|
||||
|
||||
// CreatePath will create a Path for the provided information.
|
||||
func CreatePath(bucket string, unencPath paths.Unencrypted) (path Path) {
|
||||
path.bucket = bucket
|
||||
path.unencPath = unencPath
|
||||
|
||||
path.raw = append(path.raw, bucket...)
|
||||
if unencPath.Valid() {
|
||||
path.raw = append(path.raw, '/')
|
||||
path.raw = append(path.raw, unencPath.Raw()...)
|
||||
}
|
||||
|
||||
return path
|
||||
}
|
130
pkg/storage/streams/shim.go
Normal file
130
pkg/storage/streams/shim.go
Normal file
@ -0,0 +1,130 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package streams
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"storj.io/storj/pkg/encryption"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/ranger"
|
||||
"storj.io/storj/pkg/storage/segments"
|
||||
"storj.io/storj/pkg/storj"
|
||||
)
|
||||
|
||||
// Store interface methods for streams to satisfy to be a store
|
||||
type Store interface {
|
||||
Meta(ctx context.Context, path storj.Path, pathCipher storj.Cipher) (Meta, error)
|
||||
Get(ctx context.Context, path storj.Path, pathCipher storj.Cipher) (ranger.Ranger, Meta, error)
|
||||
Put(ctx context.Context, path storj.Path, pathCipher storj.Cipher, data io.Reader, metadata []byte, expiration time.Time) (Meta, error)
|
||||
Delete(ctx context.Context, path storj.Path, pathCipher storj.Cipher) error
|
||||
List(ctx context.Context, prefix, startAfter, endBefore storj.Path, pathCipher storj.Cipher, recursive bool, limit int, metaFlags uint32) (items []ListItem, more bool, err error)
|
||||
}
|
||||
|
||||
type shimStore struct {
|
||||
store typedStore
|
||||
}
|
||||
|
||||
// NewStreamStore constructs a Store.
|
||||
func NewStreamStore(segments segments.Store, segmentSize int64, encStore *encryption.Store, encBlockSize int, cipher storj.Cipher, inlineThreshold int) (Store, error) {
|
||||
typedStore, err := newTypedStreamStore(segments, segmentSize, encStore, encBlockSize, cipher, inlineThreshold)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &shimStore{store: typedStore}, nil
|
||||
}
|
||||
|
||||
// Meta parses the passed in path and dispatches to the typed store.
|
||||
func (s *shimStore) Meta(ctx context.Context, path storj.Path, pathCipher storj.Cipher) (_ Meta, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
streamsPath, err := ParsePath(ctx, []byte(path))
|
||||
if err != nil {
|
||||
return Meta{}, err
|
||||
}
|
||||
return s.store.Meta(ctx, streamsPath, pathCipher)
|
||||
}
|
||||
|
||||
// Get parses the passed in path and dispatches to the typed store.
|
||||
func (s *shimStore) Get(ctx context.Context, path storj.Path, pathCipher storj.Cipher) (_ ranger.Ranger, _ Meta, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
streamsPath, err := ParsePath(ctx, []byte(path))
|
||||
if err != nil {
|
||||
return nil, Meta{}, err
|
||||
}
|
||||
return s.store.Get(ctx, streamsPath, pathCipher)
|
||||
}
|
||||
|
||||
// Put parses the passed in path and dispatches to the typed store.
|
||||
func (s *shimStore) Put(ctx context.Context, path storj.Path, pathCipher storj.Cipher, data io.Reader, metadata []byte, expiration time.Time) (_ Meta, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
streamsPath, err := ParsePath(ctx, []byte(path))
|
||||
if err != nil {
|
||||
return Meta{}, err
|
||||
}
|
||||
return s.store.Put(ctx, streamsPath, pathCipher, data, metadata, expiration)
|
||||
}
|
||||
|
||||
// Delete parses the passed in path and dispatches to the typed store.
|
||||
func (s *shimStore) Delete(ctx context.Context, path storj.Path, pathCipher storj.Cipher) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
streamsPath, err := ParsePath(ctx, []byte(path))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return s.store.Delete(ctx, streamsPath, pathCipher)
|
||||
}
|
||||
|
||||
// List parses the passed in path and dispatches to the typed store.
|
||||
func (s *shimStore) List(ctx context.Context, prefix storj.Path, startAfter storj.Path, endBefore storj.Path, pathCipher storj.Cipher, recursive bool, limit int, metaFlags uint32) (items []ListItem, more bool, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
// TODO: list is maybe wrong?
|
||||
streamsPrefix, err := ParsePath(ctx, []byte(prefix))
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
return s.store.List(ctx, streamsPrefix, startAfter, endBefore, pathCipher, recursive, limit, metaFlags)
|
||||
}
|
||||
|
||||
// EncryptAfterBucket encrypts a path without encrypting its first element. This is a legacy function
|
||||
// that should no longer be needed after the typed path refactoring.
|
||||
func EncryptAfterBucket(ctx context.Context, path storj.Path, cipher storj.Cipher, key *storj.Key) (encrypted storj.Path, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
comps := storj.SplitPath(path)
|
||||
if len(comps) <= 1 {
|
||||
return path, nil
|
||||
}
|
||||
|
||||
encrypted, err = encryption.EncryptPath(path, cipher, key)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// replace the first path component with the unencrypted bucket name
|
||||
return storj.JoinPaths(comps[0], storj.JoinPaths(storj.SplitPath(encrypted)[1:]...)), nil
|
||||
}
|
||||
|
||||
// DecryptStreamInfo decrypts stream info. This is a legacy function that should no longer
|
||||
// be needed after the typed path refactoring.
|
||||
func DecryptStreamInfo(ctx context.Context, streamMetaBytes []byte, path storj.Path, rootKey *storj.Key) (
|
||||
streamInfo []byte, streamMeta pb.StreamMeta, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
streamsPath, err := ParsePath(ctx, []byte(path))
|
||||
if err != nil {
|
||||
return nil, pb.StreamMeta{}, err
|
||||
}
|
||||
|
||||
store := encryption.NewStore()
|
||||
store.SetDefaultKey(rootKey)
|
||||
|
||||
return TypedDecryptStreamInfo(ctx, streamMetaBytes, streamsPath, store)
|
||||
}
|
@ -7,10 +7,9 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"strings"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
@ -20,6 +19,7 @@ import (
|
||||
|
||||
"storj.io/storj/pkg/eestream"
|
||||
"storj.io/storj/pkg/encryption"
|
||||
"storj.io/storj/pkg/paths"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/ranger"
|
||||
"storj.io/storj/pkg/storage/meta"
|
||||
@ -49,32 +49,30 @@ func convertMeta(lastSegmentMeta segments.Meta, stream pb.StreamInfo, streamMeta
|
||||
}
|
||||
|
||||
// Store interface methods for streams to satisfy to be a store
|
||||
type Store interface {
|
||||
Meta(ctx context.Context, path storj.Path, pathCipher storj.Cipher) (Meta, error)
|
||||
Get(ctx context.Context, path storj.Path, pathCipher storj.Cipher) (ranger.Ranger, Meta, error)
|
||||
Put(ctx context.Context, path storj.Path, pathCipher storj.Cipher, data io.Reader, metadata []byte, expiration time.Time) (Meta, error)
|
||||
Delete(ctx context.Context, path storj.Path, pathCipher storj.Cipher) error
|
||||
List(ctx context.Context, prefix, startAfter, endBefore storj.Path, pathCipher storj.Cipher, recursive bool, limit int, metaFlags uint32) (items []ListItem, more bool, err error)
|
||||
type typedStore interface {
|
||||
Meta(ctx context.Context, path Path, pathCipher storj.Cipher) (Meta, error)
|
||||
Get(ctx context.Context, path Path, pathCipher storj.Cipher) (ranger.Ranger, Meta, error)
|
||||
Put(ctx context.Context, path Path, pathCipher storj.Cipher, data io.Reader, metadata []byte, expiration time.Time) (Meta, error)
|
||||
Delete(ctx context.Context, path Path, pathCipher storj.Cipher) error
|
||||
List(ctx context.Context, prefix Path, startAfter, endBefore string, pathCipher storj.Cipher, recursive bool, limit int, metaFlags uint32) (items []ListItem, more bool, err error)
|
||||
}
|
||||
|
||||
// streamStore is a store for streams
|
||||
// streamStore is a store for streams. It implements typedStore as part of an ongoing migration
|
||||
// to use typed paths. See the shim for the store that the rest of the world interacts with.
|
||||
type streamStore struct {
|
||||
segments segments.Store
|
||||
segmentSize int64
|
||||
rootKey *storj.Key
|
||||
encStore *encryption.Store
|
||||
encBlockSize int
|
||||
cipher storj.Cipher
|
||||
inlineThreshold int
|
||||
}
|
||||
|
||||
// NewStreamStore stuff
|
||||
func NewStreamStore(segments segments.Store, segmentSize int64, rootKey *storj.Key, encBlockSize int, cipher storj.Cipher, inlineThreshold int) (Store, error) {
|
||||
// newTypedStreamStore constructs a typedStore backed by a streamStore.
|
||||
func newTypedStreamStore(segments segments.Store, segmentSize int64, encStore *encryption.Store, encBlockSize int, cipher storj.Cipher, inlineThreshold int) (typedStore, error) {
|
||||
if segmentSize <= 0 {
|
||||
return nil, errs.New("segment size must be larger than 0")
|
||||
}
|
||||
if rootKey == nil {
|
||||
return nil, errs.New("encryption key must not be empty")
|
||||
}
|
||||
if encBlockSize <= 0 {
|
||||
return nil, errs.New("encryption block size must be larger than 0")
|
||||
}
|
||||
@ -82,7 +80,7 @@ func NewStreamStore(segments segments.Store, segmentSize int64, rootKey *storj.K
|
||||
return &streamStore{
|
||||
segments: segments,
|
||||
segmentSize: segmentSize,
|
||||
rootKey: rootKey,
|
||||
encStore: encStore,
|
||||
encBlockSize: encBlockSize,
|
||||
cipher: cipher,
|
||||
inlineThreshold: inlineThreshold,
|
||||
@ -93,13 +91,14 @@ func NewStreamStore(segments segments.Store, segmentSize int64, rootKey *storj.K
|
||||
// store the first piece at s0/<path>, second piece at s1/<path>, and the
|
||||
// *last* piece at l/<path>. Store the given metadata, along with the number
|
||||
// of segments, in a new protobuf, in the metadata of l/<path>.
|
||||
func (s *streamStore) Put(ctx context.Context, path storj.Path, pathCipher storj.Cipher, data io.Reader, metadata []byte, expiration time.Time) (m Meta, err error) {
|
||||
func (s *streamStore) Put(ctx context.Context, path Path, pathCipher storj.Cipher, data io.Reader, metadata []byte, expiration time.Time) (m Meta, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
// previously file uploaded?
|
||||
err = s.Delete(ctx, path, pathCipher)
|
||||
if err != nil && !storage.ErrKeyNotFound.Has(err) {
|
||||
//something wrong happened checking for an existing
|
||||
//file with the same name
|
||||
// something wrong happened checking for an existing
|
||||
// file with the same name
|
||||
return Meta{}, err
|
||||
}
|
||||
|
||||
@ -111,7 +110,7 @@ func (s *streamStore) Put(ctx context.Context, path storj.Path, pathCipher storj
|
||||
return m, err
|
||||
}
|
||||
|
||||
func (s *streamStore) upload(ctx context.Context, path storj.Path, pathCipher storj.Cipher, data io.Reader, metadata []byte, expiration time.Time) (m Meta, lastSegment int64, err error) {
|
||||
func (s *streamStore) upload(ctx context.Context, path Path, pathCipher storj.Cipher, data io.Reader, metadata []byte, expiration time.Time) (m Meta, lastSegment int64, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
var currentSegment int64
|
||||
@ -126,7 +125,11 @@ func (s *streamStore) upload(ctx context.Context, path storj.Path, pathCipher st
|
||||
}
|
||||
}()
|
||||
|
||||
derivedKey, err := encryption.DeriveContentKey(path, s.rootKey)
|
||||
derivedKey, err := encryption.StoreDeriveContentKey(path.Bucket(), path.UnencryptedPath(), s.encStore)
|
||||
if err != nil {
|
||||
return Meta{}, currentSegment, err
|
||||
}
|
||||
encPath, err := encryption.StoreEncryptPath(path.Bucket(), path.UnencryptedPath(), pathCipher, s.encStore)
|
||||
if err != nil {
|
||||
return Meta{}, currentSegment, err
|
||||
}
|
||||
@ -192,13 +195,11 @@ func (s *streamStore) upload(ctx context.Context, path storj.Path, pathCipher st
|
||||
}
|
||||
|
||||
putMeta, err = s.segments.Put(ctx, transformedReader, expiration, func() (storj.Path, []byte, error) {
|
||||
encPath, err := EncryptAfterBucket(ctx, path, pathCipher, s.rootKey)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
|
||||
if !eofReader.isEOF() {
|
||||
segmentPath := getSegmentPath(encPath, currentSegment)
|
||||
segmentPath, err := createSegmentPath(ctx, currentSegment, path.Bucket(), encPath)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
|
||||
if s.cipher == storj.Unencrypted {
|
||||
return segmentPath, nil, nil
|
||||
@ -215,7 +216,10 @@ func (s *streamStore) upload(ctx context.Context, path storj.Path, pathCipher st
|
||||
return segmentPath, segmentMeta, nil
|
||||
}
|
||||
|
||||
lastSegmentPath := storj.JoinPaths("l", encPath)
|
||||
lastSegmentPath, err := createSegmentPath(ctx, -1, path.Bucket(), encPath)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
|
||||
streamInfo, err := proto.Marshal(&pb.StreamInfo{
|
||||
NumberOfSegments: currentSegment + 1,
|
||||
@ -275,28 +279,28 @@ func (s *streamStore) upload(ctx context.Context, path storj.Path, pathCipher st
|
||||
return resultMeta, currentSegment, nil
|
||||
}
|
||||
|
||||
// getSegmentPath returns the unique path for a particular segment
|
||||
func getSegmentPath(path storj.Path, segNum int64) storj.Path {
|
||||
return storj.JoinPaths(fmt.Sprintf("s%d", segNum), path)
|
||||
}
|
||||
|
||||
// Get returns a ranger that knows what the overall size is (from l/<path>)
|
||||
// and then returns the appropriate data from segments s0/<path>, s1/<path>,
|
||||
// ..., l/<path>.
|
||||
func (s *streamStore) Get(ctx context.Context, path storj.Path, pathCipher storj.Cipher) (rr ranger.Ranger, meta Meta, err error) {
|
||||
func (s *streamStore) Get(ctx context.Context, path Path, pathCipher storj.Cipher) (rr ranger.Ranger, meta Meta, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
encPath, err := EncryptAfterBucket(ctx, path, pathCipher, s.rootKey)
|
||||
encPath, err := encryption.StoreEncryptPath(path.Bucket(), path.UnencryptedPath(), pathCipher, s.encStore)
|
||||
if err != nil {
|
||||
return nil, Meta{}, err
|
||||
}
|
||||
|
||||
lastSegmentRanger, lastSegmentMeta, err := s.segments.Get(ctx, storj.JoinPaths("l", encPath))
|
||||
segmentPath, err := createSegmentPath(ctx, -1, path.Bucket(), encPath)
|
||||
if err != nil {
|
||||
return nil, Meta{}, err
|
||||
}
|
||||
|
||||
streamInfo, streamMeta, err := DecryptStreamInfo(ctx, lastSegmentMeta.Data, path, s.rootKey)
|
||||
lastSegmentRanger, lastSegmentMeta, err := s.segments.Get(ctx, segmentPath)
|
||||
if err != nil {
|
||||
return nil, Meta{}, err
|
||||
}
|
||||
|
||||
streamInfo, streamMeta, err := TypedDecryptStreamInfo(ctx, lastSegmentMeta.Data, path, s.encStore)
|
||||
if err != nil {
|
||||
return nil, Meta{}, err
|
||||
}
|
||||
@ -307,30 +311,33 @@ func (s *streamStore) Get(ctx context.Context, path storj.Path, pathCipher storj
|
||||
return nil, Meta{}, err
|
||||
}
|
||||
|
||||
derivedKey, err := encryption.DeriveContentKey(path, s.rootKey)
|
||||
derivedKey, err := encryption.StoreDeriveContentKey(path.Bucket(), path.UnencryptedPath(), s.encStore)
|
||||
if err != nil {
|
||||
return nil, Meta{}, err
|
||||
}
|
||||
|
||||
var rangers []ranger.Ranger
|
||||
for i := int64(0); i < stream.NumberOfSegments-1; i++ {
|
||||
currentPath := getSegmentPath(encPath, i)
|
||||
size := stream.SegmentsSize
|
||||
var contentNonce storj.Nonce
|
||||
_, err := encryption.Increment(&contentNonce, i+1)
|
||||
currentPath, err := createSegmentPath(ctx, i, path.Bucket(), encPath)
|
||||
if err != nil {
|
||||
return nil, Meta{}, err
|
||||
}
|
||||
rr := &lazySegmentRanger{
|
||||
|
||||
var contentNonce storj.Nonce
|
||||
_, err = encryption.Increment(&contentNonce, i+1)
|
||||
if err != nil {
|
||||
return nil, Meta{}, err
|
||||
}
|
||||
|
||||
rangers = append(rangers, &lazySegmentRanger{
|
||||
segments: s.segments,
|
||||
path: currentPath,
|
||||
size: size,
|
||||
size: stream.SegmentsSize,
|
||||
derivedKey: derivedKey,
|
||||
startingNonce: &contentNonce,
|
||||
encBlockSize: int(streamMeta.EncryptionBlockSize),
|
||||
cipher: storj.Cipher(streamMeta.EncryptionType),
|
||||
}
|
||||
rangers = append(rangers, rr)
|
||||
})
|
||||
}
|
||||
|
||||
var contentNonce storj.Nonce
|
||||
@ -338,6 +345,7 @@ func (s *streamStore) Get(ctx context.Context, path storj.Path, pathCipher storj
|
||||
if err != nil {
|
||||
return nil, Meta{}, err
|
||||
}
|
||||
|
||||
encryptedKey, keyNonce := getEncryptedKeyAndNonce(streamMeta.LastSegmentMeta)
|
||||
decryptedLastSegmentRanger, err := decryptRanger(
|
||||
ctx,
|
||||
@ -361,23 +369,29 @@ func (s *streamStore) Get(ctx context.Context, path storj.Path, pathCipher storj
|
||||
}
|
||||
|
||||
// Meta implements Store.Meta
|
||||
func (s *streamStore) Meta(ctx context.Context, path storj.Path, pathCipher storj.Cipher) (meta Meta, err error) {
|
||||
func (s *streamStore) Meta(ctx context.Context, path Path, pathCipher storj.Cipher) (meta Meta, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
encPath, err := EncryptAfterBucket(ctx, path, pathCipher, s.rootKey)
|
||||
encPath, err := encryption.StoreEncryptPath(path.Bucket(), path.UnencryptedPath(), pathCipher, s.encStore)
|
||||
if err != nil {
|
||||
return Meta{}, err
|
||||
}
|
||||
|
||||
lastSegmentMeta, err := s.segments.Meta(ctx, storj.JoinPaths("l", encPath))
|
||||
segmentPath, err := createSegmentPath(ctx, -1, path.Bucket(), encPath)
|
||||
if err != nil {
|
||||
return Meta{}, err
|
||||
}
|
||||
|
||||
streamInfo, streamMeta, err := DecryptStreamInfo(ctx, lastSegmentMeta.Data, path, s.rootKey)
|
||||
lastSegmentMeta, err := s.segments.Meta(ctx, segmentPath)
|
||||
if err != nil {
|
||||
return Meta{}, err
|
||||
}
|
||||
|
||||
streamInfo, streamMeta, err := TypedDecryptStreamInfo(ctx, lastSegmentMeta.Data, path, s.encStore)
|
||||
if err != nil {
|
||||
return Meta{}, err
|
||||
}
|
||||
|
||||
var stream pb.StreamInfo
|
||||
if err := proto.Unmarshal(streamInfo, &stream); err != nil {
|
||||
return Meta{}, err
|
||||
@ -387,19 +401,25 @@ func (s *streamStore) Meta(ctx context.Context, path storj.Path, pathCipher stor
|
||||
}
|
||||
|
||||
// Delete all the segments, with the last one last
|
||||
func (s *streamStore) Delete(ctx context.Context, path storj.Path, pathCipher storj.Cipher) (err error) {
|
||||
func (s *streamStore) Delete(ctx context.Context, path Path, pathCipher storj.Cipher) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
encPath, err := EncryptAfterBucket(ctx, path, pathCipher, s.rootKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
lastSegmentMeta, err := s.segments.Meta(ctx, storj.JoinPaths("l", encPath))
|
||||
encPath, err := encryption.StoreEncryptPath(path.Bucket(), path.UnencryptedPath(), pathCipher, s.encStore)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
streamInfo, _, err := DecryptStreamInfo(ctx, lastSegmentMeta.Data, path, s.rootKey)
|
||||
lastSegmentPath, err := createSegmentPath(ctx, -1, path.Bucket(), encPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
lastSegmentMeta, err := s.segments.Meta(ctx, lastSegmentPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
streamInfo, _, err := TypedDecryptStreamInfo(ctx, lastSegmentMeta.Data, path, s.encStore)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -409,29 +429,29 @@ func (s *streamStore) Delete(ctx context.Context, path storj.Path, pathCipher st
|
||||
}
|
||||
|
||||
for i := 0; i < int(stream.NumberOfSegments-1); i++ {
|
||||
encPath, err = EncryptAfterBucket(ctx, path, pathCipher, s.rootKey)
|
||||
currentPath, err := createSegmentPath(ctx, int64(i), path.Bucket(), encPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
currentPath := getSegmentPath(encPath, int64(i))
|
||||
err := s.segments.Delete(ctx, currentPath)
|
||||
|
||||
err = s.segments.Delete(ctx, currentPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return s.segments.Delete(ctx, storj.JoinPaths("l", encPath))
|
||||
return s.segments.Delete(ctx, lastSegmentPath)
|
||||
}
|
||||
|
||||
// ListItem is a single item in a listing
|
||||
type ListItem struct {
|
||||
Path storj.Path
|
||||
Path string
|
||||
Meta Meta
|
||||
IsPrefix bool
|
||||
}
|
||||
|
||||
// List all the paths inside l/, stripping off the l/ prefix
|
||||
func (s *streamStore) List(ctx context.Context, prefix, startAfter, endBefore storj.Path, pathCipher storj.Cipher, recursive bool, limit int, metaFlags uint32) (items []ListItem, more bool, err error) {
|
||||
func (s *streamStore) List(ctx context.Context, prefix Path, startAfter, endBefore string, pathCipher storj.Cipher, recursive bool, limit int, metaFlags uint32) (items []ListItem, more bool, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if metaFlags&meta.Size != 0 {
|
||||
@ -440,74 +460,88 @@ func (s *streamStore) List(ctx context.Context, prefix, startAfter, endBefore st
|
||||
metaFlags |= meta.UserDefined
|
||||
}
|
||||
|
||||
prefix = strings.TrimSuffix(prefix, "/")
|
||||
|
||||
encPrefix, err := EncryptAfterBucket(ctx, prefix, pathCipher, s.rootKey)
|
||||
prefixKey, err := encryption.StoreDerivePathKey(prefix.Bucket(), prefix.UnencryptedPath(), s.encStore)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
prefixKey, err := encryption.DerivePathKey(prefix, s.rootKey, len(storj.SplitPath(prefix)))
|
||||
encPrefix, err := encryption.StoreEncryptPath(prefix.Bucket(), prefix.UnencryptedPath(), pathCipher, s.encStore)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
encStartAfter, err := s.encryptMarker(ctx, startAfter, pathCipher, prefixKey)
|
||||
// We have to encrypt startAfter and endBefore but only if they don't contain a bucket.
|
||||
// They contain a bucket if and only if the prefix has no bucket. This is why they are raw
|
||||
// strings instead of a typed string: it's either a bucket or an unencrypted path component
|
||||
// and that isn't known at compile time.
|
||||
needsEncryption := prefix.Bucket() != ""
|
||||
if needsEncryption {
|
||||
startAfter, err = encryption.EncryptPathRaw(startAfter, pathCipher, prefixKey)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
endBefore, err = encryption.EncryptPathRaw(endBefore, pathCipher, prefixKey)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
}
|
||||
|
||||
segmentPrefix, err := createSegmentPath(ctx, -1, prefix.Bucket(), encPrefix)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
encEndBefore, err := s.encryptMarker(ctx, endBefore, pathCipher, prefixKey)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
segments, more, err := s.segments.List(ctx, storj.JoinPaths("l", encPrefix), encStartAfter, encEndBefore, recursive, limit, metaFlags)
|
||||
segments, more, err := s.segments.List(ctx, segmentPrefix, startAfter, endBefore, recursive, limit, metaFlags)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
items = make([]ListItem, len(segments))
|
||||
for i, item := range segments {
|
||||
path, err := s.decryptMarker(ctx, item.Path, pathCipher, prefixKey)
|
||||
var path Path
|
||||
var itemPath string
|
||||
|
||||
if needsEncryption {
|
||||
itemPath, err = encryption.DecryptPathRaw(item.Path, pathCipher, prefixKey)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
// TODO(jeff): this shouldn't be necessary if we handled trailing slashes
|
||||
// appropriately. there's some issues with list.
|
||||
fullPath := prefix.UnencryptedPath().Raw()
|
||||
if len(fullPath) > 0 && fullPath[len(fullPath)-1] != '/' {
|
||||
fullPath += "/"
|
||||
}
|
||||
fullPath += itemPath
|
||||
|
||||
path = CreatePath(prefix.Bucket(), paths.NewUnencrypted(fullPath))
|
||||
} else {
|
||||
itemPath = item.Path
|
||||
path = CreatePath(item.Path, paths.Unencrypted{})
|
||||
}
|
||||
|
||||
streamInfo, streamMeta, err := TypedDecryptStreamInfo(ctx, item.Meta.Data, path, s.encStore)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
streamInfo, streamMeta, err := DecryptStreamInfo(ctx, item.Meta.Data, storj.JoinPaths(prefix, path), s.rootKey)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
var stream pb.StreamInfo
|
||||
if err := proto.Unmarshal(streamInfo, &stream); err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
newMeta := convertMeta(item.Meta, stream, streamMeta)
|
||||
items[i] = ListItem{Path: path, Meta: newMeta, IsPrefix: item.IsPrefix}
|
||||
items[i] = ListItem{
|
||||
Path: itemPath,
|
||||
Meta: newMeta,
|
||||
IsPrefix: item.IsPrefix,
|
||||
}
|
||||
}
|
||||
|
||||
return items, more, nil
|
||||
}
|
||||
|
||||
// encryptMarker is a helper method for encrypting startAfter and endBefore markers
|
||||
func (s *streamStore) encryptMarker(ctx context.Context, marker storj.Path, pathCipher storj.Cipher, prefixKey *storj.Key) (_ storj.Path, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
if bytes.Equal(s.rootKey[:], prefixKey[:]) { // empty prefix
|
||||
return EncryptAfterBucket(ctx, marker, pathCipher, s.rootKey)
|
||||
}
|
||||
return encryption.EncryptPath(marker, pathCipher, prefixKey)
|
||||
}
|
||||
|
||||
// decryptMarker is a helper method for decrypting listed path markers
|
||||
func (s *streamStore) decryptMarker(ctx context.Context, marker storj.Path, pathCipher storj.Cipher, prefixKey *storj.Key) (_ storj.Path, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
if bytes.Equal(s.rootKey[:], prefixKey[:]) { // empty prefix
|
||||
return DecryptAfterBucket(ctx, marker, pathCipher, s.rootKey)
|
||||
}
|
||||
return encryption.DecryptPath(marker, pathCipher, prefixKey)
|
||||
}
|
||||
|
||||
type lazySegmentRanger struct {
|
||||
ranger ranger.Ranger
|
||||
segments segments.Store
|
||||
@ -584,60 +618,27 @@ func decryptRanger(ctx context.Context, rr ranger.Ranger, decryptedSize int64, c
|
||||
return eestream.Unpad(rd, int(rd.Size()-decryptedSize))
|
||||
}
|
||||
|
||||
// EncryptAfterBucket encrypts a path without encrypting its first element
|
||||
func EncryptAfterBucket(ctx context.Context, path storj.Path, cipher storj.Cipher, key *storj.Key) (encrypted storj.Path, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
comps := storj.SplitPath(path)
|
||||
if len(comps) <= 1 {
|
||||
return path, nil
|
||||
}
|
||||
|
||||
encrypted, err = encryption.EncryptPath(path, cipher, key)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// replace the first path component with the unencrypted bucket name
|
||||
return storj.JoinPaths(comps[0], storj.JoinPaths(storj.SplitPath(encrypted)[1:]...)), nil
|
||||
}
|
||||
|
||||
// DecryptAfterBucket decrypts a path without modifying its first element
|
||||
func DecryptAfterBucket(ctx context.Context, path storj.Path, cipher storj.Cipher, key *storj.Key) (decrypted storj.Path, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
comps := storj.SplitPath(path)
|
||||
if len(comps) <= 1 {
|
||||
return path, nil
|
||||
}
|
||||
|
||||
bucket := comps[0]
|
||||
toDecrypt := storj.JoinPaths(comps[1:]...)
|
||||
|
||||
bucketKey, err := encryption.DerivePathKey(path, key, 1)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
decPath, err := encryption.DecryptPath(toDecrypt, cipher, bucketKey)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return storj.JoinPaths(bucket, decPath), nil
|
||||
}
|
||||
|
||||
// CancelHandler handles clean up of segments on receiving CTRL+C
|
||||
func (s *streamStore) cancelHandler(ctx context.Context, totalSegments int64, path storj.Path, pathCipher storj.Cipher) {
|
||||
func (s *streamStore) cancelHandler(ctx context.Context, totalSegments int64, path Path, pathCipher storj.Cipher) {
|
||||
defer mon.Task()(&ctx)(nil)
|
||||
|
||||
encPath, err := encryption.StoreEncryptPath(path.Bucket(), path.UnencryptedPath(), pathCipher, s.encStore)
|
||||
if err != nil {
|
||||
zap.S().Warnf("Failed deleting segments: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
for i := int64(0); i < totalSegments; i++ {
|
||||
encPath, err := EncryptAfterBucket(ctx, path, pathCipher, s.rootKey)
|
||||
currentPath, err := createSegmentPath(ctx, i, path.Bucket(), encPath)
|
||||
if err != nil {
|
||||
zap.S().Warnf("Failed deleting a segment due to encryption path %v %v", i, err)
|
||||
zap.S().Warnf("Failed deleting segment %d: %v", i, err)
|
||||
continue
|
||||
}
|
||||
|
||||
currentPath := getSegmentPath(encPath, i)
|
||||
err = s.segments.Delete(ctx, currentPath)
|
||||
if err != nil {
|
||||
zap.S().Warnf("Failed deleting a segment %v %v", currentPath, err)
|
||||
zap.S().Warnf("Failed deleting segment %v: %v", currentPath, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -653,16 +654,17 @@ func getEncryptedKeyAndNonce(m *pb.SegmentMeta) (storj.EncryptedPrivateKey, *sto
|
||||
return m.EncryptedKey, &nonce
|
||||
}
|
||||
|
||||
// DecryptStreamInfo decrypts stream info
|
||||
func DecryptStreamInfo(ctx context.Context, streamMetaBytes []byte, path storj.Path, rootKey *storj.Key) (
|
||||
// TypedDecryptStreamInfo decrypts stream info
|
||||
func TypedDecryptStreamInfo(ctx context.Context, streamMetaBytes []byte, path Path, encStore *encryption.Store) (
|
||||
streamInfo []byte, streamMeta pb.StreamMeta, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
err = proto.Unmarshal(streamMetaBytes, &streamMeta)
|
||||
if err != nil {
|
||||
return nil, pb.StreamMeta{}, err
|
||||
}
|
||||
|
||||
derivedKey, err := encryption.DeriveContentKey(path, rootKey)
|
||||
derivedKey, err := encryption.StoreDeriveContentKey(path.Bucket(), path.UnencryptedPath(), encStore)
|
||||
if err != nil {
|
||||
return nil, pb.StreamMeta{}, err
|
||||
}
|
||||
@ -678,3 +680,33 @@ func DecryptStreamInfo(ctx context.Context, streamMetaBytes []byte, path storj.P
|
||||
streamInfo, err = encryption.Decrypt(streamMeta.EncryptedStreamInfo, cipher, contentKey, &storj.Nonce{})
|
||||
return streamInfo, streamMeta, err
|
||||
}
|
||||
|
||||
// createSegmentPath will create a storj.Path that the segment store expects.
|
||||
func createSegmentPath(ctx context.Context, segmentIndex int64, bucket string, encPath paths.Encrypted) (path storj.Path, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if segmentIndex < -1 {
|
||||
return "", errs.New("invalid segment index")
|
||||
}
|
||||
|
||||
var raw []byte
|
||||
if segmentIndex > -1 {
|
||||
raw = append(raw, 's')
|
||||
raw = append(raw, strconv.FormatInt(segmentIndex, 10)...)
|
||||
} else {
|
||||
raw = append(raw, 'l')
|
||||
}
|
||||
raw = append(raw, '/')
|
||||
|
||||
if len(bucket) > 0 {
|
||||
raw = append(raw, bucket...)
|
||||
raw = append(raw, '/')
|
||||
|
||||
if encPath.Valid() {
|
||||
raw = append(raw, encPath.Raw()...)
|
||||
raw = append(raw, '/')
|
||||
}
|
||||
}
|
||||
|
||||
return storj.Path(raw[:len(raw)-1]), nil
|
||||
}
|
||||
|
@ -15,6 +15,7 @@ import (
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"storj.io/storj/pkg/encryption"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/ranger"
|
||||
"storj.io/storj/pkg/storage/segments"
|
||||
@ -25,6 +26,12 @@ var (
|
||||
ctx = context.Background()
|
||||
)
|
||||
|
||||
func newStore() *encryption.Store {
|
||||
store := encryption.NewStore()
|
||||
store.SetDefaultKey(new(storj.Key))
|
||||
return store
|
||||
}
|
||||
|
||||
func TestStreamStoreMeta(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
@ -89,7 +96,7 @@ func TestStreamStoreMeta(t *testing.T) {
|
||||
Meta(gomock.Any(), gomock.Any()).
|
||||
Return(test.segmentMeta, test.segmentError)
|
||||
|
||||
streamStore, err := NewStreamStore(mockSegmentStore, 10, new(storj.Key), 10, storj.AESGCM, 4)
|
||||
streamStore, err := NewStreamStore(mockSegmentStore, 10, newStore(), 10, storj.AESGCM, 4)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -169,7 +176,7 @@ func TestStreamStorePut(t *testing.T) {
|
||||
Delete(gomock.Any(), gomock.Any()).
|
||||
Return(test.segmentError)
|
||||
|
||||
streamStore, err := NewStreamStore(mockSegmentStore, segSize, new(storj.Key), encBlockSize, dataCipher, inlineSize)
|
||||
streamStore, err := NewStreamStore(mockSegmentStore, segSize, newStore(), encBlockSize, dataCipher, inlineSize)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -279,7 +286,7 @@ func TestStreamStoreGet(t *testing.T) {
|
||||
|
||||
gomock.InOrder(calls...)
|
||||
|
||||
streamStore, err := NewStreamStore(mockSegmentStore, segSize, new(storj.Key), encBlockSize, dataCipher, inlineSize)
|
||||
streamStore, err := NewStreamStore(mockSegmentStore, segSize, newStore(), encBlockSize, dataCipher, inlineSize)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -328,7 +335,7 @@ func TestStreamStoreDelete(t *testing.T) {
|
||||
Delete(gomock.Any(), gomock.Any()).
|
||||
Return(test.segmentError)
|
||||
|
||||
streamStore, err := NewStreamStore(mockSegmentStore, 10, new(storj.Key), 10, 0, 0)
|
||||
streamStore, err := NewStreamStore(mockSegmentStore, 10, newStore(), 10, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -372,7 +379,7 @@ func TestStreamStoreList(t *testing.T) {
|
||||
List(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
|
||||
Return(test.segments, test.segmentMore, test.segmentError)
|
||||
|
||||
streamStore, err := NewStreamStore(mockSegmentStore, 10, new(storj.Key), 10, 0, 0)
|
||||
streamStore, err := NewStreamStore(mockSegmentStore, 10, newStore(), 10, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -130,14 +130,16 @@ func (c Config) GetMetainfo(ctx context.Context, identity *identity.FullIdentity
|
||||
return nil, nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
strms, err := streams.NewStreamStore(segment, c.Client.SegmentSize.Int64(), key,
|
||||
encStore := encryption.NewStore()
|
||||
encStore.SetDefaultKey(key)
|
||||
strms, err := streams.NewStreamStore(segment, c.Client.SegmentSize.Int64(), encStore,
|
||||
int(blockSize), storj.Cipher(c.Enc.DataType), c.Client.MaxInlineSize.Int(),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, nil, Error.New("failed to create stream store: %v", err)
|
||||
}
|
||||
|
||||
return kvmetainfo.New(project, m, strms, segment, key), strms, nil
|
||||
return kvmetainfo.New(project, m, strms, segment, encStore), strms, nil
|
||||
}
|
||||
|
||||
// GetRedundancyScheme returns the configured redundancy scheme for new uploads
|
||||
|
Loading…
Reference in New Issue
Block a user