storj/storage/redis/redisserver/server.go
Ivan Fraixedes ce26616647
satellite/accounting/live: Use Redis client directly
We have to adapt the live accounting to allow the packages that use it
to differentiate about errors for being able to ignore them and make our
satellite resilient to Redis downtime.

For differentiating errors we should make changes in the live accounting
but also in the storage/redis.Client, however, we may need to do some
dirty workarounds or break other parts of the implementation that
depends on it.

On the other hand we want to get rid of the storage/redis.Client because
it has more functionality that the one that we are using and some
process has been started to remove it.

Hence, we have refactored the live accounting to directly use the Redis
client library for later on (in a future commit) adapt the satellite for
being resilient to Redis downtime.

Last but not least, a test for expired bandwidth keys have been added
and with it a bug was spotted and fix it.

Change-Id: Ibd191522cd20f6a9a15e5ccb7beb83a678e530ff
2021-01-12 15:33:29 +01:00

206 lines
4.3 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
// Package redisserver is package for starting a redis test server
package redisserver
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"os"
"os/exec"
"path/filepath"
"runtime/pprof"
"strconv"
"strings"
"time"
"github.com/alicebob/miniredis/v2"
"github.com/go-redis/redis"
"storj.io/common/processgroup"
)
const (
fallbackAddr = "localhost:6379"
fallbackPort = 6379
)
// Server represents a redis server.
type Server interface {
Addr() string
Close() error
// TestingFastForward is a function for enforce the TTL of keys in
// implementations what they have not exercise the expiration by themselves
// (e.g. Minitredis). This method is a no-op in implementations which support
// the expiration as usual.
//
// All the keys whose TTL minus d become <= 0 will be removed.
TestingFastForward(d time.Duration)
}
func freeport() (addr string, port int) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return fallbackAddr, fallbackPort
}
netaddr := listener.Addr().(*net.TCPAddr)
addr = netaddr.String()
port = netaddr.Port
_ = listener.Close()
time.Sleep(time.Second)
return addr, port
}
// Start starts a redis-server when available, otherwise falls back to miniredis.
func Start(ctx context.Context) (Server, error) {
server, err := Process(ctx)
if err != nil {
log.Println("failed to start redis-server: ", err)
return Mini(ctx)
}
return server, err
}
// Process starts a redis-server test process.
func Process(ctx context.Context) (Server, error) {
tmpdir, err := ioutil.TempDir("", "storj-redis")
if err != nil {
return nil, err
}
// find a suitable port for listening
addr, port := freeport()
// write a configuration file, because redis doesn't support flags
confpath := filepath.Join(tmpdir, "test.conf")
arguments := []string{
"daemonize no",
"bind 127.0.0.1",
"port " + strconv.Itoa(port),
"timeout 0",
"databases 2",
"dbfilename dump.rdb",
"dir " + tmpdir,
}
conf := strings.Join(arguments, "\n") + "\n"
err = ioutil.WriteFile(confpath, []byte(conf), 0755)
if err != nil {
return nil, err
}
// start the process
cmd := exec.Command("redis-server", confpath)
processgroup.Setup(cmd)
read, write, err := os.Pipe()
if err != nil {
return nil, err
}
cmd.Stdout = write
if err := cmd.Start(); err != nil {
return nil, err
}
cleanup := func() {
processgroup.Kill(cmd)
_ = os.RemoveAll(tmpdir)
}
// wait for redis to become ready
waitForReady := make(chan error, 1)
go func() {
// wait for the message that looks like
// v3 "The server is now ready to accept connections on port 6379"
// v4 "Ready to accept connections"
scanner := bufio.NewScanner(read)
for scanner.Scan() {
line := scanner.Text()
if strings.Contains(line, "to accept") {
break
}
}
waitForReady <- scanner.Err()
_, _ = io.Copy(ioutil.Discard, read)
}()
select {
case err := <-waitForReady:
if err != nil {
cleanup()
return nil, err
}
case <-time.After(3 * time.Second):
cleanup()
return nil, errors.New("redis timeout")
}
// test whether we can actually connect
if err := pingServer(addr); err != nil {
cleanup()
return nil, fmt.Errorf("unable to ping: %v", err)
}
return &process{addr, cleanup}, nil
}
type process struct {
addr string
close func()
}
func (process *process) Addr() string {
return process.addr
}
func (process *process) Close() error {
process.close()
return nil
}
func (process *process) TestingFastForward(_ time.Duration) {}
func pingServer(addr string) error {
client := redis.NewClient(&redis.Options{Addr: addr, DB: 1})
defer func() { _ = client.Close() }()
return client.Ping().Err()
}
// Mini starts miniredis server.
func Mini(ctx context.Context) (Server, error) {
var server *miniredis.Miniredis
var err error
pprof.Do(ctx, pprof.Labels("db", "miniredis"), func(ctx context.Context) {
server, err = miniredis.Run()
})
if err != nil {
return nil, err
}
return &miniserver{server}, nil
}
type miniserver struct {
*miniredis.Miniredis
}
// Close closes the underlying miniredis server.
func (s *miniserver) Close() error {
s.Miniredis.Close()
return nil
}
func (s *miniserver) TestingFastForward(d time.Duration) {
s.FastForward(d)
}