8fd841b910
This PR fixes below issues: 1. remove concurrent installation for various versions We were doing this to decrease the amount of execution time the versions test. However, it's returning incorrect exit code when there's an installation failure. Right now, we are only installing two versions of `storj-sim` and the rest are only doing uplink cli installation. The performance of this test should be hugely impacted by the setup step now. 2. only remove release settings instead of deleting the entire file uplink CLI is referrencing `private/version` package. Therefore, we cannot delete it 3. add back `GATEWAY_0_API_KEY` in storj-sim In order to set up older version of uplink cli, we need access to the gate way api key. Change-Id: Ia3c37c197bd007b6e1f7c2bd71adde42181d46f0
726 lines
21 KiB
Go
726 lines
21 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"net"
|
|
"net/url"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"runtime"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/alessio/shellescape"
|
|
"github.com/btcsuite/btcutil/base58"
|
|
"github.com/spf13/viper"
|
|
"github.com/zeebo/errs"
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
"storj.io/common/fpath"
|
|
"storj.io/common/identity"
|
|
"storj.io/common/pb"
|
|
"storj.io/common/processgroup"
|
|
"storj.io/common/storj"
|
|
"storj.io/storj/private/dbutil"
|
|
"storj.io/storj/private/dbutil/pgutil"
|
|
"storj.io/uplink"
|
|
)
|
|
|
|
const (
|
|
maxInstanceCount = 100
|
|
maxStoragenodeCount = 200
|
|
|
|
folderPermissions = 0744
|
|
)
|
|
|
|
var (
|
|
defaultAccess = "17jgVrPRktsquJQFzpT533WHmZnF6QDkuv8w3Ry5XPzAkh3vj7D1dbJ5MatQRiyRE2ZEiA1Y6fYnhoWqr2n7VgycdXSUPz1QzhthBsHqGXCrRcjSp8RbbVE1VJqDej9nLgB5YDPh3Q5JrVjQeMe9saHAL5rE5tUYJAeynVdre8HeTJMXcwau5"
|
|
)
|
|
|
|
const (
|
|
// The following values of peer class and endpoints are used
|
|
// to create a port with a consistent format for storj-sim services.
|
|
|
|
// Peer classes.
|
|
satellitePeer = 0
|
|
gatewayPeer = 1
|
|
versioncontrolPeer = 2
|
|
storagenodePeer = 3
|
|
|
|
// Endpoints.
|
|
publicRPC = 0
|
|
privateRPC = 1
|
|
publicHTTP = 2
|
|
privateHTTP = 3
|
|
debugHTTP = 9
|
|
|
|
// Satellite specific constants.
|
|
redisPort = 4
|
|
adminHTTP = 5
|
|
debugAdminHTTP = 6
|
|
debugPeerHTTP = 7
|
|
debugRepairerHTTP = 8
|
|
debugGCHTTP = 10
|
|
)
|
|
|
|
// port creates a port with a consistent format for storj-sim services.
|
|
// The port format is: "1PXXE", where P is the peer class, XX is the index of the instance, and E is the endpoint.
|
|
func port(peerclass, index, endpoint int) string {
|
|
port := 10000 + peerclass*1000 + index*10 + endpoint
|
|
return strconv.Itoa(port)
|
|
}
|
|
|
|
func networkExec(flags *Flags, args []string, command string) error {
|
|
processes, err := newNetwork(flags)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ctx, cancel := NewCLIContext(context.Background())
|
|
defer cancel()
|
|
|
|
if command == "setup" {
|
|
if flags.Postgres == "" {
|
|
return errors.New("postgres connection URL is required for running storj-sim. Example: `storj-sim network setup --postgres=<connection URL>`.\nSee docs for more details https://github.com/storj/docs/blob/master/Test-network.md#running-tests-with-postgres")
|
|
}
|
|
|
|
identities, err := identitySetup(processes)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = identities.Exec(ctx, command)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
err = processes.Exec(ctx, command)
|
|
closeErr := processes.Close()
|
|
|
|
return errs.Combine(err, closeErr)
|
|
}
|
|
|
|
func escapeEnv(env string) string {
|
|
// TODO(jeff): escape env variables appropriately on windows. perhaps the
|
|
// env output should be of the form `set KEY=VALUE` as well.
|
|
if runtime.GOOS == "windows" {
|
|
return env
|
|
}
|
|
|
|
parts := strings.SplitN(env, "=", 2)
|
|
if len(parts) != 2 {
|
|
return env
|
|
}
|
|
return parts[0] + "=" + shellescape.Quote(parts[1])
|
|
}
|
|
|
|
func networkEnv(flags *Flags, args []string) error {
|
|
flags.OnlyEnv = true
|
|
|
|
processes, err := newNetwork(flags)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// run exec before, since it will load env vars from configs
|
|
for _, process := range processes.List {
|
|
if exec := process.ExecBefore["run"]; exec != nil {
|
|
if err := exec(process); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(args) == 1 {
|
|
envprefix := strings.ToUpper(args[0] + "=")
|
|
// find the environment value that the environment variable is set to
|
|
for _, env := range processes.Env() {
|
|
if strings.HasPrefix(strings.ToUpper(env), envprefix) {
|
|
fmt.Println(escapeEnv(env[len(envprefix):]))
|
|
return nil
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
for _, env := range processes.Env() {
|
|
fmt.Println(escapeEnv(env))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func networkTest(flags *Flags, command string, args []string) error {
|
|
processes, err := newNetwork(flags)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ctx, cancel := NewCLIContext(context.Background())
|
|
|
|
var group errgroup.Group
|
|
processes.Start(ctx, &group, "run")
|
|
|
|
for _, process := range processes.List {
|
|
process.Status.Started.Wait(ctx)
|
|
}
|
|
if err := ctx.Err(); err != nil {
|
|
return err
|
|
}
|
|
|
|
cmd := exec.CommandContext(ctx, command, args...)
|
|
cmd.Env = append(os.Environ(), processes.Env()...)
|
|
stdout := processes.Output.Prefixed("test:out")
|
|
stderr := processes.Output.Prefixed("test:err")
|
|
cmd.Stdout, cmd.Stderr = stdout, stderr
|
|
processgroup.Setup(cmd)
|
|
|
|
if printCommands {
|
|
fmt.Fprintf(processes.Output, "exec: %v\n", strings.Join(cmd.Args, " "))
|
|
}
|
|
errRun := cmd.Run()
|
|
|
|
cancel()
|
|
return errs.Combine(errRun, processes.Close(), group.Wait())
|
|
}
|
|
|
|
func networkDestroy(flags *Flags, args []string) error {
|
|
if fpath.IsRoot(flags.Directory) {
|
|
return errors.New("safety check: disallowed to remove root directory " + flags.Directory)
|
|
}
|
|
if printCommands {
|
|
fmt.Println("sim | exec: rm -rf", flags.Directory)
|
|
}
|
|
return os.RemoveAll(flags.Directory)
|
|
}
|
|
|
|
// newNetwork creates a default network.
|
|
func newNetwork(flags *Flags) (*Processes, error) {
|
|
_, filename, _, ok := runtime.Caller(0)
|
|
if !ok {
|
|
return nil, errs.New("no caller information")
|
|
}
|
|
storjRoot := strings.TrimSuffix(filename, "/cmd/storj-sim/network.go")
|
|
|
|
// with common adds all common arguments to the process
|
|
withCommon := func(dir string, all Arguments) Arguments {
|
|
common := []string{"--metrics.app-suffix", "sim", "--log.level", "debug", "--config-dir", dir}
|
|
if flags.IsDev {
|
|
common = append(common, "--defaults", "dev")
|
|
}
|
|
for command, args := range all {
|
|
all[command] = append(append(common, command), args...)
|
|
}
|
|
return all
|
|
}
|
|
|
|
processes := NewProcesses(flags.Directory)
|
|
|
|
var host = flags.Host
|
|
versioncontrol := processes.New(Info{
|
|
Name: "versioncontrol/0",
|
|
Executable: "versioncontrol",
|
|
Directory: filepath.Join(processes.Directory, "versioncontrol", "0"),
|
|
Address: net.JoinHostPort(host, port(versioncontrolPeer, 0, publicRPC)),
|
|
})
|
|
|
|
versioncontrol.Arguments = withCommon(versioncontrol.Directory, Arguments{
|
|
"setup": {
|
|
"--address", versioncontrol.Address,
|
|
"--debug.addr", net.JoinHostPort(host, port(versioncontrolPeer, 0, debugHTTP)),
|
|
"--binary.gateway.rollout.seed", "0000000000000000000000000000000000000000000000000000000000000001",
|
|
"--binary.identity.rollout.seed", "0000000000000000000000000000000000000000000000000000000000000001",
|
|
"--binary.satellite.rollout.seed", "0000000000000000000000000000000000000000000000000000000000000001",
|
|
"--binary.storagenode-updater.rollout.seed", "0000000000000000000000000000000000000000000000000000000000000001",
|
|
"--binary.storagenode.rollout.seed", "0000000000000000000000000000000000000000000000000000000000000001",
|
|
"--binary.uplink.rollout.seed", "0000000000000000000000000000000000000000000000000000000000000001",
|
|
},
|
|
"run": {},
|
|
})
|
|
|
|
versioncontrol.ExecBefore["run"] = func(process *Process) error {
|
|
return readConfigString(&versioncontrol.Address, versioncontrol.Directory, "address")
|
|
}
|
|
// gateway must wait for the versioncontrol to start up
|
|
|
|
// Create satellites
|
|
if flags.SatelliteCount > maxInstanceCount {
|
|
return nil, fmt.Errorf("exceeded the max instance count of %d with Satellite count of %d", maxInstanceCount, flags.SatelliteCount)
|
|
}
|
|
|
|
// set up redis servers
|
|
var redisServers []*Process
|
|
|
|
if flags.Redis == "" {
|
|
for i := 0; i < flags.SatelliteCount; i++ {
|
|
rp := port(satellitePeer, i, redisPort)
|
|
process := processes.New(Info{
|
|
Name: fmt.Sprintf("redis/%d", i),
|
|
Executable: "redis-server",
|
|
Directory: filepath.Join(processes.Directory, "satellite", fmt.Sprint(i), "redis"),
|
|
Address: net.JoinHostPort(host, rp),
|
|
})
|
|
redisServers = append(redisServers, process)
|
|
|
|
process.ExecBefore["setup"] = func(process *Process) error {
|
|
confpath := filepath.Join(process.Directory, "redis.conf")
|
|
arguments := []string{
|
|
"daemonize no",
|
|
"bind " + host,
|
|
"port " + rp,
|
|
"timeout 0",
|
|
"databases 2",
|
|
"dbfilename sim.rdb",
|
|
"dir ./",
|
|
}
|
|
conf := strings.Join(arguments, "\n") + "\n"
|
|
err := ioutil.WriteFile(confpath, []byte(conf), 0755)
|
|
return err
|
|
}
|
|
process.Arguments = Arguments{
|
|
"run": []string{filepath.Join(process.Directory, "redis.conf")},
|
|
}
|
|
}
|
|
}
|
|
|
|
var satellites []*Process
|
|
for i := 0; i < flags.SatelliteCount; i++ {
|
|
apiProcess := processes.New(Info{
|
|
Name: fmt.Sprintf("satellite/%d", i),
|
|
Executable: "satellite",
|
|
Directory: filepath.Join(processes.Directory, "satellite", fmt.Sprint(i)),
|
|
Address: net.JoinHostPort(host, port(satellitePeer, i, publicRPC)),
|
|
})
|
|
satellites = append(satellites, apiProcess)
|
|
|
|
redisAddress := flags.Redis
|
|
redisPortBase := flags.RedisStartDB + i*2
|
|
if redisAddress == "" {
|
|
redisAddress = redisServers[i].Address
|
|
redisPortBase = 0
|
|
apiProcess.WaitForStart(redisServers[i])
|
|
}
|
|
|
|
apiProcess.Arguments = withCommon(apiProcess.Directory, Arguments{
|
|
"setup": {
|
|
"--identity-dir", apiProcess.Directory,
|
|
"--console.address", net.JoinHostPort(host, port(satellitePeer, i, publicHTTP)),
|
|
"--console.static-dir", filepath.Join(storjRoot, "web/satellite/"),
|
|
"--console.open-registration-enabled",
|
|
"--console.rate-limit.burst", "100",
|
|
"--marketing.base-url", "",
|
|
"--marketing.address", net.JoinHostPort(host, port(satellitePeer, i, privateHTTP)),
|
|
"--marketing.static-dir", filepath.Join(storjRoot, "web/marketing/"),
|
|
"--server.address", apiProcess.Address,
|
|
"--server.private-address", net.JoinHostPort(host, port(satellitePeer, i, privateRPC)),
|
|
|
|
"--live-accounting.storage-backend", "redis://" + redisAddress + "?db=" + strconv.Itoa(redisPortBase),
|
|
"--server.revocation-dburl", "redis://" + redisAddress + "?db=" + strconv.Itoa(redisPortBase+1),
|
|
|
|
"--server.extensions.revocation=false",
|
|
"--server.use-peer-ca-whitelist=false",
|
|
|
|
"--mail.smtp-server-address", "smtp.gmail.com:587",
|
|
"--mail.from", "Storj <yaroslav-satellite-test@storj.io>",
|
|
"--mail.template-path", filepath.Join(storjRoot, "web/satellite/static/emails"),
|
|
"--version.server-address", fmt.Sprintf("http://%s/", versioncontrol.Address),
|
|
"--debug.addr", net.JoinHostPort(host, port(satellitePeer, i, debugHTTP)),
|
|
|
|
"--admin.address", net.JoinHostPort(host, port(satellitePeer, i, adminHTTP)),
|
|
},
|
|
"run": {"api"},
|
|
})
|
|
|
|
if flags.Postgres != "" {
|
|
masterDBURL, err := namespacedDatabaseURL(flags.Postgres, fmt.Sprintf("satellite/%d", i))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
metainfoDBURL, err := namespacedDatabaseURL(flags.Postgres, fmt.Sprintf("satellite/%d/meta", i))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
apiProcess.Arguments["setup"] = append(apiProcess.Arguments["setup"],
|
|
"--database", masterDBURL,
|
|
"--metainfo.database-url", metainfoDBURL,
|
|
)
|
|
}
|
|
apiProcess.ExecBefore["run"] = func(process *Process) error {
|
|
if err := readConfigString(&process.Address, process.Directory, "server.address"); err != nil {
|
|
return err
|
|
}
|
|
|
|
satNodeID, err := identity.NodeIDFromCertPath(filepath.Join(apiProcess.Directory, "identity.cert"))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
process.AddExtra("ID", satNodeID.String())
|
|
return nil
|
|
}
|
|
|
|
migrationProcess := processes.New(Info{
|
|
Name: fmt.Sprintf("satellite-migration/%d", i),
|
|
Executable: "satellite",
|
|
Directory: filepath.Join(processes.Directory, "satellite", fmt.Sprint(i)),
|
|
})
|
|
migrationProcess.Arguments = withCommon(apiProcess.Directory, Arguments{
|
|
"run": {
|
|
"migration",
|
|
"--debug.addr", net.JoinHostPort(host, port(satellitePeer, i, debugPeerHTTP)),
|
|
},
|
|
})
|
|
apiProcess.WaitForExited(migrationProcess)
|
|
|
|
coreProcess := processes.New(Info{
|
|
Name: fmt.Sprintf("satellite-core/%d", i),
|
|
Executable: "satellite",
|
|
Directory: filepath.Join(processes.Directory, "satellite", fmt.Sprint(i)),
|
|
Address: "",
|
|
})
|
|
coreProcess.Arguments = withCommon(apiProcess.Directory, Arguments{
|
|
"run": {
|
|
"--debug.addr", net.JoinHostPort(host, port(satellitePeer, i, debugPeerHTTP)),
|
|
},
|
|
})
|
|
coreProcess.WaitForExited(migrationProcess)
|
|
|
|
adminProcess := processes.New(Info{
|
|
Name: fmt.Sprintf("satellite-admin/%d", i),
|
|
Executable: "satellite",
|
|
Directory: filepath.Join(processes.Directory, "satellite", fmt.Sprint(i)),
|
|
Address: net.JoinHostPort(host, port(satellitePeer, i, adminHTTP)),
|
|
})
|
|
adminProcess.Arguments = withCommon(apiProcess.Directory, Arguments{
|
|
"run": {
|
|
"admin",
|
|
"--debug.addr", net.JoinHostPort(host, port(satellitePeer, i, debugAdminHTTP)),
|
|
},
|
|
})
|
|
adminProcess.WaitForExited(migrationProcess)
|
|
|
|
repairProcess := processes.New(Info{
|
|
Name: fmt.Sprintf("satellite-repairer/%d", i),
|
|
Executable: "satellite",
|
|
Directory: filepath.Join(processes.Directory, "satellite", fmt.Sprint(i)),
|
|
})
|
|
repairProcess.Arguments = withCommon(apiProcess.Directory, Arguments{
|
|
"run": {
|
|
"repair",
|
|
"--debug.addr", net.JoinHostPort(host, port(satellitePeer, i, debugRepairerHTTP)),
|
|
},
|
|
})
|
|
repairProcess.WaitForExited(migrationProcess)
|
|
|
|
garbageCollectionProcess := processes.New(Info{
|
|
Name: fmt.Sprintf("satellite-garbage-collection/%d", i),
|
|
Executable: "satellite",
|
|
Directory: filepath.Join(processes.Directory, "satellite", fmt.Sprint(i)),
|
|
})
|
|
garbageCollectionProcess.Arguments = withCommon(apiProcess.Directory, Arguments{
|
|
"run": {
|
|
"garbage-collection",
|
|
"--debug.addr", net.JoinHostPort(host, port(satellitePeer, i, debugGCHTTP)),
|
|
},
|
|
})
|
|
garbageCollectionProcess.WaitForExited(migrationProcess)
|
|
}
|
|
|
|
// Create gateways for each satellite
|
|
for i, satellite := range satellites {
|
|
satellite := satellite
|
|
process := processes.New(Info{
|
|
Name: fmt.Sprintf("gateway/%d", i),
|
|
Executable: "gateway",
|
|
Directory: filepath.Join(processes.Directory, "gateway", fmt.Sprint(i)),
|
|
Address: net.JoinHostPort(host, port(gatewayPeer, i, publicRPC)),
|
|
})
|
|
|
|
// gateway must wait for the corresponding satellite to start up
|
|
process.WaitForStart(satellite)
|
|
|
|
accessData := defaultAccess
|
|
|
|
process.Arguments = withCommon(process.Directory, Arguments{
|
|
"setup": {
|
|
"--non-interactive",
|
|
|
|
"--access", accessData,
|
|
"--server.address", process.Address,
|
|
|
|
"--debug.addr", net.JoinHostPort(host, port(gatewayPeer, i, debugHTTP)),
|
|
},
|
|
|
|
"run": {},
|
|
})
|
|
|
|
process.ExecBefore["run"] = func(process *Process) (err error) {
|
|
err = readConfigString(&process.Address, process.Directory, "server.address")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
vip := viper.New()
|
|
vip.AddConfigPath(process.Directory)
|
|
if err := vip.ReadInConfig(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// TODO: maybe all the config flags should be exposed for all processes?
|
|
|
|
// check if gateway config has an api key, if it's not
|
|
// create example project with key and add it to the config
|
|
// so that gateway can have access to the satellite
|
|
if runAccessData := vip.GetString("access"); !flags.OnlyEnv && runAccessData == accessData {
|
|
var consoleAddress string
|
|
err := readConfigString(&consoleAddress, satellite.Directory, "console.address")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// try with 100ms delays until we hit 3s
|
|
apiKey, start := "", time.Now()
|
|
for apiKey == "" {
|
|
apiKey, err = newConsoleEndpoints(consoleAddress).createOrGetAPIKey()
|
|
if err != nil && time.Since(start) > 3*time.Second {
|
|
return err
|
|
}
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
|
|
satNodeID, err := identity.NodeIDFromCertPath(filepath.Join(satellite.Directory, "identity.cert"))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
nodeURL := storj.NodeURL{
|
|
ID: satNodeID,
|
|
Address: satellite.Address,
|
|
}
|
|
|
|
access, err := uplink.RequestAccessWithPassphrase(context.Background(), nodeURL.String(), apiKey, "")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
accessData, err := access.Serialize()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
vip.Set("access", accessData)
|
|
|
|
if err := vip.WriteConfig(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if runAccessData := vip.GetString("access"); runAccessData != accessData {
|
|
process.AddExtra("ACCESS", runAccessData)
|
|
|
|
if apiKey, err := getAPIKey(runAccessData); err == nil {
|
|
process.AddExtra("API_KEY", apiKey)
|
|
}
|
|
}
|
|
|
|
process.AddExtra("ACCESS_KEY", vip.GetString("minio.access-key"))
|
|
process.AddExtra("SECRET_KEY", vip.GetString("minio.secret-key"))
|
|
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Create storage nodes
|
|
if flags.StorageNodeCount > maxStoragenodeCount {
|
|
return nil, fmt.Errorf("exceeded the max instance count of %d with Storage Node count of %d", maxStoragenodeCount, flags.StorageNodeCount)
|
|
}
|
|
for i := 0; i < flags.StorageNodeCount; i++ {
|
|
process := processes.New(Info{
|
|
Name: fmt.Sprintf("storagenode/%d", i),
|
|
Executable: "storagenode",
|
|
Directory: filepath.Join(processes.Directory, "storagenode", fmt.Sprint(i)),
|
|
Address: net.JoinHostPort(host, port(storagenodePeer, i, publicRPC)),
|
|
})
|
|
|
|
for _, satellite := range satellites {
|
|
process.WaitForStart(satellite)
|
|
}
|
|
|
|
process.Arguments = withCommon(process.Directory, Arguments{
|
|
"setup": {
|
|
"--identity-dir", process.Directory,
|
|
"--console.address", net.JoinHostPort(host, port(storagenodePeer, i, publicHTTP)),
|
|
"--console.static-dir", filepath.Join(storjRoot, "web/storagenode/"),
|
|
"--server.address", process.Address,
|
|
"--server.private-address", net.JoinHostPort(host, port(storagenodePeer, i, privateRPC)),
|
|
|
|
"--operator.email", fmt.Sprintf("storage%d@mail.test", i),
|
|
"--operator.wallet", "0x0123456789012345678901234567890123456789",
|
|
|
|
"--storage2.monitor.minimum-disk-space", "0",
|
|
|
|
"--server.extensions.revocation=false",
|
|
"--server.use-peer-ca-whitelist=false",
|
|
|
|
"--version.server-address", fmt.Sprintf("http://%s/", versioncontrol.Address),
|
|
"--debug.addr", net.JoinHostPort(host, port(storagenodePeer, i, debugHTTP)),
|
|
|
|
"--tracing.app", fmt.Sprintf("storagenode/%d", i),
|
|
},
|
|
"run": {},
|
|
})
|
|
|
|
process.ExecBefore["setup"] = func(process *Process) error {
|
|
whitelisted := []string{}
|
|
for _, satellite := range satellites {
|
|
peer, err := identity.PeerConfig{
|
|
CertPath: filepath.Join(satellite.Directory, "identity.cert"),
|
|
}.Load()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
whitelisted = append(whitelisted, peer.ID.String()+"@"+satellite.Address)
|
|
}
|
|
|
|
process.Arguments["setup"] = append(process.Arguments["setup"],
|
|
"--storage2.trust.sources", strings.Join(whitelisted, ","),
|
|
)
|
|
return nil
|
|
}
|
|
|
|
process.ExecBefore["run"] = func(process *Process) error {
|
|
return readConfigString(&process.Address, process.Directory, "server.address")
|
|
}
|
|
}
|
|
|
|
{ // verify that we have all binaries
|
|
missing := map[string]bool{}
|
|
for _, process := range processes.List {
|
|
_, err := exec.LookPath(process.Executable)
|
|
if err != nil {
|
|
missing[process.Executable] = true
|
|
}
|
|
}
|
|
if len(missing) > 0 {
|
|
var list []string
|
|
for executable := range missing {
|
|
list = append(list, executable)
|
|
}
|
|
sort.Strings(list)
|
|
return nil, fmt.Errorf("some executables cannot be found: %v", list)
|
|
}
|
|
}
|
|
|
|
// Create directories for all processes
|
|
for _, process := range processes.List {
|
|
if err := os.MkdirAll(process.Directory, folderPermissions); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return processes, nil
|
|
}
|
|
|
|
func identitySetup(network *Processes) (*Processes, error) {
|
|
processes := NewProcesses(network.Directory)
|
|
|
|
for _, process := range network.List {
|
|
if process.Info.Executable == "gateway" || process.Info.Executable == "redis-server" {
|
|
// gateways and redis-servers don't need an identity
|
|
continue
|
|
}
|
|
|
|
if strings.Contains(process.Name, "satellite-") {
|
|
// we only need to create the identity once for the satellite system, we create the
|
|
// identity for the satellite process and share it with these other satellite processes
|
|
continue
|
|
}
|
|
|
|
identity := processes.New(Info{
|
|
Name: "identity/" + process.Info.Name,
|
|
Executable: "identity",
|
|
Directory: process.Directory,
|
|
Address: "",
|
|
})
|
|
|
|
identity.Arguments = Arguments{
|
|
"setup": {
|
|
"--identity-dir", process.Directory,
|
|
"--concurrency", "1",
|
|
"--difficulty", "8",
|
|
"create", ".",
|
|
},
|
|
}
|
|
}
|
|
|
|
// create directories for all processes
|
|
for _, process := range processes.List {
|
|
if err := os.MkdirAll(process.Directory, folderPermissions); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return processes, nil
|
|
}
|
|
|
|
// getAPIKey parses an access string to return its corresponding api key.
|
|
func getAPIKey(access string) (apiKey string, err error) {
|
|
data, version, err := base58.CheckDecode(access)
|
|
if err != nil || version != 0 {
|
|
return "", errors.New("invalid access grant format")
|
|
}
|
|
|
|
p := new(pb.Scope)
|
|
if err := pb.Unmarshal(data, p); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
apiKey = base58.CheckEncode(p.ApiKey, 0)
|
|
return apiKey, nil
|
|
}
|
|
|
|
// readConfigString reads from dir/config.yaml flagName returns the value in `into`.
|
|
func readConfigString(into *string, dir, flagName string) error {
|
|
vip := viper.New()
|
|
vip.AddConfigPath(dir)
|
|
if err := vip.ReadInConfig(); err != nil {
|
|
return err
|
|
}
|
|
if v := vip.GetString(flagName); v != "" {
|
|
*into = v
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// namespacedDatabaseURL returns an equivalent database url with the given namespace
|
|
// so that a database opened with the url does not conflict with other databases
|
|
// opened with a different namespace.
|
|
func namespacedDatabaseURL(dbURL, namespace string) (string, error) {
|
|
parsed, err := url.Parse(dbURL)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
switch dbutil.ImplementationForScheme(parsed.Scheme) {
|
|
case dbutil.Postgres:
|
|
return pgutil.ConnstrWithSchema(dbURL, namespace), nil
|
|
|
|
case dbutil.Cockroach:
|
|
parsed.Path += "/" + namespace
|
|
return parsed.String(), nil
|
|
|
|
default:
|
|
return "", errs.New("unable to namespace db url: %q", dbURL)
|
|
}
|
|
}
|