satellite: adds and enables cockroachdb compatibility for tests

Change-Id: I85a3ad8c3b9d7e15ea8675b6c55af0002933db57
This commit is contained in:
Cameron Ayer 2019-12-10 11:32:54 -05:00 committed by Jennifer Li Johnson
parent a47d7ac89b
commit a4f9865b47
12 changed files with 176 additions and 81 deletions

View File

@ -59,10 +59,12 @@ pipeline {
stage('Tests') {
environment {
STORJ_COCKROACH_TEST = 'cockroach://root@localhost:26257/testcockroach?sslmode=disable'
STORJ_POSTGRES_TEST = 'postgres://postgres@localhost/teststorj?sslmode=disable'
COVERFLAGS = "${ env.BRANCH_NAME != 'master' ? '' : '-coverprofile=.build/coverprofile -coverpkg=-coverpkg=storj.io/storj/private/...,storj.io/storj/lib/...,storj.io/storj/pkg/...,storj.io/storj/satellite/...,storj.io/storj/storage/...,storj.io/storj/storagenode/...,storj.io/storj/uplink/...,storj.io/storj/versioncontrol/...'}"
}
steps {
sh 'cockroach sql --insecure --host=localhost:26257 -e \'create database testcockroach;\''
sh 'psql -U postgres -c \'create database teststorj;\''
sh 'go run scripts/use-ports.go -from 1024 -to 10000 &'
sh 'go test -parallel 4 -p 6 -vet=off $COVERFLAGS -timeout 20m -json -race ./... 2>&1 | tee .build/tests.json | go run ./scripts/xunit.go -out .build/tests.xml'

View File

@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/peertls/tlsopts"
@ -19,6 +20,8 @@ import (
"storj.io/storj/private/testcontext"
"storj.io/storj/private/testidentity"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/storagenode"
)
// TestRPCBuild prints a statement so that in test output you can know whether
@ -167,8 +170,17 @@ func TestDialNode(t *testing.T) {
func TestDialNode_BadServerCertificate(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 0, StorageNodeCount: 2, UplinkCount: 0,
Reconfigure: testplanet.DisablePeerCAWhitelist,
Identities: testidentity.NewPregeneratedIdentities(storj.LatestIDVersion()),
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Server.UsePeerCAWhitelist = false
},
StorageNode: func(index int, config *storagenode.Config) {
config.Server.UsePeerCAWhitelist = false
},
Identities: func(log *zap.Logger, version storj.IDVersion) *testidentity.Identities {
return testidentity.NewPregeneratedIdentities(version)
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
whitelistPath, err := planet.WriteWhitelist(storj.LatestIDVersion())

View File

@ -56,13 +56,19 @@ func TestTempCockroachDB(t *testing.T) {
require.NoError(t, err)
require.Equalf(t, 1, count, "Expected 1 DB with matching name, but counted %d", count)
// close testDB but leave otherConn open
// close testDB
err = testDB.Close()
require.NoError(t, err)
// make a new connection back to the master connstr just to check that the our temp db
// really was dropped
plainDBConn, err := sql.Open("cockroach", *pgtest.CrdbConnStr)
require.NoError(t, err)
defer ctx.Check(plainDBConn.Close)
// assert new test db was deleted (we expect this connection to keep working, even though its
// database was deleted out from under it!)
row = otherConn.QueryRow(`SELECT COUNT(*) FROM pg_database WHERE datname = current_database()`)
row = plainDBConn.QueryRow(`SELECT COUNT(*) FROM pg_database WHERE datname = $1`, dbName)
err = row.Scan(&count)
require.NoError(t, err)
require.Equalf(t, 0, count, "Expected 0 DB with matching name, but counted %d (deletion failure?)", count)

View File

@ -50,7 +50,6 @@ type Config struct {
StorageNodeCount int
UplinkCount int
Identities *testidentity.Identities
IdentityVersion *storj.IDVersion
Reconfigure Reconfigure
@ -152,15 +151,17 @@ func NewCustom(log *zap.Logger, config Config) (*Planet, error) {
version := storj.LatestIDVersion()
config.IdentityVersion = &version
}
if config.Identities == nil {
config.Identities = testidentity.NewPregeneratedSignedIdentities(*config.IdentityVersion)
}
planet := &Planet{
log: log,
id: config.Name + "/" + pgutil.CreateRandomTestingSchemaName(6),
config: config,
identities: config.Identities,
}
if config.Reconfigure.Identities != nil {
planet.identities = config.Reconfigure.Identities(log, *config.IdentityVersion)
} else {
planet.identities = testidentity.NewPregeneratedSignedIdentities(*config.IdentityVersion)
}
var err error

View File

@ -9,6 +9,8 @@ import (
"go.uber.org/zap"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
"storj.io/storj/private/testidentity"
"storj.io/storj/satellite"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/storagenode"
@ -24,6 +26,8 @@ type Reconfigure struct {
NewStorageNodeDB func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error)
StorageNode func(index int, config *storagenode.Config)
UniqueIPCount int
Identities func(log *zap.Logger, version storj.IDVersion) *testidentity.Identities
}
// DisablePeerCAWhitelist returns a `Reconfigure` that sets `UsePeerCAWhitelist` for

View File

@ -289,7 +289,7 @@ func TestProjectUsageCustomLimit(t *testing.T) {
// Execute test: check that the uplink gets an error when they have exceeded storage limits and try to upload a file
actualErr := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path", expectedData)
assert.Error(t, actualErr)
require.Error(t, actualErr)
})
}
@ -404,22 +404,22 @@ func TestUsageRollups(t *testing.T) {
t.Run("test project total", func(t *testing.T) {
projTotal1, err := usageRollups.GetProjectTotal(ctx, project1, start, now)
assert.NoError(t, err)
assert.NotNil(t, projTotal1)
require.NoError(t, err)
require.NotNil(t, projTotal1)
projTotal2, err := usageRollups.GetProjectTotal(ctx, project2, start, now)
assert.NoError(t, err)
assert.NotNil(t, projTotal2)
require.NoError(t, err)
require.NotNil(t, projTotal2)
})
t.Run("test bucket usage rollups", func(t *testing.T) {
rollups1, err := usageRollups.GetBucketUsageRollups(ctx, project1, start, now)
assert.NoError(t, err)
assert.NotNil(t, rollups1)
require.NoError(t, err)
require.NotNil(t, rollups1)
rollups2, err := usageRollups.GetBucketUsageRollups(ctx, project2, start, now)
assert.NoError(t, err)
assert.NotNil(t, rollups2)
require.NoError(t, err)
require.NotNil(t, rollups2)
})
t.Run("test bucket totals", func(t *testing.T) {
@ -429,39 +429,55 @@ func TestUsageRollups(t *testing.T) {
}
totals1, err := usageRollups.GetBucketTotals(ctx, project1, cursor, start, now)
assert.NoError(t, err)
assert.NotNil(t, totals1)
require.NoError(t, err)
require.NotNil(t, totals1)
totals2, err := usageRollups.GetBucketTotals(ctx, project2, cursor, start, now)
assert.NoError(t, err)
assert.NotNil(t, totals2)
require.NoError(t, err)
require.NotNil(t, totals2)
})
t.Run("Get paged", func(t *testing.T) {
// sql injection test. F.E '%SomeText%' = > ''%SomeText%' OR 'x' != '%'' will be true
bucketsPage, err := usageRollups.GetBucketTotals(ctx, project1, accounting.BucketUsageCursor{Limit: 5, Search: "buck%' OR 'x' != '", Page: 1}, start, now)
assert.NoError(t, err)
assert.NotNil(t, bucketsPage)
require.NoError(t, err)
require.NotNil(t, bucketsPage)
assert.Equal(t, uint64(0), bucketsPage.TotalCount)
assert.Equal(t, uint(0), bucketsPage.CurrentPage)
assert.Equal(t, uint(0), bucketsPage.PageCount)
assert.Equal(t, 0, len(bucketsPage.BucketUsages))
bucketsPage, err = usageRollups.GetBucketTotals(ctx, project1, accounting.BucketUsageCursor{Limit: 3, Search: "", Page: 1}, start, now)
assert.NoError(t, err)
assert.NotNil(t, bucketsPage)
require.NoError(t, err)
require.NotNil(t, bucketsPage)
assert.Equal(t, uint64(5), bucketsPage.TotalCount)
assert.Equal(t, uint(1), bucketsPage.CurrentPage)
assert.Equal(t, uint(2), bucketsPage.PageCount)
assert.Equal(t, 3, len(bucketsPage.BucketUsages))
bucketsPage, err = usageRollups.GetBucketTotals(ctx, project1, accounting.BucketUsageCursor{Limit: 5, Search: "buck", Page: 1}, start, now)
assert.NoError(t, err)
assert.NotNil(t, bucketsPage)
require.NoError(t, err)
require.NotNil(t, bucketsPage)
assert.Equal(t, uint64(5), bucketsPage.TotalCount)
assert.Equal(t, uint(1), bucketsPage.CurrentPage)
assert.Equal(t, uint(1), bucketsPage.PageCount)
assert.Equal(t, 5, len(bucketsPage.BucketUsages))
bucketsPage, err = usageRollups.GetBucketTotals(ctx, project1, accounting.BucketUsageCursor{Limit: 5, Search: "bucket_0", Page: 1}, start, now)
require.NoError(t, err)
require.NotNil(t, bucketsPage)
assert.Equal(t, uint64(1), bucketsPage.TotalCount)
assert.Equal(t, uint(1), bucketsPage.CurrentPage)
assert.Equal(t, uint(1), bucketsPage.PageCount)
assert.Equal(t, 1, len(bucketsPage.BucketUsages))
bucketsPage, err = usageRollups.GetBucketTotals(ctx, project1, accounting.BucketUsageCursor{Limit: 5, Search: "buck\xff", Page: 1}, start, now)
require.NoError(t, err)
require.NotNil(t, bucketsPage)
assert.Equal(t, uint64(0), bucketsPage.TotalCount)
assert.Equal(t, uint(0), bucketsPage.CurrentPage)
assert.Equal(t, uint(0), bucketsPage.PageCount)
assert.Equal(t, 0, len(bucketsPage.BucketUsages))
})
})
}

View File

@ -19,12 +19,12 @@ import (
)
func TestServiceSuccess(t *testing.T) {
endpoint := &endpointHappyPath{}
tokenCount := 2
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
Reconfigure: testplanet.Reconfigure{
ReferralManagerServer: func(logger *zap.Logger) pb.ReferralManagerServer {
endpoint := &endpointHappyPath{}
endpoint.SetTokenCount(tokenCount)
return endpoint
},
@ -55,11 +55,11 @@ func TestServiceSuccess(t *testing.T) {
}
func TestServiceRedeemFailure(t *testing.T) {
endpoint := &endpointFailedPath{}
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
Reconfigure: testplanet.Reconfigure{
ReferralManagerServer: func(logger *zap.Logger) pb.ReferralManagerServer {
endpoint := &endpointFailedPath{}
endpoint.SetTokenCount(2)
return endpoint
},

View File

@ -92,6 +92,9 @@ func TestSequential(t *testing.T) {
list, err := q.SelectN(ctx, N)
require.NoError(t, err)
require.Len(t, list, N)
sort.SliceStable(list, func(i, j int) bool { return list[i].LostPieces[0] < list[j].LostPieces[0] })
for i := 0; i < N; i++ {
require.True(t, pb.Equal(addSegs[i], &list[i]))
}

View File

@ -30,7 +30,7 @@ func TestOffer_Database(t *testing.T) {
AwardCreditDurationDays: 60,
InviteeCreditDurationDays: 30,
RedeemableCap: 50,
ExpiresAt: time.Now().UTC().Add(time.Hour * 1),
ExpiresAt: time.Now().UTC().Add(time.Hour * 1).Truncate(time.Millisecond),
Status: rewards.Active,
Type: rewards.Referral,
},
@ -42,7 +42,7 @@ func TestOffer_Database(t *testing.T) {
AwardCreditDurationDays: 0,
InviteeCreditDurationDays: 30,
RedeemableCap: 50,
ExpiresAt: time.Now().UTC().Add(time.Hour * 1),
ExpiresAt: time.Now().UTC().Add(time.Hour * 1).Truncate(time.Millisecond),
Status: rewards.Active,
Type: rewards.FreeCredit,
},
@ -54,7 +54,7 @@ func TestOffer_Database(t *testing.T) {
AwardCreditDurationDays: 0,
InviteeCreditDurationDays: 30,
RedeemableCap: 50,
ExpiresAt: time.Now().UTC().Add(time.Hour * 1),
ExpiresAt: time.Now().UTC().Add(time.Hour * 1).Truncate(time.Millisecond),
Status: rewards.Active,
Type: rewards.Partner,
},

View File

@ -9,7 +9,6 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"testing"
"github.com/lib/pq"
@ -97,21 +96,10 @@ func newData(snap *dbschema.Snapshot) string {
return tokens[1]
}
var (
dbxschema struct {
sync.Once
*dbschema.Schema
err error
}
)
// loadDBXSChema loads dbxscript schema only once and caches it,
// it shouldn't change during the test
func loadDBXSchema(connstr, dbxscript string) (*dbschema.Schema, error) {
dbxschema.Do(func() {
dbxschema.Schema, dbxschema.err = loadSchemaFromSQL(connstr, dbxscript)
})
return dbxschema.Schema, dbxschema.err
return loadSchemaFromSQL(connstr, dbxscript)
}
// loadSchemaFromSQL inserts script into connstr and loads schema.

View File

@ -372,11 +372,68 @@ func (db *ProjectAccounting) GetBucketUsageRollups(ctx context.Context, projectI
return bucketUsageRollups, nil
}
// prefixIncrement returns the lexicographically lowest byte string which is
// greater than origPrefix and does not have origPrefix as a prefix. If no such
// byte string exists (origPrefix is empty, or origPrefix contains only 0xff
// bytes), returns false for ok.
//
// examples: prefixIncrement([]byte("abc")) -> ([]byte("abd", true)
// prefixIncrement([]byte("ab\xff\xff")) -> ([]byte("ac", true)
// prefixIncrement([]byte("")) -> (nil, false)
// prefixIncrement([]byte("\x00")) -> ([]byte("\x01", true)
// prefixIncrement([]byte("\xff\xff\xff")) -> (nil, false)
//
func prefixIncrement(origPrefix []byte) (incremented []byte, ok bool) {
incremented = make([]byte, len(origPrefix))
copy(incremented, origPrefix)
i := len(incremented) - 1
for i >= 0 {
if incremented[i] != 0xff {
incremented[i]++
return incremented[:i+1], true
}
i--
}
// there is no byte string which is greater than origPrefix and which does
// not have origPrefix as a prefix.
return nil, false
}
// prefixMatch creates a SQL expression which
// will evaluate to true if and only if the value of expr starts with the value
// of prefix.
//
// Returns also a slice of arguments that should be passed to the corresponding
// db.Query* or db.Exec* to fill in parameters in the returned SQL expression.
//
// The returned SQL expression needs to be passed through Rebind(), as it uses
// `?` markers instead of `$N`, because we don't know what N we would need to
// use.
func (db *ProjectAccounting) prefixMatch(expr string, prefix []byte) (string, []byte, error) {
incrementedPrefix, ok := prefixIncrement(prefix)
switch db.db.implementation {
case dbutil.Postgres:
if !ok {
return fmt.Sprintf(`(%s >= ?)`, expr), nil, nil
}
return fmt.Sprintf(`(%s >= ? AND %s < ?)`, expr, expr), incrementedPrefix, nil
case dbutil.Cockroach:
if !ok {
return fmt.Sprintf(`(%s >= ?:::BYTEA)`, expr), nil, nil
}
return fmt.Sprintf(`(%s >= ?:::BYTEA AND %s < ?:::BYTEA)`, expr, expr), incrementedPrefix, nil
default:
return "", nil, errs.New("invalid dbType: %v", db.db.driver)
}
}
// GetBucketTotals retrieves bucket usage totals for period of time
func (db *ProjectAccounting) GetBucketTotals(ctx context.Context, projectID uuid.UUID, cursor accounting.BucketUsageCursor, since, before time.Time) (_ *accounting.BucketUsagePage, err error) {
defer mon.Task()(&ctx)(&err)
since = timeTruncateDown(since)
search := cursor.Search + "%"
bucketPrefix := []byte(cursor.Search)
if cursor.Limit > 50 {
cursor.Limit = 50
@ -391,21 +448,32 @@ func (db *ProjectAccounting) GetBucketTotals(ctx context.Context, projectID uuid
Offset: uint64((cursor.Page - 1) * cursor.Limit),
}
bucketNameRange, incrPrefix, err := db.prefixMatch("bucket_name", bucketPrefix)
if err != nil {
return nil, err
}
countQuery := db.db.Rebind(`SELECT COUNT(DISTINCT bucket_name)
FROM bucket_bandwidth_rollups
WHERE project_id = ? AND interval_start >= ? AND interval_start <= ?
AND bucket_name LIKE ?`)
AND ` + bucketNameRange)
countRow := db.db.QueryRowContext(ctx,
countQuery,
args := []interface{}{
projectID[:],
since, before,
search)
since,
before,
bucketPrefix,
}
if incrPrefix != nil {
args = append(args, incrPrefix)
}
countRow := db.db.QueryRowContext(ctx, countQuery, args...)
err = countRow.Scan(&page.TotalCount)
if err != nil {
return nil, err
}
if page.TotalCount == 0 {
return page, nil
}
@ -413,27 +481,30 @@ func (db *ProjectAccounting) GetBucketTotals(ctx context.Context, projectID uuid
return nil, errs.New("page is out of range")
}
var buckets []string
bucketsQuery := db.db.Rebind(`SELECT DISTINCT bucket_name
FROM bucket_bandwidth_rollups
WHERE project_id = ? AND interval_start >= ? AND interval_start <= ?
AND bucket_name LIKE ?
ORDER BY bucket_name ASC
AND ` + bucketNameRange + ` ORDER BY bucket_name ASC
LIMIT ? OFFSET ?`)
bucketRows, err := db.db.QueryContext(ctx,
bucketsQuery,
args = []interface{}{
projectID[:],
since, before,
[]byte(search),
page.Limit,
page.Offset)
since,
before,
bucketPrefix,
}
if incrPrefix != nil {
args = append(args, incrPrefix)
}
args = append(args, page.Limit, page.Offset)
bucketRows, err := db.db.QueryContext(ctx, bucketsQuery, args...)
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, bucketRows.Close()) }()
var buckets []string
defer func() { err = errs.Combine(err, bucketRows.Close()) }()
for bucketRows.Next() {
var bucket string
err = bucketRows.Scan(&bucket)
@ -444,7 +515,7 @@ func (db *ProjectAccounting) GetBucketTotals(ctx context.Context, projectID uuid
buckets = append(buckets, bucket)
}
roullupsQuery := db.db.Rebind(`SELECT SUM(settled), SUM(inline), action
rollupsQuery := db.db.Rebind(`SELECT SUM(settled), SUM(inline), action
FROM bucket_bandwidth_rollups
WHERE project_id = ? AND bucket_name = ? AND interval_start >= ? AND interval_start <= ?
GROUP BY action`)
@ -465,7 +536,7 @@ func (db *ProjectAccounting) GetBucketTotals(ctx context.Context, projectID uuid
}
// get bucket_bandwidth_rollups
rollupsRows, err := db.db.QueryContext(ctx, roullupsQuery, projectID[:], []byte(bucket), since, before)
rollupsRows, err := db.db.QueryContext(ctx, rollupsQuery, projectID[:], []byte(bucket), since, before)
if err != nil {
return nil, err
}

View File

@ -6,7 +6,6 @@ package satellitedbtest
// This package should be referenced only in test files!
import (
"flag"
"strconv"
"strings"
"testing"
@ -38,8 +37,6 @@ type Database struct {
Message string
}
var runCockroachTests = flag.Bool("run-cockroach-tests", false, "If set, don't skip the CockroachDB-using tests, even though they are not yet fully supported")
// Databases returns default databases.
func Databases() []SatelliteDatabases {
return []SatelliteDatabases{
@ -49,7 +46,7 @@ func Databases() []SatelliteDatabases {
},
{
MasterDB: Database{"Cockroach", *pgtest.CrdbConnStr, "Cockroach flag missing, example: -cockroach-test-db=" + pgtest.DefaultCrdbConnStr + " or use STORJ_COCKROACH_TEST environment variable."},
PointerDB: Database{"Postgres", *pgtest.ConnStr, ""},
PointerDB: Database{"Cockroach", *pgtest.CrdbConnStr, ""},
},
}
}
@ -168,11 +165,6 @@ func Run(t *testing.T, test func(t *testing.T, db satellite.DB)) {
t.Run(dbInfo.MasterDB.Name+"/"+dbInfo.PointerDB.Name, func(t *testing.T) {
t.Parallel()
// TODO: remove this skip and this flag once all the sql is cockroachdb compatible
if dbInfo.MasterDB.Name == "Cockroach" && !*runCockroachTests {
t.Skip("CockroachDB not supported yet")
}
db, err := CreateMasterDB(t, "T", 0, dbInfo.MasterDB)
if err != nil {
t.Fatal(err)