private/testuplink: move tests to uplink
Tests will be deleted from storj repo and added to uplink. Change-Id: I298d852325c8eb0df07df38fd7e1345623addd8d
This commit is contained in:
parent
ee9bb0d689
commit
21518bcc92
@ -1,340 +0,0 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package testuplink_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/vivint/infectious"
|
||||
|
||||
"storj.io/common/encryption"
|
||||
"storj.io/common/macaroon"
|
||||
"storj.io/common/memory"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/satellite/console"
|
||||
"storj.io/uplink/private/ecclient"
|
||||
"storj.io/uplink/private/eestream"
|
||||
"storj.io/uplink/private/metainfo/kvmetainfo"
|
||||
"storj.io/uplink/private/storage/segments"
|
||||
"storj.io/uplink/private/storage/streams"
|
||||
)
|
||||
|
||||
const (
|
||||
TestEncKey = "test-encryption-key"
|
||||
TestBucket = "test-bucket"
|
||||
)
|
||||
|
||||
var (
|
||||
defaultRS = storj.RedundancyScheme{
|
||||
RequiredShares: 2,
|
||||
RepairShares: 2,
|
||||
OptimalShares: 3,
|
||||
TotalShares: 4,
|
||||
ShareSize: 256 * memory.B.Int32(),
|
||||
}
|
||||
|
||||
defaultEP = storj.EncryptionParameters{
|
||||
CipherSuite: storj.EncAESGCM,
|
||||
BlockSize: defaultRS.StripeSize(),
|
||||
}
|
||||
|
||||
defaultBucket = storj.Bucket{
|
||||
PathCipher: storj.EncAESGCM,
|
||||
DefaultRedundancyScheme: defaultRS,
|
||||
DefaultEncryptionParameters: defaultEP,
|
||||
}
|
||||
)
|
||||
|
||||
func TestBucketsBasic(t *testing.T) {
|
||||
runTest(t, func(t *testing.T, ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
// Create new bucket
|
||||
bucket, err := db.CreateBucket(ctx, TestBucket, &defaultBucket)
|
||||
if assert.NoError(t, err) {
|
||||
assert.Equal(t, TestBucket, bucket.Name)
|
||||
}
|
||||
|
||||
// Check that bucket list include the new bucket
|
||||
bucketList, err := db.ListBuckets(ctx, storj.BucketListOptions{Direction: storj.After})
|
||||
if assert.NoError(t, err) {
|
||||
assert.False(t, bucketList.More)
|
||||
assert.Equal(t, 1, len(bucketList.Items))
|
||||
assert.Equal(t, TestBucket, bucketList.Items[0].Name)
|
||||
}
|
||||
|
||||
// Check that we can get the new bucket explicitly
|
||||
bucket, err = db.GetBucket(ctx, TestBucket)
|
||||
if assert.NoError(t, err) {
|
||||
assert.Equal(t, TestBucket, bucket.Name)
|
||||
assert.Equal(t, storj.EncAESGCM, bucket.PathCipher)
|
||||
}
|
||||
|
||||
// Delete the bucket
|
||||
bucket, err = db.DeleteBucket(ctx, TestBucket)
|
||||
if assert.NoError(t, err) {
|
||||
assert.Equal(t, TestBucket, bucket.Name)
|
||||
assert.Equal(t, storj.EncAESGCM, bucket.PathCipher)
|
||||
}
|
||||
|
||||
// Check that the bucket list is empty
|
||||
bucketList, err = db.ListBuckets(ctx, storj.BucketListOptions{Direction: storj.After})
|
||||
if assert.NoError(t, err) {
|
||||
assert.False(t, bucketList.More)
|
||||
assert.Equal(t, 0, len(bucketList.Items))
|
||||
}
|
||||
|
||||
// Check that the bucket cannot be get explicitly
|
||||
bucket, err = db.GetBucket(ctx, TestBucket)
|
||||
assert.True(t, storj.ErrBucketNotFound.Has(err))
|
||||
})
|
||||
}
|
||||
|
||||
func TestBucketsReadWrite(t *testing.T) {
|
||||
runTest(t, func(t *testing.T, ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
// Create new bucket
|
||||
bucket, err := db.CreateBucket(ctx, TestBucket, &defaultBucket)
|
||||
if assert.NoError(t, err) {
|
||||
assert.Equal(t, TestBucket, bucket.Name)
|
||||
}
|
||||
|
||||
// Check that bucket list include the new bucket
|
||||
bucketList, err := db.ListBuckets(ctx, storj.BucketListOptions{Direction: storj.After})
|
||||
if assert.NoError(t, err) {
|
||||
assert.False(t, bucketList.More)
|
||||
assert.Equal(t, 1, len(bucketList.Items))
|
||||
assert.Equal(t, TestBucket, bucketList.Items[0].Name)
|
||||
}
|
||||
|
||||
// Check that we can get the new bucket explicitly
|
||||
bucket, err = db.GetBucket(ctx, TestBucket)
|
||||
if assert.NoError(t, err) {
|
||||
assert.Equal(t, TestBucket, bucket.Name)
|
||||
assert.Equal(t, storj.EncAESGCM, bucket.PathCipher)
|
||||
}
|
||||
|
||||
// Delete the bucket
|
||||
bucket, err = db.DeleteBucket(ctx, TestBucket)
|
||||
if assert.NoError(t, err) {
|
||||
assert.Equal(t, TestBucket, bucket.Name)
|
||||
assert.Equal(t, storj.EncAESGCM, bucket.PathCipher)
|
||||
}
|
||||
|
||||
// Check that the bucket list is empty
|
||||
bucketList, err = db.ListBuckets(ctx, storj.BucketListOptions{Direction: storj.After})
|
||||
if assert.NoError(t, err) {
|
||||
assert.False(t, bucketList.More)
|
||||
assert.Equal(t, 0, len(bucketList.Items))
|
||||
}
|
||||
|
||||
// Check that the bucket cannot be get explicitly
|
||||
bucket, err = db.GetBucket(ctx, TestBucket)
|
||||
assert.True(t, storj.ErrBucketNotFound.Has(err))
|
||||
})
|
||||
}
|
||||
|
||||
func TestErrNoBucket(t *testing.T) {
|
||||
runTest(t, func(t *testing.T, ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
_, err := db.CreateBucket(ctx, "", nil)
|
||||
assert.True(t, storj.ErrNoBucket.Has(err))
|
||||
|
||||
_, err = db.GetBucket(ctx, "")
|
||||
assert.True(t, storj.ErrNoBucket.Has(err))
|
||||
|
||||
_, err = db.DeleteBucket(ctx, "")
|
||||
assert.True(t, storj.ErrNoBucket.Has(err))
|
||||
})
|
||||
}
|
||||
|
||||
func TestBucketCreateCipher(t *testing.T) {
|
||||
runTest(t, func(t *testing.T, ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
forAllCiphers(func(cipher storj.CipherSuite) {
|
||||
bucket, err := db.CreateBucket(ctx, "test", &storj.Bucket{
|
||||
PathCipher: cipher,
|
||||
DefaultRedundancyScheme: defaultRS,
|
||||
DefaultEncryptionParameters: defaultEP,
|
||||
})
|
||||
if assert.NoError(t, err) {
|
||||
assert.Equal(t, cipher, bucket.PathCipher)
|
||||
}
|
||||
|
||||
bucket, err = db.GetBucket(ctx, "test")
|
||||
if assert.NoError(t, err) {
|
||||
assert.Equal(t, cipher, bucket.PathCipher)
|
||||
}
|
||||
|
||||
bucket, err = db.DeleteBucket(ctx, "test")
|
||||
if assert.NoError(t, err) {
|
||||
assert.Equal(t, cipher, bucket.PathCipher)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestListBucketsEmpty(t *testing.T) {
|
||||
runTest(t, func(t *testing.T, ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
bucketList, err := db.ListBuckets(ctx, storj.BucketListOptions{Direction: storj.Forward})
|
||||
if assert.NoError(t, err) {
|
||||
assert.False(t, bucketList.More)
|
||||
assert.Equal(t, 0, len(bucketList.Items))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestListBuckets(t *testing.T) {
|
||||
runTest(t, func(t *testing.T, ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
bucketNames := []string{"a00", "aa0", "b00", "bb0", "c00"}
|
||||
|
||||
for _, name := range bucketNames {
|
||||
_, err := db.CreateBucket(ctx, name, &defaultBucket)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
for i, tt := range []struct {
|
||||
cursor string
|
||||
dir storj.ListDirection
|
||||
limit int
|
||||
more bool
|
||||
result []string
|
||||
}{
|
||||
{cursor: "", dir: storj.Forward, limit: 0, more: false, result: []string{"a00", "aa0", "b00", "bb0", "c00"}},
|
||||
{cursor: "`", dir: storj.Forward, limit: 0, more: false, result: []string{"a00", "aa0", "b00", "bb0", "c00"}},
|
||||
{cursor: "b00", dir: storj.Forward, limit: 0, more: false, result: []string{"b00", "bb0", "c00"}},
|
||||
{cursor: "c00", dir: storj.Forward, limit: 0, more: false, result: []string{"c00"}},
|
||||
{cursor: "ca", dir: storj.Forward, limit: 0, more: false, result: []string{}},
|
||||
{cursor: "", dir: storj.Forward, limit: 1, more: true, result: []string{"a00"}},
|
||||
{cursor: "`", dir: storj.Forward, limit: 1, more: true, result: []string{"a00"}},
|
||||
{cursor: "aa0", dir: storj.Forward, limit: 1, more: true, result: []string{"aa0"}},
|
||||
{cursor: "c00", dir: storj.Forward, limit: 1, more: false, result: []string{"c00"}},
|
||||
{cursor: "ca", dir: storj.Forward, limit: 1, more: false, result: []string{}},
|
||||
{cursor: "", dir: storj.Forward, limit: 2, more: true, result: []string{"a00", "aa0"}},
|
||||
{cursor: "`", dir: storj.Forward, limit: 2, more: true, result: []string{"a00", "aa0"}},
|
||||
{cursor: "aa0", dir: storj.Forward, limit: 2, more: true, result: []string{"aa0", "b00"}},
|
||||
{cursor: "bb0", dir: storj.Forward, limit: 2, more: false, result: []string{"bb0", "c00"}},
|
||||
{cursor: "c00", dir: storj.Forward, limit: 2, more: false, result: []string{"c00"}},
|
||||
{cursor: "ca", dir: storj.Forward, limit: 2, more: false, result: []string{}},
|
||||
} {
|
||||
errTag := fmt.Sprintf("%d. %+v", i, tt)
|
||||
|
||||
bucketList, err := db.ListBuckets(ctx, storj.BucketListOptions{
|
||||
Cursor: tt.cursor,
|
||||
Direction: tt.dir,
|
||||
Limit: tt.limit,
|
||||
})
|
||||
|
||||
if assert.NoError(t, err, errTag) {
|
||||
assert.Equal(t, tt.more, bucketList.More, errTag)
|
||||
assert.Equal(t, tt.result, getBucketNames(bucketList), errTag)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func getBucketNames(bucketList storj.BucketList) []string {
|
||||
names := make([]string, len(bucketList.Items))
|
||||
|
||||
for i, item := range bucketList.Items {
|
||||
names[i] = item.Name
|
||||
}
|
||||
|
||||
return names
|
||||
}
|
||||
|
||||
func runTest(t *testing.T, test func(*testing.T, context.Context, *testplanet.Planet, *kvmetainfo.DB, streams.Store)) {
|
||||
runTestWithPathCipher(t, storj.EncAESGCM, test)
|
||||
}
|
||||
|
||||
func runTestWithPathCipher(t *testing.T, pathCipher storj.CipherSuite, test func(*testing.T, context.Context, *testplanet.Planet, *kvmetainfo.DB, streams.Store)) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
encAccess := newTestEncStore(TestEncKey)
|
||||
encAccess.SetDefaultPathCipher(pathCipher)
|
||||
db, streams, err := newMetainfoParts(planet, encAccess)
|
||||
require.NoError(t, err)
|
||||
|
||||
test(t, ctx, planet, db, streams)
|
||||
})
|
||||
}
|
||||
|
||||
func newTestEncStore(keyStr string) *encryption.Store {
|
||||
key := new(storj.Key)
|
||||
copy(key[:], keyStr)
|
||||
|
||||
store := encryption.NewStore()
|
||||
store.SetDefaultKey(key)
|
||||
|
||||
return store
|
||||
}
|
||||
|
||||
func newMetainfoParts(planet *testplanet.Planet, encStore *encryption.Store) (*kvmetainfo.DB, streams.Store, error) {
|
||||
// TODO(kaloyan): We should have a better way for configuring the Satellite's API Key
|
||||
// add project to satisfy constraint
|
||||
project, err := planet.Satellites[0].DB.Console().Projects().Insert(context.Background(), &console.Project{
|
||||
Name: "testProject",
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
apiKey, err := macaroon.NewAPIKey([]byte("testSecret"))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
apiKeyInfo := console.APIKeyInfo{
|
||||
ProjectID: project.ID,
|
||||
Name: "testKey",
|
||||
Secret: []byte("testSecret"),
|
||||
}
|
||||
|
||||
// add api key to db
|
||||
_, err = planet.Satellites[0].DB.Console().APIKeys().Create(context.Background(), apiKey.Head(), apiKeyInfo)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
metainfo, err := planet.Uplinks[0].DialMetainfo(context.Background(), planet.Satellites[0], apiKey)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
// TODO(leak): call metainfo.Close somehow
|
||||
|
||||
ec := ecclient.NewClient(planet.Uplinks[0].Log.Named("ecclient"), planet.Uplinks[0].Dialer, 0)
|
||||
fc, err := infectious.NewFEC(2, 4)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
rs, err := eestream.NewRedundancyStrategy(eestream.NewRSScheme(fc, 1*memory.KiB.Int()), 0, 0)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
segments := segments.NewSegmentStore(metainfo, ec)
|
||||
|
||||
const stripesPerBlock = 2
|
||||
blockSize := stripesPerBlock * rs.StripeSize()
|
||||
inlineThreshold := 8 * memory.KiB.Int()
|
||||
streams, err := streams.NewStreamStore(metainfo, segments, 64*memory.MiB.Int64(), encStore, blockSize, storj.EncAESGCM, inlineThreshold, 8*memory.MiB.Int64())
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
proj := kvmetainfo.NewProject(streams, int32(blockSize), 64*memory.MiB.Int64(), *metainfo)
|
||||
return kvmetainfo.New(proj, metainfo, streams, segments, encStore), streams, nil
|
||||
}
|
||||
|
||||
func forAllCiphers(test func(cipher storj.CipherSuite)) {
|
||||
for _, cipher := range []storj.CipherSuite{
|
||||
storj.EncNull,
|
||||
storj.EncAESGCM,
|
||||
storj.EncSecretBox,
|
||||
} {
|
||||
test(cipher)
|
||||
}
|
||||
}
|
@ -1,546 +0,0 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package testuplink_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"io"
|
||||
"sort"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"storj.io/common/encryption"
|
||||
"storj.io/common/memory"
|
||||
"storj.io/common/paths"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/testrand"
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/uplink/private/metainfo/kvmetainfo"
|
||||
"storj.io/uplink/private/storage/streams"
|
||||
"storj.io/uplink/private/stream"
|
||||
)
|
||||
|
||||
const TestFile = "test-file"
|
||||
|
||||
func TestCreateObject(t *testing.T) {
|
||||
customRS := storj.RedundancyScheme{
|
||||
Algorithm: storj.ReedSolomon,
|
||||
RequiredShares: 29,
|
||||
RepairShares: 35,
|
||||
OptimalShares: 80,
|
||||
TotalShares: 95,
|
||||
ShareSize: 2 * memory.KiB.Int32(),
|
||||
}
|
||||
|
||||
const stripesPerBlock = 2
|
||||
customEP := storj.EncryptionParameters{
|
||||
CipherSuite: storj.EncNull,
|
||||
BlockSize: stripesPerBlock * customRS.StripeSize(),
|
||||
}
|
||||
|
||||
runTest(t, func(t *testing.T, ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
bucket, err := db.CreateBucket(ctx, TestBucket, &defaultBucket)
|
||||
require.NoError(t, err)
|
||||
|
||||
for i, tt := range []struct {
|
||||
create *kvmetainfo.CreateObject
|
||||
expectedRS storj.RedundancyScheme
|
||||
expectedEP storj.EncryptionParameters
|
||||
}{
|
||||
{
|
||||
create: nil,
|
||||
expectedRS: kvmetainfo.DefaultRS,
|
||||
expectedEP: kvmetainfo.DefaultES,
|
||||
},
|
||||
{
|
||||
create: &kvmetainfo.CreateObject{RedundancyScheme: customRS, EncryptionParameters: customEP},
|
||||
expectedRS: customRS,
|
||||
expectedEP: customEP,
|
||||
},
|
||||
{
|
||||
create: &kvmetainfo.CreateObject{RedundancyScheme: customRS},
|
||||
expectedRS: customRS,
|
||||
expectedEP: storj.EncryptionParameters{CipherSuite: kvmetainfo.DefaultES.CipherSuite, BlockSize: kvmetainfo.DefaultES.BlockSize},
|
||||
},
|
||||
{
|
||||
create: &kvmetainfo.CreateObject{EncryptionParameters: customEP},
|
||||
expectedRS: kvmetainfo.DefaultRS,
|
||||
expectedEP: storj.EncryptionParameters{CipherSuite: customEP.CipherSuite, BlockSize: kvmetainfo.DefaultES.BlockSize},
|
||||
},
|
||||
} {
|
||||
errTag := fmt.Sprintf("%d. %+v", i, tt)
|
||||
|
||||
obj, err := db.CreateObject(ctx, bucket, TestFile, tt.create)
|
||||
require.NoError(t, err)
|
||||
|
||||
info := obj.Info()
|
||||
|
||||
assert.Equal(t, TestBucket, info.Bucket.Name, errTag)
|
||||
assert.Equal(t, storj.EncAESGCM, info.Bucket.PathCipher, errTag)
|
||||
assert.Equal(t, TestFile, info.Path, errTag)
|
||||
assert.EqualValues(t, 0, info.Size, errTag)
|
||||
assert.Equal(t, tt.expectedRS, info.RedundancyScheme, errTag)
|
||||
assert.Equal(t, tt.expectedEP, info.EncryptionParameters, errTag)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestGetObject(t *testing.T) {
|
||||
runTest(t, func(t *testing.T, ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
bucket, err := db.CreateBucket(ctx, TestBucket, &defaultBucket)
|
||||
require.NoError(t, err)
|
||||
upload(ctx, t, db, streams, bucket, TestFile, nil)
|
||||
|
||||
_, err = db.GetObject(ctx, storj.Bucket{}, "")
|
||||
assert.True(t, storj.ErrNoBucket.Has(err))
|
||||
|
||||
_, err = db.GetObject(ctx, bucket, "")
|
||||
assert.True(t, storj.ErrNoPath.Has(err))
|
||||
|
||||
nonExistingBucket := storj.Bucket{
|
||||
Name: "non-existing-bucket",
|
||||
PathCipher: storj.EncNull,
|
||||
}
|
||||
_, err = db.GetObject(ctx, nonExistingBucket, TestFile)
|
||||
assert.True(t, storj.ErrObjectNotFound.Has(err))
|
||||
|
||||
_, err = db.GetObject(ctx, bucket, "non-existing-file")
|
||||
assert.True(t, storj.ErrObjectNotFound.Has(err))
|
||||
|
||||
object, err := db.GetObject(ctx, bucket, TestFile)
|
||||
if assert.NoError(t, err) {
|
||||
assert.Equal(t, TestFile, object.Path)
|
||||
assert.Equal(t, TestBucket, object.Bucket.Name)
|
||||
assert.Equal(t, storj.EncAESGCM, object.Bucket.PathCipher)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestGetObjectStream(t *testing.T) {
|
||||
runTest(t, func(t *testing.T, ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
data := testrand.Bytes(32 * memory.KiB)
|
||||
|
||||
bucket, err := db.CreateBucket(ctx, TestBucket, &defaultBucket)
|
||||
require.NoError(t, err)
|
||||
|
||||
emptyFile := upload(ctx, t, db, streams, bucket, "empty-file", nil)
|
||||
smallFile := upload(ctx, t, db, streams, bucket, "small-file", []byte("test"))
|
||||
largeFile := upload(ctx, t, db, streams, bucket, "large-file", data)
|
||||
|
||||
emptyBucket := storj.Bucket{
|
||||
PathCipher: storj.EncNull,
|
||||
}
|
||||
_, err = db.GetObjectStream(ctx, emptyBucket, storj.Object{})
|
||||
assert.True(t, storj.ErrNoBucket.Has(err))
|
||||
|
||||
_, err = db.GetObjectStream(ctx, bucket, storj.Object{})
|
||||
assert.True(t, storj.ErrNoPath.Has(err))
|
||||
|
||||
nonExistingBucket := storj.Bucket{
|
||||
Name: "non-existing-bucket",
|
||||
PathCipher: storj.EncNull,
|
||||
}
|
||||
|
||||
// no error because we are not doing satellite connection with this method
|
||||
_, err = db.GetObjectStream(ctx, nonExistingBucket, smallFile)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// no error because we are not doing satellite connection with this method
|
||||
_, err = db.GetObjectStream(ctx, bucket, storj.Object{
|
||||
Path: "non-existing-file",
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
|
||||
assertStream(ctx, t, db, streams, bucket, emptyFile, []byte{})
|
||||
assertStream(ctx, t, db, streams, bucket, smallFile, []byte("test"))
|
||||
assertStream(ctx, t, db, streams, bucket, largeFile, data)
|
||||
|
||||
/* TODO: Disable stopping due to flakiness.
|
||||
// Stop randomly half of the storage nodes and remove them from satellite's overlay
|
||||
perm := mathrand.Perm(len(planet.StorageNodes))
|
||||
for _, i := range perm[:(len(perm) / 2)] {
|
||||
assert.NoError(t, planet.StopPeer(planet.StorageNodes[i]))
|
||||
_, err := planet.Satellites[0].Overlay.Service.UpdateUptime(ctx, planet.StorageNodes[i].ID(), false)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
// try downloading the large file again
|
||||
assertStream(ctx, t, db, streams, bucket, "large-file", 32*memory.KiB.Int64(), data)
|
||||
*/
|
||||
})
|
||||
}
|
||||
|
||||
func upload(ctx context.Context, t *testing.T, db *kvmetainfo.DB, streams streams.Store, bucket storj.Bucket, path storj.Path, data []byte) storj.Object {
|
||||
obj, err := db.CreateObject(ctx, bucket, path, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
str, err := obj.CreateStream(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
upload := stream.NewUpload(ctx, str, streams)
|
||||
|
||||
_, err = upload.Write(data)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = upload.Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
err = obj.Commit(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
return obj.Info()
|
||||
}
|
||||
|
||||
func assertStream(ctx context.Context, t *testing.T, db *kvmetainfo.DB, streams streams.Store, bucket storj.Bucket, object storj.Object, content []byte) {
|
||||
readOnly, err := db.GetObjectStream(ctx, bucket, object)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, object.Path, readOnly.Info().Path)
|
||||
assert.Equal(t, TestBucket, readOnly.Info().Bucket.Name)
|
||||
assert.Equal(t, storj.EncAESGCM, readOnly.Info().Bucket.PathCipher)
|
||||
|
||||
segments, more, err := readOnly.Segments(ctx, 0, 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.False(t, more)
|
||||
if !assert.Equal(t, 1, len(segments)) {
|
||||
return
|
||||
}
|
||||
|
||||
assert.EqualValues(t, 0, segments[0].Index)
|
||||
assert.EqualValues(t, len(content), segments[0].Size)
|
||||
if segments[0].Size > 4*memory.KiB.Int64() {
|
||||
assertRemoteSegment(t, segments[0])
|
||||
} else {
|
||||
assertInlineSegment(t, segments[0], content)
|
||||
}
|
||||
|
||||
download := stream.NewDownload(ctx, readOnly, streams)
|
||||
defer func() {
|
||||
err = download.Close()
|
||||
assert.NoError(t, err)
|
||||
}()
|
||||
|
||||
data := make([]byte, len(content))
|
||||
n, err := io.ReadFull(download, data)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, len(content), n)
|
||||
assert.Equal(t, content, data)
|
||||
}
|
||||
|
||||
func assertInlineSegment(t *testing.T, segment storj.Segment, content []byte) {
|
||||
assert.Equal(t, content, segment.Inline)
|
||||
assert.True(t, segment.PieceID.IsZero())
|
||||
assert.Equal(t, 0, len(segment.Pieces))
|
||||
}
|
||||
|
||||
func assertRemoteSegment(t *testing.T, segment storj.Segment) {
|
||||
assert.Nil(t, segment.Inline)
|
||||
assert.NotNil(t, segment.PieceID)
|
||||
|
||||
// check that piece numbers and nodes are unique
|
||||
nums := make(map[byte]struct{})
|
||||
nodes := make(map[string]struct{})
|
||||
for _, piece := range segment.Pieces {
|
||||
if _, ok := nums[piece.Number]; ok {
|
||||
t.Fatalf("piece number %d is not unique", piece.Number)
|
||||
}
|
||||
nums[piece.Number] = struct{}{}
|
||||
|
||||
id := piece.Location.String()
|
||||
if _, ok := nodes[id]; ok {
|
||||
t.Fatalf("node id %s is not unique", id)
|
||||
}
|
||||
nodes[id] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeleteObject(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
encStore := newTestEncStore(TestEncKey)
|
||||
encStore.SetDefaultPathCipher(storj.EncAESGCM)
|
||||
db, streams, err := newMetainfoParts(planet, encStore)
|
||||
require.NoError(t, err)
|
||||
|
||||
bucket, err := db.CreateBucket(ctx, TestBucket, &defaultBucket)
|
||||
if !assert.NoError(t, err) {
|
||||
return
|
||||
}
|
||||
|
||||
unencryptedPath := paths.NewUnencrypted(TestFile)
|
||||
encryptedPath, err := encryption.EncryptPathWithStoreCipher(bucket.Name, unencryptedPath, encStore)
|
||||
require.NoError(t, err)
|
||||
|
||||
for i, path := range []string{unencryptedPath.String(), encryptedPath.String()} {
|
||||
upload(ctx, t, db, streams, bucket, path, nil)
|
||||
|
||||
if i < 0 {
|
||||
// Enable encryption bypass
|
||||
encStore.EncryptionBypass = true
|
||||
}
|
||||
|
||||
_, err = db.DeleteObject(ctx, storj.Bucket{}, "")
|
||||
assert.True(t, storj.ErrNoBucket.Has(err))
|
||||
|
||||
_, err = db.DeleteObject(ctx, bucket, "")
|
||||
assert.True(t, storj.ErrNoPath.Has(err))
|
||||
|
||||
{
|
||||
unexistingBucket := storj.Bucket{
|
||||
Name: bucket.Name + "-not-exist",
|
||||
PathCipher: bucket.PathCipher,
|
||||
}
|
||||
_, err = db.DeleteObject(ctx, unexistingBucket, TestFile)
|
||||
assert.True(t, storj.ErrObjectNotFound.Has(err))
|
||||
}
|
||||
|
||||
_, err = db.DeleteObject(ctx, bucket, "non-existing-file")
|
||||
assert.True(t, storj.ErrObjectNotFound.Has(err))
|
||||
|
||||
object, err := db.DeleteObject(ctx, bucket, path)
|
||||
if assert.NoError(t, err) {
|
||||
assert.Equal(t, path, object.Path)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestListObjectsEmpty(t *testing.T) {
|
||||
runTest(t, func(t *testing.T, ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
testBucketInfo, err := db.CreateBucket(ctx, TestBucket, &defaultBucket)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = db.ListObjects(ctx, storj.Bucket{}, storj.ListOptions{})
|
||||
assert.True(t, storj.ErrNoBucket.Has(err))
|
||||
|
||||
_, err = db.ListObjects(ctx, testBucketInfo, storj.ListOptions{})
|
||||
assert.EqualError(t, err, "kvmetainfo: invalid direction 0")
|
||||
|
||||
// TODO for now we are supporting only storj.After
|
||||
for _, direction := range []storj.ListDirection{
|
||||
// storj.Forward,
|
||||
storj.After,
|
||||
} {
|
||||
list, err := db.ListObjects(ctx, testBucketInfo, storj.ListOptions{Direction: direction})
|
||||
if assert.NoError(t, err) {
|
||||
assert.False(t, list.More)
|
||||
assert.Equal(t, 0, len(list.Items))
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestListObjects_EncryptionBypass(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
encStore := newTestEncStore(TestEncKey)
|
||||
encStore.SetDefaultPathCipher(storj.EncAESGCM)
|
||||
db, streams, err := newMetainfoParts(planet, encStore)
|
||||
require.NoError(t, err)
|
||||
|
||||
bucket, err := db.CreateBucket(ctx, TestBucket, &defaultBucket)
|
||||
require.NoError(t, err)
|
||||
|
||||
filePaths := []string{
|
||||
"a", "aa", "b", "bb", "c",
|
||||
"a/xa", "a/xaa", "a/xb", "a/xbb", "a/xc",
|
||||
"b/ya", "b/yaa", "b/yb", "b/ybb", "b/yc",
|
||||
}
|
||||
|
||||
for _, path := range filePaths {
|
||||
upload(ctx, t, db, streams, bucket, path, nil)
|
||||
}
|
||||
sort.Strings(filePaths)
|
||||
|
||||
// Enable encryption bypass
|
||||
encStore.EncryptionBypass = true
|
||||
|
||||
opts := options("", "", 0)
|
||||
opts.Recursive = true
|
||||
encodedList, err := db.ListObjects(ctx, bucket, opts)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(filePaths), len(encodedList.Items))
|
||||
|
||||
seenPaths := make(map[string]struct{})
|
||||
for _, item := range encodedList.Items {
|
||||
iter := paths.NewUnencrypted(item.Path).Iterator()
|
||||
var decoded, next string
|
||||
for !iter.Done() {
|
||||
next = iter.Next()
|
||||
|
||||
decodedNextBytes, err := base64.URLEncoding.DecodeString(next)
|
||||
require.NoError(t, err)
|
||||
|
||||
decoded += string(decodedNextBytes) + "/"
|
||||
}
|
||||
decoded = strings.TrimRight(decoded, "/")
|
||||
encryptedPath := paths.NewEncrypted(decoded)
|
||||
|
||||
decryptedPath, err := encryption.DecryptPathWithStoreCipher(bucket.Name, encryptedPath, encStore)
|
||||
require.NoError(t, err)
|
||||
|
||||
// NB: require decrypted path is a member of `filePaths`.
|
||||
result := sort.Search(len(filePaths), func(i int) bool {
|
||||
return !paths.NewUnencrypted(filePaths[i]).Less(decryptedPath)
|
||||
})
|
||||
require.NotEqual(t, len(filePaths), result)
|
||||
|
||||
// NB: ensure each path is only seen once.
|
||||
_, ok := seenPaths[decryptedPath.String()]
|
||||
require.False(t, ok)
|
||||
|
||||
seenPaths[decryptedPath.String()] = struct{}{}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestListObjects(t *testing.T) {
|
||||
runTestWithPathCipher(t, storj.EncNull, func(t *testing.T, ctx context.Context, planet *testplanet.Planet, db *kvmetainfo.DB, streams streams.Store) {
|
||||
bucket, err := db.CreateBucket(ctx, TestBucket, &storj.Bucket{
|
||||
PathCipher: storj.EncNull,
|
||||
DefaultRedundancyScheme: defaultRS,
|
||||
DefaultEncryptionParameters: defaultEP,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
filePaths := []string{
|
||||
"a", "aa", "b", "bb", "c",
|
||||
"a/xa", "a/xaa", "a/xb", "a/xbb", "a/xc",
|
||||
"b/ya", "b/yaa", "b/yb", "b/ybb", "b/yc",
|
||||
}
|
||||
|
||||
for _, path := range filePaths {
|
||||
upload(ctx, t, db, streams, bucket, path, nil)
|
||||
}
|
||||
|
||||
otherBucket, err := db.CreateBucket(ctx, "otherbucket", &defaultBucket)
|
||||
require.NoError(t, err)
|
||||
|
||||
upload(ctx, t, db, streams, otherBucket, "file-in-other-bucket", nil)
|
||||
|
||||
for i, tt := range []struct {
|
||||
options storj.ListOptions
|
||||
more bool
|
||||
result []string
|
||||
}{
|
||||
{
|
||||
options: options("", "", 0),
|
||||
result: []string{"a", "a/", "aa", "b", "b/", "bb", "c"},
|
||||
}, {
|
||||
options: options("", "`", 0),
|
||||
result: []string{"a", "a/", "aa", "b", "b/", "bb", "c"},
|
||||
}, {
|
||||
options: options("", "b", 0),
|
||||
result: []string{"b/", "bb", "c"},
|
||||
}, {
|
||||
options: options("", "c", 0),
|
||||
result: []string{},
|
||||
}, {
|
||||
options: options("", "ca", 0),
|
||||
result: []string{},
|
||||
}, {
|
||||
options: options("", "", 1),
|
||||
more: true,
|
||||
result: []string{"a"},
|
||||
}, {
|
||||
options: options("", "`", 1),
|
||||
more: true,
|
||||
result: []string{"a"},
|
||||
}, {
|
||||
options: options("", "aa", 1),
|
||||
more: true,
|
||||
result: []string{"b"},
|
||||
}, {
|
||||
options: options("", "c", 1),
|
||||
result: []string{},
|
||||
}, {
|
||||
options: options("", "ca", 1),
|
||||
result: []string{},
|
||||
}, {
|
||||
options: options("", "", 2),
|
||||
more: true,
|
||||
result: []string{"a", "a/"},
|
||||
}, {
|
||||
options: options("", "`", 2),
|
||||
more: true,
|
||||
result: []string{"a", "a/"},
|
||||
}, {
|
||||
options: options("", "aa", 2),
|
||||
more: true,
|
||||
result: []string{"b", "b/"},
|
||||
}, {
|
||||
options: options("", "bb", 2),
|
||||
result: []string{"c"},
|
||||
}, {
|
||||
options: options("", "c", 2),
|
||||
result: []string{},
|
||||
}, {
|
||||
options: options("", "ca", 2),
|
||||
result: []string{},
|
||||
}, {
|
||||
options: optionsRecursive("", "", 0),
|
||||
result: []string{"a", "a/xa", "a/xaa", "a/xb", "a/xbb", "a/xc", "aa", "b", "b/ya", "b/yaa", "b/yb", "b/ybb", "b/yc", "bb", "c"},
|
||||
}, {
|
||||
options: options("a", "", 0),
|
||||
result: []string{"xa", "xaa", "xb", "xbb", "xc"},
|
||||
}, {
|
||||
options: options("a/", "", 0),
|
||||
result: []string{"xa", "xaa", "xb", "xbb", "xc"},
|
||||
}, {
|
||||
options: options("a/", "xb", 0),
|
||||
result: []string{"xbb", "xc"},
|
||||
}, {
|
||||
options: optionsRecursive("", "a/xbb", 5),
|
||||
more: true,
|
||||
result: []string{"a/xc", "aa", "b", "b/ya", "b/yaa"},
|
||||
}, {
|
||||
options: options("a/", "xaa", 2),
|
||||
more: true,
|
||||
result: []string{"xb", "xbb"},
|
||||
},
|
||||
} {
|
||||
errTag := fmt.Sprintf("%d. %+v", i, tt)
|
||||
|
||||
list, err := db.ListObjects(ctx, bucket, tt.options)
|
||||
|
||||
if assert.NoError(t, err, errTag) {
|
||||
assert.Equal(t, tt.more, list.More, errTag)
|
||||
for i, item := range list.Items {
|
||||
assert.Equal(t, tt.result[i], item.Path, errTag)
|
||||
assert.Equal(t, TestBucket, item.Bucket.Name, errTag)
|
||||
assert.Equal(t, storj.EncNull, item.Bucket.PathCipher, errTag)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func options(prefix, cursor string, limit int) storj.ListOptions {
|
||||
return storj.ListOptions{
|
||||
Prefix: prefix,
|
||||
Cursor: cursor,
|
||||
Direction: storj.After,
|
||||
Limit: limit,
|
||||
}
|
||||
}
|
||||
|
||||
func optionsRecursive(prefix, cursor string, limit int) storj.ListOptions {
|
||||
return storj.ListOptions{
|
||||
Prefix: prefix,
|
||||
Cursor: cursor,
|
||||
Direction: storj.After,
|
||||
Limit: limit,
|
||||
Recursive: true,
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user