all: remove usages of testplanet.New
Ensure that tests use testplanet.Run, so we always require running against all database backends. Change-Id: I6b0209e6a4912cf3328bd35b2c31bb8598930acb
This commit is contained in:
parent
76fdb5d863
commit
5a4745eddb
@ -161,9 +161,7 @@ func TestPutGetList(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func runTest(ctx context.Context, t *testing.T, apiKey, satelliteAddr string,
|
||||
test putGetListTest) {
|
||||
|
||||
func runTest(ctx context.Context, t *testing.T, apiKey, satelliteAddr string, test putGetListTest) {
|
||||
errCatch := func(fn func() error) { require.NoError(t, fn()) }
|
||||
|
||||
cfg := &uplink.Config{}
|
||||
|
@ -16,6 +16,7 @@ import (
|
||||
"github.com/minio/minio/pkg/auth"
|
||||
"github.com/minio/minio/pkg/hash"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/vivint/infectious"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
||||
@ -65,7 +66,7 @@ var (
|
||||
)
|
||||
|
||||
func TestMakeBucketWithLocation(t *testing.T) {
|
||||
runTest(t, func(ctx context.Context, layer minio.ObjectLayer, m *kvmetainfo.DB, strms streams.Store) {
|
||||
runTest(t, func(t *testing.T, ctx context.Context, layer minio.ObjectLayer, m *kvmetainfo.DB, strms streams.Store) {
|
||||
// Check the error when creating bucket with empty name
|
||||
err := layer.MakeBucketWithLocation(ctx, "", "")
|
||||
assert.Equal(t, minio.BucketNameInvalid{}, err)
|
||||
@ -88,7 +89,7 @@ func TestMakeBucketWithLocation(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestGetBucketInfo(t *testing.T) {
|
||||
runTest(t, func(ctx context.Context, layer minio.ObjectLayer, m *kvmetainfo.DB, strms streams.Store) {
|
||||
runTest(t, func(t *testing.T, ctx context.Context, layer minio.ObjectLayer, m *kvmetainfo.DB, strms streams.Store) {
|
||||
// Check the error when getting info about bucket with empty name
|
||||
_, err := layer.GetBucketInfo(ctx, "")
|
||||
assert.Equal(t, minio.BucketNameInvalid{}, err)
|
||||
@ -111,7 +112,7 @@ func TestGetBucketInfo(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDeleteBucket(t *testing.T) {
|
||||
runTest(t, func(ctx context.Context, layer minio.ObjectLayer, m *kvmetainfo.DB, strms streams.Store) {
|
||||
runTest(t, func(t *testing.T, ctx context.Context, layer minio.ObjectLayer, m *kvmetainfo.DB, strms streams.Store) {
|
||||
// Check the error when deleting bucket with empty name
|
||||
err := layer.DeleteBucket(ctx, "")
|
||||
assert.Equal(t, minio.BucketNameInvalid{}, err)
|
||||
@ -146,7 +147,7 @@ func TestDeleteBucket(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestListBuckets(t *testing.T) {
|
||||
runTest(t, func(ctx context.Context, layer minio.ObjectLayer, m *kvmetainfo.DB, strms streams.Store) {
|
||||
runTest(t, func(t *testing.T, ctx context.Context, layer minio.ObjectLayer, m *kvmetainfo.DB, strms streams.Store) {
|
||||
// Check that empty list is return if no buckets exist yet
|
||||
bucketInfos, err := layer.ListBuckets(ctx)
|
||||
assert.NoError(t, err)
|
||||
@ -174,31 +175,29 @@ func TestListBuckets(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPutObject(t *testing.T) {
|
||||
data, err := hash.NewReader(bytes.NewReader([]byte("test")),
|
||||
int64(len("test")),
|
||||
"098f6bcd4621d373cade4e832627b4f6",
|
||||
"9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
runTest(t, func(t *testing.T, ctx context.Context, layer minio.ObjectLayer, m *kvmetainfo.DB, strms streams.Store) {
|
||||
data, err := hash.NewReader(bytes.NewReader([]byte("test")),
|
||||
int64(len("test")),
|
||||
"098f6bcd4621d373cade4e832627b4f6",
|
||||
"9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08")
|
||||
require.NoError(t, err)
|
||||
|
||||
metadata := map[string]string{
|
||||
"content-type": "media/foo",
|
||||
"key1": "value1",
|
||||
"key2": "value2",
|
||||
}
|
||||
metadata := map[string]string{
|
||||
"content-type": "media/foo",
|
||||
"key1": "value1",
|
||||
"key2": "value2",
|
||||
}
|
||||
|
||||
serMetaInfo := pb.SerializableMeta{
|
||||
ContentType: metadata["content-type"],
|
||||
UserDefined: map[string]string{
|
||||
"key1": metadata["key1"],
|
||||
"key2": metadata["key2"],
|
||||
},
|
||||
}
|
||||
serMetaInfo := pb.SerializableMeta{
|
||||
ContentType: metadata["content-type"],
|
||||
UserDefined: map[string]string{
|
||||
"key1": metadata["key1"],
|
||||
"key2": metadata["key2"],
|
||||
},
|
||||
}
|
||||
|
||||
runTest(t, func(ctx context.Context, layer minio.ObjectLayer, m *kvmetainfo.DB, strms streams.Store) {
|
||||
// Check the error when putting an object to a bucket with empty name
|
||||
_, err := layer.PutObject(ctx, "", "", nil, nil)
|
||||
_, err = layer.PutObject(ctx, "", "", nil, nil)
|
||||
assert.Equal(t, minio.BucketNameInvalid{}, err)
|
||||
|
||||
// Check the error when putting an object to a non-existing bucket
|
||||
@ -242,7 +241,7 @@ func TestPutObject(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestGetObjectInfo(t *testing.T) {
|
||||
runTest(t, func(ctx context.Context, layer minio.ObjectLayer, m *kvmetainfo.DB, strms streams.Store) {
|
||||
runTest(t, func(t *testing.T, ctx context.Context, layer minio.ObjectLayer, m *kvmetainfo.DB, strms streams.Store) {
|
||||
// Check the error when getting an object from a bucket with empty name
|
||||
_, err := layer.GetObjectInfo(ctx, "", "")
|
||||
assert.Equal(t, minio.BucketNameInvalid{}, err)
|
||||
@ -287,7 +286,7 @@ func TestGetObjectInfo(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestGetObject(t *testing.T) {
|
||||
runTest(t, func(ctx context.Context, layer minio.ObjectLayer, m *kvmetainfo.DB, strms streams.Store) {
|
||||
runTest(t, func(t *testing.T, ctx context.Context, layer minio.ObjectLayer, m *kvmetainfo.DB, strms streams.Store) {
|
||||
// Check the error when getting an object from a bucket with empty name
|
||||
err := layer.GetObject(ctx, "", "", 0, 0, nil, "")
|
||||
assert.Equal(t, minio.BucketNameInvalid{}, err)
|
||||
@ -350,7 +349,7 @@ func TestGetObject(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestCopyObject(t *testing.T) {
|
||||
runTest(t, func(ctx context.Context, layer minio.ObjectLayer, m *kvmetainfo.DB, strms streams.Store) {
|
||||
runTest(t, func(t *testing.T, ctx context.Context, layer minio.ObjectLayer, m *kvmetainfo.DB, strms streams.Store) {
|
||||
// Check the error when copying an object from a bucket with empty name
|
||||
_, err := layer.CopyObject(ctx, "", TestFile, DestBucket, DestFile, minio.ObjectInfo{})
|
||||
assert.Equal(t, minio.BucketNameInvalid{}, err)
|
||||
@ -420,7 +419,7 @@ func TestCopyObject(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDeleteObject(t *testing.T) {
|
||||
runTest(t, func(ctx context.Context, layer minio.ObjectLayer, m *kvmetainfo.DB, strms streams.Store) {
|
||||
runTest(t, func(t *testing.T, ctx context.Context, layer minio.ObjectLayer, m *kvmetainfo.DB, strms streams.Store) {
|
||||
// Check the error when deleting an object from a bucket with empty name
|
||||
err := layer.DeleteObject(ctx, "", "")
|
||||
assert.Equal(t, minio.BucketNameInvalid{}, err)
|
||||
@ -456,7 +455,7 @@ func TestDeleteObject(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestListObjects(t *testing.T) {
|
||||
testListObjects(t, func(ctx context.Context, layer minio.ObjectLayer, bucket, prefix, marker, delimiter string, maxKeys int) ([]string, []minio.ObjectInfo, bool, error) {
|
||||
testListObjects(t, func(t *testing.T, ctx context.Context, layer minio.ObjectLayer, bucket, prefix, marker, delimiter string, maxKeys int) ([]string, []minio.ObjectInfo, bool, error) {
|
||||
list, err := layer.ListObjects(ctx, TestBucket, prefix, marker, delimiter, maxKeys)
|
||||
if err != nil {
|
||||
return nil, nil, false, err
|
||||
@ -466,7 +465,7 @@ func TestListObjects(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestListObjectsV2(t *testing.T) {
|
||||
testListObjects(t, func(ctx context.Context, layer minio.ObjectLayer, bucket, prefix, marker, delimiter string, maxKeys int) ([]string, []minio.ObjectInfo, bool, error) {
|
||||
testListObjects(t, func(t *testing.T, ctx context.Context, layer minio.ObjectLayer, bucket, prefix, marker, delimiter string, maxKeys int) ([]string, []minio.ObjectInfo, bool, error) {
|
||||
list, err := layer.ListObjectsV2(ctx, TestBucket, prefix, marker, delimiter, maxKeys, false, "")
|
||||
if err != nil {
|
||||
return nil, nil, false, err
|
||||
@ -475,8 +474,8 @@ func TestListObjectsV2(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func testListObjects(t *testing.T, listObjects func(context.Context, minio.ObjectLayer, string, string, string, string, int) ([]string, []minio.ObjectInfo, bool, error)) {
|
||||
runTest(t, func(ctx context.Context, layer minio.ObjectLayer, m *kvmetainfo.DB, strms streams.Store) {
|
||||
func testListObjects(t *testing.T, listObjects func(*testing.T, context.Context, minio.ObjectLayer, string, string, string, string, int) ([]string, []minio.ObjectInfo, bool, error)) {
|
||||
runTest(t, func(t *testing.T, ctx context.Context, layer minio.ObjectLayer, m *kvmetainfo.DB, strms streams.Store) {
|
||||
// Check the error when listing objects with unsupported delimiter
|
||||
_, err := layer.ListObjects(ctx, TestBucket, "", "", "#", 0)
|
||||
assert.Equal(t, minio.UnsupportedDelimiter{Delimiter: "#"}, err)
|
||||
@ -634,7 +633,7 @@ func testListObjects(t *testing.T, listObjects func(context.Context, minio.Objec
|
||||
errTag := fmt.Sprintf("%d. %+v", i, tt)
|
||||
|
||||
// Check that the expected objects can be listed using the Minio API
|
||||
prefixes, objects, isTruncated, err := listObjects(ctx, layer, TestBucket, tt.prefix, tt.marker, tt.delimiter, tt.maxKeys)
|
||||
prefixes, objects, isTruncated, err := listObjects(t, ctx, layer, TestBucket, tt.prefix, tt.marker, tt.delimiter, tt.maxKeys)
|
||||
if assert.NoError(t, err, errTag) {
|
||||
assert.Equal(t, tt.more, isTruncated, errTag)
|
||||
assert.Equal(t, tt.prefixes, prefixes, errTag)
|
||||
@ -660,25 +659,15 @@ func testListObjects(t *testing.T, listObjects func(context.Context, minio.Objec
|
||||
})
|
||||
}
|
||||
|
||||
func runTest(t *testing.T, test func(context.Context, minio.ObjectLayer, *kvmetainfo.DB, streams.Store)) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
func runTest(t *testing.T, test func(*testing.T, context.Context, minio.ObjectLayer, *kvmetainfo.DB, streams.Store)) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
layer, m, strms, err := initEnv(ctx, t, planet)
|
||||
require.NoError(t, err)
|
||||
|
||||
planet, err := testplanet.New(t, 1, 4, 1)
|
||||
if !assert.NoError(t, err) {
|
||||
return
|
||||
}
|
||||
|
||||
defer ctx.Check(planet.Shutdown)
|
||||
|
||||
planet.Start(ctx)
|
||||
|
||||
layer, m, strms, err := initEnv(ctx, t, planet)
|
||||
if !assert.NoError(t, err) {
|
||||
return
|
||||
}
|
||||
|
||||
test(ctx, layer, m, strms)
|
||||
test(t, ctx, layer, m, strms)
|
||||
})
|
||||
}
|
||||
|
||||
func initEnv(ctx context.Context, t *testing.T, planet *testplanet.Planet) (minio.ObjectLayer, *kvmetainfo.DB, streams.Store, error) {
|
||||
|
@ -14,6 +14,7 @@ import (
|
||||
"github.com/minio/cli"
|
||||
minio "github.com/minio/minio/cmd"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
||||
@ -37,78 +38,74 @@ type config struct {
|
||||
func TestUploadDownload(t *testing.T) {
|
||||
t.Skip("disable because, keeps stalling CI intermittently")
|
||||
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
// add project to satisfy constraint
|
||||
_, err := planet.Satellites[0].DB.Console().Projects().Insert(ctx, &console.Project{
|
||||
Name: "testProject",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
planet, err := testplanet.New(t, 1, 30, 1)
|
||||
assert.NoError(t, err)
|
||||
var gwCfg config
|
||||
gwCfg.Minio.Dir = ctx.Dir("minio")
|
||||
gwCfg.Server.Address = "127.0.0.1:7777"
|
||||
|
||||
defer ctx.Check(planet.Shutdown)
|
||||
uplinkCfg := planet.Uplinks[0].GetConfig(planet.Satellites[0])
|
||||
|
||||
// add project to satisfy constraint
|
||||
_, err = planet.Satellites[0].DB.Console().Projects().Insert(context.Background(), &console.Project{
|
||||
Name: "testProject",
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
planet.Start(ctx)
|
||||
|
||||
var gwCfg config
|
||||
gwCfg.Minio.Dir = ctx.Dir("minio")
|
||||
gwCfg.Server.Address = "127.0.0.1:7777"
|
||||
// create identity for gateway
|
||||
ca, err := testidentity.NewTestCA(ctx)
|
||||
assert.NoError(t, err)
|
||||
identity, err := ca.NewIdentity()
|
||||
assert.NoError(t, err)
|
||||
|
||||
uplinkCfg := planet.Uplinks[0].GetConfig(planet.Satellites[0])
|
||||
// setup and start gateway
|
||||
go func() {
|
||||
// TODO: this leaks the gateway server, however it shouldn't
|
||||
err := runGateway(ctx, gwCfg, uplinkCfg, zaptest.NewLogger(t), identity)
|
||||
if err != nil {
|
||||
t.Log(err)
|
||||
}
|
||||
}()
|
||||
|
||||
planet.Start(ctx)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// create identity for gateway
|
||||
ca, err := testidentity.NewTestCA(ctx)
|
||||
assert.NoError(t, err)
|
||||
identity, err := ca.NewIdentity()
|
||||
assert.NoError(t, err)
|
||||
client, err := s3client.NewMinio(s3client.Config{
|
||||
S3Gateway: gwCfg.Server.Address,
|
||||
Satellite: planet.Satellites[0].Addr(),
|
||||
AccessKey: gwCfg.Minio.AccessKey,
|
||||
SecretKey: gwCfg.Minio.SecretKey,
|
||||
APIKey: uplinkCfg.Legacy.Client.APIKey,
|
||||
EncryptionKey: "fake-encryption-key",
|
||||
NoSSL: true,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
|
||||
// setup and start gateway
|
||||
go func() {
|
||||
// TODO: this leaks the gateway server, however it shouldn't
|
||||
err := runGateway(ctx, gwCfg, uplinkCfg, zaptest.NewLogger(t), identity)
|
||||
if err != nil {
|
||||
t.Log(err)
|
||||
bucket := "bucket"
|
||||
|
||||
err = client.MakeBucket(bucket, "")
|
||||
assert.NoError(t, err)
|
||||
|
||||
// generate enough data for a remote segment
|
||||
data := []byte{}
|
||||
for i := 0; i < 5000; i++ {
|
||||
data = append(data, 'a')
|
||||
}
|
||||
}()
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
objectName := "testdata"
|
||||
|
||||
client, err := s3client.NewMinio(s3client.Config{
|
||||
S3Gateway: gwCfg.Server.Address,
|
||||
Satellite: planet.Satellites[0].Addr(),
|
||||
AccessKey: gwCfg.Minio.AccessKey,
|
||||
SecretKey: gwCfg.Minio.SecretKey,
|
||||
APIKey: uplinkCfg.Legacy.Client.APIKey,
|
||||
EncryptionKey: "fake-encryption-key",
|
||||
NoSSL: true,
|
||||
err = client.Upload(bucket, objectName, data)
|
||||
assert.NoError(t, err)
|
||||
|
||||
buffer := make([]byte, len(data))
|
||||
|
||||
bytes, err := client.Download(bucket, objectName, buffer)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, string(data), string(bytes))
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
|
||||
bucket := "bucket"
|
||||
|
||||
err = client.MakeBucket(bucket, "")
|
||||
assert.NoError(t, err)
|
||||
|
||||
// generate enough data for a remote segment
|
||||
data := []byte{}
|
||||
for i := 0; i < 5000; i++ {
|
||||
data = append(data, 'a')
|
||||
}
|
||||
|
||||
objectName := "testdata"
|
||||
|
||||
err = client.Upload(bucket, objectName, data)
|
||||
assert.NoError(t, err)
|
||||
|
||||
buffer := make([]byte, len(data))
|
||||
|
||||
bytes, err := client.Download(bucket, objectName, buffer)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, string(data), string(bytes))
|
||||
}
|
||||
|
||||
// runGateway creates and starts a gateway
|
||||
|
@ -4,8 +4,6 @@
|
||||
package testplanet_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -72,22 +70,3 @@ func TestContact(t *testing.T) {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkCreate(b *testing.B) {
|
||||
storageNodes := []int{4, 10, 100}
|
||||
for _, count := range storageNodes {
|
||||
storageNodeCount := count
|
||||
b.Run(strconv.Itoa(storageNodeCount), func(b *testing.B) {
|
||||
ctx := context.Background()
|
||||
for i := 0; i < b.N; i++ {
|
||||
planet, err := testplanet.New(nil, 1, storageNodeCount, 1)
|
||||
require.NoError(b, err)
|
||||
|
||||
planet.Start(ctx)
|
||||
|
||||
err = planet.Shutdown()
|
||||
require.NoError(b, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -8,7 +8,6 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
||||
"storj.io/common/memory"
|
||||
@ -22,105 +21,39 @@ import (
|
||||
"storj.io/storj/storagenode/pieces"
|
||||
)
|
||||
|
||||
func TestNewDeletePiecesService(t *testing.T) {
|
||||
type params struct {
|
||||
maxConcurrentConns int
|
||||
dialer rpc.Dialer
|
||||
log *zap.Logger
|
||||
}
|
||||
var testCases = []struct {
|
||||
desc string
|
||||
args params
|
||||
errMsg string
|
||||
}{
|
||||
{
|
||||
desc: "ok",
|
||||
args: params{
|
||||
maxConcurrentConns: 10,
|
||||
dialer: rpc.NewDefaultDialer(nil),
|
||||
log: zaptest.NewLogger(t),
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "error: 0 maxConcurrentCons",
|
||||
args: params{
|
||||
maxConcurrentConns: 0,
|
||||
dialer: rpc.NewDefaultDialer(nil),
|
||||
log: zaptest.NewLogger(t),
|
||||
},
|
||||
errMsg: "greater than 0",
|
||||
},
|
||||
{
|
||||
desc: "error: negative maxConcurrentCons",
|
||||
args: params{
|
||||
maxConcurrentConns: -3,
|
||||
dialer: rpc.NewDefaultDialer(nil),
|
||||
log: zaptest.NewLogger(t),
|
||||
},
|
||||
errMsg: "greater than 0",
|
||||
},
|
||||
{
|
||||
desc: "error: zero dialer",
|
||||
args: params{
|
||||
maxConcurrentConns: 87,
|
||||
dialer: rpc.Dialer{},
|
||||
log: zaptest.NewLogger(t),
|
||||
},
|
||||
errMsg: "dialer cannot be its zero value",
|
||||
},
|
||||
{
|
||||
desc: "error: nil logger",
|
||||
args: params{
|
||||
maxConcurrentConns: 2,
|
||||
dialer: rpc.NewDefaultDialer(nil),
|
||||
log: nil,
|
||||
},
|
||||
errMsg: "logger cannot be nil",
|
||||
},
|
||||
}
|
||||
func TestDeletePiecesService_New_Error(t *testing.T) {
|
||||
log := zaptest.NewLogger(t)
|
||||
dialer := rpc.NewDefaultDialer(nil)
|
||||
|
||||
for _, tc := range testCases {
|
||||
tc := tc
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
_, err := metainfo.NewDeletePiecesService(nil, dialer, 8)
|
||||
require.True(t, metainfo.ErrDeletePieces.Has(err), err)
|
||||
require.Contains(t, err.Error(), "logger cannot be nil")
|
||||
|
||||
svc, err := metainfo.NewDeletePiecesService(tc.args.log, tc.args.dialer, tc.args.maxConcurrentConns)
|
||||
if tc.errMsg != "" {
|
||||
require.Error(t, err)
|
||||
require.True(t, metainfo.ErrDeletePieces.Has(err), "unexpected error class")
|
||||
require.Contains(t, err.Error(), tc.errMsg)
|
||||
return
|
||||
}
|
||||
_, err = metainfo.NewDeletePiecesService(log, rpc.Dialer{}, 87)
|
||||
require.True(t, metainfo.ErrDeletePieces.Has(err), err)
|
||||
require.Contains(t, err.Error(), "dialer cannot be its zero value")
|
||||
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, svc)
|
||||
_, err = metainfo.NewDeletePiecesService(log, dialer, 0)
|
||||
require.True(t, metainfo.ErrDeletePieces.Has(err), err)
|
||||
require.Contains(t, err.Error(), "greater than 0")
|
||||
|
||||
})
|
||||
}
|
||||
_, err = metainfo.NewDeletePiecesService(log, dialer, -3)
|
||||
require.True(t, metainfo.ErrDeletePieces.Has(err), err)
|
||||
require.Contains(t, err.Error(), "greater than 0")
|
||||
}
|
||||
|
||||
func TestDeletePiecesService_DeletePieces(t *testing.T) {
|
||||
t.Run("all nodes up", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
planet, err := testplanet.New(t, 1, 4, 1)
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(planet.Shutdown)
|
||||
planet.Start(ctx)
|
||||
|
||||
var (
|
||||
uplnk = planet.Uplinks[0]
|
||||
satelliteSys = planet.Satellites[0]
|
||||
)
|
||||
func TestDeletePiecesService_DeletePieces_AllNodesUp(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
uplnk := planet.Uplinks[0]
|
||||
satelliteSys := planet.Satellites[0]
|
||||
|
||||
{
|
||||
data := testrand.Bytes(10 * memory.KiB)
|
||||
// Use RSConfig for ensuring that we don't have long-tail cancellations
|
||||
// and the upload doesn't leave garbage in the SNs
|
||||
err = uplnk.UploadWithClientConfig(ctx, satelliteSys, cmd.Config{
|
||||
err := uplnk.UploadWithClientConfig(ctx, satelliteSys, cmd.Config{
|
||||
Client: cmd.ClientConfig{
|
||||
SegmentSize: 10 * memory.KiB,
|
||||
},
|
||||
@ -165,7 +98,7 @@ func TestDeletePiecesService_DeletePieces(t *testing.T) {
|
||||
nodesPieces = append(nodesPieces, nodePieces)
|
||||
}
|
||||
|
||||
err = satelliteSys.API.Metainfo.DeletePiecesService.DeletePieces(ctx, nodesPieces, 0.75)
|
||||
err := satelliteSys.API.Metainfo.DeletePiecesService.DeletePieces(ctx, nodesPieces, 0.75)
|
||||
require.NoError(t, err)
|
||||
|
||||
// calculate the SNs used space after delete the pieces
|
||||
@ -183,28 +116,20 @@ func TestDeletePiecesService_DeletePieces(t *testing.T) {
|
||||
t.Fatalf("deleted used space is less than 0.75%%. Got %f", deletedUsedSpace)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("some nodes down", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
planet, err := testplanet.New(t, 1, 4, 1)
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(planet.Shutdown)
|
||||
planet.Start(ctx)
|
||||
|
||||
var (
|
||||
uplnk = planet.Uplinks[0]
|
||||
satelliteSys = planet.Satellites[0]
|
||||
)
|
||||
func TestDeletePiecesService_DeletePieces_SomeNodesDown(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
uplnk := planet.Uplinks[0]
|
||||
satelliteSys := planet.Satellites[0]
|
||||
|
||||
{
|
||||
data := testrand.Bytes(10 * memory.KiB)
|
||||
// Use RSConfig for ensuring that we don't have long-tail cancellations
|
||||
// and the upload doesn't leave garbage in the SNs
|
||||
err = uplnk.UploadWithClientConfig(ctx, satelliteSys, cmd.Config{
|
||||
err := uplnk.UploadWithClientConfig(ctx, satelliteSys, cmd.Config{
|
||||
Client: cmd.ClientConfig{
|
||||
SegmentSize: 10 * memory.KiB,
|
||||
},
|
||||
@ -246,7 +171,7 @@ func TestDeletePiecesService_DeletePieces(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
err = satelliteSys.API.Metainfo.DeletePiecesService.DeletePieces(ctx, nodesPieces, 0.9999)
|
||||
err := satelliteSys.API.Metainfo.DeletePiecesService.DeletePieces(ctx, nodesPieces, 0.9999)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Check that storage nodes which are online when deleting pieces don't
|
||||
@ -260,28 +185,20 @@ func TestDeletePiecesService_DeletePieces(t *testing.T) {
|
||||
|
||||
require.Zero(t, totalUsedSpace, "totalUsedSpace online nodes")
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("all nodes down", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
planet, err := testplanet.New(t, 1, 4, 1)
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(planet.Shutdown)
|
||||
planet.Start(ctx)
|
||||
|
||||
var (
|
||||
uplnk = planet.Uplinks[0]
|
||||
satelliteSys = planet.Satellites[0]
|
||||
)
|
||||
func TestDeletePiecesService_DeletePieces_AllNodesDown(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
uplnk := planet.Uplinks[0]
|
||||
satelliteSys := planet.Satellites[0]
|
||||
|
||||
{
|
||||
data := testrand.Bytes(10 * memory.KiB)
|
||||
// Use RSConfig for ensuring that we don't have long-tail cancellations
|
||||
// and the upload doesn't leave garbage in the SNs
|
||||
err = uplnk.UploadWithClientConfig(ctx, satelliteSys, cmd.Config{
|
||||
err := uplnk.UploadWithClientConfig(ctx, satelliteSys, cmd.Config{
|
||||
Client: cmd.ClientConfig{
|
||||
SegmentSize: 10 * memory.KiB,
|
||||
},
|
||||
@ -327,7 +244,7 @@ func TestDeletePiecesService_DeletePieces(t *testing.T) {
|
||||
require.NoError(t, planet.StopPeer(sn))
|
||||
}
|
||||
|
||||
err = satelliteSys.API.Metainfo.DeletePiecesService.DeletePieces(ctx, nodesPieces, 0.9999)
|
||||
err := satelliteSys.API.Metainfo.DeletePiecesService.DeletePieces(ctx, nodesPieces, 0.9999)
|
||||
require.NoError(t, err)
|
||||
|
||||
var totalUsedSpace int64
|
||||
@ -340,28 +257,20 @@ func TestDeletePiecesService_DeletePieces(t *testing.T) {
|
||||
|
||||
require.Equal(t, expectedTotalUsedSpace, totalUsedSpace, "totalUsedSpace")
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("invalid dialer", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
planet, err := testplanet.New(t, 1, 4, 1)
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(planet.Shutdown)
|
||||
planet.Start(ctx)
|
||||
|
||||
var (
|
||||
uplnk = planet.Uplinks[0]
|
||||
satelliteSys = planet.Satellites[0]
|
||||
)
|
||||
func TestDeletePiecesService_DeletePieces_InvalidDialer(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
uplnk := planet.Uplinks[0]
|
||||
satelliteSys := planet.Satellites[0]
|
||||
|
||||
{
|
||||
data := testrand.Bytes(10 * memory.KiB)
|
||||
// Use RSConfig for ensuring that we don't have long-tail cancellations
|
||||
// and the upload doesn't leave garbage in the SNs
|
||||
err = uplnk.UploadWithClientConfig(ctx, satelliteSys, cmd.Config{
|
||||
err := uplnk.UploadWithClientConfig(ctx, satelliteSys, cmd.Config{
|
||||
Client: cmd.ClientConfig{
|
||||
SegmentSize: 10 * memory.KiB,
|
||||
},
|
||||
@ -413,6 +322,7 @@ func TestDeletePiecesService_DeletePieces(t *testing.T) {
|
||||
zaptest.NewLogger(t), dialer, len(nodesPieces)-1,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(service.Close)
|
||||
|
||||
err = service.DeletePieces(ctx, nodesPieces, 0.75)
|
||||
require.NoError(t, err)
|
||||
@ -427,40 +337,31 @@ func TestDeletePiecesService_DeletePieces(t *testing.T) {
|
||||
// because no node can be dialed the SNs used space should be the same
|
||||
require.Equal(t, expectedTotalUsedSpace, totalUsedSpaceAfterDelete)
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("empty nodes pieces", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
func TestDeletePiecesService_DeletePieces_Invalid(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
deletePiecesService := planet.Satellites[0].API.Metainfo.DeletePiecesService
|
||||
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
t.Run("empty node pieces", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
err := deletePiecesService.DeletePieces(ctx, metainfo.NodesPieces{}, 0.75)
|
||||
require.Error(t, err)
|
||||
assert.False(t, metainfo.ErrDeletePieces.Has(err), err)
|
||||
assert.Contains(t, err.Error(), "invalid number of tasks")
|
||||
})
|
||||
|
||||
planet, err := testplanet.New(t, 1, 4, 1)
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(planet.Shutdown)
|
||||
planet.Start(ctx)
|
||||
|
||||
err = planet.Satellites[0].API.Metainfo.DeletePiecesService.DeletePieces(ctx, metainfo.NodesPieces{}, 0.75)
|
||||
require.Error(t, err)
|
||||
assert.False(t, metainfo.ErrDeletePieces.Has(err), "unexpected error class")
|
||||
assert.Contains(t, err.Error(), "invalid number of tasks")
|
||||
})
|
||||
|
||||
t.Run("invalid threshold", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
planet, err := testplanet.New(t, 1, 4, 1)
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(planet.Shutdown)
|
||||
planet.Start(ctx)
|
||||
|
||||
nodesPieces := make(metainfo.NodesPieces, 1)
|
||||
nodesPieces[0].Pieces = make([]storj.PieceID, 2)
|
||||
err = planet.Satellites[0].API.Metainfo.DeletePiecesService.DeletePieces(ctx, nodesPieces, 1)
|
||||
require.Error(t, err)
|
||||
assert.False(t, metainfo.ErrDeletePieces.Has(err), "unexpected error class")
|
||||
assert.Contains(t, err.Error(), "invalid successThreshold")
|
||||
t.Run("invalid threshold", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
nodesPieces := metainfo.NodesPieces{
|
||||
{Pieces: make([]storj.PieceID, 2)},
|
||||
}
|
||||
err := deletePiecesService.DeletePieces(ctx, nodesPieces, 1)
|
||||
require.Error(t, err)
|
||||
assert.False(t, metainfo.ErrDeletePieces.Has(err), err)
|
||||
assert.Contains(t, err.Error(), "invalid successThreshold")
|
||||
})
|
||||
})
|
||||
}
|
||||
|
@ -508,112 +508,103 @@ func TestDeletePiece(t *testing.T) {
|
||||
func TestTooManyRequests(t *testing.T) {
|
||||
t.Skip("flaky, because of EOF issues")
|
||||
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
const uplinkCount = 6
|
||||
const maxConcurrent = 3
|
||||
const expectedFailures = uplinkCount - maxConcurrent
|
||||
|
||||
log := zaptest.NewLogger(t)
|
||||
|
||||
planet, err := testplanet.NewCustom(log, testplanet.Config{
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: uplinkCount,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
StorageNode: func(index int, config *storagenode.Config) {
|
||||
config.Storage2.MaxConcurrentRequests = maxConcurrent
|
||||
},
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
doneWaiting := make(chan struct{})
|
||||
failedCount := int64(expectedFailures)
|
||||
|
||||
uploads, _ := errgroup.WithContext(ctx)
|
||||
defer ctx.Check(uploads.Wait)
|
||||
|
||||
for i, uplink := range planet.Uplinks {
|
||||
i, uplink := i, uplink
|
||||
uploads.Go(func() (err error) {
|
||||
storageNode := planet.StorageNodes[0].Local()
|
||||
config := piecestore.DefaultConfig
|
||||
config.UploadBufferSize = 0 // disable buffering so we can detect write error early
|
||||
|
||||
client, err := piecestore.Dial(ctx, uplink.Dialer, &storageNode.Node, uplink.Log, config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if cerr := client.Close(); cerr != nil {
|
||||
uplink.Log.Error("close failed", zap.Error(cerr))
|
||||
err = errs.Combine(err, cerr)
|
||||
}
|
||||
}()
|
||||
|
||||
pieceID := storj.PieceID{byte(i + 1)}
|
||||
serialNumber := testrand.SerialNumber()
|
||||
|
||||
orderLimit, piecePrivateKey := GenerateOrderLimit(
|
||||
t,
|
||||
planet.Satellites[0].ID(),
|
||||
planet.StorageNodes[0].ID(),
|
||||
pieceID,
|
||||
pb.PieceAction_PUT,
|
||||
serialNumber,
|
||||
24*time.Hour,
|
||||
24*time.Hour,
|
||||
int64(10000),
|
||||
)
|
||||
|
||||
satelliteSigner := signing.SignerFromFullIdentity(planet.Satellites[0].Identity)
|
||||
orderLimit, err = signing.SignOrderLimit(ctx, satelliteSigner, orderLimit)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
upload, err := client.Upload(ctx, orderLimit, piecePrivateKey)
|
||||
if err != nil {
|
||||
if errs2.IsRPC(err, rpcstatus.Unavailable) {
|
||||
if atomic.AddInt64(&failedCount, -1) == 0 {
|
||||
close(doneWaiting)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
uplink.Log.Error("upload failed", zap.Stringer("Piece ID", pieceID), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = upload.Write(make([]byte, orderLimit.Limit))
|
||||
if err != nil {
|
||||
if errs2.IsRPC(err, rpcstatus.Unavailable) {
|
||||
if atomic.AddInt64(&failedCount, -1) == 0 {
|
||||
close(doneWaiting)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
uplink.Log.Error("write failed", zap.Stringer("Piece ID", pieceID), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = upload.Commit(ctx)
|
||||
if err != nil {
|
||||
if errs2.IsRPC(err, rpcstatus.Unavailable) {
|
||||
if atomic.AddInt64(&failedCount, -1) == 0 {
|
||||
close(doneWaiting)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
uplink.Log.Error("commit failed", zap.Stringer("Piece ID", pieceID), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
})
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(planet.Shutdown)
|
||||
|
||||
planet.Start(ctx)
|
||||
|
||||
doneWaiting := make(chan struct{})
|
||||
failedCount := int64(expectedFailures)
|
||||
|
||||
uploads, _ := errgroup.WithContext(ctx)
|
||||
defer ctx.Check(uploads.Wait)
|
||||
|
||||
for i, uplink := range planet.Uplinks {
|
||||
i, uplink := i, uplink
|
||||
uploads.Go(func() (err error) {
|
||||
storageNode := planet.StorageNodes[0].Local()
|
||||
config := piecestore.DefaultConfig
|
||||
config.UploadBufferSize = 0 // disable buffering so we can detect write error early
|
||||
|
||||
client, err := piecestore.Dial(ctx, uplink.Dialer, &storageNode.Node, uplink.Log, config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if cerr := client.Close(); cerr != nil {
|
||||
uplink.Log.Error("close failed", zap.Error(cerr))
|
||||
err = errs.Combine(err, cerr)
|
||||
}
|
||||
}()
|
||||
|
||||
pieceID := storj.PieceID{byte(i + 1)}
|
||||
serialNumber := testrand.SerialNumber()
|
||||
|
||||
orderLimit, piecePrivateKey := GenerateOrderLimit(
|
||||
t,
|
||||
planet.Satellites[0].ID(),
|
||||
planet.StorageNodes[0].ID(),
|
||||
pieceID,
|
||||
pb.PieceAction_PUT,
|
||||
serialNumber,
|
||||
24*time.Hour,
|
||||
24*time.Hour,
|
||||
int64(10000),
|
||||
)
|
||||
|
||||
satelliteSigner := signing.SignerFromFullIdentity(planet.Satellites[0].Identity)
|
||||
orderLimit, err = signing.SignOrderLimit(ctx, satelliteSigner, orderLimit)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
upload, err := client.Upload(ctx, orderLimit, piecePrivateKey)
|
||||
if err != nil {
|
||||
if errs2.IsRPC(err, rpcstatus.Unavailable) {
|
||||
if atomic.AddInt64(&failedCount, -1) == 0 {
|
||||
close(doneWaiting)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
uplink.Log.Error("upload failed", zap.Stringer("Piece ID", pieceID), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = upload.Write(make([]byte, orderLimit.Limit))
|
||||
if err != nil {
|
||||
if errs2.IsRPC(err, rpcstatus.Unavailable) {
|
||||
if atomic.AddInt64(&failedCount, -1) == 0 {
|
||||
close(doneWaiting)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
uplink.Log.Error("write failed", zap.Stringer("Piece ID", pieceID), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = upload.Commit(ctx)
|
||||
if err != nil {
|
||||
if errs2.IsRPC(err, rpcstatus.Unavailable) {
|
||||
if atomic.AddInt64(&failedCount, -1) == 0 {
|
||||
close(doneWaiting)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
uplink.Log.Error("commit failed", zap.Stringer("Piece ID", pieceID), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func GenerateOrderLimit(t *testing.T, satellite storj.NodeID, storageNode storj.NodeID, pieceID storj.PieceID, action pb.PieceAction, serialNumber storj.SerialNumber, pieceExpiration, orderExpiration time.Duration, limit int64) (*pb.OrderLimit, storj.PiecePrivateKey) {
|
||||
|
Loading…
Reference in New Issue
Block a user