1cb3cbaecf
It's possible that content was not being flushed from processes. For now, ignore other process failures under storj-sim network test. Once we get other processes stable, we can repropagate the error. Change-Id: I01ed572d7c779ab6451124f1e24e3d1168b3ea79
376 lines
9.2 KiB
Go
376 lines
9.2 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"runtime"
|
|
"strconv"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/zeebo/errs"
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
"storj.io/common/processgroup"
|
|
"storj.io/common/sync2"
|
|
)
|
|
|
|
// Processes contains list of processes.
|
|
type Processes struct {
|
|
Output *PrefixWriter
|
|
Directory string
|
|
List []*Process
|
|
|
|
FailFast bool
|
|
MaxStartupWait time.Duration
|
|
}
|
|
|
|
const storjSimMaxLineLen = 10000
|
|
|
|
// NewProcesses returns a group of processes.
|
|
func NewProcesses(dir string, failfast bool) *Processes {
|
|
return &Processes{
|
|
Output: NewPrefixWriter("sim", storjSimMaxLineLen, os.Stdout),
|
|
Directory: dir,
|
|
List: nil,
|
|
|
|
FailFast: failfast,
|
|
MaxStartupWait: time.Minute,
|
|
}
|
|
}
|
|
|
|
// Exec executes a command on all processes.
|
|
func (processes *Processes) Exec(ctx context.Context, command string) error {
|
|
defer func() { _ = processes.Output.Flush() }()
|
|
|
|
var group *errgroup.Group
|
|
if processes.FailFast {
|
|
group, ctx = errgroup.WithContext(ctx)
|
|
} else {
|
|
group = &errgroup.Group{}
|
|
}
|
|
processes.Start(ctx, group, command)
|
|
return group.Wait()
|
|
}
|
|
|
|
// Start executes all processes using specified errgroup.Group.
|
|
func (processes *Processes) Start(ctx context.Context, group *errgroup.Group, command string) {
|
|
for _, p := range processes.List {
|
|
process := p
|
|
group.Go(func() error {
|
|
err := process.Exec(ctx, command)
|
|
if errors.Is(err, context.Canceled) {
|
|
err = nil
|
|
}
|
|
if err != nil {
|
|
err = fmt.Errorf("%v failed: %w", process.Name, err)
|
|
}
|
|
return err
|
|
})
|
|
}
|
|
}
|
|
|
|
// Env returns environment flags for other nodes.
|
|
func (processes *Processes) Env() []string {
|
|
var env []string
|
|
for _, process := range processes.List {
|
|
env = append(env, process.Info.Env()...)
|
|
}
|
|
return env
|
|
}
|
|
|
|
// Close closes all the processes and their resources.
|
|
func (processes *Processes) Close() error {
|
|
defer func() { _ = processes.Output.Flush() }()
|
|
|
|
var errlist errs.Group
|
|
for _, process := range processes.List {
|
|
errlist.Add(process.Close())
|
|
}
|
|
return errlist.Err()
|
|
}
|
|
|
|
// Info represents public information about the process.
|
|
type Info struct {
|
|
Name string
|
|
Executable string
|
|
Address string
|
|
Directory string
|
|
ID string
|
|
Pid int
|
|
Extra []EnvVar
|
|
}
|
|
|
|
// EnvVar represents an environment variable like Key=Value.
|
|
type EnvVar struct {
|
|
Key string
|
|
Value string
|
|
}
|
|
|
|
// AddExtra appends an extra environment variable to the process info.
|
|
func (info *Info) AddExtra(key, value string) {
|
|
info.Extra = append(info.Extra, EnvVar{Key: key, Value: value})
|
|
}
|
|
|
|
// Env returns process flags.
|
|
func (info *Info) Env() []string {
|
|
name := strings.ToUpper(info.Name)
|
|
|
|
name = strings.Map(func(r rune) rune {
|
|
switch {
|
|
case '0' <= r && r <= '9':
|
|
return r
|
|
case 'a' <= r && r <= 'z':
|
|
return r
|
|
case 'A' <= r && r <= 'Z':
|
|
return r
|
|
default:
|
|
return '_'
|
|
}
|
|
}, name)
|
|
|
|
var env []string
|
|
if info.ID != "" {
|
|
env = append(env, name+"_ID="+info.ID)
|
|
}
|
|
if info.Address != "" {
|
|
env = append(env, name+"_ADDR="+info.Address)
|
|
}
|
|
if info.ID != "" && info.Address != "" {
|
|
env = append(env, name+"_URL="+info.ID+"@"+info.Address)
|
|
}
|
|
if info.Directory != "" {
|
|
env = append(env, name+"_DIR="+info.Directory)
|
|
}
|
|
if info.Pid != 0 {
|
|
env = append(env, name+"_PID="+strconv.Itoa(info.Pid))
|
|
}
|
|
for _, extra := range info.Extra {
|
|
env = append(env, name+"_"+strings.ToUpper(extra.Key)+"="+extra.Value)
|
|
}
|
|
return env
|
|
}
|
|
|
|
// Arguments contains arguments based on the main command.
|
|
type Arguments map[string][]string
|
|
|
|
// Process is a type for monitoring the process.
|
|
type Process struct {
|
|
processes *Processes
|
|
|
|
Info
|
|
|
|
Delay time.Duration
|
|
Wait []*sync2.Fence
|
|
Status struct {
|
|
Started sync2.Fence
|
|
Exited sync2.Fence
|
|
}
|
|
|
|
ExecBefore map[string]func(*Process) error
|
|
Arguments Arguments
|
|
|
|
stdout WriterFlusher
|
|
stderr WriterFlusher
|
|
}
|
|
|
|
// New creates a process which can be run in the specified directory.
|
|
func (processes *Processes) New(info Info) *Process {
|
|
output := processes.Output.Prefixed(info.Name)
|
|
|
|
process := &Process{
|
|
processes: processes,
|
|
|
|
Info: info,
|
|
ExecBefore: map[string]func(*Process) error{},
|
|
Arguments: Arguments{},
|
|
|
|
stdout: output,
|
|
stderr: output,
|
|
}
|
|
|
|
processes.List = append(processes.List, process)
|
|
return process
|
|
}
|
|
|
|
// WaitForStart ensures that process will wait on dependency before starting.
|
|
func (process *Process) WaitForStart(dependency *Process) {
|
|
process.Wait = append(process.Wait, &dependency.Status.Started)
|
|
}
|
|
|
|
// WaitForExited ensures that process will wait on dependency before starting.
|
|
func (process *Process) WaitForExited(dependency *Process) {
|
|
process.Wait = append(process.Wait, &dependency.Status.Exited)
|
|
}
|
|
|
|
// Exec runs the process using the arguments for a given command.
|
|
func (process *Process) Exec(ctx context.Context, command string) (err error) {
|
|
defer func() { _ = process.stdout.Flush() }()
|
|
defer func() { _ = process.stderr.Flush() }()
|
|
|
|
// ensure that we always release all status fences
|
|
defer process.Status.Started.Release()
|
|
defer process.Status.Exited.Release()
|
|
|
|
ctx, cancelProcess := context.WithCancel(ctx)
|
|
defer cancelProcess()
|
|
|
|
// wait for dependencies to start
|
|
for _, fence := range process.Wait {
|
|
if !fence.Wait(ctx) {
|
|
return fmt.Errorf("waiting dependencies: %w", ctx.Err())
|
|
}
|
|
}
|
|
|
|
// in case we have an explicit delay then sleep
|
|
if process.Delay > 0 {
|
|
if !sync2.Sleep(ctx, process.Delay) {
|
|
return fmt.Errorf("waiting for delay: %w", ctx.Err())
|
|
}
|
|
}
|
|
|
|
if exec, ok := process.ExecBefore[command]; ok {
|
|
if err := exec(process); err != nil {
|
|
return fmt.Errorf("executing pre-actions: %w", err)
|
|
}
|
|
}
|
|
|
|
executable := process.Executable
|
|
|
|
// use executable inside the directory, if it exists
|
|
localExecutable := exe(filepath.Join(process.Directory, executable))
|
|
if _, err := os.Lstat(localExecutable); !os.IsNotExist(err) {
|
|
executable = localExecutable
|
|
}
|
|
|
|
if _, ok := process.Arguments[command]; !ok {
|
|
fmt.Fprintf(process.processes.Output, "%s running: %s\n", process.Name, command)
|
|
return
|
|
}
|
|
|
|
cmd := exec.CommandContext(ctx, executable, process.Arguments[command]...)
|
|
cmd.Dir = process.processes.Directory
|
|
cmd.Env = append(os.Environ(), "STORJ_LOG_NOTIME=1")
|
|
|
|
{ // setup standard output with logging into file
|
|
outfile, err1 := os.OpenFile(filepath.Join(process.Directory, "stdout.log"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
|
if err1 != nil {
|
|
return fmt.Errorf("open stdout: %w", err1)
|
|
}
|
|
defer func() { err = errs.Combine(err, outfile.Close()) }()
|
|
|
|
cmd.Stdout = io.MultiWriter(process.stdout, outfile)
|
|
}
|
|
|
|
{ // setup standard error with logging into file
|
|
errfile, err2 := os.OpenFile(filepath.Join(process.Directory, "stderr.log"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
|
if err2 != nil {
|
|
return fmt.Errorf("open stderr: %w", err2)
|
|
}
|
|
defer func() {
|
|
err = errs.Combine(err, errfile.Close())
|
|
}()
|
|
|
|
cmd.Stderr = io.MultiWriter(process.stderr, errfile)
|
|
}
|
|
|
|
// ensure that it is part of this process group
|
|
processgroup.Setup(cmd)
|
|
|
|
if printCommands {
|
|
fmt.Fprintf(process.processes.Output, "%s running: %v\n", process.Name, strings.Join(cmd.Args, " "))
|
|
defer func() {
|
|
fmt.Fprintf(process.processes.Output, "%s exited (code:%d): %v\n", process.Name, cmd.ProcessState.ExitCode(), err)
|
|
}()
|
|
}
|
|
|
|
// start the process
|
|
err = cmd.Start()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
process.Info.Pid = cmd.Process.Pid
|
|
|
|
if command == "setup" || process.Address == "" {
|
|
// during setup we aren't starting the addresses, so we can release the dependencies immediately
|
|
process.Status.Started.Release()
|
|
} else {
|
|
// release started when we are able to connect to the process address
|
|
go func() {
|
|
defer process.Status.Started.Release()
|
|
|
|
err := process.waitForAddress(process.processes.MaxStartupWait)
|
|
if err != nil {
|
|
fmt.Fprintf(process.processes.Output, "failed to wait startup: %v", err)
|
|
cancelProcess()
|
|
}
|
|
}()
|
|
}
|
|
|
|
// wait for process completion
|
|
err = cmd.Wait()
|
|
if errors.Is(err, context.Canceled) && ctx.Err() != nil {
|
|
// Ignore error caused by context cancellation.
|
|
err = nil
|
|
}
|
|
// clear the error if the process was killed
|
|
if status, ok := cmd.ProcessState.Sys().(syscall.WaitStatus); ok {
|
|
if status.Signaled() && status.Signal() == os.Kill {
|
|
err = nil
|
|
}
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// waitForAddress will monitor starting when we are able to start the process.
|
|
func (process *Process) waitForAddress(maxStartupWait time.Duration) error {
|
|
start := time.Now()
|
|
for !process.Status.Started.Released() {
|
|
if tryConnect(process.Info.Address) {
|
|
return nil
|
|
}
|
|
|
|
// wait a bit before retrying to reduce load
|
|
time.Sleep(50 * time.Millisecond)
|
|
|
|
if time.Since(start) > maxStartupWait {
|
|
return fmt.Errorf("%s did not start in required time %v", process.Name, maxStartupWait)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// tryConnect will try to connect to the process public address.
|
|
func tryConnect(address string) bool {
|
|
conn, err := net.Dial("tcp", address)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
// write empty byte slice to trigger refresh on connection
|
|
_, _ = conn.Write([]byte{})
|
|
// ignoring errors, because we only care about being able to connect
|
|
_ = conn.Close()
|
|
return true
|
|
}
|
|
|
|
// Close closes process resources.
|
|
func (process *Process) Close() error { return nil }
|
|
|
|
func exe(name string) string {
|
|
if runtime.GOOS == "windows" {
|
|
return name + ".exe"
|
|
}
|
|
return name
|
|
}
|