kvmetainfo: merge with storage/buckets (#2277)
This commit is contained in:
parent
568b000e9b
commit
aa25c4458f
@ -12,7 +12,6 @@ import (
|
||||
"storj.io/storj/pkg/eestream"
|
||||
"storj.io/storj/pkg/encryption"
|
||||
"storj.io/storj/pkg/metainfo/kvmetainfo"
|
||||
"storj.io/storj/pkg/storage/buckets"
|
||||
ecclient "storj.io/storj/pkg/storage/ec"
|
||||
"storj.io/storj/pkg/storage/segments"
|
||||
"storj.io/storj/pkg/storage/streams"
|
||||
@ -189,14 +188,12 @@ func (p *Project) OpenBucket(ctx context.Context, bucketName string, access *Enc
|
||||
return nil, err
|
||||
}
|
||||
|
||||
bucketStore := buckets.NewStore(streamStore)
|
||||
|
||||
return &Bucket{
|
||||
BucketConfig: *cfg,
|
||||
Name: bucketInfo.Name,
|
||||
Created: bucketInfo.Created,
|
||||
bucket: bucketInfo,
|
||||
metainfo: kvmetainfo.New(p.metainfo, bucketStore, streamStore, segmentStore, &access.Key, encryptionScheme.BlockSize, rs, cfg.Volatile.SegmentsSize.Int64()),
|
||||
metainfo: kvmetainfo.New(p.metainfo, streamStore, segmentStore, &access.Key, encryptionScheme.BlockSize, rs, cfg.Volatile.SegmentsSize.Int64()),
|
||||
streams: streamStore,
|
||||
}, nil
|
||||
}
|
||||
|
@ -13,7 +13,6 @@ import (
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/metainfo/kvmetainfo"
|
||||
"storj.io/storj/pkg/peertls/tlsopts"
|
||||
"storj.io/storj/pkg/storage/buckets"
|
||||
"storj.io/storj/pkg/storage/segments"
|
||||
"storj.io/storj/pkg/storage/streams"
|
||||
"storj.io/storj/pkg/storj"
|
||||
@ -178,7 +177,7 @@ func (u *Uplink) OpenProject(ctx context.Context, satelliteAddr string, apiKey A
|
||||
uplinkCfg: u.cfg,
|
||||
tc: u.tc,
|
||||
metainfo: metainfo,
|
||||
project: kvmetainfo.NewProject(buckets.NewStore(streams), memory.KiB.Int32(), rs, 64*memory.MiB.Int64()),
|
||||
project: kvmetainfo.NewProject(streams, memory.KiB.Int32(), rs, 64*memory.MiB.Int64()),
|
||||
maxInlineSize: u.cfg.Volatile.MaxInlineSize,
|
||||
encryptionKey: encryptionKey,
|
||||
}, nil
|
||||
|
@ -4,12 +4,19 @@
|
||||
package kvmetainfo
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/pkg/storage/buckets"
|
||||
"storj.io/storj/pkg/encryption"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/storage/meta"
|
||||
"storj.io/storj/pkg/storage/objects"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/storage"
|
||||
)
|
||||
|
||||
// CreateBucket creates a new bucket with the specified information
|
||||
@ -54,17 +61,33 @@ func (db *Project) CreateBucket(ctx context.Context, bucketName string, info *st
|
||||
return bucketInfo, err
|
||||
}
|
||||
|
||||
meta, err := db.buckets.Put(ctx, bucketName, buckets.Meta{
|
||||
PathEncryptionType: info.PathCipher,
|
||||
SegmentsSize: info.SegmentsSize,
|
||||
RedundancyScheme: info.RedundancyScheme,
|
||||
EncryptionScheme: info.EncryptionParameters.ToEncryptionScheme(),
|
||||
})
|
||||
if info.PathCipher < storj.Unencrypted || info.PathCipher > storj.SecretBox {
|
||||
return storj.Bucket{}, encryption.ErrInvalidConfig.New("encryption type %d is not supported", info.PathCipher)
|
||||
}
|
||||
|
||||
r := bytes.NewReader(nil)
|
||||
userMeta := map[string]string{
|
||||
"path-enc-type": strconv.Itoa(int(info.PathCipher)),
|
||||
"default-seg-size": strconv.FormatInt(info.SegmentsSize, 10),
|
||||
"default-enc-type": strconv.Itoa(int(info.EncryptionParameters.CipherSuite.ToCipher())),
|
||||
"default-enc-blksz": strconv.FormatInt(int64(info.EncryptionParameters.BlockSize), 10),
|
||||
"default-rs-algo": strconv.Itoa(int(info.RedundancyScheme.Algorithm)),
|
||||
"default-rs-sharsz": strconv.FormatInt(int64(info.RedundancyScheme.ShareSize), 10),
|
||||
"default-rs-reqd": strconv.Itoa(int(info.RedundancyScheme.RequiredShares)),
|
||||
"default-rs-repair": strconv.Itoa(int(info.RedundancyScheme.RepairShares)),
|
||||
"default-rs-optim": strconv.Itoa(int(info.RedundancyScheme.OptimalShares)),
|
||||
"default-rs-total": strconv.Itoa(int(info.RedundancyScheme.TotalShares)),
|
||||
}
|
||||
var exp time.Time
|
||||
m, err := db.buckets.Put(ctx, bucketName, r, pb.SerializableMeta{UserDefined: userMeta}, exp)
|
||||
if err != nil {
|
||||
return storj.Bucket{}, err
|
||||
}
|
||||
|
||||
return bucketFromMeta(bucketName, meta), nil
|
||||
rv := *info
|
||||
rv.Name = bucketName
|
||||
rv.Created = m.Modified
|
||||
return rv, nil
|
||||
}
|
||||
|
||||
// validateBlockSize confirms the encryption block size aligns with stripe size.
|
||||
@ -91,7 +114,13 @@ func (db *Project) DeleteBucket(ctx context.Context, bucketName string) (err err
|
||||
return storj.ErrNoBucket.New("")
|
||||
}
|
||||
|
||||
return db.buckets.Delete(ctx, bucketName)
|
||||
err = db.buckets.Delete(ctx, bucketName)
|
||||
|
||||
if storage.ErrKeyNotFound.Has(err) {
|
||||
err = storj.ErrBucketNotFound.Wrap(err)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// GetBucket gets bucket information
|
||||
@ -102,12 +131,15 @@ func (db *Project) GetBucket(ctx context.Context, bucketName string) (bucketInfo
|
||||
return storj.Bucket{}, storj.ErrNoBucket.New("")
|
||||
}
|
||||
|
||||
meta, err := db.buckets.Get(ctx, bucketName)
|
||||
objMeta, err := db.buckets.Meta(ctx, bucketName)
|
||||
if err != nil {
|
||||
if storage.ErrKeyNotFound.Has(err) {
|
||||
err = storj.ErrBucketNotFound.Wrap(err)
|
||||
}
|
||||
return storj.Bucket{}, err
|
||||
}
|
||||
|
||||
return bucketFromMeta(bucketName, meta), nil
|
||||
return bucketFromMeta(ctx, bucketName, objMeta)
|
||||
}
|
||||
|
||||
// ListBuckets lists buckets
|
||||
@ -137,30 +169,67 @@ func (db *Project) ListBuckets(ctx context.Context, options storj.BucketListOpti
|
||||
endBefore = "\x7f\x7f\x7f\x7f\x7f\x7f\x7f"
|
||||
}
|
||||
|
||||
items, more, err := db.buckets.List(ctx, startAfter, endBefore, options.Limit)
|
||||
objItems, more, err := db.buckets.List(ctx, "", startAfter, endBefore, false, options.Limit, meta.Modified)
|
||||
if err != nil {
|
||||
return storj.BucketList{}, err
|
||||
}
|
||||
|
||||
list = storj.BucketList{
|
||||
More: more,
|
||||
Items: make([]storj.Bucket, 0, len(items)),
|
||||
Items: make([]storj.Bucket, 0, len(objItems)),
|
||||
}
|
||||
|
||||
for _, item := range items {
|
||||
list.Items = append(list.Items, bucketFromMeta(item.Bucket, item.Meta))
|
||||
for _, itm := range objItems {
|
||||
if itm.IsPrefix {
|
||||
continue
|
||||
}
|
||||
m, err := bucketFromMeta(ctx, itm.Path, itm.Meta)
|
||||
if err != nil {
|
||||
return storj.BucketList{}, err
|
||||
}
|
||||
list.Items = append(list.Items, m)
|
||||
}
|
||||
|
||||
return list, nil
|
||||
}
|
||||
|
||||
func bucketFromMeta(bucketName string, meta buckets.Meta) storj.Bucket {
|
||||
return storj.Bucket{
|
||||
Name: bucketName,
|
||||
Created: meta.Created,
|
||||
PathCipher: meta.PathEncryptionType,
|
||||
SegmentsSize: meta.SegmentsSize,
|
||||
RedundancyScheme: meta.RedundancyScheme,
|
||||
EncryptionParameters: meta.EncryptionScheme.ToEncryptionParameters(),
|
||||
func bucketFromMeta(ctx context.Context, bucketName string, m objects.Meta) (out storj.Bucket, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
out.Name = bucketName
|
||||
out.Created = m.Modified
|
||||
// backwards compatibility for old buckets
|
||||
out.PathCipher = storj.AESGCM
|
||||
out.EncryptionParameters.CipherSuite = storj.EncUnspecified
|
||||
|
||||
applySetting := func(nameInMap string, bits int, storeFunc func(val int64)) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if stringVal := m.UserDefined[nameInMap]; stringVal != "" {
|
||||
var intVal int64
|
||||
intVal, err = strconv.ParseInt(stringVal, 10, bits)
|
||||
if err != nil {
|
||||
err = errs.New("invalid metadata field for %s: %v", nameInMap, err)
|
||||
return
|
||||
}
|
||||
storeFunc(intVal)
|
||||
}
|
||||
}
|
||||
|
||||
es := &out.EncryptionParameters
|
||||
rs := &out.RedundancyScheme
|
||||
|
||||
applySetting("path-enc-type", 16, func(v int64) { out.PathCipher = storj.Cipher(v) })
|
||||
applySetting("default-seg-size", 64, func(v int64) { out.SegmentsSize = v })
|
||||
applySetting("default-enc-type", 32, func(v int64) { es.CipherSuite = storj.Cipher(v).ToCipherSuite() })
|
||||
applySetting("default-enc-blksz", 32, func(v int64) { es.BlockSize = int32(v) })
|
||||
applySetting("default-rs-algo", 32, func(v int64) { rs.Algorithm = storj.RedundancyAlgorithm(v) })
|
||||
applySetting("default-rs-sharsz", 32, func(v int64) { rs.ShareSize = int32(v) })
|
||||
applySetting("default-rs-reqd", 16, func(v int64) { rs.RequiredShares = int16(v) })
|
||||
applySetting("default-rs-repair", 16, func(v int64) { rs.RepairShares = int16(v) })
|
||||
applySetting("default-rs-optim", 16, func(v int64) { rs.OptimalShares = int16(v) })
|
||||
applySetting("default-rs-total", 16, func(v int64) { rs.TotalShares = int16(v) })
|
||||
|
||||
return out, err
|
||||
}
|
||||
|
@ -20,7 +20,6 @@ import (
|
||||
"storj.io/storj/pkg/eestream"
|
||||
"storj.io/storj/pkg/macaroon"
|
||||
"storj.io/storj/pkg/metainfo/kvmetainfo"
|
||||
"storj.io/storj/pkg/storage/buckets"
|
||||
ecclient "storj.io/storj/pkg/storage/ec"
|
||||
"storj.io/storj/pkg/storage/segments"
|
||||
"storj.io/storj/pkg/storage/streams"
|
||||
@ -33,7 +32,7 @@ const (
|
||||
)
|
||||
|
||||
func TestBucketsBasic(t *testing.T) {
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) {
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
// Create new bucket
|
||||
bucket, err := db.CreateBucket(ctx, TestBucket, nil)
|
||||
if assert.NoError(t, err) {
|
||||
@ -72,13 +71,15 @@ func TestBucketsBasic(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestBucketsReadNewWayWriteOldWay(t *testing.T) {
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, bucketStore buckets.Store, streams streams.Store) {
|
||||
// (Old API) Create new bucket
|
||||
_, err := bucketStore.Put(ctx, TestBucket, buckets.Meta{PathEncryptionType: storj.AESGCM})
|
||||
assert.NoError(t, err)
|
||||
func TestBucketsReadWrite(t *testing.T) {
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
// Create new bucket
|
||||
bucket, err := db.CreateBucket(ctx, TestBucket, nil)
|
||||
if assert.NoError(t, err) {
|
||||
assert.Equal(t, TestBucket, bucket.Name)
|
||||
}
|
||||
|
||||
// (New API) Check that bucket list include the new bucket
|
||||
// Check that bucket list include the new bucket
|
||||
bucketList, err := db.ListBuckets(ctx, storj.BucketListOptions{Direction: storj.After})
|
||||
if assert.NoError(t, err) {
|
||||
assert.False(t, bucketList.More)
|
||||
@ -86,71 +87,32 @@ func TestBucketsReadNewWayWriteOldWay(t *testing.T) {
|
||||
assert.Equal(t, TestBucket, bucketList.Items[0].Name)
|
||||
}
|
||||
|
||||
// (New API) Check that we can get the new bucket explicitly
|
||||
bucket, err := db.GetBucket(ctx, TestBucket)
|
||||
// Check that we can get the new bucket explicitly
|
||||
bucket, err = db.GetBucket(ctx, TestBucket)
|
||||
if assert.NoError(t, err) {
|
||||
assert.Equal(t, TestBucket, bucket.Name)
|
||||
assert.Equal(t, storj.AESGCM, bucket.PathCipher)
|
||||
}
|
||||
|
||||
// (Old API) Delete the bucket
|
||||
err = bucketStore.Delete(ctx, TestBucket)
|
||||
// Delete the bucket
|
||||
err = db.DeleteBucket(ctx, TestBucket)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// (New API) Check that the bucket list is empty
|
||||
// Check that the bucket list is empty
|
||||
bucketList, err = db.ListBuckets(ctx, storj.BucketListOptions{Direction: storj.After})
|
||||
if assert.NoError(t, err) {
|
||||
assert.False(t, bucketList.More)
|
||||
assert.Equal(t, 0, len(bucketList.Items))
|
||||
}
|
||||
|
||||
// (New API) Check that the bucket cannot be get explicitly
|
||||
// Check that the bucket cannot be get explicitly
|
||||
bucket, err = db.GetBucket(ctx, TestBucket)
|
||||
assert.True(t, storj.ErrBucketNotFound.Has(err))
|
||||
})
|
||||
}
|
||||
|
||||
func TestBucketsReadOldWayWriteNewWay(t *testing.T) {
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) {
|
||||
// (New API) Create new bucket
|
||||
bucket, err := db.CreateBucket(ctx, TestBucket, nil)
|
||||
if assert.NoError(t, err) {
|
||||
assert.Equal(t, TestBucket, bucket.Name)
|
||||
}
|
||||
|
||||
// (Old API) Check that bucket list include the new bucket
|
||||
items, more, err := buckets.List(ctx, "", "", 0)
|
||||
if assert.NoError(t, err) {
|
||||
assert.False(t, more)
|
||||
assert.Equal(t, 1, len(items))
|
||||
assert.Equal(t, TestBucket, items[0].Bucket)
|
||||
}
|
||||
|
||||
// (Old API) Check that we can get the new bucket explicitly
|
||||
meta, err := buckets.Get(ctx, TestBucket)
|
||||
if assert.NoError(t, err) {
|
||||
assert.Equal(t, storj.AESGCM, meta.PathEncryptionType)
|
||||
}
|
||||
|
||||
// (New API) Delete the bucket
|
||||
err = db.DeleteBucket(ctx, TestBucket)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// (Old API) Check that the bucket list is empty
|
||||
items, more, err = buckets.List(ctx, "", "", 0)
|
||||
if assert.NoError(t, err) {
|
||||
assert.False(t, more)
|
||||
assert.Equal(t, 0, len(items))
|
||||
}
|
||||
|
||||
// (Old API) Check that the bucket cannot be get explicitly
|
||||
_, err = buckets.Get(ctx, TestBucket)
|
||||
assert.True(t, storj.ErrBucketNotFound.Has(err))
|
||||
})
|
||||
}
|
||||
|
||||
func TestErrNoBucket(t *testing.T) {
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) {
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
_, err := db.CreateBucket(ctx, "", nil)
|
||||
assert.True(t, storj.ErrNoBucket.Has(err))
|
||||
|
||||
@ -163,7 +125,7 @@ func TestErrNoBucket(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestBucketCreateCipher(t *testing.T) {
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) {
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
forAllCiphers(func(cipher storj.Cipher) {
|
||||
bucket, err := db.CreateBucket(ctx, "test", &storj.Bucket{PathCipher: cipher})
|
||||
if assert.NoError(t, err) {
|
||||
@ -182,7 +144,7 @@ func TestBucketCreateCipher(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestListBucketsEmpty(t *testing.T) {
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) {
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
_, err := db.ListBuckets(ctx, storj.BucketListOptions{})
|
||||
assert.EqualError(t, err, "kvmetainfo: invalid direction 0")
|
||||
|
||||
@ -202,7 +164,7 @@ func TestListBucketsEmpty(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestListBuckets(t *testing.T) {
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) {
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
bucketNames := []string{"a", "aa", "b", "bb", "c"}
|
||||
|
||||
for _, name := range bucketNames {
|
||||
@ -308,30 +270,30 @@ func getBucketNames(bucketList storj.BucketList) []string {
|
||||
return names
|
||||
}
|
||||
|
||||
func runTest(t *testing.T, test func(context.Context, *testplanet.Planet, *kvmetainfo.DB, buckets.Store, streams.Store)) {
|
||||
func runTest(t *testing.T, test func(context.Context, *testplanet.Planet, *kvmetainfo.DB, streams.Store)) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
db, buckets, streams, err := newMetainfoParts(planet)
|
||||
db, streams, err := newMetainfoParts(planet)
|
||||
require.NoError(t, err)
|
||||
|
||||
test(ctx, planet, db, buckets, streams)
|
||||
test(ctx, planet, db, streams)
|
||||
})
|
||||
}
|
||||
|
||||
func newMetainfoParts(planet *testplanet.Planet) (*kvmetainfo.DB, buckets.Store, streams.Store, error) {
|
||||
func newMetainfoParts(planet *testplanet.Planet) (*kvmetainfo.DB, streams.Store, error) {
|
||||
// TODO(kaloyan): We should have a better way for configuring the Satellite's API Key
|
||||
// add project to satisfy constraint
|
||||
project, err := planet.Satellites[0].DB.Console().Projects().Insert(context.Background(), &console.Project{
|
||||
Name: "testProject",
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
apiKey, err := macaroon.NewAPIKey([]byte("testSecret"))
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
apiKeyInfo := console.APIKeyInfo{
|
||||
@ -343,23 +305,23 @@ func newMetainfoParts(planet *testplanet.Planet) (*kvmetainfo.DB, buckets.Store,
|
||||
// add api key to db
|
||||
_, err = planet.Satellites[0].DB.Console().APIKeys().Create(context.Background(), apiKey.Head(), apiKeyInfo)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
metainfo, err := planet.Uplinks[0].DialMetainfo(context.Background(), planet.Satellites[0], apiKey.Serialize())
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
ec := ecclient.NewClient(planet.Uplinks[0].Transport, 0)
|
||||
fc, err := infectious.NewFEC(2, 4)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
rs, err := eestream.NewRedundancyStrategy(eestream.NewRSScheme(fc, 1*memory.KiB.Int()), 0, 0)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
segments := segments.NewSegmentStore(metainfo, ec, rs, 8*memory.KiB.Int(), 8*memory.MiB.Int64())
|
||||
@ -372,12 +334,10 @@ func newMetainfoParts(planet *testplanet.Planet) (*kvmetainfo.DB, buckets.Store,
|
||||
inlineThreshold := 8 * memory.KiB.Int()
|
||||
streams, err := streams.NewStreamStore(segments, 64*memory.MiB.Int64(), key, blockSize, storj.AESGCM, inlineThreshold)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
buckets := buckets.NewStore(streams)
|
||||
|
||||
return kvmetainfo.New(metainfo, buckets, streams, segments, key, int32(blockSize), rs, 64*memory.MiB.Int64()), buckets, streams, nil
|
||||
return kvmetainfo.New(metainfo, streams, segments, key, int32(blockSize), rs, 64*memory.MiB.Int64()), streams, nil
|
||||
}
|
||||
|
||||
func forAllCiphers(test func(cipher storj.Cipher)) {
|
||||
|
@ -4,12 +4,13 @@
|
||||
package kvmetainfo
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
monkit "gopkg.in/spacemonkeygo/monkit.v2"
|
||||
|
||||
"storj.io/storj/internal/memory"
|
||||
"storj.io/storj/pkg/eestream"
|
||||
"storj.io/storj/pkg/storage/buckets"
|
||||
"storj.io/storj/pkg/storage/segments"
|
||||
"storj.io/storj/pkg/storage/streams"
|
||||
"storj.io/storj/pkg/storj"
|
||||
@ -27,7 +28,7 @@ var _ storj.Metainfo = (*DB)(nil)
|
||||
|
||||
// DB implements metainfo database
|
||||
type DB struct {
|
||||
*Project
|
||||
project *Project
|
||||
|
||||
metainfo metainfo.Client
|
||||
|
||||
@ -38,9 +39,9 @@ type DB struct {
|
||||
}
|
||||
|
||||
// New creates a new metainfo database
|
||||
func New(metainfo metainfo.Client, buckets buckets.Store, streams streams.Store, segments segments.Store, rootKey *storj.Key, encryptedBlockSize int32, redundancy eestream.RedundancyStrategy, segmentsSize int64) *DB {
|
||||
func New(metainfo metainfo.Client, streams streams.Store, segments segments.Store, rootKey *storj.Key, encryptedBlockSize int32, redundancy eestream.RedundancyStrategy, segmentsSize int64) *DB {
|
||||
return &DB{
|
||||
Project: NewProject(buckets, encryptedBlockSize, redundancy, segmentsSize),
|
||||
project: NewProject(streams, encryptedBlockSize, redundancy, segmentsSize),
|
||||
metainfo: metainfo,
|
||||
streams: streams,
|
||||
segments: segments,
|
||||
@ -56,3 +57,23 @@ func (db *DB) Limits() (storj.MetainfoLimits, error) {
|
||||
MaximumInlineSegmentSize: memory.MiB.Int64(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// CreateBucket creates a new bucket with the specified information
|
||||
func (db *DB) CreateBucket(ctx context.Context, bucketName string, info *storj.Bucket) (bucketInfo storj.Bucket, err error) {
|
||||
return db.project.CreateBucket(ctx, bucketName, info)
|
||||
}
|
||||
|
||||
// DeleteBucket deletes bucket
|
||||
func (db *DB) DeleteBucket(ctx context.Context, bucketName string) (err error) {
|
||||
return db.project.DeleteBucket(ctx, bucketName)
|
||||
}
|
||||
|
||||
// GetBucket gets bucket information
|
||||
func (db *DB) GetBucket(ctx context.Context, bucketName string) (bucketInfo storj.Bucket, err error) {
|
||||
return db.project.GetBucket(ctx, bucketName)
|
||||
}
|
||||
|
||||
// ListBuckets lists buckets
|
||||
func (db *DB) ListBuckets(ctx context.Context, options storj.BucketListOptions) (list storj.BucketList, err error) {
|
||||
return db.project.ListBuckets(ctx, options)
|
||||
}
|
||||
|
@ -139,12 +139,18 @@ func (db *DB) ModifyObject(ctx context.Context, bucket string, path storj.Path)
|
||||
func (db *DB) DeleteObject(ctx context.Context, bucket string, path storj.Path) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
store, err := db.buckets.GetObjectStore(ctx, bucket)
|
||||
bucketInfo, err := db.GetBucket(ctx, bucket)
|
||||
if err != nil {
|
||||
if storage.ErrKeyNotFound.Has(err) {
|
||||
err = storj.ErrBucketNotFound.Wrap(err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
return store.Delete(ctx, path)
|
||||
prefixed := prefixedObjStore{
|
||||
store: objects.NewStore(db.streams, bucketInfo.PathCipher),
|
||||
prefix: bucket,
|
||||
}
|
||||
return prefixed.Delete(ctx, path)
|
||||
}
|
||||
|
||||
// ModifyPendingObject creates an interface for updating a partially uploaded object
|
||||
@ -168,9 +174,9 @@ func (db *DB) ListObjects(ctx context.Context, bucket string, options storj.List
|
||||
return storj.ObjectList{}, err
|
||||
}
|
||||
|
||||
objects, err := db.buckets.GetObjectStore(ctx, bucket)
|
||||
if err != nil {
|
||||
return storj.ObjectList{}, err
|
||||
objects := prefixedObjStore{
|
||||
store: objects.NewStore(db.streams, bucketInfo.PathCipher),
|
||||
prefix: bucket,
|
||||
}
|
||||
|
||||
var startAfter, endBefore string
|
||||
|
@ -16,7 +16,6 @@ import (
|
||||
"storj.io/storj/internal/memory"
|
||||
"storj.io/storj/internal/testplanet"
|
||||
"storj.io/storj/pkg/metainfo/kvmetainfo"
|
||||
"storj.io/storj/pkg/storage/buckets"
|
||||
"storj.io/storj/pkg/storage/streams"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/pkg/stream"
|
||||
@ -40,7 +39,7 @@ func TestCreateObject(t *testing.T) {
|
||||
BlockSize: stripesPerBlock * customRS.StripeSize(),
|
||||
}
|
||||
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) {
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
bucket, err := db.CreateBucket(ctx, TestBucket, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -88,7 +87,7 @@ func TestCreateObject(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestGetObject(t *testing.T) {
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) {
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
bucket, err := db.CreateBucket(ctx, TestBucket, nil)
|
||||
require.NoError(t, err)
|
||||
upload(ctx, t, db, streams, bucket, TestFile, nil)
|
||||
@ -115,7 +114,7 @@ func TestGetObject(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestGetObjectStream(t *testing.T) {
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) {
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
data := make([]byte, 32*memory.KiB)
|
||||
_, err := rand.Read(data)
|
||||
require.NoError(t, err)
|
||||
@ -244,7 +243,7 @@ func assertRemoteSegment(t *testing.T, segment storj.Segment) {
|
||||
}
|
||||
|
||||
func TestDeleteObject(t *testing.T) {
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) {
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
bucket, err := db.CreateBucket(ctx, TestBucket, nil)
|
||||
if !assert.NoError(t, err) {
|
||||
return
|
||||
@ -270,7 +269,7 @@ func TestDeleteObject(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestListObjectsEmpty(t *testing.T) {
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) {
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
bucket, err := db.CreateBucket(ctx, TestBucket, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -296,7 +295,7 @@ func TestListObjectsEmpty(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestListObjects(t *testing.T) {
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) {
|
||||
runTest(t, func(ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
bucket, err := db.CreateBucket(ctx, TestBucket, &storj.Bucket{PathCipher: storj.Unencrypted})
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -1,7 +1,7 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package buckets
|
||||
package kvmetainfo
|
||||
|
||||
import (
|
||||
"context"
|
@ -5,21 +5,25 @@ package kvmetainfo
|
||||
|
||||
import (
|
||||
"storj.io/storj/pkg/eestream"
|
||||
"storj.io/storj/pkg/storage/buckets"
|
||||
"storj.io/storj/pkg/storage/objects"
|
||||
"storj.io/storj/pkg/storage/streams"
|
||||
"storj.io/storj/pkg/storj"
|
||||
)
|
||||
|
||||
// Project implements project management operations
|
||||
type Project struct {
|
||||
buckets buckets.Store
|
||||
buckets objects.Store
|
||||
streams streams.Store
|
||||
encryptedBlockSize int32
|
||||
redundancy eestream.RedundancyStrategy
|
||||
segmentsSize int64
|
||||
}
|
||||
|
||||
// NewProject constructs a *Project
|
||||
func NewProject(buckets buckets.Store, encryptedBlockSize int32, redundancy eestream.RedundancyStrategy, segmentsSize int64) *Project {
|
||||
func NewProject(streams streams.Store, encryptedBlockSize int32, redundancy eestream.RedundancyStrategy, segmentsSize int64) *Project {
|
||||
return &Project{
|
||||
buckets: buckets,
|
||||
buckets: objects.NewStore(streams, storj.Unencrypted),
|
||||
streams: streams,
|
||||
encryptedBlockSize: encryptedBlockSize,
|
||||
redundancy: redundancy,
|
||||
segmentsSize: segmentsSize,
|
||||
|
@ -26,7 +26,6 @@ import (
|
||||
"storj.io/storj/pkg/macaroon"
|
||||
"storj.io/storj/pkg/metainfo/kvmetainfo"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/storage/buckets"
|
||||
ecclient "storj.io/storj/pkg/storage/ec"
|
||||
"storj.io/storj/pkg/storage/segments"
|
||||
"storj.io/storj/pkg/storage/streams"
|
||||
@ -712,9 +711,7 @@ func initEnv(ctx context.Context, planet *testplanet.Planet) (minio.ObjectLayer,
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
buckets := buckets.NewStore(streams)
|
||||
|
||||
kvmetainfo := kvmetainfo.New(metainfo, buckets, streams, segments, encKey, int32(blockSize), rs, 64*memory.MiB.Int64())
|
||||
kvmetainfo := kvmetainfo.New(metainfo, streams, segments, encKey, int32(blockSize), rs, 64*memory.MiB.Int64())
|
||||
|
||||
cfg := libuplink.Config{}
|
||||
cfg.Volatile.TLS = struct {
|
||||
|
@ -1,224 +0,0 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package buckets
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
monkit "gopkg.in/spacemonkeygo/monkit.v2"
|
||||
|
||||
"storj.io/storj/pkg/encryption"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/storage/meta"
|
||||
"storj.io/storj/pkg/storage/objects"
|
||||
"storj.io/storj/pkg/storage/streams"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/storage"
|
||||
)
|
||||
|
||||
var mon = monkit.Package()
|
||||
|
||||
// Store creates an interface for interacting with buckets
|
||||
type Store interface {
|
||||
Get(ctx context.Context, bucket string) (meta Meta, err error)
|
||||
Put(ctx context.Context, bucket string, inMeta Meta) (meta Meta, err error)
|
||||
Delete(ctx context.Context, bucket string) (err error)
|
||||
List(ctx context.Context, startAfter, endBefore string, limit int) (items []ListItem, more bool, err error)
|
||||
GetObjectStore(ctx context.Context, bucketName string) (store objects.Store, err error)
|
||||
}
|
||||
|
||||
// ListItem is a single item in a listing
|
||||
type ListItem struct {
|
||||
Bucket string
|
||||
Meta Meta
|
||||
}
|
||||
|
||||
// BucketStore contains objects store
|
||||
type BucketStore struct {
|
||||
store objects.Store
|
||||
stream streams.Store
|
||||
}
|
||||
|
||||
// Meta is the bucket metadata struct
|
||||
type Meta struct {
|
||||
Created time.Time
|
||||
PathEncryptionType storj.Cipher
|
||||
SegmentsSize int64
|
||||
RedundancyScheme storj.RedundancyScheme
|
||||
EncryptionScheme storj.EncryptionScheme
|
||||
}
|
||||
|
||||
// NewStore instantiates BucketStore
|
||||
func NewStore(stream streams.Store) Store {
|
||||
// root object store for storing the buckets with unencrypted names
|
||||
store := objects.NewStore(stream, storj.Unencrypted)
|
||||
return &BucketStore{store: store, stream: stream}
|
||||
}
|
||||
|
||||
// GetObjectStore returns an implementation of objects.Store
|
||||
func (b *BucketStore) GetObjectStore(ctx context.Context, bucket string) (_ objects.Store, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
if bucket == "" {
|
||||
return nil, storj.ErrNoBucket.New("")
|
||||
}
|
||||
|
||||
m, err := b.Get(ctx, bucket)
|
||||
if err != nil {
|
||||
if storage.ErrKeyNotFound.Has(err) {
|
||||
err = storj.ErrBucketNotFound.Wrap(err)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
prefixed := prefixedObjStore{
|
||||
store: objects.NewStore(b.stream, m.PathEncryptionType),
|
||||
prefix: bucket,
|
||||
}
|
||||
return &prefixed, nil
|
||||
}
|
||||
|
||||
// Get calls objects store Get
|
||||
func (b *BucketStore) Get(ctx context.Context, bucket string) (meta Meta, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if bucket == "" {
|
||||
return Meta{}, storj.ErrNoBucket.New("")
|
||||
}
|
||||
|
||||
objMeta, err := b.store.Meta(ctx, bucket)
|
||||
if err != nil {
|
||||
if storage.ErrKeyNotFound.Has(err) {
|
||||
err = storj.ErrBucketNotFound.Wrap(err)
|
||||
}
|
||||
return Meta{}, err
|
||||
}
|
||||
|
||||
return convertMeta(ctx, objMeta)
|
||||
}
|
||||
|
||||
// Put calls objects store Put and fills in some specific metadata to be used
|
||||
// in the bucket's object Pointer. Note that the Meta.Created field is ignored.
|
||||
func (b *BucketStore) Put(ctx context.Context, bucketName string, inMeta Meta) (meta Meta, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if bucketName == "" {
|
||||
return Meta{}, storj.ErrNoBucket.New("")
|
||||
}
|
||||
|
||||
pathCipher := inMeta.PathEncryptionType
|
||||
if pathCipher < storj.Unencrypted || pathCipher > storj.SecretBox {
|
||||
return Meta{}, encryption.ErrInvalidConfig.New("encryption type %d is not supported", pathCipher)
|
||||
}
|
||||
|
||||
r := bytes.NewReader(nil)
|
||||
userMeta := map[string]string{
|
||||
"path-enc-type": strconv.Itoa(int(pathCipher)),
|
||||
"default-seg-size": strconv.FormatInt(inMeta.SegmentsSize, 10),
|
||||
"default-enc-type": strconv.Itoa(int(inMeta.EncryptionScheme.Cipher)),
|
||||
"default-enc-blksz": strconv.FormatInt(int64(inMeta.EncryptionScheme.BlockSize), 10),
|
||||
"default-rs-algo": strconv.Itoa(int(inMeta.RedundancyScheme.Algorithm)),
|
||||
"default-rs-sharsz": strconv.FormatInt(int64(inMeta.RedundancyScheme.ShareSize), 10),
|
||||
"default-rs-reqd": strconv.Itoa(int(inMeta.RedundancyScheme.RequiredShares)),
|
||||
"default-rs-repair": strconv.Itoa(int(inMeta.RedundancyScheme.RepairShares)),
|
||||
"default-rs-optim": strconv.Itoa(int(inMeta.RedundancyScheme.OptimalShares)),
|
||||
"default-rs-total": strconv.Itoa(int(inMeta.RedundancyScheme.TotalShares)),
|
||||
}
|
||||
var exp time.Time
|
||||
m, err := b.store.Put(ctx, bucketName, r, pb.SerializableMeta{UserDefined: userMeta}, exp)
|
||||
if err != nil {
|
||||
return Meta{}, err
|
||||
}
|
||||
// we could use convertMeta() here, but that's a lot of int-parsing
|
||||
// just to get back to what should be the same contents we already
|
||||
// have. the only change ought to be the modified time.
|
||||
inMeta.Created = m.Modified
|
||||
return inMeta, nil
|
||||
}
|
||||
|
||||
// Delete calls objects store Delete
|
||||
func (b *BucketStore) Delete(ctx context.Context, bucket string) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if bucket == "" {
|
||||
return storj.ErrNoBucket.New("")
|
||||
}
|
||||
|
||||
err = b.store.Delete(ctx, bucket)
|
||||
|
||||
if storage.ErrKeyNotFound.Has(err) {
|
||||
err = storj.ErrBucketNotFound.Wrap(err)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// List calls objects store List
|
||||
func (b *BucketStore) List(ctx context.Context, startAfter, endBefore string, limit int) (items []ListItem, more bool, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
objItems, more, err := b.store.List(ctx, "", startAfter, endBefore, false, limit, meta.Modified)
|
||||
if err != nil {
|
||||
return items, more, err
|
||||
}
|
||||
|
||||
items = make([]ListItem, 0, len(objItems))
|
||||
for _, itm := range objItems {
|
||||
if itm.IsPrefix {
|
||||
continue
|
||||
}
|
||||
m, err := convertMeta(ctx, itm.Meta)
|
||||
if err != nil {
|
||||
return items, more, err
|
||||
}
|
||||
items = append(items, ListItem{
|
||||
Bucket: itm.Path,
|
||||
Meta: m,
|
||||
})
|
||||
}
|
||||
return items, more, nil
|
||||
}
|
||||
|
||||
// convertMeta converts stream metadata to object metadata
|
||||
func convertMeta(ctx context.Context, m objects.Meta) (out Meta, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
out.Created = m.Modified
|
||||
// backwards compatibility for old buckets
|
||||
out.PathEncryptionType = storj.AESGCM
|
||||
out.EncryptionScheme.Cipher = storj.Invalid
|
||||
|
||||
applySetting := func(nameInMap string, bits int, storeFunc func(val int64)) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if stringVal := m.UserDefined[nameInMap]; stringVal != "" {
|
||||
var intVal int64
|
||||
intVal, err = strconv.ParseInt(stringVal, 10, bits)
|
||||
if err != nil {
|
||||
err = errs.New("invalid metadata field for %s: %v", nameInMap, err)
|
||||
return
|
||||
}
|
||||
storeFunc(intVal)
|
||||
}
|
||||
}
|
||||
|
||||
es := &out.EncryptionScheme
|
||||
rs := &out.RedundancyScheme
|
||||
|
||||
applySetting("path-enc-type", 16, func(v int64) { out.PathEncryptionType = storj.Cipher(v) })
|
||||
applySetting("default-seg-size", 64, func(v int64) { out.SegmentsSize = v })
|
||||
applySetting("default-enc-type", 32, func(v int64) { es.Cipher = storj.Cipher(v) })
|
||||
applySetting("default-enc-blksz", 32, func(v int64) { es.BlockSize = int32(v) })
|
||||
applySetting("default-rs-algo", 32, func(v int64) { rs.Algorithm = storj.RedundancyAlgorithm(v) })
|
||||
applySetting("default-rs-sharsz", 32, func(v int64) { rs.ShareSize = int32(v) })
|
||||
applySetting("default-rs-reqd", 16, func(v int64) { rs.RequiredShares = int16(v) })
|
||||
applySetting("default-rs-repair", 16, func(v int64) { rs.RepairShares = int16(v) })
|
||||
applySetting("default-rs-optim", 16, func(v int64) { rs.OptimalShares = int16(v) })
|
||||
applySetting("default-rs-total", 16, func(v int64) { rs.TotalShares = int16(v) })
|
||||
|
||||
return out, err
|
||||
}
|
@ -19,7 +19,6 @@ import (
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/metainfo/kvmetainfo"
|
||||
"storj.io/storj/pkg/peertls/tlsopts"
|
||||
"storj.io/storj/pkg/storage/buckets"
|
||||
ecclient "storj.io/storj/pkg/storage/ec"
|
||||
"storj.io/storj/pkg/storage/segments"
|
||||
"storj.io/storj/pkg/storage/streams"
|
||||
@ -133,9 +132,7 @@ func (c Config) GetMetainfo(ctx context.Context, identity *identity.FullIdentity
|
||||
return nil, nil, Error.New("failed to create stream store: %v", err)
|
||||
}
|
||||
|
||||
buckets := buckets.NewStore(streams)
|
||||
|
||||
return kvmetainfo.New(metainfo, buckets, streams, segments, key, blockSize, rs, c.Client.SegmentSize.Int64()), streams, nil
|
||||
return kvmetainfo.New(metainfo, streams, segments, key, blockSize, rs, c.Client.SegmentSize.Int64()), streams, nil
|
||||
}
|
||||
|
||||
// GetRedundancyScheme returns the configured redundancy scheme for new uploads
|
||||
|
Loading…
Reference in New Issue
Block a user