storj/cmd/storj-sim/network.go
Jennifer Li Johnson d2502bb51b Adds tests for kad replacement and restores kad operator configs (#3094)
* test that all nodes can check in with all satellites

* keep kademlia config

* add untrusted satellite test

* use getversion

* remove kademlia config changes in test-sim-backwards.sh

* add kademlia flags back to storj-sim storagenode

* reset kademlia flags in storagenode entrypoint
2019-09-20 16:02:23 -04:00

566 lines
16 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"context"
"errors"
"fmt"
"net"
"os"
"os/exec"
"path/filepath"
"runtime"
"sort"
"strconv"
"strings"
"time"
"github.com/spf13/viper"
"github.com/zeebo/errs"
"golang.org/x/sync/errgroup"
"storj.io/storj/internal/dbutil/pgutil"
"storj.io/storj/internal/fpath"
"storj.io/storj/internal/processgroup"
"storj.io/storj/lib/uplink"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/storj"
)
const (
maxInstanceCount = 100
maxStoragenodeCount = 200
folderPermissions = 0744
)
var (
defaultAPIKeyData = "13YqgH45XZLg7nm6KsQ72QgXfjbDu2uhTaeSdMVP2A85QuANthM9K58ww5Y4nhMowrZDoqdA4Kyqt1ioQghQcm9fT5uR2drPHpFEqeb"
defaultAPIKey, _ = uplink.ParseAPIKey(defaultAPIKeyData)
)
const (
// The following values of peer class and endpoints are used
// to create a port with a consistent format for storj-sim services.
// Peer class
satellitePeer = 0
gatewayPeer = 1
versioncontrolPeer = 2
bootstrapPeer = 3
storagenodePeer = 4
// Endpoint
publicGRPC = 0
privateGRPC = 1
publicHTTP = 2
privateHTTP = 3
debugHTTP = 9
)
// port creates a port with a consistent format for storj-sim services.
// The port format is: "1PXXE", where P is the peer class, XX is the index of the instance, and E is the endpoint.
func port(peerclass, index, endpoint int) string {
port := 10000 + peerclass*1000 + index*10 + endpoint
return strconv.Itoa(port)
}
func networkExec(flags *Flags, args []string, command string) error {
processes, err := newNetwork(flags)
if err != nil {
return err
}
ctx, cancel := NewCLIContext(context.Background())
defer cancel()
if command == "setup" {
identities, err := identitySetup(processes)
if err != nil {
return err
}
err = identities.Exec(ctx, command)
if err != nil {
return err
}
}
err = processes.Exec(ctx, command)
closeErr := processes.Close()
return errs.Combine(err, closeErr)
}
func networkEnv(flags *Flags, args []string) error {
flags.OnlyEnv = true
processes, err := newNetwork(flags)
if err != nil {
return err
}
// run exec before, since it will load env vars from configs
for _, process := range processes.List {
if exec := process.ExecBefore["run"]; exec != nil {
if err := exec(process); err != nil {
return err
}
}
}
if len(args) == 1 {
envprefix := strings.ToUpper(args[0] + "=")
// find the environment value that the environment variable is set to
for _, env := range processes.Env() {
if strings.HasPrefix(strings.ToUpper(env), envprefix) {
fmt.Println(env[len(envprefix):])
return nil
}
}
return nil
}
for _, env := range processes.Env() {
fmt.Println(env)
}
return nil
}
func networkTest(flags *Flags, command string, args []string) error {
processes, err := newNetwork(flags)
if err != nil {
return err
}
ctx, cancel := NewCLIContext(context.Background())
var group errgroup.Group
processes.Start(ctx, &group, "run")
for _, process := range processes.List {
process.Status.Started.Wait()
}
cmd := exec.CommandContext(ctx, command, args...)
cmd.Env = append(os.Environ(), processes.Env()...)
stdout := processes.Output.Prefixed("test:out")
stderr := processes.Output.Prefixed("test:err")
cmd.Stdout, cmd.Stderr = stdout, stderr
processgroup.Setup(cmd)
if printCommands {
fmt.Fprintf(processes.Output, "exec: %v\n", strings.Join(cmd.Args, " "))
}
errRun := cmd.Run()
cancel()
return errs.Combine(errRun, processes.Close(), group.Wait())
}
func networkDestroy(flags *Flags, args []string) error {
if fpath.IsRoot(flags.Directory) {
return errors.New("safety check: disallowed to remove root directory " + flags.Directory)
}
if printCommands {
fmt.Println("sim | exec: rm -rf", flags.Directory)
}
return os.RemoveAll(flags.Directory)
}
// newNetwork creates a default network
func newNetwork(flags *Flags) (*Processes, error) {
_, filename, _, ok := runtime.Caller(0)
if !ok {
return nil, errs.New("no caller information")
}
storjRoot := strings.TrimSuffix(filename, "/cmd/storj-sim/network.go")
// with common adds all common arguments to the process
withCommon := func(dir string, all Arguments) Arguments {
common := []string{"--metrics.app-suffix", "sim", "--log.level", "debug", "--config-dir", dir}
if flags.IsDev {
common = append(common, "--defaults", "dev")
}
for command, args := range all {
all[command] = append(append(common, command), args...)
}
return all
}
processes := NewProcesses(flags.Directory)
var host = flags.Host
versioncontrol := processes.New(Info{
Name: "versioncontrol/0",
Executable: "versioncontrol",
Directory: filepath.Join(processes.Directory, "versioncontrol", "0"),
Address: net.JoinHostPort(host, port(versioncontrolPeer, 0, publicGRPC)),
})
versioncontrol.Arguments = withCommon(versioncontrol.Directory, Arguments{
"setup": {
"--address", versioncontrol.Address,
"--debug.addr", net.JoinHostPort(host, port(versioncontrolPeer, 0, debugHTTP)),
},
"run": {},
})
versioncontrol.ExecBefore["run"] = func(process *Process) error {
return readConfigString(&versioncontrol.Address, versioncontrol.Directory, "address")
}
bootstrap := processes.New(Info{
Name: "bootstrap/0",
Executable: "bootstrap",
Directory: filepath.Join(processes.Directory, "bootstrap", "0"),
Address: net.JoinHostPort(host, port(bootstrapPeer, 0, publicGRPC)),
})
// gateway must wait for the versioncontrol to start up
bootstrap.WaitForStart(versioncontrol)
bootstrap.Arguments = withCommon(bootstrap.Directory, Arguments{
"setup": {
"--identity-dir", bootstrap.Directory,
"--web.address", net.JoinHostPort(host, port(bootstrapPeer, 0, publicHTTP)),
"--server.address", bootstrap.Address,
"--server.private-address", net.JoinHostPort(host, port(bootstrapPeer, 0, privateGRPC)),
"--kademlia.bootstrap-addr", bootstrap.Address,
"--kademlia.operator.email", "bootstrap@mail.test",
"--kademlia.operator.wallet", "0x0123456789012345678901234567890123456789",
"--server.extensions.revocation=false",
"--server.use-peer-ca-whitelist=false",
"--version.server-address", fmt.Sprintf("http://%s/", versioncontrol.Address),
"--debug.addr", net.JoinHostPort(host, port(bootstrapPeer, 0, debugHTTP)),
},
"run": {},
})
bootstrap.ExecBefore["run"] = func(process *Process) error {
return readConfigString(&bootstrap.Address, bootstrap.Directory, "server.address")
}
// Create satellites making all satellites wait for bootstrap to start
if flags.SatelliteCount > maxInstanceCount {
return nil, fmt.Errorf("exceeded the max instance count of %d with Satellite count of %d", maxInstanceCount, flags.SatelliteCount)
}
var satellites []*Process
for i := 0; i < flags.SatelliteCount; i++ {
process := processes.New(Info{
Name: fmt.Sprintf("satellite/%d", i),
Executable: "satellite",
Directory: filepath.Join(processes.Directory, "satellite", fmt.Sprint(i)),
Address: net.JoinHostPort(host, port(satellitePeer, i, publicGRPC)),
})
satellites = append(satellites, process)
// satellite must wait for bootstrap to start
process.WaitForStart(bootstrap)
consoleAuthToken := "secure_token"
process.Arguments = withCommon(process.Directory, Arguments{
"setup": {
"--identity-dir", process.Directory,
"--console.address", net.JoinHostPort(host, port(satellitePeer, i, publicHTTP)),
"--console.static-dir", filepath.Join(storjRoot, "web/satellite/"),
// TODO: remove console.auth-token after vanguard release
"--console.auth-token", consoleAuthToken,
"--marketing.base-url", "",
"--marketing.address", net.JoinHostPort(host, port(satellitePeer, i, privateHTTP)),
"--marketing.static-dir", filepath.Join(storjRoot, "web/marketing/"),
"--server.address", process.Address,
"--server.private-address", net.JoinHostPort(host, port(satellitePeer, i, privateGRPC)),
"--server.extensions.revocation=false",
"--server.use-peer-ca-whitelist=false",
"--mail.smtp-server-address", "smtp.gmail.com:587",
"--mail.from", "Storj <yaroslav-satellite-test@storj.io>",
"--mail.template-path", filepath.Join(storjRoot, "web/satellite/static/emails"),
"--version.server-address", fmt.Sprintf("http://%s/", versioncontrol.Address),
"--debug.addr", net.JoinHostPort(host, port(satellitePeer, i, debugHTTP)),
},
"run": {},
})
if flags.Postgres != "" {
process.Arguments["setup"] = append(process.Arguments["setup"],
"--database", pgutil.ConnstrWithSchema(flags.Postgres, fmt.Sprintf("satellite/%d", i)),
"--metainfo.database-url", pgutil.ConnstrWithSchema(flags.Postgres, fmt.Sprintf("satellite/%d/meta", i)),
)
}
process.ExecBefore["run"] = func(process *Process) error {
return readConfigString(&process.Address, process.Directory, "server.address")
}
}
// Create gateways for each satellite
for i, satellite := range satellites {
satellite := satellite
process := processes.New(Info{
Name: fmt.Sprintf("gateway/%d", i),
Executable: "gateway",
Directory: filepath.Join(processes.Directory, "gateway", fmt.Sprint(i)),
Address: net.JoinHostPort(host, port(gatewayPeer, i, publicGRPC)),
Extra: []string{},
})
scopeData, err := (&uplink.Scope{
SatelliteAddr: satellite.Address,
APIKey: defaultAPIKey,
EncryptionAccess: uplink.NewEncryptionAccessWithDefaultKey(storj.Key{}),
}).Serialize()
if err != nil {
return nil, err
}
// gateway must wait for the corresponding satellite to start up
process.WaitForStart(satellite)
process.Arguments = withCommon(process.Directory, Arguments{
"setup": {
"--non-interactive",
"--scope", scopeData,
"--identity-dir", process.Directory,
"--server.address", process.Address,
"--rs.min-threshold", strconv.Itoa(1 * flags.StorageNodeCount / 5),
"--rs.repair-threshold", strconv.Itoa(2 * flags.StorageNodeCount / 5),
"--rs.success-threshold", strconv.Itoa(3 * flags.StorageNodeCount / 5),
"--rs.max-threshold", strconv.Itoa(4 * flags.StorageNodeCount / 5),
"--tls.extensions.revocation=false",
"--tls.use-peer-ca-whitelist=false",
"--debug.addr", net.JoinHostPort(host, port(gatewayPeer, i, debugHTTP)),
},
"run": {},
})
process.ExecBefore["run"] = func(process *Process) error {
err := readConfigString(&process.Address, process.Directory, "server.address")
if err != nil {
return err
}
vip := viper.New()
vip.AddConfigPath(process.Directory)
if err := vip.ReadInConfig(); err != nil {
return err
}
// TODO: maybe all the config flags should be exposed for all processes?
// check if gateway config has an api key, if it's not
// create example project with key and add it to the config
// so that gateway can have access to the satellite
if runScopeData := vip.GetString("scope"); !flags.OnlyEnv && runScopeData == scopeData {
var consoleAddress string
err := readConfigString(&consoleAddress, satellite.Directory, "console.address")
if err != nil {
return err
}
host := "http://" + consoleAddress
createRegistrationTokenAddress := host + "/registrationToken/?projectsLimit=1"
consoleActivationAddress := host + "/activation/?token="
consoleAPIAddress := host + "/api/graphql/v0"
// wait for console server to start
time.Sleep(3 * time.Second)
var apiKey string
if err := addExampleProjectWithKey(&apiKey, createRegistrationTokenAddress, consoleActivationAddress, consoleAPIAddress); err != nil {
return err
}
scope, err := uplink.ParseScope(runScopeData)
if err != nil {
return err
}
scope.APIKey, err = uplink.ParseAPIKey(apiKey)
if err != nil {
return err
}
scopeData, err := scope.Serialize()
if err != nil {
return err
}
vip.Set("scope", scopeData)
if err := vip.WriteConfig(); err != nil {
return err
}
}
if runScopeData := vip.GetString("scope"); runScopeData != scopeData {
process.Extra = append(process.Extra, "SCOPE="+runScopeData)
}
accessKey := vip.GetString("minio.access-key")
secretKey := vip.GetString("minio.secret-key")
process.Extra = append(process.Extra,
"ACCESS_KEY="+accessKey,
"SECRET_KEY="+secretKey,
)
return nil
}
}
// Create storage nodes
if flags.StorageNodeCount > maxStoragenodeCount {
return nil, fmt.Errorf("exceeded the max instance count of %d with Storage Node count of %d", maxStoragenodeCount, flags.StorageNodeCount)
}
for i := 0; i < flags.StorageNodeCount; i++ {
process := processes.New(Info{
Name: fmt.Sprintf("storagenode/%d", i),
Executable: "storagenode",
Directory: filepath.Join(processes.Directory, "storagenode", fmt.Sprint(i)),
Address: net.JoinHostPort(host, port(storagenodePeer, i, publicGRPC)),
})
// storage node must wait for bootstrap and satellites to start
process.WaitForStart(bootstrap)
for _, satellite := range satellites {
process.WaitForStart(satellite)
}
process.Arguments = withCommon(process.Directory, Arguments{
"setup": {
"--identity-dir", process.Directory,
"--console.address", net.JoinHostPort(host, port(storagenodePeer, i, publicHTTP)),
"--console.static-dir", filepath.Join(storjRoot, "web/storagenode/"),
"--server.address", process.Address,
"--server.private-address", net.JoinHostPort(host, port(storagenodePeer, i, privateGRPC)),
"--kademlia.operator.email", fmt.Sprintf("storage%d@mail.test", i),
"--kademlia.operator.wallet", "0x0123456789012345678901234567890123456789",
"--storage2.monitor.minimum-disk-space", "0",
"--storage2.monitor.minimum-bandwidth", "0",
"--server.extensions.revocation=false",
"--server.use-peer-ca-whitelist=false",
"--version.server-address", fmt.Sprintf("http://%s/", versioncontrol.Address),
"--debug.addr", net.JoinHostPort(host, port(storagenodePeer, i, debugHTTP)),
},
"run": {},
})
process.ExecBefore["setup"] = func(process *Process) error {
whitelisted := []string{}
for _, satellite := range satellites {
peer, err := identity.PeerConfig{
CertPath: filepath.Join(satellite.Directory, "identity.cert"),
}.Load()
if err != nil {
return err
}
whitelisted = append(whitelisted, peer.ID.String()+"@"+satellite.Address)
}
process.Arguments["setup"] = append(process.Arguments["setup"],
"--storage.whitelisted-satellites", strings.Join(whitelisted, ","),
)
return nil
}
process.ExecBefore["run"] = func(process *Process) error {
return readConfigString(&process.Address, process.Directory, "server.address")
}
}
{ // verify that we have all binaries
missing := map[string]bool{}
for _, process := range processes.List {
_, err := exec.LookPath(process.Executable)
if err != nil {
missing[process.Executable] = true
}
}
if len(missing) > 0 {
var list []string
for executable := range missing {
list = append(list, executable)
}
sort.Strings(list)
return nil, fmt.Errorf("some executables cannot be found: %v", list)
}
}
// Create directories for all processes
for _, process := range processes.List {
if err := os.MkdirAll(process.Directory, folderPermissions); err != nil {
return nil, err
}
}
return processes, nil
}
func identitySetup(network *Processes) (*Processes, error) {
processes := NewProcesses(network.Directory)
for _, process := range network.List {
if process.Info.Executable == "gateway" {
// gateways don't need an identity
continue
}
identity := processes.New(Info{
Name: "identity/" + process.Info.Name,
Executable: "identity",
Directory: process.Directory,
Address: "",
})
identity.Arguments = Arguments{
"setup": {
"--identity-dir", process.Directory,
"--concurrency", "1",
"--difficulty", "8",
"create", ".",
},
}
}
// create directories for all processes
for _, process := range processes.List {
if err := os.MkdirAll(process.Directory, folderPermissions); err != nil {
return nil, err
}
}
return processes, nil
}
// readConfigString reads from dir/config.yaml flagName returns the value in `into`
func readConfigString(into *string, dir, flagName string) error {
vip := viper.New()
vip.AddConfigPath(dir)
if err := vip.ReadInConfig(); err != nil {
return err
}
if v := vip.GetString(flagName); v != "" {
*into = v
}
return nil
}