From c6f814422141a9e94032e3d1abf57c2dc9e60014 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Wed, 20 Jun 2018 16:28:46 +0200 Subject: [PATCH] Bolt backed overlay cache (#94) * wip * add separate `Process` tests for bolt and redis-backed overlay * more testing * fix gitignore * fix linter error * goimports goimports GOIMPORTS GoImPortS!!!! * fix port madness * forgot to add * add `mux` as handler and shorten context timeouts * gofreakingimports * fix comments * refactor test & add logger/monkit registry * debugging travis * add comment * Set redisAddress to empty string for bolt-test * travis experiment * refactoring tests * Merge remote-tracking branch 'upstream/master' into bolt-backed-overlay-cache --- .gitignore | 5 +++ Gopkg.lock | 14 +++---- internal/test/{util.go => storage.go} | 5 +++ pkg/overlay/cache_test.go | 8 +++- pkg/overlay/service.go | 27 +++++++++--- pkg/overlay/service_test.go | 59 ++++++++++++++++++++++++--- pkg/process/process.go | 2 + 7 files changed, 100 insertions(+), 20 deletions(-) rename internal/test/{util.go => storage.go} (97%) diff --git a/.gitignore b/.gitignore index ab199c972..e6ee4cc87 100644 --- a/.gitignore +++ b/.gitignore @@ -27,3 +27,8 @@ debug # vendor vendor + +# Test redis log and snapshot files +*test_redis-server.log +*dump.rdb +*test_bolt.db diff --git a/Gopkg.lock b/Gopkg.lock index 9a4ba6151..eea2daa2a 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -294,12 +294,6 @@ packages = ["."] revision = "9b6edb34372a110992ea2e174886cb03ff971230" -[[projects]] - name = "github.com/julienschmidt/httprouter" - packages = ["."] - revision = "8c199fb6259ffc1af525cc3ad52ee60ba8359669" - version = "v1.1" - [[projects]] name = "github.com/klauspost/cpuid" packages = ["."] @@ -469,6 +463,12 @@ packages = ["."] revision = "bb74f1db0675b241733089d5a1faa5dd8b0ef57b" +[[projects]] + branch = "master" + name = "github.com/mr-tron/base58" + packages = ["base58"] + revision = "c1bdf7c52f59d6685ca597b9955a443ff95eeee6" + [[projects]] name = "github.com/nats-io/go-nats" packages = [ @@ -953,6 +953,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "101bde363c549e6e970d993982cd39dc7751cb980569d98187078b0dd8dbcdff" + inputs-digest = "04d30c7df217efd3f219ce531076d0aad52d5fd12423f7e9c86766b5e613389b" solver-name = "gps-cdcl" solver-version = 1 diff --git a/internal/test/util.go b/internal/test/storage.go similarity index 97% rename from internal/test/util.go rename to internal/test/storage.go index ae111a75e..ed2386e33 100644 --- a/internal/test/util.go +++ b/internal/test/storage.go @@ -22,6 +22,11 @@ import ( // KvStore is an in-memory, crappy key/value store type for testing type KvStore map[string]storage.Value +// Empty checks if there are any keys in the store +func (k *KvStore) Empty() bool { + return len(*k) == 0 +} + // MockKeyValueStore is a `KeyValueStore` type used for testing (see storj.io/storj/storage/common.go) type MockKeyValueStore struct { Data KvStore diff --git a/pkg/overlay/cache_test.go b/pkg/overlay/cache_test.go index bbdf6fa77..5740b533a 100644 --- a/pkg/overlay/cache_test.go +++ b/pkg/overlay/cache_test.go @@ -151,7 +151,9 @@ func redisTestClient(t *testing.T, data test.KvStore) storage.KeyValueStore { t.Fatal(err) } - populateStorage(t, client, data) + if !(data.Empty()) { + populateStorage(t, client, data) + } return client } @@ -171,7 +173,9 @@ func boltTestClient(t *testing.T, data test.KvStore) (_ storage.KeyValueStore, _ assert.NoError(t, err) } - populateStorage(t, client, data) + if !(data.Empty()) { + populateStorage(t, client, data) + } return client, cleanup } diff --git a/pkg/overlay/service.go b/pkg/overlay/service.go index fa3d1a855..af0e40d3e 100644 --- a/pkg/overlay/service.go +++ b/pkg/overlay/service.go @@ -15,24 +15,26 @@ import ( "gopkg.in/spacemonkeygo/monkit.v2" "storj.io/storj/pkg/kademlia" + "storj.io/storj/pkg/process" proto "storj.io/storj/protos/overlay" ) var ( - redisAddress, redisPassword, httpPort, bootstrapIP, bootstrapPort, localPort string - db int - srvPort uint + redisAddress, redisPassword, httpPort, bootstrapIP, bootstrapPort, localPort, boltdbPath string + db int + srvPort uint ) func init() { flag.StringVar(&httpPort, "httpPort", "", "The port for the health endpoint") flag.StringVar(&redisAddress, "redisAddress", "", "The string to use for connection to a redis cache") flag.StringVar(&redisPassword, "redisPassword", "", "The password used for authentication to a secured redis instance") + flag.StringVar(&boltdbPath, "boltdbPath", "", "The path to the boltdb file that should be loaded or created") flag.IntVar(&db, "db", 0, "The network cache database") flag.UintVar(&srvPort, "srvPort", 8080, "Port to listen on") flag.StringVar(&bootstrapIP, "bootstrapIP", "", "Optional IP to bootstrap node against") flag.StringVar(&bootstrapPort, "bootstrapPort", "", "Optional port of node to bootstrap against") - flag.StringVar(&localPort, "localPort", "8080", "Specify a different port to listen on locally") + flag.StringVar(&localPort, "localPort", "8081", "Specify a different port to listen on locally") } // NewServer creates a new Overlay Service Server @@ -100,6 +102,18 @@ func (s *Service) Process(ctx context.Context) error { s.logger.Error("Failed to create a new redis overlay client", zap.Error(err)) return err } + } else if boltdbPath != "" { + cache, err = NewBoltOverlayCache(boltdbPath, kad) + if err != nil { + s.logger.Error("Failed to create a new boltdb overlay client", zap.Error(err)) + return err + } + } else { + return process.ErrUsage.New("You must specify one of `--boltdbPath` or `--redisAddress`") + } + + if boltdbPath != "" { + } if err := cache.Bootstrap(ctx); err != nil { @@ -118,8 +132,9 @@ func (s *Service) Process(ctx context.Context) error { grpcServer := NewServer(kad, cache, s.logger, s.metrics) - http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { fmt.Fprintln(w, "OK") }) - go func() { http.ListenAndServe(fmt.Sprintf(":%s", httpPort), nil) }() + mux := http.NewServeMux() + mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { fmt.Fprintln(w, "OK") }) + go func() { http.ListenAndServe(fmt.Sprintf(":%s", httpPort), mux) }() go cache.Walk(ctx) // If the passed context times out or is cancelled, shutdown the gRPC server diff --git a/pkg/overlay/service_test.go b/pkg/overlay/service_test.go index f5d73c195..d3c41618b 100644 --- a/pkg/overlay/service_test.go +++ b/pkg/overlay/service_test.go @@ -5,18 +5,33 @@ package overlay import ( "context" + "flag" "fmt" + "log" "net" + "os" + "path/filepath" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/zeebo/errs" + "go.uber.org/zap" "google.golang.org/grpc" + "gopkg.in/spacemonkeygo/monkit.v2" "storj.io/storj/internal/test" + "storj.io/storj/pkg/process" proto "storj.io/storj/protos/overlay" // naming proto to avoid confusion with this package ) +func newTestService(t *testing.T) Service { + return Service{ + logger: zap.NewNop(), + metrics: monkit.Default, + } +} + func TestNewServer(t *testing.T) { t.SkipNow() lis, err := net.Listen("tcp", fmt.Sprintf(":%d", 0)) @@ -30,8 +45,8 @@ func TestNewServer(t *testing.T) { } func TestNewClient(t *testing.T) { - //a := "35.232.202.229:8080" - //c, err := NewClient(&a, grpc.WithInsecure()) + // a := "35.232.202.229:8080" + // c, err := NewClient(&a, grpc.WithInsecure()) t.SkipNow() lis, err := net.Listen("tcp", fmt.Sprintf(":%d", 0)) assert.NoError(t, err) @@ -48,12 +63,46 @@ func TestNewClient(t *testing.T) { assert.NotNil(t, r) } -func TestProcess(t *testing.T) { +func TestProcess_redis(t *testing.T) { + flag.Set("localPort", "0") done := test.EnsureRedis(t) defer done() - o := Service{} - ctx, _ := context.WithTimeout(context.Background(), 3*time.Second) + o := newTestService(t) + ctx, _ := context.WithTimeout(context.Background(), 500*time.Millisecond) err := o.Process(ctx) assert.NoError(t, err) } + +func TestProcess_bolt(t *testing.T) { + flag.Set("localPort", "0") + flag.Set("redisAddress", "") + boltdbPath, err := filepath.Abs("test_bolt.db") + assert.NoError(t, err) + + if err != nil { + defer func() { + if err := os.Remove(boltdbPath); err != nil { + log.Println(errs.New("error while removing test bolt db: %s", err)) + } + }() + } + + flag.Set("boltdbPath", boltdbPath) + + o := newTestService(t) + ctx, _ := context.WithTimeout(context.Background(), 500*time.Millisecond) + err = o.Process(ctx) + assert.NoError(t, err) +} + +func TestProcess_error(t *testing.T) { + flag.Set("localPort", "0") + flag.Set("boltdbPath", "") + flag.Set("redisAddress", "") + + o := newTestService(t) + ctx, _ := context.WithTimeout(context.Background(), 500*time.Millisecond) + err := o.Process(ctx) + assert.True(t, process.ErrUsage.Has(err)) +} diff --git a/pkg/process/process.go b/pkg/process/process.go index f50dd7f4d..4cf4a714b 100644 --- a/pkg/process/process.go +++ b/pkg/process/process.go @@ -23,6 +23,8 @@ var ( // Error is a process error class Error = errs.Class("ProcessError") + // ErrUsage is used when a user didn't use compatible or required options + ErrUsage = errs.Class("UsageError") ) // ID is the type used to specify a ID key in the process context