paul cannon 5b9fd4e50a cmd/storj-sim: fix prefix writer infinite loop bug
Fix case where line is larger than maxline and writer ended up in an
infinite loop trying to buffer more data.

Change-Id: I243da738b331279d6bf27255778b5798e7f37f95
2020-09-11 12:33:04 +00:00

338 lines
8.2 KiB

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
// Processes contains list of processes.
type Processes struct {
Output *PrefixWriter
Directory string
List []*Process
MaxStartupWait time.Duration
const storjSimMaxLineLen = 10000
// NewProcesses returns a group of processes.
func NewProcesses(dir string) *Processes {
return &Processes{
Output: NewPrefixWriter("sim", storjSimMaxLineLen, os.Stdout),
Directory: dir,
List: nil,
MaxStartupWait: time.Minute,
// Exec executes a command on all processes.
func (processes *Processes) Exec(ctx context.Context, command string) error {
var group errgroup.Group
processes.Start(ctx, &group, command)
return group.Wait()
// Start executes all processes using specified errgroup.Group.
func (processes *Processes) Start(ctx context.Context, group *errgroup.Group, command string) {
for _, p := range processes.List {
process := p
group.Go(func() error {
return process.Exec(ctx, command)
// Env returns environment flags for other nodes.
func (processes *Processes) Env() []string {
var env []string
for _, process := range processes.List {
env = append(env, process.Info.Env()...)
return env
// Close closes all the processes and their resources.
func (processes *Processes) Close() error {
var errlist errs.Group
for _, process := range processes.List {
return errlist.Err()
// Info represents public information about the process.
type Info struct {
Name string
Executable string
Address string
Directory string
ID string
Pid int
Extra []EnvVar
// EnvVar represents an environment variable like Key=Value.
type EnvVar struct {
Key string
Value string
// AddExtra appends an extra environment variable to the process info.
func (info *Info) AddExtra(key, value string) {
info.Extra = append(info.Extra, EnvVar{Key: key, Value: value})
// Env returns process flags.
func (info *Info) Env() []string {
name := strings.ToUpper(info.Name)
name = strings.Map(func(r rune) rune {
switch {
case '0' <= r && r <= '9':
return r
case 'a' <= r && r <= 'z':
return r
case 'A' <= r && r <= 'Z':
return r
return '_'
}, name)
var env []string
if info.ID != "" {
env = append(env, name+"_ID="+info.ID)
if info.Address != "" {
env = append(env, name+"_ADDR="+info.Address)
if info.Directory != "" {
env = append(env, name+"_DIR="+info.Directory)
if info.Pid != 0 {
env = append(env, name+"_PID="+strconv.Itoa(info.Pid))
for _, extra := range info.Extra {
env = append(env, name+"_"+strings.ToUpper(extra.Key)+"="+extra.Value)
return env
// Arguments contains arguments based on the main command.
type Arguments map[string][]string
// Process is a type for monitoring the process.
type Process struct {
processes *Processes
Delay time.Duration
Wait []*sync2.Fence
Status struct {
Started sync2.Fence
Exited sync2.Fence
ExecBefore map[string]func(*Process) error
Arguments Arguments
stdout io.Writer
stderr io.Writer
// New creates a process which can be run in the specified directory.
func (processes *Processes) New(info Info) *Process {
output := processes.Output.Prefixed(info.Name)
process := &Process{
processes: processes,
Info: info,
ExecBefore: map[string]func(*Process) error{},
Arguments: Arguments{},
stdout: output,
stderr: output,
processes.List = append(processes.List, process)
return process
// WaitForStart ensures that process will wait on dependency before starting.
func (process *Process) WaitForStart(dependency *Process) {
process.Wait = append(process.Wait, &dependency.Status.Started)
// WaitForExited ensures that process will wait on dependency before starting.
func (process *Process) WaitForExited(dependency *Process) {
process.Wait = append(process.Wait, &dependency.Status.Exited)
// Exec runs the process using the arguments for a given command.
func (process *Process) Exec(ctx context.Context, command string) (err error) {
// ensure that we always release all status fences
defer process.Status.Started.Release()
defer process.Status.Exited.Release()
ctx, cancelProcess := context.WithCancel(ctx)
defer cancelProcess()
// wait for dependencies to start
for _, fence := range process.Wait {
if !fence.Wait(ctx) {
return ctx.Err()
// in case we have an explicit delay then sleep
if process.Delay > 0 {
if !sync2.Sleep(ctx, process.Delay) {
return ctx.Err()
if exec, ok := process.ExecBefore[command]; ok {
if err := exec(process); err != nil {
return err
executable := process.Executable
// use executable inside the directory, if it exists
localExecutable := filepath.Join(process.Directory, executable)
if _, err := os.Lstat(localExecutable); !os.IsNotExist(err) {
executable = localExecutable
if _, ok := process.Arguments[command]; !ok {
fmt.Fprintf(process.processes.Output, "%s running: %s\n", process.Name, command)
cmd := exec.CommandContext(ctx, executable, process.Arguments[command]...)
cmd.Dir = process.processes.Directory
cmd.Env = append(os.Environ(), "STORJ_LOG_NOTIME=1")
{ // setup standard output with logging into file
outfile, err1 := os.OpenFile(filepath.Join(process.Directory, "stdout.log"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err1 != nil {
return fmt.Errorf("open stdout: %v", err1)
defer func() { err = errs.Combine(err, outfile.Close()) }()
cmd.Stdout = io.MultiWriter(process.stdout, outfile)
{ // setup standard error with logging into file
errfile, err2 := os.OpenFile(filepath.Join(process.Directory, "stderr.log"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err2 != nil {
return fmt.Errorf("open stderr: %v", err2)
defer func() {
err = errs.Combine(err, errfile.Close())
cmd.Stderr = io.MultiWriter(process.stderr, errfile)
// ensure that it is part of this process group
if printCommands {
fmt.Fprintf(process.processes.Output, "%s running: %v\n", process.Name, strings.Join(cmd.Args, " "))
defer func() {
fmt.Fprintf(process.processes.Output, "%s exited: %v\n", process.Name, err)
// start the process
err = cmd.Start()
if err != nil {
return err
process.Info.Pid = cmd.Process.Pid
if command == "setup" || process.Address == "" {
// during setup we aren't starting the addresses, so we can release the dependencies immediately
} else {
// release started when we are able to connect to the process address
go func() {
defer process.Status.Started.Release()
err := process.waitForAddress(process.processes.MaxStartupWait)
if err != nil {
fmt.Fprintf(process.processes.Output, "failed to wait startup: %v", err)
// wait for process completion
err = cmd.Wait()
// clear the error if the process was killed
if status, ok := cmd.ProcessState.Sys().(syscall.WaitStatus); ok {
if status.Signaled() && status.Signal() == os.Kill {
err = nil
return err
// waitForAddress will monitor starting when we are able to start the process.
func (process *Process) waitForAddress(maxStartupWait time.Duration) error {
start := time.Now()
for !process.Status.Started.Released() {
if process.tryConnect() {
return nil
// wait a bit before retrying to reduce load
time.Sleep(50 * time.Millisecond)
if time.Since(start) > maxStartupWait {
return fmt.Errorf("%s did not start in required time %v", process.Name, maxStartupWait)
return nil
// tryConnect will try to connect to the process public address.
func (process *Process) tryConnect() bool {
conn, err := net.Dial("tcp", process.Info.Address)
if err != nil {
return false
// write empty byte slice to trigger refresh on connection
_, _ = conn.Write([]byte{})
// ignoring errors, because we only care about being able to connect
_ = conn.Close()
return true
// Close closes process resources.
func (process *Process) Close() error { return nil }