uplink/metainfo: move listing to kvmetainfo

https://storjlabs.atlassian.net/browse/V3-517

Change-Id: Ie23017f7cb4994aec835ff8c8e10c917b79966a6
This commit is contained in:
Michal Niewrzal 2019-12-30 16:36:06 +01:00
parent 027e3d4f62
commit 6607dacb1c
8 changed files with 122 additions and 362 deletions

View File

@ -6,6 +6,7 @@ package kvmetainfo
import (
"context"
"errors"
"strings"
"github.com/gogo/protobuf/proto"
@ -15,7 +16,6 @@ import (
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/storj/uplink/metainfo"
"storj.io/storj/uplink/storage/meta"
"storj.io/storj/uplink/storage/objects"
"storj.io/storj/uplink/storage/segments"
"storj.io/storj/uplink/storage/streams"
@ -164,11 +164,6 @@ func (db *DB) ListObjects(ctx context.Context, bucket storj.Bucket, options stor
return storj.ObjectList{}, storj.ErrNoBucket.New("")
}
objects := prefixedObjStore{
store: objects.NewStore(db.streams, db.pathCipher(bucket)),
prefix: bucket.Name,
}
var startAfter string
switch options.Direction {
// TODO for now we are supporting only storj.After
@ -183,25 +178,105 @@ func (db *DB) ListObjects(ctx context.Context, bucket storj.Bucket, options stor
}
// TODO: we should let libuplink users be able to determine what metadata fields they request as well
metaFlags := meta.All
if db.pathCipher(bucket) == storj.EncNull || db.pathCipher(bucket) == storj.EncNullBase64URL {
metaFlags = meta.None
// metaFlags := meta.All
// if db.pathCipher(bucket) == storj.EncNull || db.pathCipher(bucket) == storj.EncNullBase64URL {
// metaFlags = meta.None
// }
// TODO use flags with listing
// if metaFlags&meta.Size != 0 {
// Calculating the stream's size require also the user-defined metadata,
// where stream store keeps info about the number of segments and their size.
// metaFlags |= meta.UserDefined
// }
pathCipher := db.pathCipher(bucket)
prefix := streams.ParsePath(storj.JoinPaths(bucket.Name, options.Prefix))
prefixKey, err := encryption.DerivePathKey(prefix.Bucket(), streams.PathForKey(prefix.UnencryptedPath().Raw()), db.encStore)
if err != nil {
return storj.ObjectList{}, errClass.Wrap(err)
}
items, more, err := objects.List(ctx, options.Prefix, startAfter, options.Recursive, options.Limit, metaFlags)
encPrefix, err := encryption.EncryptPath(prefix.Bucket(), prefix.UnencryptedPath(), pathCipher, db.encStore)
if err != nil {
return storj.ObjectList{}, err
return storj.ObjectList{}, errClass.Wrap(err)
}
// If the raw unencrypted path ends in a `/` we need to remove the final
// section of the encrypted path. For example, if we are listing the path
// `/bob/`, the encrypted path results in `enc("")/enc("bob")/enc("")`. This
// is an incorrect list prefix, what we really want is `enc("")/enc("bob")`
if strings.HasSuffix(prefix.UnencryptedPath().Raw(), "/") {
lastSlashIdx := strings.LastIndex(encPrefix.Raw(), "/")
encPrefix = paths.NewEncrypted(encPrefix.Raw()[:lastSlashIdx])
}
// We have to encrypt startAfter but only if it doesn't contain a bucket.
// It contains a bucket if and only if the prefix has no bucket. This is why it is a raw
// string instead of a typed string: it's either a bucket or an unencrypted path component
// and that isn't known at compile time.
needsEncryption := prefix.Bucket() != ""
if needsEncryption {
startAfter, err = encryption.EncryptPathRaw(startAfter, pathCipher, prefixKey)
if err != nil {
return storj.ObjectList{}, errClass.Wrap(err)
}
}
items, more, err := db.metainfo.ListObjects(ctx, metainfo.ListObjectsParams{
Bucket: []byte(bucket.Name),
EncryptedPrefix: []byte(encPrefix.Raw()),
EncryptedCursor: []byte(startAfter),
Limit: int32(options.Limit),
Recursive: options.Recursive,
})
if err != nil {
return storj.ObjectList{}, errClass.Wrap(err)
}
list = storj.ObjectList{
Bucket: bucket.Name,
Prefix: options.Prefix,
More: more,
Items: make([]storj.Object, 0, len(items)),
Items: make([]storj.Object, len(items)),
}
for _, item := range items {
list.Items = append(list.Items, objectFromMeta(bucket, item.Path, item.IsPrefix, item.Meta))
for i, item := range items {
var path streams.Path
var itemPath string
if needsEncryption {
itemPath, err = encryption.DecryptPathRaw(string(item.EncryptedPath), pathCipher, prefixKey)
if err != nil {
return storj.ObjectList{}, errClass.Wrap(err)
}
// TODO(jeff): this shouldn't be necessary if we handled trailing slashes
// appropriately. there's some issues with list.
fullPath := prefix.UnencryptedPath().Raw()
if len(fullPath) > 0 && fullPath[len(fullPath)-1] != '/' {
fullPath += "/"
}
fullPath += itemPath
path = streams.CreatePath(prefix.Bucket(), paths.NewUnencrypted(fullPath))
} else {
itemPath = string(item.EncryptedPath)
path = streams.CreatePath(string(item.EncryptedPath), paths.Unencrypted{})
}
stream, streamMeta, err := streams.TypedDecryptStreamInfo(ctx, item.EncryptedMetadata, path, db.encStore)
if err != nil {
return storj.ObjectList{}, errClass.Wrap(err)
}
object, err := objectFromMeta(bucket, itemPath, item, stream, &streamMeta)
if err != nil {
return storj.ObjectList{}, errClass.Wrap(err)
}
list.Items[i] = object
}
return list, nil
@ -271,25 +346,30 @@ func (db *DB) getInfo(ctx context.Context, bucket storj.Bucket, path storj.Path)
}, info, nil
}
func objectFromMeta(bucket storj.Bucket, path storj.Path, isPrefix bool, meta objects.Meta) storj.Object {
return storj.Object{
func objectFromMeta(bucket storj.Bucket, path storj.Path, listItem storj.ObjectListItem, stream *pb.StreamInfo, streamMeta *pb.StreamMeta) (storj.Object, error) {
object := storj.Object{
Version: 0, // TODO:
Bucket: bucket,
Path: path,
IsPrefix: isPrefix,
IsPrefix: listItem.IsPrefix,
Metadata: meta.UserDefined,
ContentType: meta.ContentType,
Created: meta.Modified, // TODO: use correct field
Modified: meta.Modified, // TODO: use correct field
Expires: meta.Expiration,
Stream: storj.Stream{
Size: meta.Size,
Checksum: []byte(meta.Checksum),
},
Created: listItem.CreatedAt, // TODO: use correct field
Modified: listItem.CreatedAt, // TODO: use correct field
Expires: listItem.ExpiresAt,
}
if stream != nil {
serializableMeta := pb.SerializableMeta{}
err := proto.Unmarshal(stream.Metadata, &serializableMeta)
if err != nil {
return storj.Object{}, err
}
object.Metadata = serializableMeta.UserDefined
object.ContentType = serializableMeta.ContentType
object.Stream.Size = ((numberOfSegments(stream, streamMeta) - 1) * stream.SegmentsSize) + stream.LastSegmentSize
}
return object, nil
}
func objectStreamFromMeta(bucket storj.Bucket, path storj.Path, streamID storj.StreamID, lastSegment segments.Meta, stream *pb.StreamInfo, streamMeta pb.StreamMeta, redundancyScheme storj.RedundancyScheme) (storj.Object, error) {
@ -380,3 +460,10 @@ func (object *mutableObject) Commit(ctx context.Context) (err error) {
object.info = info
return err
}
func numberOfSegments(stream *pb.StreamInfo, streamMeta *pb.StreamMeta) int64 {
if streamMeta.NumberOfSegments > 0 {
return streamMeta.NumberOfSegments
}
return stream.DeprecatedNumberOfSegments
}

View File

@ -48,9 +48,3 @@ func (o *prefixedObjStore) Delete(ctx context.Context, path storj.Path) (err err
return o.store.Delete(ctx, storj.JoinPaths(o.prefix, path))
}
func (o *prefixedObjStore) List(ctx context.Context, prefix, startAfter storj.Path, recursive bool, limit int, metaFlags uint32) (items []objects.ListItem, more bool, err error) {
defer mon.Task()(&ctx)(&err)
return o.store.List(ctx, storj.JoinPaths(o.prefix, prefix), startAfter, recursive, limit, metaFlags)
}

View File

@ -29,19 +29,11 @@ type Meta struct {
Checksum string
}
// ListItem is a single item in a listing
type ListItem struct {
Path storj.Path
Meta Meta
IsPrefix bool
}
// Store for objects
type Store interface {
Get(ctx context.Context, path storj.Path, object storj.Object) (rr ranger.Ranger, err error)
Put(ctx context.Context, path storj.Path, data io.Reader, metadata pb.SerializableMeta, expiration time.Time) (meta Meta, err error)
Delete(ctx context.Context, path storj.Path) (err error)
List(ctx context.Context, prefix, startAfter storj.Path, recursive bool, limit int, metaFlags uint32) (items []ListItem, more bool, err error)
}
type objStore struct {
@ -94,27 +86,6 @@ func (o *objStore) Delete(ctx context.Context, path storj.Path) (err error) {
return o.store.Delete(ctx, path, o.pathCipher)
}
func (o *objStore) List(ctx context.Context, prefix, startAfter storj.Path, recursive bool, limit int, metaFlags uint32) (
items []ListItem, more bool, err error) {
defer mon.Task()(&ctx)(&err)
strItems, more, err := o.store.List(ctx, prefix, startAfter, o.pathCipher, recursive, limit, metaFlags)
if err != nil {
return nil, false, err
}
items = make([]ListItem, len(strItems))
for i, itm := range strItems {
items[i] = ListItem{
Path: itm.Path,
Meta: convertMeta(itm.Meta),
IsPrefix: itm.IsPrefix,
}
}
return items, more, nil
}
// convertMeta converts stream metadata to object metadata
func convertMeta(m streams.Meta) Meta {
ser := pb.SerializableMeta{}

View File

@ -33,13 +33,6 @@ type Meta struct {
Data []byte
}
// ListItem is a single item in a listing
type ListItem struct {
Path storj.Path
Meta Meta
IsPrefix bool
}
// Store for segments
type Store interface {
// Ranger creates a ranger for downloading erasure codes from piece store nodes.

View File

@ -54,3 +54,10 @@ func CreatePath(bucket string, unencPath paths.Unencrypted) (path Path) {
return path
}
// PathForKey removes the trailing `/` from the raw path, which is required so
// the derived key matches the final list path (which also has the trailing
// encrypted `/` part of the path removed).
func PathForKey(raw string) paths.Unencrypted {
return paths.NewUnencrypted(strings.TrimSuffix(raw, "/"))
}

View File

@ -20,7 +20,6 @@ type Store interface {
Get(ctx context.Context, path storj.Path, object storj.Object, pathCipher storj.CipherSuite) (ranger.Ranger, error)
Put(ctx context.Context, path storj.Path, pathCipher storj.CipherSuite, data io.Reader, metadata []byte, expiration time.Time) (Meta, error)
Delete(ctx context.Context, path storj.Path, pathCipher storj.CipherSuite) error
List(ctx context.Context, prefix, startAfter storj.Path, pathCipher storj.CipherSuite, recursive bool, limit int, metaFlags uint32) (items []ListItem, more bool, err error)
}
type shimStore struct {
@ -56,10 +55,3 @@ func (s *shimStore) Delete(ctx context.Context, path storj.Path, pathCipher stor
return s.store.Delete(ctx, ParsePath(path), pathCipher)
}
// List parses the passed in path and dispatches to the typed store.
func (s *shimStore) List(ctx context.Context, prefix storj.Path, startAfter storj.Path, pathCipher storj.CipherSuite, recursive bool, limit int, metaFlags uint32) (items []ListItem, more bool, err error) {
defer mon.Task()(&ctx)(&err)
return s.store.List(ctx, ParsePath(prefix), startAfter, pathCipher, recursive, limit, metaFlags)
}

View File

@ -8,7 +8,6 @@ import (
"crypto/rand"
"io"
"io/ioutil"
"strings"
"time"
"github.com/gogo/protobuf/proto"
@ -17,7 +16,6 @@ import (
monkit "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/common/encryption"
"storj.io/common/paths"
"storj.io/common/pb"
"storj.io/common/ranger"
"storj.io/common/storj"
@ -35,35 +33,11 @@ type Meta struct {
Data []byte
}
func numberOfSegments(stream *pb.StreamInfo, streamMeta *pb.StreamMeta) (count int64, ok bool) {
if streamMeta.NumberOfSegments > 0 {
return streamMeta.NumberOfSegments, true
}
if stream != nil {
return stream.DeprecatedNumberOfSegments, true
}
return 0, false
}
// convertMeta converts segment metadata to stream metadata
func convertMeta(modified, expiration time.Time, stream *pb.StreamInfo, streamMeta pb.StreamMeta) (rv Meta) {
rv.Modified = modified
rv.Expiration = expiration
if stream != nil {
if segmentCount, ok := numberOfSegments(stream, &streamMeta); ok {
rv.Size = (segmentCount-1)*stream.SegmentsSize + stream.LastSegmentSize
}
rv.Data = stream.Metadata
}
return rv
}
// Store interface methods for streams to satisfy to be a store
type typedStore interface {
Get(ctx context.Context, path Path, object storj.Object, pathCipher storj.CipherSuite) (ranger.Ranger, error)
Put(ctx context.Context, path Path, pathCipher storj.CipherSuite, data io.Reader, metadata []byte, expiration time.Time) (Meta, error)
Delete(ctx context.Context, path Path, pathCipher storj.CipherSuite) error
List(ctx context.Context, prefix Path, startAfter string, pathCipher storj.CipherSuite, recursive bool, limit int, metaFlags uint32) (items []ListItem, more bool, err error)
}
// streamStore is a store for streams. It implements typedStore as part of an ongoing migration
@ -517,107 +491,6 @@ type ListItem struct {
IsPrefix bool
}
// pathForKey removes the trailing `/` from the raw path, which is required so
// the derived key matches the final list path (which also has the trailing
// encrypted `/` part of the path removed)
func pathForKey(raw string) paths.Unencrypted {
return paths.NewUnencrypted(strings.TrimSuffix(raw, "/"))
}
// List all the paths inside l/, stripping off the l/ prefix
func (s *streamStore) List(ctx context.Context, prefix Path, startAfter string, pathCipher storj.CipherSuite, recursive bool, limit int, metaFlags uint32) (items []ListItem, more bool, err error) {
defer mon.Task()(&ctx)(&err)
// TODO use flags with listing
// if metaFlags&meta.Size != 0 {
// Calculating the stream's size require also the user-defined metadata,
// where stream store keeps info about the number of segments and their size.
// metaFlags |= meta.UserDefined
// }
prefixKey, err := encryption.DerivePathKey(prefix.Bucket(), pathForKey(prefix.UnencryptedPath().Raw()), s.encStore)
if err != nil {
return nil, false, err
}
encPrefix, err := encryption.EncryptPath(prefix.Bucket(), prefix.UnencryptedPath(), pathCipher, s.encStore)
if err != nil {
return nil, false, err
}
// If the raw unencrypted path ends in a `/` we need to remove the final
// section of the encrypted path. For example, if we are listing the path
// `/bob/`, the encrypted path results in `enc("")/enc("bob")/enc("")`. This
// is an incorrect list prefix, what we really want is `enc("")/enc("bob")`
if strings.HasSuffix(prefix.UnencryptedPath().Raw(), "/") {
lastSlashIdx := strings.LastIndex(encPrefix.Raw(), "/")
encPrefix = paths.NewEncrypted(encPrefix.Raw()[:lastSlashIdx])
}
// We have to encrypt startAfter but only if it doesn't contain a bucket.
// It contains a bucket if and only if the prefix has no bucket. This is why it is a raw
// string instead of a typed string: it's either a bucket or an unencrypted path component
// and that isn't known at compile time.
needsEncryption := prefix.Bucket() != ""
if needsEncryption {
startAfter, err = encryption.EncryptPathRaw(startAfter, pathCipher, prefixKey)
if err != nil {
return nil, false, err
}
}
objects, more, err := s.metainfo.ListObjects(ctx, metainfo.ListObjectsParams{
Bucket: []byte(prefix.Bucket()),
EncryptedPrefix: []byte(encPrefix.Raw()),
EncryptedCursor: []byte(startAfter),
Limit: int32(limit),
Recursive: recursive,
})
if err != nil {
return nil, false, err
}
items = make([]ListItem, len(objects))
for i, item := range objects {
var path Path
var itemPath string
if needsEncryption {
itemPath, err = encryption.DecryptPathRaw(string(item.EncryptedPath), pathCipher, prefixKey)
if err != nil {
return nil, false, err
}
// TODO(jeff): this shouldn't be necessary if we handled trailing slashes
// appropriately. there's some issues with list.
fullPath := prefix.UnencryptedPath().Raw()
if len(fullPath) > 0 && fullPath[len(fullPath)-1] != '/' {
fullPath += "/"
}
fullPath += itemPath
path = CreatePath(prefix.Bucket(), paths.NewUnencrypted(fullPath))
} else {
itemPath = string(item.EncryptedPath)
path = CreatePath(string(item.EncryptedPath), paths.Unencrypted{})
}
stream, streamMeta, err := TypedDecryptStreamInfo(ctx, item.EncryptedMetadata, path, s.encStore)
if err != nil {
return nil, false, err
}
newMeta := convertMeta(item.CreatedAt, item.ExpiresAt, stream, streamMeta)
items[i] = ListItem{
Path: itemPath,
Meta: newMeta,
IsPrefix: item.IsPrefix,
}
}
return items, more, nil
}
type lazySegmentRanger struct {
ranger ranger.Ranger
metainfo *metainfo.Client

View File

@ -1,157 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package streams_test
import (
"bytes"
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"storj.io/common/encryption"
"storj.io/common/macaroon"
"storj.io/common/memory"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite/console"
"storj.io/storj/uplink/ecclient"
"storj.io/storj/uplink/eestream"
"storj.io/storj/uplink/metainfo"
"storj.io/storj/uplink/storage/meta"
"storj.io/storj/uplink/storage/segments"
"storj.io/storj/uplink/storage/streams"
)
const (
TestEncKey = "test-encryption-key"
)
func TestStreamStoreList(t *testing.T) {
runTest(t, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, streamStore streams.Store) {
expiration := time.Now().Add(10 * 24 * time.Hour)
bucketName := "bucket-name"
err := planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], bucketName)
require.NoError(t, err)
objects := []struct {
path string
content []byte
}{
{"aaaa/afile1", []byte("content")},
{"aaaa/bfile2", []byte("content")},
{"bbbb/afile1", []byte("content")},
{"bbbb/bfile2", []byte("content")},
{"bbbb/bfolder/file1", []byte("content")},
}
for _, test := range objects {
test := test
data := bytes.NewReader(test.content)
path := storj.JoinPaths(bucketName, test.path)
_, err := streamStore.Put(ctx, path, storj.EncNull, data, []byte{}, expiration)
require.NoError(t, err)
}
prefix := bucketName
// should list all
items, more, err := streamStore.List(ctx, prefix, "", storj.EncNull, true, 10, meta.None)
require.NoError(t, err)
require.False(t, more)
require.Equal(t, len(objects), len(items))
// should list first two and more = true
items, more, err = streamStore.List(ctx, prefix, "", storj.EncNull, true, 2, meta.None)
require.NoError(t, err)
require.True(t, more)
require.Equal(t, 2, len(items))
// should list only prefixes
items, more, err = streamStore.List(ctx, prefix, "", storj.EncNull, false, 10, meta.None)
require.NoError(t, err)
require.False(t, more)
require.Equal(t, 2, len(items))
// should list only BBBB bucket
prefix = storj.JoinPaths(bucketName, "bbbb")
items, more, err = streamStore.List(ctx, prefix, "", storj.EncNull, false, 10, meta.None)
require.NoError(t, err)
require.False(t, more)
require.Equal(t, 3, len(items))
// should list only BBBB bucket after afile
items, more, err = streamStore.List(ctx, prefix, "afile1", storj.EncNull, false, 10, meta.None)
require.NoError(t, err)
require.False(t, more)
require.Equal(t, 2, len(items))
// should list nothing
prefix = storj.JoinPaths(bucketName, "cccc")
items, more, err = streamStore.List(ctx, prefix, "", storj.EncNull, true, 10, meta.None)
require.NoError(t, err)
require.False(t, more)
require.Equal(t, 0, len(items))
})
}
func runTest(t *testing.T, test func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, streamsStore streams.Store)) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
metainfo, _, streamStore := storeTestSetup(t, ctx, planet, 64*memory.MiB.Int64())
defer ctx.Check(metainfo.Close)
test(t, ctx, planet, streamStore)
})
}
func storeTestSetup(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, segmentSize int64) (*metainfo.Client, segments.Store, streams.Store) {
// TODO move apikey creation to testplanet
projects, err := planet.Satellites[0].DB.Console().Projects().GetAll(ctx)
require.NoError(t, err)
apiKey, err := macaroon.NewAPIKey([]byte("testSecret"))
require.NoError(t, err)
apiKeyInfo := console.APIKeyInfo{
ProjectID: projects[0].ID,
Name: "testKey",
Secret: []byte("testSecret"),
}
// add api key to db
_, err = planet.Satellites[0].DB.Console().APIKeys().Create(context.Background(), apiKey.Head(), apiKeyInfo)
require.NoError(t, err)
TestAPIKey := apiKey
metainfo, err := planet.Uplinks[0].DialMetainfo(context.Background(), planet.Satellites[0], TestAPIKey)
require.NoError(t, err)
ec := ecclient.NewClient(planet.Uplinks[0].Log.Named("ecclient"), planet.Uplinks[0].Dialer, 0)
cfg := planet.Uplinks[0].GetConfig(planet.Satellites[0])
rs, err := eestream.NewRedundancyStrategyFromStorj(cfg.GetRedundancyScheme())
require.NoError(t, err)
segmentStore := segments.NewSegmentStore(metainfo, ec, rs)
assert.NotNil(t, segmentStore)
key := new(storj.Key)
copy(key[:], TestEncKey)
encStore := encryption.NewStore()
encStore.SetDefaultKey(key)
const stripesPerBlock = 2
blockSize := stripesPerBlock * rs.StripeSize()
inlineThreshold := 8 * memory.KiB.Int()
streamStore, err := streams.NewStreamStore(metainfo, segmentStore, segmentSize, encStore, blockSize, storj.EncNull, inlineThreshold, 8*memory.MiB.Int64())
require.NoError(t, err)
return metainfo, segmentStore, streamStore
}