Upload/download for testplanet Uplink (#1183)

* Upload/download for testplanet Uplink

* check error in tests

* cleanup

* refactor node -> uplink

* add missing test file

* rest of refactoring

* workaround to resolve cycles in tests

* rename method

* add missing comments

* review comments

* use KiB
This commit is contained in:
Michal Niewrzal 2019-02-04 17:56:10 +01:00 committed by GitHub
parent 383cb05e8e
commit 2f8b3da1af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 372 additions and 158 deletions

View File

@ -1,94 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information
package testplanet
import (
"context"
"go.uber.org/zap"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/pointerdb/pdbclient"
"storj.io/storj/pkg/storj"
"storj.io/storj/pkg/transport"
)
// Node is a general purpose
type Node struct {
Log *zap.Logger
Info pb.Node
Identity *identity.FullIdentity
Transport transport.Client
}
// newUplink creates a new uplink
func (planet *Planet) newUplink(name string) (*Node, error) {
identity, err := planet.NewIdentity()
if err != nil {
return nil, err
}
node := &Node{
Log: planet.log.Named(name),
Identity: identity,
}
node.Log.Debug("id=" + identity.ID.String())
node.Transport = transport.NewClient(identity)
node.Info = pb.Node{
Id: node.Identity.ID,
Type: pb.NodeType_UPLINK,
Address: &pb.NodeAddress{
Transport: pb.NodeTransport_TCP_TLS_GRPC,
Address: "",
},
}
planet.nodes = append(planet.nodes, node)
return node, nil
}
// ID returns node id
func (node *Node) ID() storj.NodeID { return node.Info.Id }
// Addr returns node address
func (node *Node) Addr() string { return node.Info.Address.Address }
// Local returns node info
func (node *Node) Local() pb.Node { return node.Info }
// Shutdown shuts down all node dependencies
func (node *Node) Shutdown() error { return nil }
// DialPointerDB dials destination with apikey and returns pointerdb Client
func (node *Node) DialPointerDB(destination Peer, apikey string) (pdbclient.Client, error) {
// TODO: use node.Transport instead of pdbclient.NewClient
/*
conn, err := node.Transport.DialNode(context.Background(), &destination.Info)
if err != nil {
return nil, err
}
return piececlient.NewPSClient
*/
// TODO: handle disconnect
return pdbclient.NewClient(node.Identity, destination.Addr(), apikey)
}
// DialOverlay dials destination and returns an overlay.Client
func (node *Node) DialOverlay(destination Peer) (overlay.Client, error) {
info := destination.Local()
conn, err := node.Transport.DialNode(context.Background(), &info)
if err != nil {
return nil, err
}
// TODO: handle disconnect
return overlay.NewClientFrom(pb.NewOverlayClient(conn)), nil
}

View File

@ -87,12 +87,12 @@ type Planet struct {
peers []closablePeer
databases []io.Closer
nodes []*Node
uplinks []*Uplink
Bootstrap *bootstrap.Peer
Satellites []*satellite.Peer
StorageNodes []*storagenode.Peer
Uplinks []*Node
Uplinks []*Uplink
identities *Identities
@ -172,7 +172,7 @@ func NewCustom(log *zap.Logger, config Config) (*Planet, error) {
return nil, errs.Combine(err, planet.Shutdown())
}
planet.Uplinks, err = planet.newUplinks("uplink", config.UplinkCount)
planet.Uplinks, err = planet.newUplinks("uplink", config.UplinkCount, config.StorageNodeCount)
if err != nil {
return nil, errs.Combine(err, planet.Shutdown())
}
@ -235,7 +235,7 @@ func (planet *Planet) StopPeer(peer Peer) error {
}
// Size returns number of nodes in the network
func (planet *Planet) Size() int { return len(planet.nodes) + len(planet.peers) }
func (planet *Planet) Size() int { return len(planet.uplinks) + len(planet.peers) }
// Shutdown shuts down all the nodes and deletes temporary directories.
func (planet *Planet) Shutdown() error {
@ -246,8 +246,8 @@ func (planet *Planet) Shutdown() error {
var errlist errs.Group
// shutdown in reverse order
for i := len(planet.nodes) - 1; i >= 0; i-- {
node := planet.nodes[i]
for i := len(planet.uplinks) - 1; i >= 0; i-- {
node := planet.uplinks[i]
errlist.Add(node.Shutdown())
}
for i := len(planet.peers) - 1; i >= 0; i-- {
@ -263,14 +263,14 @@ func (planet *Planet) Shutdown() error {
}
// newUplinks creates initializes uplinks
func (planet *Planet) newUplinks(prefix string, count int) ([]*Node, error) {
var xs []*Node
func (planet *Planet) newUplinks(prefix string, count, storageNodeCount int) ([]*Uplink, error) {
var xs []*Uplink
for i := 0; i < count; i++ {
node, err := planet.newUplink(prefix + strconv.Itoa(i))
uplink, err := planet.newUplink(prefix+strconv.Itoa(i), storageNodeCount)
if err != nil {
return nil, err
}
xs = append(xs, node)
xs = append(xs, uplink)
}
return xs, nil

View File

@ -0,0 +1,261 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information
package testplanet
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"github.com/vivint/infectious"
"github.com/zeebo/errs"
"go.uber.org/zap"
"google.golang.org/grpc"
"storj.io/storj/internal/memory"
"storj.io/storj/pkg/eestream"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/metainfo/kvmetainfo"
"storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/pointerdb/pdbclient"
"storj.io/storj/pkg/storage/buckets"
ecclient "storj.io/storj/pkg/storage/ec"
"storj.io/storj/pkg/storage/segments"
"storj.io/storj/pkg/storage/streams"
"storj.io/storj/pkg/storj"
"storj.io/storj/pkg/stream"
"storj.io/storj/pkg/transport"
"storj.io/storj/satellite"
)
// Uplink is a general purpose
type Uplink struct {
Log *zap.Logger
Info pb.Node
Identity *identity.FullIdentity
Transport transport.Client
StorageNodeCount int
}
// newUplink creates a new uplink
func (planet *Planet) newUplink(name string, storageNodeCount int) (*Uplink, error) {
identity, err := planet.NewIdentity()
if err != nil {
return nil, err
}
uplink := &Uplink{
Log: planet.log.Named(name),
Identity: identity,
StorageNodeCount: storageNodeCount,
}
uplink.Log.Debug("id=" + identity.ID.String())
uplink.Transport = transport.NewClient(identity)
uplink.Info = pb.Node{
Id: uplink.Identity.ID,
Type: pb.NodeType_UPLINK,
Address: &pb.NodeAddress{
Transport: pb.NodeTransport_TCP_TLS_GRPC,
Address: "",
},
}
planet.uplinks = append(planet.uplinks, uplink)
return uplink, nil
}
// ID returns uplink id
func (uplink *Uplink) ID() storj.NodeID { return uplink.Info.Id }
// Addr returns uplink address
func (uplink *Uplink) Addr() string { return uplink.Info.Address.Address }
// Local returns uplink info
func (uplink *Uplink) Local() pb.Node { return uplink.Info }
// Shutdown shuts down all uplink dependencies
func (uplink *Uplink) Shutdown() error { return nil }
// DialPointerDB dials destination with apikey and returns pointerdb Client
func (uplink *Uplink) DialPointerDB(destination Peer, apikey string) (pdbclient.Client, error) {
// TODO: use node.Transport instead of pdbclient.NewClient
/*
conn, err := node.Transport.DialNode(context.Background(), &destination.Info)
if err != nil {
return nil, err
}
return piececlient.NewPSClient
*/
// TODO: handle disconnect
return pdbclient.NewClient(uplink.Identity, destination.Addr(), apikey)
}
// DialOverlay dials destination and returns an overlay.Client
func (uplink *Uplink) DialOverlay(destination Peer) (overlay.Client, error) {
info := destination.Local()
conn, err := uplink.Transport.DialNode(context.Background(), &info, grpc.WithBlock())
if err != nil {
return nil, err
}
// TODO: handle disconnect
return overlay.NewClientFrom(pb.NewOverlayClient(conn)), nil
}
// Upload data to specific satellite
func (uplink *Uplink) Upload(ctx context.Context, satellite *satellite.Peer, bucket string, path storj.Path, data []byte) error {
metainfo, streams, err := uplink.getMetainfo(satellite)
if err != nil {
return err
}
encScheme := uplink.getEncryptionScheme()
redScheme := uplink.getRedundancyScheme()
// create bucket if not exists
_, err = metainfo.GetBucket(ctx, bucket)
if err != nil {
if storj.ErrBucketNotFound.Has(err) {
_, err := metainfo.CreateBucket(ctx, bucket, &storj.Bucket{PathCipher: encScheme.Cipher})
if err != nil {
return err
}
} else {
return err
}
}
createInfo := storj.CreateObject{
RedundancyScheme: redScheme,
EncryptionScheme: encScheme,
}
obj, err := metainfo.CreateObject(ctx, bucket, path, &createInfo)
if err != nil {
return err
}
reader := bytes.NewReader(data)
err = uploadStream(ctx, streams, obj, reader)
if err != nil {
return err
}
return nil
}
func uploadStream(ctx context.Context, streams streams.Store, mutableObject storj.MutableObject, reader io.Reader) error {
mutableStream, err := mutableObject.CreateStream(ctx)
if err != nil {
return err
}
upload := stream.NewUpload(ctx, mutableStream, streams)
_, err = io.Copy(upload, reader)
return errs.Combine(err, upload.Close())
}
// Download data from specific satellite
func (uplink *Uplink) Download(ctx context.Context, satellite *satellite.Peer, bucket string, path storj.Path) ([]byte, error) {
metainfo, streams, err := uplink.getMetainfo(satellite)
if err != nil {
return []byte{}, err
}
readOnlyStream, err := metainfo.GetObjectStream(ctx, bucket, path)
if err != nil {
return []byte{}, err
}
download := stream.NewDownload(ctx, readOnlyStream, streams)
defer func() { err = errs.Combine(err, download.Close()) }()
data, err := ioutil.ReadAll(download)
if err != nil {
return []byte{}, err
}
return data, nil
}
func (uplink *Uplink) getMetainfo(satellite *satellite.Peer) (db storj.Metainfo, ss streams.Store, err error) {
encScheme := uplink.getEncryptionScheme()
redScheme := uplink.getRedundancyScheme()
// redundancy settings
minThreshold := int(redScheme.RequiredShares)
repairThreshold := int(redScheme.RepairShares)
successThreshold := int(redScheme.OptimalShares)
maxThreshold := int(redScheme.TotalShares)
erasureShareSize := 1 * memory.KiB
maxBufferMem := 4 * memory.MiB
// client settings
maxInlineSize := 4 * memory.KiB
segmentSize := 64 * memory.MiB
oc, err := uplink.DialOverlay(satellite)
if err != nil {
return nil, nil, err
}
pdb, err := uplink.DialPointerDB(satellite, "") // TODO pass api key?
if err != nil {
return nil, nil, err
}
ec := ecclient.NewClient(uplink.Identity, maxBufferMem.Int())
fc, err := infectious.NewFEC(minThreshold, maxThreshold)
if err != nil {
return nil, nil, err
}
rs, err := eestream.NewRedundancyStrategy(eestream.NewRSScheme(fc, erasureShareSize.Int()), repairThreshold, successThreshold)
if err != nil {
return nil, nil, err
}
segments := segments.NewSegmentStore(oc, ec, pdb, rs, maxInlineSize.Int())
if erasureShareSize.Int()*minThreshold%int(encScheme.BlockSize) != 0 {
return nil, nil, fmt.Errorf("EncryptionBlockSize must be a multiple of ErasureShareSize * RS MinThreshold")
}
key := new(storj.Key)
copy(key[:], "enc.key")
streams, err := streams.NewStreamStore(segments, segmentSize.Int64(), key, int(encScheme.BlockSize), encScheme.Cipher)
if err != nil {
return nil, nil, err
}
buckets := buckets.NewStore(streams)
return kvmetainfo.New(buckets, streams, segments, pdb, key), streams, nil
}
func (uplink *Uplink) getRedundancyScheme() storj.RedundancyScheme {
return storj.RedundancyScheme{
Algorithm: storj.ReedSolomon,
RequiredShares: int16(1 * uplink.StorageNodeCount / 5),
RepairShares: int16(2 * uplink.StorageNodeCount / 5),
OptimalShares: int16(3 * uplink.StorageNodeCount / 5),
TotalShares: int16(4 * uplink.StorageNodeCount / 5),
}
}
func (uplink *Uplink) getEncryptionScheme() storj.EncryptionScheme {
return storj.EncryptionScheme{
Cipher: storj.AESGCM,
BlockSize: 1 * memory.KiB.Int32(),
}
}

View File

@ -0,0 +1,41 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information
package testplanet_test
import (
"crypto/rand"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"storj.io/storj/internal/memory"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testplanet"
)
func TestUploadDownload(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
planet, err := testplanet.New(t, 1, 10, 1)
require.NoError(t, err)
defer ctx.Check(planet.Shutdown)
planet.Start(ctx)
time.Sleep(2 * time.Second)
expectedData := make([]byte, 5*memory.MiB)
_, err = rand.Read(expectedData)
assert.NoError(t, err)
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "test/bucket", "test/path", expectedData)
assert.NoError(t, err)
data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], "test/bucket", "test/path")
assert.NoError(t, err)
assert.Equal(t, expectedData, data)
}

View File

@ -1,7 +1,7 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package kvmetainfo
package kvmetainfo_test
import (
"context"
@ -16,6 +16,7 @@ import (
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testplanet"
"storj.io/storj/pkg/eestream"
"storj.io/storj/pkg/metainfo/kvmetainfo"
"storj.io/storj/pkg/storage/buckets"
ecclient "storj.io/storj/pkg/storage/ec"
"storj.io/storj/pkg/storage/segments"
@ -30,7 +31,7 @@ const (
)
func TestBucketsBasic(t *testing.T) {
runTest(t, func(ctx context.Context, db *DB) {
runTest(t, func(ctx context.Context, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) {
// Create new bucket
bucket, err := db.CreateBucket(ctx, TestBucket, nil)
if assert.NoError(t, err) {
@ -70,9 +71,9 @@ func TestBucketsBasic(t *testing.T) {
}
func TestBucketsReadNewWayWriteOldWay(t *testing.T) {
runTest(t, func(ctx context.Context, db *DB) {
runTest(t, func(ctx context.Context, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) {
// (Old API) Create new bucket
_, err := db.buckets.Put(ctx, TestBucket, storj.AESGCM)
_, err := buckets.Put(ctx, TestBucket, storj.AESGCM)
assert.NoError(t, err)
// (New API) Check that bucket list include the new bucket
@ -91,7 +92,7 @@ func TestBucketsReadNewWayWriteOldWay(t *testing.T) {
}
// (Old API) Delete the bucket
err = db.buckets.Delete(ctx, TestBucket)
err = buckets.Delete(ctx, TestBucket)
assert.NoError(t, err)
// (New API) Check that the bucket list is empty
@ -108,7 +109,7 @@ func TestBucketsReadNewWayWriteOldWay(t *testing.T) {
}
func TestBucketsReadOldWayWriteNewWay(t *testing.T) {
runTest(t, func(ctx context.Context, db *DB) {
runTest(t, func(ctx context.Context, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) {
// (New API) Create new bucket
bucket, err := db.CreateBucket(ctx, TestBucket, nil)
if assert.NoError(t, err) {
@ -116,7 +117,7 @@ func TestBucketsReadOldWayWriteNewWay(t *testing.T) {
}
// (Old API) Check that bucket list include the new bucket
items, more, err := db.buckets.List(ctx, "", "", 0)
items, more, err := buckets.List(ctx, "", "", 0)
if assert.NoError(t, err) {
assert.False(t, more)
assert.Equal(t, 1, len(items))
@ -124,7 +125,7 @@ func TestBucketsReadOldWayWriteNewWay(t *testing.T) {
}
// (Old API) Check that we can get the new bucket explicitly
meta, err := db.buckets.Get(ctx, TestBucket)
meta, err := buckets.Get(ctx, TestBucket)
if assert.NoError(t, err) {
assert.Equal(t, storj.AESGCM, meta.PathEncryptionType)
}
@ -134,20 +135,20 @@ func TestBucketsReadOldWayWriteNewWay(t *testing.T) {
assert.NoError(t, err)
// (Old API) Check that the bucket list is empty
items, more, err = db.buckets.List(ctx, "", "", 0)
items, more, err = buckets.List(ctx, "", "", 0)
if assert.NoError(t, err) {
assert.False(t, more)
assert.Equal(t, 0, len(items))
}
// (Old API) Check that the bucket cannot be get explicitly
_, err = db.buckets.Get(ctx, TestBucket)
_, err = buckets.Get(ctx, TestBucket)
assert.True(t, storj.ErrBucketNotFound.Has(err))
})
}
func TestErrNoBucket(t *testing.T) {
runTest(t, func(ctx context.Context, db *DB) {
runTest(t, func(ctx context.Context, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) {
_, err := db.CreateBucket(ctx, "", nil)
assert.True(t, storj.ErrNoBucket.Has(err))
@ -160,7 +161,7 @@ func TestErrNoBucket(t *testing.T) {
}
func TestBucketCreateCipher(t *testing.T) {
runTest(t, func(ctx context.Context, db *DB) {
runTest(t, func(ctx context.Context, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) {
forAllCiphers(func(cipher storj.Cipher) {
bucket, err := db.CreateBucket(ctx, "test", &storj.Bucket{PathCipher: cipher})
if assert.NoError(t, err) {
@ -179,7 +180,7 @@ func TestBucketCreateCipher(t *testing.T) {
}
func TestListBucketsEmpty(t *testing.T) {
runTest(t, func(ctx context.Context, db *DB) {
runTest(t, func(ctx context.Context, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) {
_, err := db.ListBuckets(ctx, storj.BucketListOptions{})
assert.EqualError(t, err, "kvmetainfo: invalid direction 0")
@ -199,7 +200,7 @@ func TestListBucketsEmpty(t *testing.T) {
}
func TestListBuckets(t *testing.T) {
runTest(t, func(ctx context.Context, db *DB) {
runTest(t, func(ctx context.Context, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) {
bucketNames := []string{"a", "aa", "b", "bb", "c"}
for _, name := range bucketNames {
@ -307,7 +308,7 @@ func getBucketNames(bucketList storj.BucketList) []string {
return names
}
func runTest(t *testing.T, test func(context.Context, *DB)) {
func runTest(t *testing.T, test func(context.Context, *kvmetainfo.DB, buckets.Store, streams.Store)) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
@ -320,40 +321,40 @@ func runTest(t *testing.T, test func(context.Context, *DB)) {
planet.Start(ctx)
db, err := newDB(planet)
db, buckets, streams, err := newMetainfoParts(planet)
if !assert.NoError(t, err) {
return
}
test(ctx, db)
test(ctx, db, buckets, streams)
}
func newDB(planet *testplanet.Planet) (*DB, error) {
func newMetainfoParts(planet *testplanet.Planet) (*kvmetainfo.DB, buckets.Store, streams.Store, error) {
// TODO(kaloyan): We should have a better way for configuring the Satellite's API Key
err := flag.Set("pointer-db.auth.api-key", TestAPIKey)
if err != nil {
return nil, err
return nil, nil, nil, err
}
oc, err := planet.Uplinks[0].DialOverlay(planet.Satellites[0])
if err != nil {
return nil, err
return nil, nil, nil, err
}
pdb, err := planet.Uplinks[0].DialPointerDB(planet.Satellites[0], TestAPIKey)
if err != nil {
return nil, err
return nil, nil, nil, err
}
ec := ecclient.NewClient(planet.Uplinks[0].Identity, 0)
fc, err := infectious.NewFEC(2, 4)
if err != nil {
return nil, err
return nil, nil, nil, err
}
rs, err := eestream.NewRedundancyStrategy(eestream.NewRSScheme(fc, int(1*memory.KB)), 3, 4)
if err != nil {
return nil, err
return nil, nil, nil, err
}
segments := segments.NewSegmentStore(oc, ec, pdb, rs, int(8*memory.KB))
@ -363,12 +364,12 @@ func newDB(planet *testplanet.Planet) (*DB, error) {
streams, err := streams.NewStreamStore(segments, int64(64*memory.MB), key, int(1*memory.KB), storj.AESGCM)
if err != nil {
return nil, err
return nil, nil, nil, err
}
buckets := buckets.NewStore(streams)
return New(buckets, streams, segments, pdb, key), nil
return kvmetainfo.New(buckets, streams, segments, pdb, key), buckets, streams, nil
}
func forAllCiphers(test func(cipher storj.Cipher)) {

View File

@ -29,7 +29,8 @@ const (
committedPrefix = "l/"
)
var defaultRS = storj.RedundancyScheme{
// DefaultRS default values for RedundancyScheme
var DefaultRS = storj.RedundancyScheme{
Algorithm: storj.ReedSolomon,
RequiredShares: 20,
RepairShares: 30,
@ -38,7 +39,8 @@ var defaultRS = storj.RedundancyScheme{
ShareSize: 1 * memory.KB.Int32(),
}
var defaultES = storj.EncryptionScheme{
// DefaultES default values for EncryptionScheme
var DefaultES = storj.EncryptionScheme{
Cipher: storj.AESGCM,
BlockSize: 1 * memory.KB.Int32(),
}
@ -104,12 +106,12 @@ func (db *DB) CreateObject(ctx context.Context, bucket string, path storj.Path,
// if info.ContentType == "" {}
if info.RedundancyScheme.IsZero() {
info.RedundancyScheme = defaultRS
info.RedundancyScheme = DefaultRS
}
if info.EncryptionScheme.IsZero() {
info.EncryptionScheme = storj.EncryptionScheme{
Cipher: defaultES.Cipher,
Cipher: DefaultES.Cipher,
BlockSize: info.RedundancyScheme.ShareSize,
}
}

View File

@ -1,7 +1,7 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package kvmetainfo
package kvmetainfo_test
import (
"context"
@ -14,6 +14,9 @@ import (
"github.com/stretchr/testify/assert"
"storj.io/storj/internal/memory"
"storj.io/storj/pkg/metainfo/kvmetainfo"
"storj.io/storj/pkg/storage/buckets"
"storj.io/storj/pkg/storage/streams"
"storj.io/storj/pkg/storj"
"storj.io/storj/pkg/stream"
)
@ -35,7 +38,7 @@ func TestCreateObject(t *testing.T) {
BlockSize: 1 * memory.KB.Int32(),
}
runTest(t, func(ctx context.Context, db *DB) {
runTest(t, func(ctx context.Context, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) {
bucket, err := db.CreateBucket(ctx, TestBucket, nil)
if !assert.NoError(t, err) {
return
@ -48,8 +51,8 @@ func TestCreateObject(t *testing.T) {
}{
{
create: nil,
expectedRS: defaultRS,
expectedES: defaultES,
expectedRS: kvmetainfo.DefaultRS,
expectedES: kvmetainfo.DefaultES,
}, {
create: &storj.CreateObject{RedundancyScheme: customRS, EncryptionScheme: customES},
expectedRS: customRS,
@ -57,10 +60,10 @@ func TestCreateObject(t *testing.T) {
}, {
create: &storj.CreateObject{RedundancyScheme: customRS},
expectedRS: customRS,
expectedES: storj.EncryptionScheme{Cipher: defaultES.Cipher, BlockSize: customRS.ShareSize},
expectedES: storj.EncryptionScheme{Cipher: kvmetainfo.DefaultES.Cipher, BlockSize: customRS.ShareSize},
}, {
create: &storj.CreateObject{EncryptionScheme: customES},
expectedRS: defaultRS,
expectedRS: kvmetainfo.DefaultRS,
expectedES: customES,
},
} {
@ -84,13 +87,13 @@ func TestCreateObject(t *testing.T) {
}
func TestGetObject(t *testing.T) {
runTest(t, func(ctx context.Context, db *DB) {
runTest(t, func(ctx context.Context, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) {
bucket, err := db.CreateBucket(ctx, TestBucket, nil)
if !assert.NoError(t, err) {
return
}
upload(ctx, t, db, bucket, TestFile, nil)
upload(ctx, t, db, streams, bucket, TestFile, nil)
_, err = db.GetObject(ctx, "", "")
assert.True(t, storj.ErrNoBucket.Has(err))
@ -114,7 +117,7 @@ func TestGetObject(t *testing.T) {
}
func TestGetObjectStream(t *testing.T) {
runTest(t, func(ctx context.Context, db *DB) {
runTest(t, func(ctx context.Context, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) {
// we wait a second for all the nodes to complete bootstrapping off the satellite
time.Sleep(2 * time.Second)
@ -129,9 +132,9 @@ func TestGetObjectStream(t *testing.T) {
return
}
upload(ctx, t, db, bucket, "empty-file", nil)
upload(ctx, t, db, bucket, "small-file", []byte("test"))
upload(ctx, t, db, bucket, "large-file", data)
upload(ctx, t, db, streams, bucket, "empty-file", nil)
upload(ctx, t, db, streams, bucket, "small-file", []byte("test"))
upload(ctx, t, db, streams, bucket, "large-file", data)
_, err = db.GetObjectStream(ctx, "", "")
assert.True(t, storj.ErrNoBucket.Has(err))
@ -145,13 +148,13 @@ func TestGetObjectStream(t *testing.T) {
_, err = db.GetObjectStream(ctx, bucket.Name, "non-existing-file")
assert.True(t, storj.ErrObjectNotFound.Has(err))
assertStream(ctx, t, db, bucket, "empty-file", 0, []byte{})
assertStream(ctx, t, db, bucket, "small-file", 4, []byte("test"))
assertStream(ctx, t, db, bucket, "large-file", int64(32*memory.KB), data)
assertStream(ctx, t, db, streams, bucket, "empty-file", 0, []byte{})
assertStream(ctx, t, db, streams, bucket, "small-file", 4, []byte("test"))
assertStream(ctx, t, db, streams, bucket, "large-file", int64(32*memory.KB), data)
})
}
func upload(ctx context.Context, t *testing.T, db *DB, bucket storj.Bucket, path storj.Path, data []byte) {
func upload(ctx context.Context, t *testing.T, db *kvmetainfo.DB, streams streams.Store, bucket storj.Bucket, path storj.Path, data []byte) {
obj, err := db.CreateObject(ctx, bucket.Name, path, nil)
if !assert.NoError(t, err) {
return
@ -162,7 +165,7 @@ func upload(ctx context.Context, t *testing.T, db *DB, bucket storj.Bucket, path
return
}
upload := stream.NewUpload(ctx, str, db.streams)
upload := stream.NewUpload(ctx, str, streams)
_, err = upload.Write(data)
if !assert.NoError(t, err) {
@ -180,7 +183,7 @@ func upload(ctx context.Context, t *testing.T, db *DB, bucket storj.Bucket, path
}
}
func assertStream(ctx context.Context, t *testing.T, db *DB, bucket storj.Bucket, path storj.Path, size int64, content []byte) {
func assertStream(ctx context.Context, t *testing.T, db *kvmetainfo.DB, streams streams.Store, bucket storj.Bucket, path storj.Path, size int64, content []byte) {
readOnly, err := db.GetObjectStream(ctx, bucket.Name, path)
if !assert.NoError(t, err) {
return
@ -208,7 +211,7 @@ func assertStream(ctx context.Context, t *testing.T, db *DB, bucket storj.Bucket
assertInlineSegment(t, segments[0], content)
}
download := stream.NewDownload(ctx, readOnly, db.streams)
download := stream.NewDownload(ctx, readOnly, streams)
defer func() {
err = download.Close()
assert.NoError(t, err)
@ -253,13 +256,13 @@ func assertRemoteSegment(t *testing.T, segment storj.Segment) {
}
func TestDeleteObject(t *testing.T) {
runTest(t, func(ctx context.Context, db *DB) {
runTest(t, func(ctx context.Context, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) {
bucket, err := db.CreateBucket(ctx, TestBucket, nil)
if !assert.NoError(t, err) {
return
}
upload(ctx, t, db, bucket, TestFile, nil)
upload(ctx, t, db, streams, bucket, TestFile, nil)
err = db.DeleteObject(ctx, "", "")
assert.True(t, storj.ErrNoBucket.Has(err))
@ -279,7 +282,7 @@ func TestDeleteObject(t *testing.T) {
}
func TestListObjectsEmpty(t *testing.T) {
runTest(t, func(ctx context.Context, db *DB) {
runTest(t, func(ctx context.Context, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) {
bucket, err := db.CreateBucket(ctx, TestBucket, nil)
if !assert.NoError(t, err) {
return
@ -307,7 +310,7 @@ func TestListObjectsEmpty(t *testing.T) {
}
func TestListObjects(t *testing.T) {
runTest(t, func(ctx context.Context, db *DB) {
runTest(t, func(ctx context.Context, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) {
bucket, err := db.CreateBucket(ctx, TestBucket, &storj.Bucket{PathCipher: storj.Unencrypted})
if !assert.NoError(t, err) {
return
@ -320,7 +323,7 @@ func TestListObjects(t *testing.T) {
}
for _, path := range filePaths {
upload(ctx, t, db, bucket, path, nil)
upload(ctx, t, db, streams, bucket, path, nil)
}
otherBucket, err := db.CreateBucket(ctx, "otherbucket", nil)
@ -328,7 +331,7 @@ func TestListObjects(t *testing.T) {
return
}
upload(ctx, t, db, otherBucket, "file-in-other-bucket", nil)
upload(ctx, t, db, streams, otherBucket, "file-in-other-bucket", nil)
for i, tt := range []struct {
options storj.ListOptions