storage/redis/redisserver: simplify redisserver creation

Change-Id: I881576a7881db671b5abeeca7120a022987cc47f
This commit is contained in:
Cameron Ayer 2020-01-31 13:28:42 -05:00 committed by Cameron
parent b22bf16b35
commit 33d696b096
7 changed files with 75 additions and 70 deletions

View File

@ -256,12 +256,12 @@ func (planet *Planet) newSatellites(count int) ([]*SatelliteSystem, error) {
} }
planet.databases = append(planet.databases, pointerDB) planet.databases = append(planet.databases, pointerDB)
liveAccountingServer := redisserver.NewMini() redis, err := redisserver.Mini()
addr, _, err := liveAccountingServer.Run()
if err != nil { if err != nil {
return xs, errs.Wrap(err) return nil, err
} }
planet.databases = append(planet.databases, liveAccountingServer)
planet.databases = append(planet.databases, redis)
config := satellite.Config{ config := satellite.Config{
Server: server.Config{ Server: server.Config{
@ -382,7 +382,7 @@ func (planet *Planet) newSatellites(count int) ([]*SatelliteSystem, error) {
Interval: defaultInterval, Interval: defaultInterval,
}, },
LiveAccounting: live.Config{ LiveAccounting: live.Config{
StorageBackend: "redis://" + addr + "?db=0", StorageBackend: "redis://" + redis.Addr() + "?db=0",
}, },
Mail: mailservice.Config{ Mail: mailservice.Config{
SMTPServerAddress: "smtp.mail.test:587", SMTPServerAddress: "smtp.mail.test:587",
@ -541,6 +541,8 @@ func createNewSystem(log *zap.Logger, peer *satellite.Core, api *satellite.API,
system.Accounting.ProjectUsage = peer.Accounting.ProjectUsage system.Accounting.ProjectUsage = peer.Accounting.ProjectUsage
system.Accounting.ReportedRollup = peer.Accounting.ReportedRollupChore system.Accounting.ReportedRollup = peer.Accounting.ReportedRollupChore
system.LiveAccounting = peer.LiveAccounting
system.Marketing.Listener = api.Marketing.Listener system.Marketing.Listener = api.Marketing.Listener
system.Marketing.Endpoint = api.Marketing.Endpoint system.Marketing.Endpoint = api.Marketing.Endpoint

View File

@ -21,12 +21,12 @@ func RunDBs(t *testing.T, test func(*testing.T, extensions.RevocationDB, storage
ctx := testcontext.New(t) ctx := testcontext.New(t)
defer ctx.Cleanup() defer ctx.Cleanup()
addr, cleanup, err := redisserver.Start() redis, err := redisserver.Mini()
require.NoError(t, err) require.NoError(t, err)
defer cleanup() defer ctx.Check(redis.Close)
// Test using redis-backed revocation DB // Test using redis-backed revocation DB
dbURL := "redis://" + addr + "?db=0" dbURL := "redis://" + redis.Addr() + "?db=0"
db, err := revocation.NewDB(dbURL) db, err := revocation.NewDB(dbURL)
require.NoError(t, err) require.NoError(t, err)
defer ctx.Check(db.Close) defer ctx.Check(db.Close)

View File

@ -32,15 +32,15 @@ func TestLiveAccountingCache(t *testing.T) {
ctx := testcontext.New(t) ctx := testcontext.New(t)
defer ctx.Cleanup() defer ctx.Cleanup()
address, cleanup, err := redisserver.Start() redis, err := redisserver.Mini()
require.NoError(t, err) require.NoError(t, err)
defer cleanup() defer ctx.Check(redis.Close)
for _, tt := range tests { for _, tt := range tests {
var config live.Config var config live.Config
if tt.backend == "redis" { if tt.backend == "redis" {
config = live.Config{ config = live.Config{
StorageBackend: "redis://" + address + "?db=0", StorageBackend: "redis://" + redis.Addr() + "?db=0",
} }
} }
@ -75,12 +75,12 @@ func TestRedisCacheConcurrency(t *testing.T) {
ctx := testcontext.New(t) ctx := testcontext.New(t)
defer ctx.Cleanup() defer ctx.Cleanup()
address, cleanup, err := redisserver.Start() redis, err := redisserver.Mini()
require.NoError(t, err) require.NoError(t, err)
defer cleanup() defer ctx.Check(redis.Close)
config := live.Config{ config := live.Config{
StorageBackend: "redis://" + address + "?db=0", StorageBackend: "redis://" + redis.Addr() + "?db=0",
} }
cache, err := live.NewCache(zaptest.NewLogger(t).Named("live-accounting"), config) cache, err := live.NewCache(zaptest.NewLogger(t).Named("live-accounting"), config)
require.NoError(t, err) require.NoError(t, err)
@ -109,9 +109,9 @@ func TestRedisCacheConcurrency(t *testing.T) {
func populateCache(ctx context.Context, cache accounting.Cache) (projectIDs []uuid.UUID, sum int64, _ error) { func populateCache(ctx context.Context, cache accounting.Cache) (projectIDs []uuid.UUID, sum int64, _ error) {
const ( const (
valuesListSize = 1000 valuesListSize = 10
valueMultiplier = 4096 valueMultiplier = 4096
numProjects = 200 numProjects = 100
) )
// make a largish list of varying values // make a largish list of varying values
someValues := make([]int64, valuesListSize) someValues := make([]int64, valuesListSize)
@ -162,15 +162,15 @@ func TestGetAllProjectTotals(t *testing.T) {
ctx := testcontext.New(t) ctx := testcontext.New(t)
defer ctx.Cleanup() defer ctx.Cleanup()
address, cleanup, err := redisserver.Start() redis, err := redisserver.Mini()
require.NoError(t, err) require.NoError(t, err)
defer cleanup() defer ctx.Check(redis.Close)
for _, tt := range tests { for _, tt := range tests {
var config live.Config var config live.Config
if tt.backend == "redis" { if tt.backend == "redis" {
config = live.Config{ config = live.Config{
StorageBackend: "redis://" + address + "?db=0", StorageBackend: "redis://" + redis.Addr() + "?db=0",
} }
} }

View File

@ -67,12 +67,11 @@ func TestGrapqhlMutation(t *testing.T) {
) )
require.NoError(t, err) require.NoError(t, err)
miniredis := redisserver.NewMini() redis, err := redisserver.Mini()
addr, cleanup, err := miniredis.Run()
require.NoError(t, err) require.NoError(t, err)
defer cleanup() defer ctx.Check(redis.Close)
cache, err := live.NewCache(log.Named("cache"), live.Config{StorageBackend: "redis://" + addr + "?db=0"}) cache, err := live.NewCache(log.Named("cache"), live.Config{StorageBackend: "redis://" + redis.Addr() + "?db=0"})
require.NoError(t, err) require.NoError(t, err)
projectUsage := accounting.NewService(db.ProjectAccounting(), cache, 0) projectUsage := accounting.NewService(db.ProjectAccounting(), cache, 0)

View File

@ -52,12 +52,11 @@ func TestGraphqlQuery(t *testing.T) {
) )
require.NoError(t, err) require.NoError(t, err)
miniredis := redisserver.NewMini() redis, err := redisserver.Mini()
addr, cleanup, err := miniredis.Run()
require.NoError(t, err) require.NoError(t, err)
defer cleanup() defer ctx.Check(redis.Close)
cache, err := live.NewCache(log.Named("cache"), live.Config{StorageBackend: "redis://" + addr + "?db=0"}) cache, err := live.NewCache(log.Named("cache"), live.Config{StorageBackend: "redis://" + redis.Addr() + "?db=0"})
require.NoError(t, err) require.NoError(t, err)
projectUsage := accounting.NewService(db.ProjectAccounting(), cache, 0) projectUsage := accounting.NewService(db.ProjectAccounting(), cache, 0)

View File

@ -6,18 +6,20 @@ package redis
import ( import (
"testing" "testing"
"github.com/stretchr/testify/require"
"storj.io/storj/storage/redis/redisserver" "storj.io/storj/storage/redis/redisserver"
"storj.io/storj/storage/testsuite" "storj.io/storj/storage/testsuite"
) )
func TestSuite(t *testing.T) { func TestSuite(t *testing.T) {
addr, cleanup, err := redisserver.Start() redis, err := redisserver.Start()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer cleanup() defer func() { require.NoError(t, redis.Close()) }()
client, err := NewClient(addr, "", 1) client, err := NewClient(redis.Addr(), "", 1)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -34,13 +36,13 @@ func TestInvalidConnection(t *testing.T) {
} }
func BenchmarkSuite(b *testing.B) { func BenchmarkSuite(b *testing.B) {
addr, cleanup, err := redisserver.Start() redis, err := redisserver.Start()
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }
defer cleanup() defer func() { require.NoError(b, redis.Close()) }()
client, err := NewClient(addr, "", 1) client, err := NewClient(redis.Addr(), "", 1)
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }

View File

@ -30,9 +30,10 @@ const (
fallbackPort = 6379 fallbackPort = 6379
) )
// Mini is a wrapper for *miniredis.MiniRedis which implements the io.Closer interface. // Server represents a redis server.
type Mini struct { type Server interface {
server *miniredis.Miniredis Addr() string
Close() error
} }
func freeport() (addr string, port int) { func freeport() (addr string, port int) {
@ -50,26 +51,24 @@ func freeport() (addr string, port int) {
} }
// Start starts a redis-server when available, otherwise falls back to miniredis // Start starts a redis-server when available, otherwise falls back to miniredis
func Start() (addr string, cleanup func(), err error) { func Start() (Server, error) {
addr, cleanup, err = Process() server, err := Process()
if err != nil { if err != nil {
log.Println("failed to start redis-server: ", err) log.Println("failed to start redis-server: ", err)
mini := NewMini() return Mini()
return mini.Run()
} }
return addr, cleanup, err return server, err
} }
// Process starts a redis-server test process // Process starts a redis-server test process
func Process() (addr string, cleanup func(), err error) { func Process() (Server, error) {
tmpdir, err := ioutil.TempDir("", "storj-redis") tmpdir, err := ioutil.TempDir("", "storj-redis")
if err != nil { if err != nil {
return "", nil, err return nil, err
} }
// find a suitable port for listening // find a suitable port for listening
var port int addr, port := freeport()
addr, port = freeport()
// write a configuration file, because redis doesn't support flags // write a configuration file, because redis doesn't support flags
confpath := filepath.Join(tmpdir, "test.conf") confpath := filepath.Join(tmpdir, "test.conf")
@ -86,7 +85,7 @@ func Process() (addr string, cleanup func(), err error) {
conf := strings.Join(arguments, "\n") + "\n" conf := strings.Join(arguments, "\n") + "\n"
err = ioutil.WriteFile(confpath, []byte(conf), 0755) err = ioutil.WriteFile(confpath, []byte(conf), 0755)
if err != nil { if err != nil {
return "", nil, err return nil, err
} }
// start the process // start the process
@ -95,15 +94,15 @@ func Process() (addr string, cleanup func(), err error) {
read, write, err := os.Pipe() read, write, err := os.Pipe()
if err != nil { if err != nil {
return "", nil, err return nil, err
} }
cmd.Stdout = write cmd.Stdout = write
if err := cmd.Start(); err != nil { if err := cmd.Start(); err != nil {
return "", nil, err return nil, err
} }
cleanup = func() { cleanup := func() {
processgroup.Kill(cmd) processgroup.Kill(cmd)
_ = os.RemoveAll(tmpdir) _ = os.RemoveAll(tmpdir)
} }
@ -129,48 +128,52 @@ func Process() (addr string, cleanup func(), err error) {
case err := <-waitForReady: case err := <-waitForReady:
if err != nil { if err != nil {
cleanup() cleanup()
return "", nil, err return nil, err
} }
case <-time.After(3 * time.Second): case <-time.After(3 * time.Second):
cleanup() cleanup()
return "", nil, errors.New("redis timeout") return nil, errors.New("redis timeout")
} }
// test whether we can actually connect // test whether we can actually connect
if err := pingServer(addr); err != nil { if err := pingServer(addr); err != nil {
cleanup() cleanup()
return "", nil, fmt.Errorf("unable to ping: %v", err) return nil, fmt.Errorf("unable to ping: %v", err)
} }
return addr, cleanup, nil return &process{addr, cleanup}, nil
} }
type process struct {
addr string
close func()
}
func (process *process) Addr() string { return process.addr }
func (process *process) Close() error { process.close(); return nil }
func pingServer(addr string) error { func pingServer(addr string) error {
client := redis.NewClient(&redis.Options{Addr: addr, DB: 1}) client := redis.NewClient(&redis.Options{Addr: addr, DB: 1})
defer func() { _ = client.Close() }() defer func() { _ = client.Close() }()
return client.Ping().Err() return client.Ping().Err()
} }
// NewMini creates a new Mini. // Mini starts miniredis server
func NewMini() *Mini { func Mini() (Server, error) {
return &Mini{ server, err := miniredis.Run()
server: miniredis.NewMiniRedis(),
}
}
// Run starts the miniredis server.
func (mini *Mini) Run() (addr string, cleanup func(), err error) {
err = mini.server.Start()
if err != nil { if err != nil {
return "", nil, err return nil, err
} }
return mini.server.Addr(), func() {
mini.server.Close() return &miniserver{server}, nil
}, nil
} }
// Close closes the miniredis server. type miniserver struct {
func (mini *Mini) Close() error { *miniredis.Miniredis
mini.server.Close() }
// Close closes the underlying miniredis server
func (s *miniserver) Close() error {
s.Miniredis.Close()
return nil return nil
} }