Bolt backed overlay cache (#94)

* wip

* add separate `Process` tests for bolt and redis-backed overlay

* more testing

* fix gitignore

* fix linter error

* goimports goimports GOIMPORTS GoImPortS!!!!

* fix port madness

* forgot to add

* add `mux` as handler and shorten context timeouts

* gofreakingimports

* fix comments

* refactor test & add logger/monkit registry

* debugging travis

* add comment

* Set redisAddress to empty string for bolt-test

* travis experiment

* refactoring tests

* Merge remote-tracking branch 'upstream/master' into bolt-backed-overlay-cache
This commit is contained in:
Bryan White 2018-06-20 16:28:46 +02:00 committed by Dennis Coyle
parent 5ee6b76d41
commit c6f8144221
7 changed files with 100 additions and 20 deletions

5
.gitignore vendored
View File

@ -27,3 +27,8 @@ debug
# vendor # vendor
vendor vendor
# Test redis log and snapshot files
*test_redis-server.log
*dump.rdb
*test_bolt.db

14
Gopkg.lock generated
View File

@ -294,12 +294,6 @@
packages = ["."] packages = ["."]
revision = "9b6edb34372a110992ea2e174886cb03ff971230" revision = "9b6edb34372a110992ea2e174886cb03ff971230"
[[projects]]
name = "github.com/julienschmidt/httprouter"
packages = ["."]
revision = "8c199fb6259ffc1af525cc3ad52ee60ba8359669"
version = "v1.1"
[[projects]] [[projects]]
name = "github.com/klauspost/cpuid" name = "github.com/klauspost/cpuid"
packages = ["."] packages = ["."]
@ -469,6 +463,12 @@
packages = ["."] packages = ["."]
revision = "bb74f1db0675b241733089d5a1faa5dd8b0ef57b" revision = "bb74f1db0675b241733089d5a1faa5dd8b0ef57b"
[[projects]]
branch = "master"
name = "github.com/mr-tron/base58"
packages = ["base58"]
revision = "c1bdf7c52f59d6685ca597b9955a443ff95eeee6"
[[projects]] [[projects]]
name = "github.com/nats-io/go-nats" name = "github.com/nats-io/go-nats"
packages = [ packages = [
@ -953,6 +953,6 @@
[solve-meta] [solve-meta]
analyzer-name = "dep" analyzer-name = "dep"
analyzer-version = 1 analyzer-version = 1
inputs-digest = "101bde363c549e6e970d993982cd39dc7751cb980569d98187078b0dd8dbcdff" inputs-digest = "04d30c7df217efd3f219ce531076d0aad52d5fd12423f7e9c86766b5e613389b"
solver-name = "gps-cdcl" solver-name = "gps-cdcl"
solver-version = 1 solver-version = 1

View File

@ -22,6 +22,11 @@ import (
// KvStore is an in-memory, crappy key/value store type for testing // KvStore is an in-memory, crappy key/value store type for testing
type KvStore map[string]storage.Value type KvStore map[string]storage.Value
// Empty checks if there are any keys in the store
func (k *KvStore) Empty() bool {
return len(*k) == 0
}
// MockKeyValueStore is a `KeyValueStore` type used for testing (see storj.io/storj/storage/common.go) // MockKeyValueStore is a `KeyValueStore` type used for testing (see storj.io/storj/storage/common.go)
type MockKeyValueStore struct { type MockKeyValueStore struct {
Data KvStore Data KvStore

View File

@ -151,7 +151,9 @@ func redisTestClient(t *testing.T, data test.KvStore) storage.KeyValueStore {
t.Fatal(err) t.Fatal(err)
} }
populateStorage(t, client, data) if !(data.Empty()) {
populateStorage(t, client, data)
}
return client return client
} }
@ -171,7 +173,9 @@ func boltTestClient(t *testing.T, data test.KvStore) (_ storage.KeyValueStore, _
assert.NoError(t, err) assert.NoError(t, err)
} }
populateStorage(t, client, data) if !(data.Empty()) {
populateStorage(t, client, data)
}
return client, cleanup return client, cleanup
} }

View File

