From 2f8b3da1af7eea8b802894e2120439a7c47538fa Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Mon, 4 Feb 2019 17:56:10 +0100 Subject: [PATCH] Upload/download for testplanet Uplink (#1183) * Upload/download for testplanet Uplink * check error in tests * cleanup * refactor node -> uplink * add missing test file * rest of refactoring * workaround to resolve cycles in tests * rename method * add missing comments * review comments * use KiB --- internal/testplanet/node.go | 94 --------- internal/testplanet/planet.go | 20 +- internal/testplanet/uplink.go | 261 ++++++++++++++++++++++++ internal/testplanet/uplink_test.go | 41 ++++ pkg/metainfo/kvmetainfo/buckets_test.go | 51 ++--- pkg/metainfo/kvmetainfo/objects.go | 10 +- pkg/metainfo/kvmetainfo/objects_test.go | 53 ++--- 7 files changed, 372 insertions(+), 158 deletions(-) delete mode 100644 internal/testplanet/node.go create mode 100644 internal/testplanet/uplink.go create mode 100644 internal/testplanet/uplink_test.go diff --git a/internal/testplanet/node.go b/internal/testplanet/node.go deleted file mode 100644 index 8a919d82b..000000000 --- a/internal/testplanet/node.go +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information - -package testplanet - -import ( - "context" - - "go.uber.org/zap" - - "storj.io/storj/pkg/identity" - "storj.io/storj/pkg/overlay" - "storj.io/storj/pkg/pb" - "storj.io/storj/pkg/pointerdb/pdbclient" - "storj.io/storj/pkg/storj" - "storj.io/storj/pkg/transport" -) - -// Node is a general purpose -type Node struct { - Log *zap.Logger - Info pb.Node - Identity *identity.FullIdentity - Transport transport.Client -} - -// newUplink creates a new uplink -func (planet *Planet) newUplink(name string) (*Node, error) { - identity, err := planet.NewIdentity() - if err != nil { - return nil, err - } - - node := &Node{ - Log: planet.log.Named(name), - Identity: identity, - } - - node.Log.Debug("id=" + identity.ID.String()) - - node.Transport = transport.NewClient(identity) - - node.Info = pb.Node{ - Id: node.Identity.ID, - Type: pb.NodeType_UPLINK, - Address: &pb.NodeAddress{ - Transport: pb.NodeTransport_TCP_TLS_GRPC, - Address: "", - }, - } - - planet.nodes = append(planet.nodes, node) - - return node, nil -} - -// ID returns node id -func (node *Node) ID() storj.NodeID { return node.Info.Id } - -// Addr returns node address -func (node *Node) Addr() string { return node.Info.Address.Address } - -// Local returns node info -func (node *Node) Local() pb.Node { return node.Info } - -// Shutdown shuts down all node dependencies -func (node *Node) Shutdown() error { return nil } - -// DialPointerDB dials destination with apikey and returns pointerdb Client -func (node *Node) DialPointerDB(destination Peer, apikey string) (pdbclient.Client, error) { - // TODO: use node.Transport instead of pdbclient.NewClient - /* - conn, err := node.Transport.DialNode(context.Background(), &destination.Info) - if err != nil { - return nil, err - } - return piececlient.NewPSClient - */ - - // TODO: handle disconnect - return pdbclient.NewClient(node.Identity, destination.Addr(), apikey) -} - -// DialOverlay dials destination and returns an overlay.Client -func (node *Node) DialOverlay(destination Peer) (overlay.Client, error) { - info := destination.Local() - conn, err := node.Transport.DialNode(context.Background(), &info) - if err != nil { - return nil, err - } - - // TODO: handle disconnect - return overlay.NewClientFrom(pb.NewOverlayClient(conn)), nil -} diff --git a/internal/testplanet/planet.go b/internal/testplanet/planet.go index 0f02f225f..8d0a1e2ff 100644 --- a/internal/testplanet/planet.go +++ b/internal/testplanet/planet.go @@ -87,12 +87,12 @@ type Planet struct { peers []closablePeer databases []io.Closer - nodes []*Node + uplinks []*Uplink Bootstrap *bootstrap.Peer Satellites []*satellite.Peer StorageNodes []*storagenode.Peer - Uplinks []*Node + Uplinks []*Uplink identities *Identities @@ -172,7 +172,7 @@ func NewCustom(log *zap.Logger, config Config) (*Planet, error) { return nil, errs.Combine(err, planet.Shutdown()) } - planet.Uplinks, err = planet.newUplinks("uplink", config.UplinkCount) + planet.Uplinks, err = planet.newUplinks("uplink", config.UplinkCount, config.StorageNodeCount) if err != nil { return nil, errs.Combine(err, planet.Shutdown()) } @@ -235,7 +235,7 @@ func (planet *Planet) StopPeer(peer Peer) error { } // Size returns number of nodes in the network -func (planet *Planet) Size() int { return len(planet.nodes) + len(planet.peers) } +func (planet *Planet) Size() int { return len(planet.uplinks) + len(planet.peers) } // Shutdown shuts down all the nodes and deletes temporary directories. func (planet *Planet) Shutdown() error { @@ -246,8 +246,8 @@ func (planet *Planet) Shutdown() error { var errlist errs.Group // shutdown in reverse order - for i := len(planet.nodes) - 1; i >= 0; i-- { - node := planet.nodes[i] + for i := len(planet.uplinks) - 1; i >= 0; i-- { + node := planet.uplinks[i] errlist.Add(node.Shutdown()) } for i := len(planet.peers) - 1; i >= 0; i-- { @@ -263,14 +263,14 @@ func (planet *Planet) Shutdown() error { } // newUplinks creates initializes uplinks -func (planet *Planet) newUplinks(prefix string, count int) ([]*Node, error) { - var xs []*Node +func (planet *Planet) newUplinks(prefix string, count, storageNodeCount int) ([]*Uplink, error) { + var xs []*Uplink for i := 0; i < count; i++ { - node, err := planet.newUplink(prefix + strconv.Itoa(i)) + uplink, err := planet.newUplink(prefix+strconv.Itoa(i), storageNodeCount) if err != nil { return nil, err } - xs = append(xs, node) + xs = append(xs, uplink) } return xs, nil diff --git a/internal/testplanet/uplink.go b/internal/testplanet/uplink.go new file mode 100644 index 000000000..b87858703 --- /dev/null +++ b/internal/testplanet/uplink.go @@ -0,0 +1,261 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information + +package testplanet + +import ( + "bytes" + "context" + "fmt" + "io" + "io/ioutil" + + "github.com/vivint/infectious" + "github.com/zeebo/errs" + "go.uber.org/zap" + "google.golang.org/grpc" + + "storj.io/storj/internal/memory" + "storj.io/storj/pkg/eestream" + "storj.io/storj/pkg/identity" + "storj.io/storj/pkg/metainfo/kvmetainfo" + "storj.io/storj/pkg/overlay" + "storj.io/storj/pkg/pb" + "storj.io/storj/pkg/pointerdb/pdbclient" + "storj.io/storj/pkg/storage/buckets" + ecclient "storj.io/storj/pkg/storage/ec" + "storj.io/storj/pkg/storage/segments" + "storj.io/storj/pkg/storage/streams" + "storj.io/storj/pkg/storj" + "storj.io/storj/pkg/stream" + "storj.io/storj/pkg/transport" + "storj.io/storj/satellite" +) + +// Uplink is a general purpose +type Uplink struct { + Log *zap.Logger + Info pb.Node + Identity *identity.FullIdentity + Transport transport.Client + StorageNodeCount int +} + +// newUplink creates a new uplink +func (planet *Planet) newUplink(name string, storageNodeCount int) (*Uplink, error) { + identity, err := planet.NewIdentity() + if err != nil { + return nil, err + } + + uplink := &Uplink{ + Log: planet.log.Named(name), + Identity: identity, + StorageNodeCount: storageNodeCount, + } + + uplink.Log.Debug("id=" + identity.ID.String()) + + uplink.Transport = transport.NewClient(identity) + + uplink.Info = pb.Node{ + Id: uplink.Identity.ID, + Type: pb.NodeType_UPLINK, + Address: &pb.NodeAddress{ + Transport: pb.NodeTransport_TCP_TLS_GRPC, + Address: "", + }, + } + + planet.uplinks = append(planet.uplinks, uplink) + + return uplink, nil +} + +// ID returns uplink id +func (uplink *Uplink) ID() storj.NodeID { return uplink.Info.Id } + +// Addr returns uplink address +func (uplink *Uplink) Addr() string { return uplink.Info.Address.Address } + +// Local returns uplink info +func (uplink *Uplink) Local() pb.Node { return uplink.Info } + +// Shutdown shuts down all uplink dependencies +func (uplink *Uplink) Shutdown() error { return nil } + +// DialPointerDB dials destination with apikey and returns pointerdb Client +func (uplink *Uplink) DialPointerDB(destination Peer, apikey string) (pdbclient.Client, error) { + // TODO: use node.Transport instead of pdbclient.NewClient + /* + conn, err := node.Transport.DialNode(context.Background(), &destination.Info) + if err != nil { + return nil, err + } + return piececlient.NewPSClient + */ + + // TODO: handle disconnect + return pdbclient.NewClient(uplink.Identity, destination.Addr(), apikey) +} + +// DialOverlay dials destination and returns an overlay.Client +func (uplink *Uplink) DialOverlay(destination Peer) (overlay.Client, error) { + info := destination.Local() + conn, err := uplink.Transport.DialNode(context.Background(), &info, grpc.WithBlock()) + if err != nil { + return nil, err + } + + // TODO: handle disconnect + return overlay.NewClientFrom(pb.NewOverlayClient(conn)), nil +} + +// Upload data to specific satellite +func (uplink *Uplink) Upload(ctx context.Context, satellite *satellite.Peer, bucket string, path storj.Path, data []byte) error { + metainfo, streams, err := uplink.getMetainfo(satellite) + if err != nil { + return err + } + + encScheme := uplink.getEncryptionScheme() + redScheme := uplink.getRedundancyScheme() + + // create bucket if not exists + _, err = metainfo.GetBucket(ctx, bucket) + if err != nil { + if storj.ErrBucketNotFound.Has(err) { + _, err := metainfo.CreateBucket(ctx, bucket, &storj.Bucket{PathCipher: encScheme.Cipher}) + if err != nil { + return err + } + } else { + return err + } + } + + createInfo := storj.CreateObject{ + RedundancyScheme: redScheme, + EncryptionScheme: encScheme, + } + obj, err := metainfo.CreateObject(ctx, bucket, path, &createInfo) + if err != nil { + return err + } + + reader := bytes.NewReader(data) + err = uploadStream(ctx, streams, obj, reader) + if err != nil { + return err + } + + return nil +} + +func uploadStream(ctx context.Context, streams streams.Store, mutableObject storj.MutableObject, reader io.Reader) error { + mutableStream, err := mutableObject.CreateStream(ctx) + if err != nil { + return err + } + + upload := stream.NewUpload(ctx, mutableStream, streams) + + _, err = io.Copy(upload, reader) + + return errs.Combine(err, upload.Close()) +} + +// Download data from specific satellite +func (uplink *Uplink) Download(ctx context.Context, satellite *satellite.Peer, bucket string, path storj.Path) ([]byte, error) { + metainfo, streams, err := uplink.getMetainfo(satellite) + if err != nil { + return []byte{}, err + } + + readOnlyStream, err := metainfo.GetObjectStream(ctx, bucket, path) + if err != nil { + return []byte{}, err + } + + download := stream.NewDownload(ctx, readOnlyStream, streams) + defer func() { err = errs.Combine(err, download.Close()) }() + + data, err := ioutil.ReadAll(download) + if err != nil { + return []byte{}, err + } + return data, nil +} + +func (uplink *Uplink) getMetainfo(satellite *satellite.Peer) (db storj.Metainfo, ss streams.Store, err error) { + encScheme := uplink.getEncryptionScheme() + redScheme := uplink.getRedundancyScheme() + + // redundancy settings + minThreshold := int(redScheme.RequiredShares) + repairThreshold := int(redScheme.RepairShares) + successThreshold := int(redScheme.OptimalShares) + maxThreshold := int(redScheme.TotalShares) + erasureShareSize := 1 * memory.KiB + maxBufferMem := 4 * memory.MiB + + // client settings + maxInlineSize := 4 * memory.KiB + segmentSize := 64 * memory.MiB + + oc, err := uplink.DialOverlay(satellite) + if err != nil { + return nil, nil, err + } + + pdb, err := uplink.DialPointerDB(satellite, "") // TODO pass api key? + if err != nil { + return nil, nil, err + } + + ec := ecclient.NewClient(uplink.Identity, maxBufferMem.Int()) + fc, err := infectious.NewFEC(minThreshold, maxThreshold) + if err != nil { + return nil, nil, err + } + + rs, err := eestream.NewRedundancyStrategy(eestream.NewRSScheme(fc, erasureShareSize.Int()), repairThreshold, successThreshold) + if err != nil { + return nil, nil, err + } + + segments := segments.NewSegmentStore(oc, ec, pdb, rs, maxInlineSize.Int()) + + if erasureShareSize.Int()*minThreshold%int(encScheme.BlockSize) != 0 { + return nil, nil, fmt.Errorf("EncryptionBlockSize must be a multiple of ErasureShareSize * RS MinThreshold") + } + + key := new(storj.Key) + copy(key[:], "enc.key") + + streams, err := streams.NewStreamStore(segments, segmentSize.Int64(), key, int(encScheme.BlockSize), encScheme.Cipher) + if err != nil { + return nil, nil, err + } + + buckets := buckets.NewStore(streams) + + return kvmetainfo.New(buckets, streams, segments, pdb, key), streams, nil +} + +func (uplink *Uplink) getRedundancyScheme() storj.RedundancyScheme { + return storj.RedundancyScheme{ + Algorithm: storj.ReedSolomon, + RequiredShares: int16(1 * uplink.StorageNodeCount / 5), + RepairShares: int16(2 * uplink.StorageNodeCount / 5), + OptimalShares: int16(3 * uplink.StorageNodeCount / 5), + TotalShares: int16(4 * uplink.StorageNodeCount / 5), + } +} + +func (uplink *Uplink) getEncryptionScheme() storj.EncryptionScheme { + return storj.EncryptionScheme{ + Cipher: storj.AESGCM, + BlockSize: 1 * memory.KiB.Int32(), + } +} diff --git a/internal/testplanet/uplink_test.go b/internal/testplanet/uplink_test.go new file mode 100644 index 000000000..61c4ba5be --- /dev/null +++ b/internal/testplanet/uplink_test.go @@ -0,0 +1,41 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information + +package testplanet_test + +import ( + "crypto/rand" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "storj.io/storj/internal/memory" + "storj.io/storj/internal/testcontext" + "storj.io/storj/internal/testplanet" +) + +func TestUploadDownload(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + planet, err := testplanet.New(t, 1, 10, 1) + require.NoError(t, err) + defer ctx.Check(planet.Shutdown) + + planet.Start(ctx) + time.Sleep(2 * time.Second) + + expectedData := make([]byte, 5*memory.MiB) + _, err = rand.Read(expectedData) + assert.NoError(t, err) + + err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "test/bucket", "test/path", expectedData) + assert.NoError(t, err) + + data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], "test/bucket", "test/path") + assert.NoError(t, err) + + assert.Equal(t, expectedData, data) +} diff --git a/pkg/metainfo/kvmetainfo/buckets_test.go b/pkg/metainfo/kvmetainfo/buckets_test.go index f2ed715a5..05c9bf174 100644 --- a/pkg/metainfo/kvmetainfo/buckets_test.go +++ b/pkg/metainfo/kvmetainfo/buckets_test.go @@ -1,7 +1,7 @@ // Copyright (C) 2019 Storj Labs, Inc. // See LICENSE for copying information. -package kvmetainfo +package kvmetainfo_test import ( "context" @@ -16,6 +16,7 @@ import ( "storj.io/storj/internal/testcontext" "storj.io/storj/internal/testplanet" "storj.io/storj/pkg/eestream" + "storj.io/storj/pkg/metainfo/kvmetainfo" "storj.io/storj/pkg/storage/buckets" ecclient "storj.io/storj/pkg/storage/ec" "storj.io/storj/pkg/storage/segments" @@ -30,7 +31,7 @@ const ( ) func TestBucketsBasic(t *testing.T) { - runTest(t, func(ctx context.Context, db *DB) { + runTest(t, func(ctx context.Context, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) { // Create new bucket bucket, err := db.CreateBucket(ctx, TestBucket, nil) if assert.NoError(t, err) { @@ -70,9 +71,9 @@ func TestBucketsBasic(t *testing.T) { } func TestBucketsReadNewWayWriteOldWay(t *testing.T) { - runTest(t, func(ctx context.Context, db *DB) { + runTest(t, func(ctx context.Context, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) { // (Old API) Create new bucket - _, err := db.buckets.Put(ctx, TestBucket, storj.AESGCM) + _, err := buckets.Put(ctx, TestBucket, storj.AESGCM) assert.NoError(t, err) // (New API) Check that bucket list include the new bucket @@ -91,7 +92,7 @@ func TestBucketsReadNewWayWriteOldWay(t *testing.T) { } // (Old API) Delete the bucket - err = db.buckets.Delete(ctx, TestBucket) + err = buckets.Delete(ctx, TestBucket) assert.NoError(t, err) // (New API) Check that the bucket list is empty @@ -108,7 +109,7 @@ func TestBucketsReadNewWayWriteOldWay(t *testing.T) { } func TestBucketsReadOldWayWriteNewWay(t *testing.T) { - runTest(t, func(ctx context.Context, db *DB) { + runTest(t, func(ctx context.Context, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) { // (New API) Create new bucket bucket, err := db.CreateBucket(ctx, TestBucket, nil) if assert.NoError(t, err) { @@ -116,7 +117,7 @@ func TestBucketsReadOldWayWriteNewWay(t *testing.T) { } // (Old API) Check that bucket list include the new bucket - items, more, err := db.buckets.List(ctx, "", "", 0) + items, more, err := buckets.List(ctx, "", "", 0) if assert.NoError(t, err) { assert.False(t, more) assert.Equal(t, 1, len(items)) @@ -124,7 +125,7 @@ func TestBucketsReadOldWayWriteNewWay(t *testing.T) { } // (Old API) Check that we can get the new bucket explicitly - meta, err := db.buckets.Get(ctx, TestBucket) + meta, err := buckets.Get(ctx, TestBucket) if assert.NoError(t, err) { assert.Equal(t, storj.AESGCM, meta.PathEncryptionType) } @@ -134,20 +135,20 @@ func TestBucketsReadOldWayWriteNewWay(t *testing.T) { assert.NoError(t, err) // (Old API) Check that the bucket list is empty - items, more, err = db.buckets.List(ctx, "", "", 0) + items, more, err = buckets.List(ctx, "", "", 0) if assert.NoError(t, err) { assert.False(t, more) assert.Equal(t, 0, len(items)) } // (Old API) Check that the bucket cannot be get explicitly - _, err = db.buckets.Get(ctx, TestBucket) + _, err = buckets.Get(ctx, TestBucket) assert.True(t, storj.ErrBucketNotFound.Has(err)) }) } func TestErrNoBucket(t *testing.T) { - runTest(t, func(ctx context.Context, db *DB) { + runTest(t, func(ctx context.Context, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) { _, err := db.CreateBucket(ctx, "", nil) assert.True(t, storj.ErrNoBucket.Has(err)) @@ -160,7 +161,7 @@ func TestErrNoBucket(t *testing.T) { } func TestBucketCreateCipher(t *testing.T) { - runTest(t, func(ctx context.Context, db *DB) { + runTest(t, func(ctx context.Context, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) { forAllCiphers(func(cipher storj.Cipher) { bucket, err := db.CreateBucket(ctx, "test", &storj.Bucket{PathCipher: cipher}) if assert.NoError(t, err) { @@ -179,7 +180,7 @@ func TestBucketCreateCipher(t *testing.T) { } func TestListBucketsEmpty(t *testing.T) { - runTest(t, func(ctx context.Context, db *DB) { + runTest(t, func(ctx context.Context, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) { _, err := db.ListBuckets(ctx, storj.BucketListOptions{}) assert.EqualError(t, err, "kvmetainfo: invalid direction 0") @@ -199,7 +200,7 @@ func TestListBucketsEmpty(t *testing.T) { } func TestListBuckets(t *testing.T) { - runTest(t, func(ctx context.Context, db *DB) { + runTest(t, func(ctx context.Context, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) { bucketNames := []string{"a", "aa", "b", "bb", "c"} for _, name := range bucketNames { @@ -307,7 +308,7 @@ func getBucketNames(bucketList storj.BucketList) []string { return names } -func runTest(t *testing.T, test func(context.Context, *DB)) { +func runTest(t *testing.T, test func(context.Context, *kvmetainfo.DB, buckets.Store, streams.Store)) { ctx := testcontext.New(t) defer ctx.Cleanup() @@ -320,40 +321,40 @@ func runTest(t *testing.T, test func(context.Context, *DB)) { planet.Start(ctx) - db, err := newDB(planet) + db, buckets, streams, err := newMetainfoParts(planet) if !assert.NoError(t, err) { return } - test(ctx, db) + test(ctx, db, buckets, streams) } -func newDB(planet *testplanet.Planet) (*DB, error) { +func newMetainfoParts(planet *testplanet.Planet) (*kvmetainfo.DB, buckets.Store, streams.Store, error) { // TODO(kaloyan): We should have a better way for configuring the Satellite's API Key err := flag.Set("pointer-db.auth.api-key", TestAPIKey) if err != nil { - return nil, err + return nil, nil, nil, err } oc, err := planet.Uplinks[0].DialOverlay(planet.Satellites[0]) if err != nil { - return nil, err + return nil, nil, nil, err } pdb, err := planet.Uplinks[0].DialPointerDB(planet.Satellites[0], TestAPIKey) if err != nil { - return nil, err + return nil, nil, nil, err } ec := ecclient.NewClient(planet.Uplinks[0].Identity, 0) fc, err := infectious.NewFEC(2, 4) if err != nil { - return nil, err + return nil, nil, nil, err } rs, err := eestream.NewRedundancyStrategy(eestream.NewRSScheme(fc, int(1*memory.KB)), 3, 4) if err != nil { - return nil, err + return nil, nil, nil, err } segments := segments.NewSegmentStore(oc, ec, pdb, rs, int(8*memory.KB)) @@ -363,12 +364,12 @@ func newDB(planet *testplanet.Planet) (*DB, error) { streams, err := streams.NewStreamStore(segments, int64(64*memory.MB), key, int(1*memory.KB), storj.AESGCM) if err != nil { - return nil, err + return nil, nil, nil, err } buckets := buckets.NewStore(streams) - return New(buckets, streams, segments, pdb, key), nil + return kvmetainfo.New(buckets, streams, segments, pdb, key), buckets, streams, nil } func forAllCiphers(test func(cipher storj.Cipher)) { diff --git a/pkg/metainfo/kvmetainfo/objects.go b/pkg/metainfo/kvmetainfo/objects.go index 2fb6a82f1..556bd088b 100644 --- a/pkg/metainfo/kvmetainfo/objects.go +++ b/pkg/metainfo/kvmetainfo/objects.go @@ -29,7 +29,8 @@ const ( committedPrefix = "l/" ) -var defaultRS = storj.RedundancyScheme{ +// DefaultRS default values for RedundancyScheme +var DefaultRS = storj.RedundancyScheme{ Algorithm: storj.ReedSolomon, RequiredShares: 20, RepairShares: 30, @@ -38,7 +39,8 @@ var defaultRS = storj.RedundancyScheme{ ShareSize: 1 * memory.KB.Int32(), } -var defaultES = storj.EncryptionScheme{ +// DefaultES default values for EncryptionScheme +var DefaultES = storj.EncryptionScheme{ Cipher: storj.AESGCM, BlockSize: 1 * memory.KB.Int32(), } @@ -104,12 +106,12 @@ func (db *DB) CreateObject(ctx context.Context, bucket string, path storj.Path, // if info.ContentType == "" {} if info.RedundancyScheme.IsZero() { - info.RedundancyScheme = defaultRS + info.RedundancyScheme = DefaultRS } if info.EncryptionScheme.IsZero() { info.EncryptionScheme = storj.EncryptionScheme{ - Cipher: defaultES.Cipher, + Cipher: DefaultES.Cipher, BlockSize: info.RedundancyScheme.ShareSize, } } diff --git a/pkg/metainfo/kvmetainfo/objects_test.go b/pkg/metainfo/kvmetainfo/objects_test.go index 13e1a8e85..2daf586d7 100644 --- a/pkg/metainfo/kvmetainfo/objects_test.go +++ b/pkg/metainfo/kvmetainfo/objects_test.go @@ -1,7 +1,7 @@ // Copyright (C) 2019 Storj Labs, Inc. // See LICENSE for copying information. -package kvmetainfo +package kvmetainfo_test import ( "context" @@ -14,6 +14,9 @@ import ( "github.com/stretchr/testify/assert" "storj.io/storj/internal/memory" + "storj.io/storj/pkg/metainfo/kvmetainfo" + "storj.io/storj/pkg/storage/buckets" + "storj.io/storj/pkg/storage/streams" "storj.io/storj/pkg/storj" "storj.io/storj/pkg/stream" ) @@ -35,7 +38,7 @@ func TestCreateObject(t *testing.T) { BlockSize: 1 * memory.KB.Int32(), } - runTest(t, func(ctx context.Context, db *DB) { + runTest(t, func(ctx context.Context, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) { bucket, err := db.CreateBucket(ctx, TestBucket, nil) if !assert.NoError(t, err) { return @@ -48,8 +51,8 @@ func TestCreateObject(t *testing.T) { }{ { create: nil, - expectedRS: defaultRS, - expectedES: defaultES, + expectedRS: kvmetainfo.DefaultRS, + expectedES: kvmetainfo.DefaultES, }, { create: &storj.CreateObject{RedundancyScheme: customRS, EncryptionScheme: customES}, expectedRS: customRS, @@ -57,10 +60,10 @@ func TestCreateObject(t *testing.T) { }, { create: &storj.CreateObject{RedundancyScheme: customRS}, expectedRS: customRS, - expectedES: storj.EncryptionScheme{Cipher: defaultES.Cipher, BlockSize: customRS.ShareSize}, + expectedES: storj.EncryptionScheme{Cipher: kvmetainfo.DefaultES.Cipher, BlockSize: customRS.ShareSize}, }, { create: &storj.CreateObject{EncryptionScheme: customES}, - expectedRS: defaultRS, + expectedRS: kvmetainfo.DefaultRS, expectedES: customES, }, } { @@ -84,13 +87,13 @@ func TestCreateObject(t *testing.T) { } func TestGetObject(t *testing.T) { - runTest(t, func(ctx context.Context, db *DB) { + runTest(t, func(ctx context.Context, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) { bucket, err := db.CreateBucket(ctx, TestBucket, nil) if !assert.NoError(t, err) { return } - upload(ctx, t, db, bucket, TestFile, nil) + upload(ctx, t, db, streams, bucket, TestFile, nil) _, err = db.GetObject(ctx, "", "") assert.True(t, storj.ErrNoBucket.Has(err)) @@ -114,7 +117,7 @@ func TestGetObject(t *testing.T) { } func TestGetObjectStream(t *testing.T) { - runTest(t, func(ctx context.Context, db *DB) { + runTest(t, func(ctx context.Context, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) { // we wait a second for all the nodes to complete bootstrapping off the satellite time.Sleep(2 * time.Second) @@ -129,9 +132,9 @@ func TestGetObjectStream(t *testing.T) { return } - upload(ctx, t, db, bucket, "empty-file", nil) - upload(ctx, t, db, bucket, "small-file", []byte("test")) - upload(ctx, t, db, bucket, "large-file", data) + upload(ctx, t, db, streams, bucket, "empty-file", nil) + upload(ctx, t, db, streams, bucket, "small-file", []byte("test")) + upload(ctx, t, db, streams, bucket, "large-file", data) _, err = db.GetObjectStream(ctx, "", "") assert.True(t, storj.ErrNoBucket.Has(err)) @@ -145,13 +148,13 @@ func TestGetObjectStream(t *testing.T) { _, err = db.GetObjectStream(ctx, bucket.Name, "non-existing-file") assert.True(t, storj.ErrObjectNotFound.Has(err)) - assertStream(ctx, t, db, bucket, "empty-file", 0, []byte{}) - assertStream(ctx, t, db, bucket, "small-file", 4, []byte("test")) - assertStream(ctx, t, db, bucket, "large-file", int64(32*memory.KB), data) + assertStream(ctx, t, db, streams, bucket, "empty-file", 0, []byte{}) + assertStream(ctx, t, db, streams, bucket, "small-file", 4, []byte("test")) + assertStream(ctx, t, db, streams, bucket, "large-file", int64(32*memory.KB), data) }) } -func upload(ctx context.Context, t *testing.T, db *DB, bucket storj.Bucket, path storj.Path, data []byte) { +func upload(ctx context.Context, t *testing.T, db *kvmetainfo.DB, streams streams.Store, bucket storj.Bucket, path storj.Path, data []byte) { obj, err := db.CreateObject(ctx, bucket.Name, path, nil) if !assert.NoError(t, err) { return @@ -162,7 +165,7 @@ func upload(ctx context.Context, t *testing.T, db *DB, bucket storj.Bucket, path return } - upload := stream.NewUpload(ctx, str, db.streams) + upload := stream.NewUpload(ctx, str, streams) _, err = upload.Write(data) if !assert.NoError(t, err) { @@ -180,7 +183,7 @@ func upload(ctx context.Context, t *testing.T, db *DB, bucket storj.Bucket, path } } -func assertStream(ctx context.Context, t *testing.T, db *DB, bucket storj.Bucket, path storj.Path, size int64, content []byte) { +func assertStream(ctx context.Context, t *testing.T, db *kvmetainfo.DB, streams streams.Store, bucket storj.Bucket, path storj.Path, size int64, content []byte) { readOnly, err := db.GetObjectStream(ctx, bucket.Name, path) if !assert.NoError(t, err) { return @@ -208,7 +211,7 @@ func assertStream(ctx context.Context, t *testing.T, db *DB, bucket storj.Bucket assertInlineSegment(t, segments[0], content) } - download := stream.NewDownload(ctx, readOnly, db.streams) + download := stream.NewDownload(ctx, readOnly, streams) defer func() { err = download.Close() assert.NoError(t, err) @@ -253,13 +256,13 @@ func assertRemoteSegment(t *testing.T, segment storj.Segment) { } func TestDeleteObject(t *testing.T) { - runTest(t, func(ctx context.Context, db *DB) { + runTest(t, func(ctx context.Context, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) { bucket, err := db.CreateBucket(ctx, TestBucket, nil) if !assert.NoError(t, err) { return } - upload(ctx, t, db, bucket, TestFile, nil) + upload(ctx, t, db, streams, bucket, TestFile, nil) err = db.DeleteObject(ctx, "", "") assert.True(t, storj.ErrNoBucket.Has(err)) @@ -279,7 +282,7 @@ func TestDeleteObject(t *testing.T) { } func TestListObjectsEmpty(t *testing.T) { - runTest(t, func(ctx context.Context, db *DB) { + runTest(t, func(ctx context.Context, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) { bucket, err := db.CreateBucket(ctx, TestBucket, nil) if !assert.NoError(t, err) { return @@ -307,7 +310,7 @@ func TestListObjectsEmpty(t *testing.T) { } func TestListObjects(t *testing.T) { - runTest(t, func(ctx context.Context, db *DB) { + runTest(t, func(ctx context.Context, db *kvmetainfo.DB, buckets buckets.Store, streams streams.Store) { bucket, err := db.CreateBucket(ctx, TestBucket, &storj.Bucket{PathCipher: storj.Unencrypted}) if !assert.NoError(t, err) { return @@ -320,7 +323,7 @@ func TestListObjects(t *testing.T) { } for _, path := range filePaths { - upload(ctx, t, db, bucket, path, nil) + upload(ctx, t, db, streams, bucket, path, nil) } otherBucket, err := db.CreateBucket(ctx, "otherbucket", nil) @@ -328,7 +331,7 @@ func TestListObjects(t *testing.T) { return } - upload(ctx, t, db, otherBucket, "file-in-other-bucket", nil) + upload(ctx, t, db, streams, otherBucket, "file-in-other-bucket", nil) for i, tt := range []struct { options storj.ListOptions