storj-sdk (#969)
This commit is contained in:
parent
6f931759a6
commit
387fb14e23
11
.travis.yml
11
.travis.yml
@ -65,19 +65,18 @@ matrix:
|
|||||||
- gospace istidy
|
- gospace istidy
|
||||||
|
|
||||||
### service integration tests ###
|
### service integration tests ###
|
||||||
- env: MODE=captplanet
|
- env: MODE=integration
|
||||||
services:
|
services:
|
||||||
- redis
|
- redis
|
||||||
install:
|
install:
|
||||||
- source scripts/install-awscli.sh
|
- source scripts/install-awscli.sh
|
||||||
- go install -race storj.io/storj/cmd/captplanet
|
- go install -race storj.io/storj/cmd/{storj-sdk,satellite,storagenode,uplink,gateway,certificates,captplanet}
|
||||||
- go install -race storj.io/storj/cmd/storagenode
|
|
||||||
- go install -race storj.io/storj/cmd/certificates
|
|
||||||
script:
|
script:
|
||||||
- make test-captplanet
|
- make test-storj-sdk
|
||||||
- make test-certificate-signing
|
- make test-certificate-signing
|
||||||
|
- make test-captplanet
|
||||||
|
|
||||||
### docker tests ###
|
### Docker tests ###
|
||||||
- env: MODE=docker
|
- env: MODE=docker
|
||||||
services:
|
services:
|
||||||
- docker
|
- docker
|
||||||
|
5
Makefile
5
Makefile
@ -77,6 +77,11 @@ test-captplanet: ## Test source with captain planet (travis)
|
|||||||
@echo "Running ${@}"
|
@echo "Running ${@}"
|
||||||
@./scripts/test-captplanet.sh
|
@./scripts/test-captplanet.sh
|
||||||
|
|
||||||
|
.PHONY: test-storj-sdk
|
||||||
|
test-storj-sdk: ## Test source with storj-sdk (travis)
|
||||||
|
@echo "Running ${@}"
|
||||||
|
@./scripts/test-storj-sdk.sh
|
||||||
|
|
||||||
.PHONY: test-certificate-signing
|
.PHONY: test-certificate-signing
|
||||||
test-certificate-signing: ## Test certificate signing service and storagenode setup (travis)
|
test-certificate-signing: ## Test certificate signing service and storagenode setup (travis)
|
||||||
@echo "Running ${@}"
|
@echo "Running ${@}"
|
||||||
|
@ -17,16 +17,18 @@ func NewCLIContext(root context.Context) (context.Context, func()) {
|
|||||||
|
|
||||||
signal.Notify(signals, os.Interrupt)
|
signal.Notify(signals, os.Interrupt)
|
||||||
|
|
||||||
|
stop := func() {
|
||||||
|
signal.Stop(signals)
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
select {
|
select {
|
||||||
case <-signals:
|
case <-signals:
|
||||||
cancel()
|
stop()
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return ctx, func() {
|
return ctx, stop
|
||||||
signal.Stop(signals)
|
|
||||||
cancel()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -14,6 +14,7 @@ import (
|
|||||||
// Flags contains different flags for commands
|
// Flags contains different flags for commands
|
||||||
type Flags struct {
|
type Flags struct {
|
||||||
Directory string
|
Directory string
|
||||||
|
Host string
|
||||||
|
|
||||||
SatelliteCount int
|
SatelliteCount int
|
||||||
StorageNodeCount int
|
StorageNodeCount int
|
||||||
@ -39,12 +40,13 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
rootCmd.PersistentFlags().StringVarP(&flags.Directory, "config-dir", "", configDir, "base project directory")
|
rootCmd.PersistentFlags().StringVarP(&flags.Directory, "config-dir", "", configDir, "base project directory")
|
||||||
|
rootCmd.PersistentFlags().StringVarP(&flags.Host, "host", "", "127.0.0.1", "host to use for network")
|
||||||
|
|
||||||
rootCmd.PersistentFlags().IntVarP(&flags.SatelliteCount, "satellites", "", 1, "number of satellites to start")
|
rootCmd.PersistentFlags().IntVarP(&flags.SatelliteCount, "satellites", "", 1, "number of satellites to start")
|
||||||
rootCmd.PersistentFlags().IntVarP(&flags.StorageNodeCount, "storage-nodes", "", 10, "number of storage nodes to start")
|
rootCmd.PersistentFlags().IntVarP(&flags.StorageNodeCount, "storage-nodes", "", 10, "number of storage nodes to start")
|
||||||
rootCmd.PersistentFlags().IntVarP(&flags.Identities, "identities", "", 10, "number of identities to create")
|
rootCmd.PersistentFlags().IntVarP(&flags.Identities, "identities", "", 10, "number of identities to create")
|
||||||
|
|
||||||
rootCmd.PersistentFlags().BoolVarP(&printCommands, "", "x", false, "print commands as they are run")
|
rootCmd.PersistentFlags().BoolVarP(&printCommands, "print-commands", "x", false, "print commands as they are run")
|
||||||
|
|
||||||
networkCmd := &cobra.Command{
|
networkCmd := &cobra.Command{
|
||||||
Use: "network",
|
Use: "network",
|
||||||
|
@ -5,6 +5,8 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/rand"
|
||||||
|
"encoding/hex"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
@ -13,20 +15,18 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/zeebo/errs"
|
"github.com/zeebo/errs"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
|
|
||||||
"storj.io/storj/internal/fpath"
|
"storj.io/storj/internal/fpath"
|
||||||
"storj.io/storj/internal/processgroup"
|
"storj.io/storj/internal/processgroup"
|
||||||
"storj.io/storj/pkg/utils"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const folderPermissions = 0744
|
const folderPermissions = 0744
|
||||||
|
|
||||||
func networkExec(flags *Flags, args []string, command string) error {
|
func networkExec(flags *Flags, args []string, command string) error {
|
||||||
processes, err := newNetwork(flags.Directory, flags.SatelliteCount, flags.StorageNodeCount)
|
processes, err := newNetwork(flags)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -41,7 +41,7 @@ func networkExec(flags *Flags, args []string, command string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func networkTest(flags *Flags, command string, args []string) error {
|
func networkTest(flags *Flags, command string, args []string) error {
|
||||||
processes, err := newNetwork(flags.Directory, flags.SatelliteCount, flags.StorageNodeCount)
|
processes, err := newNetwork(flags)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -51,7 +51,9 @@ func networkTest(flags *Flags, command string, args []string) error {
|
|||||||
var group errgroup.Group
|
var group errgroup.Group
|
||||||
processes.Start(ctx, &group, "run")
|
processes.Start(ctx, &group, "run")
|
||||||
|
|
||||||
time.Sleep(2 * time.Second)
|
for _, process := range processes.List {
|
||||||
|
process.Status.Started.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
cmd := exec.CommandContext(ctx, command, args...)
|
cmd := exec.CommandContext(ctx, command, args...)
|
||||||
cmd.Env = append(os.Environ(), processes.Env()...)
|
cmd.Env = append(os.Environ(), processes.Env()...)
|
||||||
@ -80,102 +82,138 @@ func networkDestroy(flags *Flags, args []string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// newNetwork creates a default network
|
// newNetwork creates a default network
|
||||||
func newNetwork(dir string, satelliteCount, storageNodeCount int) (*Processes, error) {
|
func newNetwork(flags *Flags) (*Processes, error) {
|
||||||
processes := NewProcesses()
|
// with common adds all common arguments to the process
|
||||||
|
withCommon := func(all Arguments) Arguments {
|
||||||
|
for command, args := range all {
|
||||||
|
all[command] = append([]string{
|
||||||
|
"--log.level", "debug",
|
||||||
|
"--config-dir", ".",
|
||||||
|
command,
|
||||||
|
}, args...)
|
||||||
|
}
|
||||||
|
return all
|
||||||
|
}
|
||||||
|
|
||||||
const (
|
processes := NewProcesses()
|
||||||
host = "127.0.0.1"
|
var (
|
||||||
|
configDir = flags.Directory
|
||||||
|
host = flags.Host
|
||||||
gatewayPort = 9000
|
gatewayPort = 9000
|
||||||
satellitePort = 10000
|
satellitePort = 10000
|
||||||
storageNodePort = 11000
|
storageNodePort = 11000
|
||||||
|
difficulty = "10"
|
||||||
)
|
)
|
||||||
|
|
||||||
defaultSatellite := net.JoinHostPort(host, strconv.Itoa(satellitePort+0))
|
var bootstrapSatellite *Process
|
||||||
|
|
||||||
arguments := func(name, command, addr string, rest ...string) []string {
|
// Create satellites making the first satellite bootstrap
|
||||||
return append([]string{
|
for i := 0; i < flags.SatelliteCount; i++ {
|
||||||
"--log.level", "debug",
|
process := processes.New(Info{
|
||||||
"--config-dir", ".",
|
Name: fmt.Sprintf("satellite/%d", i),
|
||||||
command,
|
Executable: "satellite",
|
||||||
"--server.address", addr,
|
Directory: filepath.Join(configDir, "satellite", fmt.Sprint(i)),
|
||||||
}, rest...)
|
Address: net.JoinHostPort(host, strconv.Itoa(satellitePort+i)),
|
||||||
}
|
})
|
||||||
|
|
||||||
for i := 0; i < satelliteCount; i++ {
|
bootstrapAddr := process.Address
|
||||||
name := fmt.Sprintf("satellite/%d", i)
|
if bootstrapSatellite != nil {
|
||||||
|
bootstrapAddr = bootstrapSatellite.Address
|
||||||
dir := filepath.Join(dir, "satellite", fmt.Sprint(i))
|
process.WaitForStart(bootstrapSatellite)
|
||||||
if err := os.MkdirAll(dir, folderPermissions); err != nil {
|
} else {
|
||||||
return nil, err
|
bootstrapSatellite = process
|
||||||
}
|
}
|
||||||
|
|
||||||
process, err := processes.New(name, "satellite", dir)
|
process.Arguments = withCommon(Arguments{
|
||||||
if err != nil {
|
"setup": {
|
||||||
return nil, utils.CombineErrors(err, processes.Close())
|
"--ca.difficulty", difficulty,
|
||||||
}
|
},
|
||||||
process.Info.Address = net.JoinHostPort(host, strconv.Itoa(satellitePort+i))
|
"run": {
|
||||||
|
"--kademlia.bootstrap-addr", bootstrapAddr,
|
||||||
process.Arguments["setup"] = arguments(name, "setup", process.Info.Address)
|
"--server.address", process.Address,
|
||||||
process.Arguments["run"] = arguments(name, "run", process.Info.Address,
|
},
|
||||||
"--kademlia.bootstrap-addr", defaultSatellite,
|
})
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
gatewayArguments := func(name, command string, addr string, rest ...string) []string {
|
// Create gateways for each satellite
|
||||||
return append([]string{
|
for i := 0; i < flags.SatelliteCount; i++ {
|
||||||
"--log.level", "debug",
|
accessKey, secretKey := randomKey(), randomKey()
|
||||||
"--config-dir", ".",
|
|
||||||
command,
|
|
||||||
"--server.address", addr,
|
|
||||||
}, rest...)
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; i < satelliteCount; i++ {
|
|
||||||
name := fmt.Sprintf("gateway/%d", i)
|
|
||||||
|
|
||||||
dir := filepath.Join(dir, "gateway", fmt.Sprint(i))
|
|
||||||
if err := os.MkdirAll(dir, folderPermissions); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
satellite := processes.List[i]
|
satellite := processes.List[i]
|
||||||
|
|
||||||
process, err := processes.New(name, "gateway", dir)
|
process := processes.New(Info{
|
||||||
if err != nil {
|
Name: fmt.Sprintf("gateway/%d", i),
|
||||||
return nil, utils.CombineErrors(err, processes.Close())
|
Executable: "gateway",
|
||||||
}
|
Directory: filepath.Join(configDir, "gateway", fmt.Sprint(i)),
|
||||||
process.Info.Address = net.JoinHostPort(host, strconv.Itoa(gatewayPort+i))
|
Address: net.JoinHostPort(host, strconv.Itoa(gatewayPort+i)),
|
||||||
|
Extra: []string{
|
||||||
|
"ACCESS_KEY=" + accessKey,
|
||||||
|
"SECRET_KEY=" + secretKey,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
process.Arguments["setup"] = gatewayArguments(name, "setup", process.Info.Address,
|
// gateway must wait for the corresponding satellite to start up
|
||||||
"--satellite-addr", satellite.Info.Address,
|
process.WaitForStart(satellite)
|
||||||
)
|
|
||||||
process.Arguments["run"] = gatewayArguments(name, "run", process.Info.Address)
|
process.Arguments = withCommon(Arguments{
|
||||||
|
"setup": {
|
||||||
|
"--satellite-addr", satellite.Address,
|
||||||
|
"--ca.difficulty", difficulty,
|
||||||
|
},
|
||||||
|
"run": {
|
||||||
|
"--server.address", process.Address,
|
||||||
|
"--minio.access-key", accessKey,
|
||||||
|
"--minio.secret-key", secretKey,
|
||||||
|
|
||||||
|
"--client.overlay-addr", satellite.Address,
|
||||||
|
"--client.pointer-db-addr", satellite.Address,
|
||||||
|
|
||||||
|
"--rs.min-threshold", strconv.Itoa(1 * flags.StorageNodeCount / 5),
|
||||||
|
"--rs.repair-threshold", strconv.Itoa(2 * flags.StorageNodeCount / 5),
|
||||||
|
"--rs.success-threshold", strconv.Itoa(3 * flags.StorageNodeCount / 5),
|
||||||
|
"--rs.max-threshold", strconv.Itoa(4 * flags.StorageNodeCount / 5),
|
||||||
|
},
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < storageNodeCount; i++ {
|
// Create storage nodes
|
||||||
name := fmt.Sprintf("storage/%d", i)
|
for i := 0; i < flags.StorageNodeCount; i++ {
|
||||||
|
process := processes.New(Info{
|
||||||
|
Name: fmt.Sprintf("storagenode/%d", i),
|
||||||
|
Executable: "storagenode",
|
||||||
|
Directory: filepath.Join(configDir, "storage", fmt.Sprint(i)),
|
||||||
|
Address: net.JoinHostPort(host, strconv.Itoa(storageNodePort+i)),
|
||||||
|
})
|
||||||
|
|
||||||
dir := filepath.Join(dir, "storage", fmt.Sprint(i))
|
// storage node must wait for bootstrap to start
|
||||||
if err := os.MkdirAll(dir, folderPermissions); err != nil {
|
process.WaitForStart(bootstrapSatellite)
|
||||||
|
|
||||||
|
process.Arguments = withCommon(Arguments{
|
||||||
|
"setup": {
|
||||||
|
"--ca.difficulty", difficulty,
|
||||||
|
"--piecestore.agreementsender.overlay-addr", bootstrapSatellite.Address,
|
||||||
|
},
|
||||||
|
"run": {
|
||||||
|
"--piecestore.agreementsender.overlay-addr", bootstrapSatellite.Address,
|
||||||
|
"--kademlia.bootstrap-addr", bootstrapSatellite.Address,
|
||||||
|
"--kademlia.operator.email", fmt.Sprintf("storage%d@example.com", i),
|
||||||
|
"--kademlia.operator.wallet", "0x0123456789012345678901234567890123456789",
|
||||||
|
"--server.address", process.Address,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create directories for all processes
|
||||||
|
for _, process := range processes.List {
|
||||||
|
if err := os.MkdirAll(process.Directory, folderPermissions); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
process, err := processes.New(name, "storagenode", dir)
|
|
||||||
if err != nil {
|
|
||||||
return nil, utils.CombineErrors(err, processes.Close())
|
|
||||||
}
|
|
||||||
process.Info.Address = net.JoinHostPort(host, strconv.Itoa(storageNodePort+i))
|
|
||||||
|
|
||||||
process.Arguments["setup"] = arguments(name, "setup", process.Info.Address,
|
|
||||||
"--piecestore.agreementsender.overlay-addr", defaultSatellite,
|
|
||||||
)
|
|
||||||
process.Arguments["run"] = arguments(name, "run", process.Info.Address,
|
|
||||||
"--piecestore.agreementsender.overlay-addr", defaultSatellite,
|
|
||||||
"--kademlia.bootstrap-addr", defaultSatellite,
|
|
||||||
"--kademlia.operator.email", fmt.Sprintf("storage%d@example.com", i),
|
|
||||||
"--kademlia.operator.wallet", "0x0123456789012345678901234567890123456789",
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return processes, nil
|
return processes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func randomKey() string {
|
||||||
|
var data [10]byte
|
||||||
|
_, _ = rand.Read(data[:])
|
||||||
|
return hex.EncodeToString(data[:])
|
||||||
|
}
|
||||||
|
@ -8,6 +8,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// PrefixWriter writes to the specified output with prefixes.
|
// PrefixWriter writes to the specified output with prefixes.
|
||||||
@ -15,9 +16,18 @@ type PrefixWriter struct {
|
|||||||
root *prefixWriter
|
root *prefixWriter
|
||||||
maxline int
|
maxline int
|
||||||
|
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
len int
|
prefixlen int
|
||||||
dst io.Writer
|
dst io.Writer
|
||||||
|
}
|
||||||
|
|
||||||
|
const maxIDLength = 10
|
||||||
|
|
||||||
|
func max(a, b int) int {
|
||||||
|
if a > b {
|
||||||
|
return a
|
||||||
|
}
|
||||||
|
return b
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPrefixWriter creates a writer than can prefix all lines written to it.
|
// NewPrefixWriter creates a writer than can prefix all lines written to it.
|
||||||
@ -34,18 +44,17 @@ func NewPrefixWriter(defaultPrefix string, dst io.Writer) *PrefixWriter {
|
|||||||
type prefixWriter struct {
|
type prefixWriter struct {
|
||||||
*PrefixWriter
|
*PrefixWriter
|
||||||
prefix string
|
prefix string
|
||||||
|
id string
|
||||||
buffer []byte
|
buffer []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
// Prefixed returns a new writer that has writes with specified prefix.
|
// Prefixed returns a new writer that has writes with specified prefix.
|
||||||
func (writer *PrefixWriter) Prefixed(prefix string) io.Writer {
|
func (writer *PrefixWriter) Prefixed(prefix string) io.Writer {
|
||||||
writer.mu.Lock()
|
writer.mu.Lock()
|
||||||
if len(prefix) > writer.len {
|
writer.prefixlen = max(writer.prefixlen, len(prefix))
|
||||||
writer.len = len(prefix)
|
|
||||||
}
|
|
||||||
writer.mu.Unlock()
|
writer.mu.Unlock()
|
||||||
|
|
||||||
return &prefixWriter{writer, prefix, make([]byte, 0, writer.maxline)}
|
return &prefixWriter{writer, prefix, "", make([]byte, 0, writer.maxline)}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write implements io.Writer that prefixes lines.
|
// Write implements io.Writer that prefixes lines.
|
||||||
@ -59,6 +68,18 @@ func (writer *prefixWriter) Write(data []byte) (int, error) {
|
|||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var newID string
|
||||||
|
if writer.id == "" {
|
||||||
|
if start := bytes.Index(data, []byte("Node ")); start > 0 {
|
||||||
|
if end := bytes.Index(data[start:], []byte(" started")); end > 0 {
|
||||||
|
newID = string(data[start+5 : start+end])
|
||||||
|
if len(newID) > maxIDLength {
|
||||||
|
newID = newID[:maxIDLength]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
buffer := data
|
buffer := data
|
||||||
|
|
||||||
// buffer everything that hasn't been written yet
|
// buffer everything that hasn't been written yet
|
||||||
@ -78,7 +99,13 @@ func (writer *prefixWriter) Write(data []byte) (int, error) {
|
|||||||
writer.mu.Lock()
|
writer.mu.Lock()
|
||||||
defer writer.mu.Unlock()
|
defer writer.mu.Unlock()
|
||||||
|
|
||||||
|
if newID != "" {
|
||||||
|
writer.id = newID
|
||||||
|
}
|
||||||
|
|
||||||
prefix := writer.prefix
|
prefix := writer.prefix
|
||||||
|
id := writer.id
|
||||||
|
timeText := time.Now().Format("15:04:05.000")
|
||||||
for len(buffer) > 0 {
|
for len(buffer) > 0 {
|
||||||
pos := bytes.IndexByte(buffer, '\n') + 1
|
pos := bytes.IndexByte(buffer, '\n') + 1
|
||||||
breakline := false
|
breakline := false
|
||||||
@ -98,7 +125,7 @@ func (writer *prefixWriter) Write(data []byte) (int, error) {
|
|||||||
breakline = true
|
breakline = true
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := fmt.Fprintf(writer.dst, "%-*s | ", writer.len, prefix)
|
_, err := fmt.Fprintf(writer.dst, "%-*s %-*s %s | ", writer.prefixlen, prefix, maxIDLength, id, timeText)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return len(data), err
|
return len(data), err
|
||||||
}
|
}
|
||||||
@ -118,6 +145,8 @@ func (writer *prefixWriter) Write(data []byte) (int, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
prefix = ""
|
prefix = ""
|
||||||
|
id = ""
|
||||||
|
timeText = " "
|
||||||
}
|
}
|
||||||
|
|
||||||
return len(data), nil
|
return len(data), nil
|
||||||
|
@ -7,14 +7,19 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/zeebo/errs"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
|
|
||||||
"storj.io/storj/internal/processgroup"
|
"storj.io/storj/internal/processgroup"
|
||||||
|
"storj.io/storj/internal/sync2"
|
||||||
"storj.io/storj/pkg/utils"
|
"storj.io/storj/pkg/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -70,16 +75,18 @@ func (processes *Processes) Close() error {
|
|||||||
return utils.CombineErrors(errs...)
|
return utils.CombineErrors(errs...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ProcessInfo represents public information about the process
|
// Info represents public information about the process
|
||||||
type ProcessInfo struct {
|
type Info struct {
|
||||||
Name string
|
Name string
|
||||||
ID string
|
Executable string
|
||||||
Address string
|
Address string
|
||||||
Directory string
|
Directory string
|
||||||
|
ID string
|
||||||
|
Extra []string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Env returns process flags
|
// Env returns process flags
|
||||||
func (info *ProcessInfo) Env() []string {
|
func (info *Info) Env() []string {
|
||||||
name := strings.ToUpper(info.Name)
|
name := strings.ToUpper(info.Name)
|
||||||
|
|
||||||
name = strings.Map(func(r rune) rune {
|
name = strings.Map(func(r rune) rune {
|
||||||
@ -105,83 +112,162 @@ func (info *ProcessInfo) Env() []string {
|
|||||||
if info.Directory != "" {
|
if info.Directory != "" {
|
||||||
env = append(env, name+"_DIR="+info.Directory)
|
env = append(env, name+"_DIR="+info.Directory)
|
||||||
}
|
}
|
||||||
|
for _, extra := range info.Extra {
|
||||||
|
env = append(env, name+"_"+extra)
|
||||||
|
}
|
||||||
return env
|
return env
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Arguments contains arguments based on the main command
|
||||||
|
type Arguments map[string][]string
|
||||||
|
|
||||||
// Process is a type for monitoring the process
|
// Process is a type for monitoring the process
|
||||||
type Process struct {
|
type Process struct {
|
||||||
processes *Processes
|
processes *Processes
|
||||||
|
|
||||||
Name string
|
Info
|
||||||
Directory string
|
|
||||||
Executable string
|
|
||||||
|
|
||||||
Info ProcessInfo
|
Delay time.Duration
|
||||||
|
Wait []*sync2.Fence
|
||||||
|
Status struct {
|
||||||
|
Started sync2.Fence
|
||||||
|
Exited sync2.Fence
|
||||||
|
}
|
||||||
|
|
||||||
Arguments map[string][]string
|
Arguments Arguments
|
||||||
|
|
||||||
stdout io.Writer
|
stdout io.Writer
|
||||||
stderr io.Writer
|
stderr io.Writer
|
||||||
|
|
||||||
outfile *os.File
|
|
||||||
errfile *os.File
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a process which can be run in the specified directory
|
// New creates a process which can be run in the specified directory
|
||||||
func (processes *Processes) New(name, executable, directory string) (*Process, error) {
|
func (processes *Processes) New(info Info) *Process {
|
||||||
outfile, err1 := os.OpenFile(filepath.Join(directory, "stderr.log"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
output := processes.Output.Prefixed(info.Name)
|
||||||
errfile, err2 := os.OpenFile(filepath.Join(directory, "stdout.log"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
|
||||||
|
|
||||||
err := utils.CombineErrors(err1, err2)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
output := processes.Output.Prefixed(name)
|
|
||||||
|
|
||||||
process := &Process{
|
process := &Process{
|
||||||
processes: processes,
|
processes: processes,
|
||||||
|
|
||||||
Name: name,
|
Info: info,
|
||||||
Directory: directory,
|
Arguments: Arguments{},
|
||||||
Executable: executable,
|
|
||||||
|
|
||||||
Info: ProcessInfo{
|
stdout: output,
|
||||||
Name: name,
|
stderr: output,
|
||||||
Directory: directory,
|
|
||||||
},
|
|
||||||
Arguments: map[string][]string{},
|
|
||||||
|
|
||||||
stdout: io.MultiWriter(output, outfile),
|
|
||||||
stderr: io.MultiWriter(output, errfile),
|
|
||||||
|
|
||||||
outfile: outfile,
|
|
||||||
errfile: errfile,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
processes.List = append(processes.List, process)
|
processes.List = append(processes.List, process)
|
||||||
|
return process
|
||||||
|
}
|
||||||
|
|
||||||
return process, nil
|
// WaitForStart ensures that process will wait on dependency before starting.
|
||||||
|
func (process *Process) WaitForStart(dependency *Process) {
|
||||||
|
process.Wait = append(process.Wait, &dependency.Status.Started)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Exec runs the process using the arguments for a given command
|
// Exec runs the process using the arguments for a given command
|
||||||
func (process *Process) Exec(ctx context.Context, command string) error {
|
func (process *Process) Exec(ctx context.Context, command string) (err error) {
|
||||||
|
// ensure that we always release all status fences
|
||||||
|
defer process.Status.Started.Release()
|
||||||
|
defer process.Status.Exited.Release()
|
||||||
|
|
||||||
cmd := exec.CommandContext(ctx, process.Executable, process.Arguments[command]...)
|
cmd := exec.CommandContext(ctx, process.Executable, process.Arguments[command]...)
|
||||||
cmd.Dir = process.Directory
|
cmd.Dir = process.Directory
|
||||||
cmd.Stdout, cmd.Stderr = process.stdout, process.stderr
|
cmd.Env = append(os.Environ(), "STORJ_LOG_NOTIME=1")
|
||||||
|
|
||||||
|
{ // setup standard output with logging into file
|
||||||
|
outfile, err1 := os.OpenFile(filepath.Join(process.Directory, "stdout.log"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
||||||
|
if err1 != nil {
|
||||||
|
return fmt.Errorf("open stdout: %v", err1)
|
||||||
|
}
|
||||||
|
defer func() { err = errs.Combine(err, outfile.Close()) }()
|
||||||
|
|
||||||
|
cmd.Stdout = io.MultiWriter(process.stdout, outfile)
|
||||||
|
}
|
||||||
|
|
||||||
|
{ // setup standard error with logging into file
|
||||||
|
errfile, err2 := os.OpenFile(filepath.Join(process.Directory, "stderr.log"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
||||||
|
if err2 != nil {
|
||||||
|
return fmt.Errorf("open stderr: %v", err2)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
err = errs.Combine(err, errfile.Close())
|
||||||
|
}()
|
||||||
|
|
||||||
|
cmd.Stderr = io.MultiWriter(process.stderr, errfile)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ensure that it is part of this process group
|
||||||
processgroup.Setup(cmd)
|
processgroup.Setup(cmd)
|
||||||
|
|
||||||
if printCommands {
|
// wait for dependencies to start
|
||||||
fmt.Fprintf(process.processes.Output, "exec: %v\n", strings.Join(cmd.Args, " "))
|
for _, fence := range process.Wait {
|
||||||
|
fence.Wait()
|
||||||
}
|
}
|
||||||
return cmd.Run()
|
|
||||||
|
// in case we have an explicit delay then sleep
|
||||||
|
if process.Delay > 0 {
|
||||||
|
if !sync2.Sleep(ctx, process.Delay) {
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if printCommands {
|
||||||
|
fmt.Fprintf(process.processes.Output, "%s running: %v\n", process.Name, strings.Join(cmd.Args, " "))
|
||||||
|
defer func() {
|
||||||
|
fmt.Fprintf(process.processes.Output, "%s exited: %v\n", process.Name, err)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// start the process
|
||||||
|
err = cmd.Start()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
switch command {
|
||||||
|
case "setup":
|
||||||
|
// during setup we aren't starting the addresses, so we can release the dependencies immediately
|
||||||
|
process.Status.Started.Release()
|
||||||
|
default:
|
||||||
|
// release started when we are able to connect to the process address
|
||||||
|
go process.monitorAddress()
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait for process completion
|
||||||
|
err = cmd.Wait()
|
||||||
|
|
||||||
|
// clear the error if the process was killed
|
||||||
|
if status, ok := cmd.ProcessState.Sys().(syscall.WaitStatus); ok {
|
||||||
|
if status.Signaled() && status.Signal() == os.Kill {
|
||||||
|
err = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// monitorAddress will monitor starting when we are able to start the process.
|
||||||
|
func (process *Process) monitorAddress() {
|
||||||
|
for !process.Status.Started.Released() {
|
||||||
|
if process.tryConnect() {
|
||||||
|
process.Status.Started.Release()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// wait a bit before retrying to reduce load
|
||||||
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// tryConnect will try to connect to the process public address
|
||||||
|
func (process *Process) tryConnect() bool {
|
||||||
|
conn, err := net.Dial("tcp", process.Info.Address)
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
// write empty byte slice to trigger refresh on connection
|
||||||
|
_, _ = conn.Write([]byte{})
|
||||||
|
// ignoring errors, because we only care about being able to connect
|
||||||
|
_ = conn.Close()
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes process resources
|
// Close closes process resources
|
||||||
func (process *Process) Close() error {
|
func (process *Process) Close() error { return nil }
|
||||||
return utils.CombineErrors(
|
|
||||||
process.outfile.Close(),
|
|
||||||
process.errfile.Close(),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
@ -39,7 +39,7 @@ func runTestPlanet(flags *Flags, command string, args []string) error {
|
|||||||
|
|
||||||
// add satellites to environment
|
// add satellites to environment
|
||||||
for i, satellite := range planet.Satellites {
|
for i, satellite := range planet.Satellites {
|
||||||
env = append(env, (&ProcessInfo{
|
env = append(env, (&Info{
|
||||||
Name: "satellite/" + strconv.Itoa(i),
|
Name: "satellite/" + strconv.Itoa(i),
|
||||||
ID: satellite.ID().String(),
|
ID: satellite.ID().String(),
|
||||||
Address: satellite.Addr(),
|
Address: satellite.Addr(),
|
||||||
@ -48,7 +48,7 @@ func runTestPlanet(flags *Flags, command string, args []string) error {
|
|||||||
|
|
||||||
// add storage nodes to environment
|
// add storage nodes to environment
|
||||||
for i, storage := range planet.StorageNodes {
|
for i, storage := range planet.StorageNodes {
|
||||||
env = append(env, (&ProcessInfo{
|
env = append(env, (&Info{
|
||||||
Name: "storage/" + strconv.Itoa(i),
|
Name: "storage/" + strconv.Itoa(i),
|
||||||
ID: storage.ID().String(),
|
ID: storage.ID().String(),
|
||||||
Address: storage.Addr(),
|
Address: storage.Addr(),
|
||||||
|
64
scripts/test-storj-sdk-aws.sh
Executable file
64
scripts/test-storj-sdk-aws.sh
Executable file
@ -0,0 +1,64 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
set -ueo pipefail
|
||||||
|
|
||||||
|
#setup tmpdir for testfiles and cleanup
|
||||||
|
TMPDIR=$(mktemp -d -t tmp.XXXXXXXXXX)
|
||||||
|
cleanup(){
|
||||||
|
rm -rf "$TMPDIR"
|
||||||
|
}
|
||||||
|
trap cleanup EXIT
|
||||||
|
|
||||||
|
SRC_DIR=$TMPDIR/source
|
||||||
|
DST_DIR=$(mktemp -d -t tmp.XXXXXXXXXX)
|
||||||
|
mkdir -p $SRC_DIR $DST_DIR
|
||||||
|
|
||||||
|
aws configure set aws_access_key_id $GATEWAY_0_ACCESS_KEY
|
||||||
|
aws configure set aws_secret_access_key $GATEWAY_0_SECRET_KEY
|
||||||
|
aws configure set default.region us-east-1
|
||||||
|
|
||||||
|
head -c 1024 </dev/urandom > $SRC_DIR/small-upload-testfile # create 1mb file of random bytes (inline)
|
||||||
|
head -c 5120 </dev/urandom > $SRC_DIR/big-upload-testfile # create 5mb file of random bytes (remote)
|
||||||
|
head -c 5 </dev/urandom > $SRC_DIR/multipart-upload-testfile # create 5kb file of random bytes (remote)
|
||||||
|
|
||||||
|
echo "Creating Bucket"
|
||||||
|
aws s3 --endpoint=http://$GATEWAY_0_ADDR mb s3://bucket
|
||||||
|
|
||||||
|
echo "Uploading Files"
|
||||||
|
aws configure set default.s3.multipart_threshold 1TB
|
||||||
|
aws s3 --endpoint=http://$GATEWAY_0_ADDR cp $SRC_DIR/small-upload-testfile s3://bucket/small-testfile
|
||||||
|
aws s3 --endpoint=http://$GATEWAY_0_ADDR cp $SRC_DIR/big-upload-testfile s3://bucket/big-testfile
|
||||||
|
|
||||||
|
# Wait 5 seconds to trigger any error related to one of the different intervals
|
||||||
|
sleep 5
|
||||||
|
|
||||||
|
echo "Uploading Multipart File"
|
||||||
|
aws configure set default.s3.multipart_threshold 4KB
|
||||||
|
aws s3 --endpoint=http://$GATEWAY_0_ADDR cp $SRC_DIR/multipart-upload-testfile s3://bucket/multipart-testfile
|
||||||
|
|
||||||
|
echo "Downloading Files"
|
||||||
|
aws s3 --endpoint=http://$GATEWAY_0_ADDR ls s3://bucket
|
||||||
|
aws s3 --endpoint=http://$GATEWAY_0_ADDR cp s3://bucket/small-testfile $DST_DIR/small-download-testfile
|
||||||
|
aws s3 --endpoint=http://$GATEWAY_0_ADDR cp s3://bucket/big-testfile $DST_DIR/big-download-testfile
|
||||||
|
aws s3 --endpoint=http://$GATEWAY_0_ADDR cp s3://bucket/multipart-testfile $DST_DIR/multipart-download-testfile
|
||||||
|
aws s3 --endpoint=http://$GATEWAY_0_ADDR rb s3://bucket --force
|
||||||
|
|
||||||
|
if cmp $SRC_DIR/small-upload-testfile $DST_DIR/small-download-testfile
|
||||||
|
then
|
||||||
|
echo "small-upload-testfile file matches uploaded file";
|
||||||
|
else
|
||||||
|
echo "small-upload-testfile file does not match uploaded file";
|
||||||
|
fi
|
||||||
|
|
||||||
|
if cmp $SRC_DIR/big-upload-testfile $DST_DIR/big-download-testfile
|
||||||
|
then
|
||||||
|
echo "big-upload-testfile file matches uploaded file";
|
||||||
|
else
|
||||||
|
echo "big-upload-testfile file does not match uploaded file";
|
||||||
|
fi
|
||||||
|
|
||||||
|
if cmp $SRC_DIR/multipart-upload-testfile $DST_DIR/multipart-download-testfile
|
||||||
|
then
|
||||||
|
echo "multipart-upload-testfile file matches uploaded file";
|
||||||
|
else
|
||||||
|
echo "multipart-upload-testfile file does not match uploaded file";
|
||||||
|
fi
|
29
scripts/test-storj-sdk.sh
Executable file
29
scripts/test-storj-sdk.sh
Executable file
@ -0,0 +1,29 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
set -ueo pipefail
|
||||||
|
|
||||||
|
SCRIPTDIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
|
||||||
|
|
||||||
|
go install -race -v storj.io/storj/cmd/{storj-sdk,satellite,storagenode,uplink,gateway}
|
||||||
|
|
||||||
|
# setup tmpdir for testfiles and cleanup
|
||||||
|
TMP=$(mktemp -d -t tmp.XXXXXXXXXX)
|
||||||
|
cleanup(){
|
||||||
|
rm -rf "$TMP"
|
||||||
|
}
|
||||||
|
trap cleanup EXIT
|
||||||
|
|
||||||
|
export STORJ_LOCAL_NETWORK=$TMP
|
||||||
|
|
||||||
|
# setup the network
|
||||||
|
storj-sdk -x network setup
|
||||||
|
|
||||||
|
# run aws-cli tests
|
||||||
|
storj-sdk -x network test bash $SCRIPTDIR/test-storj-sdk-aws.sh
|
||||||
|
storj-sdk -x network destroy
|
||||||
|
|
||||||
|
# ipv6 tests disabled because aws-cli doesn't seem to support connecting to ipv6 host
|
||||||
|
# # setup the network with ipv6
|
||||||
|
# storj-sdk -x --host "::1" network setup
|
||||||
|
# # run aws-cli tests using ipv6
|
||||||
|
# storj-sdk -x --host "::1" network test bash $SCRIPTDIR/test-storj-sdk-aws.sh
|
||||||
|
# storj-sdk -x network destroy
|
Loading…
Reference in New Issue
Block a user