@ -15,24 +15,26 @@ import (
"gopkg.in/spacemonkeygo/monkit.v2" "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/pkg/kademlia" "storj.io/storj/pkg/kademlia"
"storj.io/storj/pkg/process"
proto "storj.io/storj/protos/overlay" proto "storj.io/storj/protos/overlay"
) )
var ( var (
redisAddress, redisPassword, httpPort, bootstrapIP, bootstrapPort, localPort string redisAddress, redisPassword, httpPort, bootstrapIP, bootstrapPort, localPort, boltdbPath string
db int db int
srvPort uint srvPort uint
) )
func init() { func init() {
flag.StringVar(&httpPort, "httpPort", "", "The port for the health endpoint") flag.StringVar(&httpPort, "httpPort", "", "The port for the health endpoint")
flag.StringVar(&redisAddress, "redisAddress", "", "The <IP:PORT> string to use for connection to a redis cache") flag.StringVar(&redisAddress, "redisAddress", "", "The <IP:PORT> string to use for connection to a redis cache")
flag.StringVar(&redisPassword, "redisPassword", "", "The password used for authentication to a secured redis instance") flag.StringVar(&redisPassword, "redisPassword", "", "The password used for authentication to a secured redis instance")
flag.StringVar(&boltdbPath, "boltdbPath", "", "The path to the boltdb file that should be loaded or created")
flag.IntVar(&db, "db", 0, "The network cache database") flag.IntVar(&db, "db", 0, "The network cache database")
flag.UintVar(&srvPort, "srvPort", 8080, "Port to listen on") flag.UintVar(&srvPort, "srvPort", 8080, "Port to listen on")
flag.StringVar(&bootstrapIP, "bootstrapIP", "", "Optional IP to bootstrap node against") flag.StringVar(&bootstrapIP, "bootstrapIP", "", "Optional IP to bootstrap node against")
flag.StringVar(&bootstrapPort, "bootstrapPort", "", "Optional port of node to bootstrap against") flag.StringVar(&bootstrapPort, "bootstrapPort", "", "Optional port of node to bootstrap against")
flag.StringVar(&localPort, "localPort", "8080", "Specify a different port to listen on locally") flag.StringVar(&localPort, "localPort", "8081", "Specify a different port to listen on locally")
} }
// NewServer creates a new Overlay Service Server // NewServer creates a new Overlay Service Server
@ -100,6 +102,18 @@ func (s *Service) Process(ctx context.Context) error {
s.logger.Error("Failed to create a new redis overlay client", zap.Error(err)) s.logger.Error("Failed to create a new redis overlay client", zap.Error(err))
return err return err
} }
} else if boltdbPath != "" {
cache, err = NewBoltOverlayCache(boltdbPath, kad)
if err != nil {
s.logger.Error("Failed to create a new boltdb overlay client", zap.Error(err))
return err
}
} else {
return process.ErrUsage.New("You must specify one of `--boltdbPath` or `--redisAddress`")
}
if boltdbPath != "" {
} }
if err := cache.Bootstrap(ctx); err != nil { if err := cache.Bootstrap(ctx); err != nil {
@ -118,8 +132,9 @@ func (s *Service) Process(ctx context.Context) error {
grpcServer := NewServer(kad, cache, s.logger, s.metrics) grpcServer := NewServer(kad, cache, s.logger, s.metrics)
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { fmt.Fprintln(w, "OK") }) mux := http.NewServeMux()
go func() { http.ListenAndServe(fmt.Sprintf(":%s", httpPort), nil) }() mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { fmt.Fprintln(w, "OK") })
go func() { http.ListenAndServe(fmt.Sprintf(":%s", httpPort), mux) }()
go cache.Walk(ctx) go cache.Walk(ctx)
// If the passed context times out or is cancelled, shutdown the gRPC server // If the passed context times out or is cancelled, shutdown the gRPC server

View File

@ -5,18 +5,33 @@ package overlay
import ( import (
"context" "context"
"flag"
"fmt" "fmt"
"log"
"net" "net"
"os"
"path/filepath"
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeebo/errs"
"go.uber.org/zap"
"google.golang.org/grpc" "google.golang.org/grpc"
"gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/internal/test" "storj.io/storj/internal/test"
"storj.io/storj/pkg/process"
proto "storj.io/storj/protos/overlay" // naming proto to avoid confusion with this package proto "storj.io/storj/protos/overlay" // naming proto to avoid confusion with this package
) )
func newTestService(t *testing.T) Service {
return Service{
logger: zap.NewNop(),
metrics: monkit.Default,
}
}
func TestNewServer(t *testing.T) { func TestNewServer(t *testing.T) {
t.SkipNow() t.SkipNow()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", 0)) lis, err := net.Listen("tcp", fmt.Sprintf(":%d", 0))
@ -30,8 +45,8 @@ func TestNewServer(t *testing.T) {
} }
func TestNewClient(t *testing.T) { func TestNewClient(t *testing.T) {
//a := "35.232.202.229:8080" // a := "35.232.202.229:8080"
//c, err := NewClient(&a, grpc.WithInsecure()) // c, err := NewClient(&a, grpc.WithInsecure())
t.SkipNow() t.SkipNow()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", 0)) lis, err := net.Listen("tcp", fmt.Sprintf(":%d", 0))
assert.NoError(t, err) assert.NoError(t, err)
@ -48,12 +63,46 @@ func TestNewClient(t *testing.T) {
assert.NotNil(t, r) assert.NotNil(t, r)
} }
func TestProcess(t *testing.T) { func TestProcess_redis(t *testing.T) {
flag.Set("localPort", "0")
done := test.EnsureRedis(t) done := test.EnsureRedis(t)
defer done() defer done()
o := Service{} o := newTestService(t)
ctx, _ := context.WithTimeout(context.Background(), 3*time.Second) ctx, _ := context.WithTimeout(context.Background(), 500*time.Millisecond)
err := o.Process(ctx) err := o.Process(ctx)
assert.NoError(t, err) assert.NoError(t, err)
} }
func TestProcess_bolt(t *testing.T) {
flag.Set("localPort", "0")
flag.Set("redisAddress", "")
boltdbPath, err := filepath.Abs("test_bolt.db")
assert.NoError(t, err)
if err != nil {
defer func() {
if err := os.Remove(boltdbPath); err != nil {
log.Println(errs.New("error while removing test bolt db: %s", err))
}
}()
}
flag.Set("boltdbPath", boltdbPath)
o := newTestService(t)
ctx, _ := context.WithTimeout(context.Background(), 500*time.Millisecond)
err = o.Process(ctx)
assert.NoError(t, err)
}
func TestProcess_error(t *testing.T) {
flag.Set("localPort", "0")
flag.Set("boltdbPath", "")
flag.Set("redisAddress", "")
o := newTestService(t)
ctx, _ := context.WithTimeout(context.Background(), 500*time.Millisecond)
err := o.Process(ctx)
assert.True(t, process.ErrUsage.Has(err))
}

View File

@ -23,6 +23,8 @@ var (
// Error is a process error class // Error is a process error class
Error = errs.Class("ProcessError") Error = errs.Class("ProcessError")
// ErrUsage is used when a user didn't use compatible or required options
ErrUsage = errs.Class("UsageError")
) )
// ID is the type used to specify a ID key in the process context // ID is the type used to specify a ID key in the process context