cmd/storj-sim: ignore context canceled and flush output

It's possible that content was not being flushed from processes.

For now, ignore other process failures under storj-sim network test.
Once we get other processes stable, we can repropagate the error.

Change-Id: I01ed572d7c779ab6451124f1e24e3d1168b3ea79
This commit is contained in:
Egon Elbre 2022-08-02 14:52:24 +03:00
parent 98b8c7be06
commit 1cb3cbaecf
4 changed files with 129 additions and 17 deletions

View File

@ -86,6 +86,7 @@ func networkExec(flags *Flags, args []string, command string) error {
if err != nil { if err != nil {
return err return err
} }
defer func() { _ = processes.Output.Flush() }()
ctx, cancel := NewCLIContext(context.Background()) ctx, cancel := NewCLIContext(context.Background())
defer cancel() defer cancel()
@ -133,6 +134,7 @@ func networkEnv(flags *Flags, args []string) error {
if err != nil { if err != nil {
return err return err
} }
defer func() { _ = processes.Output.Flush() }()
// run exec before, since it will load env vars from configs // run exec before, since it will load env vars from configs
for _, process := range processes.List { for _, process := range processes.List {
@ -168,6 +170,7 @@ func networkTest(flags *Flags, command string, args []string) error {
if err != nil { if err != nil {
return err return err
} }
defer func() { _ = processes.Output.Flush() }()
ctx, cancel := NewCLIContext(context.Background()) ctx, cancel := NewCLIContext(context.Background())
@ -184,23 +187,33 @@ func networkTest(flags *Flags, command string, args []string) error {
process.Status.Started.Wait(ctx) process.Status.Started.Wait(ctx)
} }
if err := ctx.Err(); err != nil { if err := ctx.Err(); err != nil {
return err // If the context has been cancelled, it means that one of the processes failed.
// Wait for the processes to shut down themselves and return the first error.
return fmt.Errorf("network canceled: %w", group.Wait())
} }
cmd := exec.CommandContext(ctx, command, args...) cmd := exec.CommandContext(ctx, command, args...)
cmd.Env = append(os.Environ(), processes.Env()...) cmd.Env = append(os.Environ(), processes.Env()...)
stdout := processes.Output.Prefixed("test:out") stdout := processes.Output.Prefixed("test:out")
defer func() { _ = stdout.Flush() }()
stderr := processes.Output.Prefixed("test:err") stderr := processes.Output.Prefixed("test:err")
defer func() { _ = stderr.Flush() }()
cmd.Stdout, cmd.Stderr = stdout, stderr cmd.Stdout, cmd.Stderr = stdout, stderr
processgroup.Setup(cmd) processgroup.Setup(cmd)
if printCommands { if printCommands {
fmt.Fprintf(processes.Output, "exec: %v\n", strings.Join(cmd.Args, " ")) fmt.Fprintf(processes.Output, "exec: %v\n", strings.Join(cmd.Args, " "))
} }
errRun := cmd.Run() errRun := cmd.Run()
if errRun != nil {
fmt.Fprintf(processes.Output, "test command failed: %v\n", errRun)
}
cancel() cancel()
return errs.Combine(errRun, processes.Close(), group.Wait()) _ = group.Wait()
return errs.Combine(errRun, processes.Close())
} }
func networkDestroy(flags *Flags, args []string) error { func networkDestroy(flags *Flags, args []string) error {
@ -506,7 +519,7 @@ func newNetwork(flags *Flags) (*Processes, error) {
var consoleAddress string var consoleAddress string
err := readConfigString(&consoleAddress, satellite.Directory, "console.address") err := readConfigString(&consoleAddress, satellite.Directory, "console.address")
if err != nil { if err != nil {
return err return fmt.Errorf("failed to read config string: %w", err)
} }
// try with 100ms delays until we hit 3s // try with 100ms delays until we hit 3s
@ -514,14 +527,14 @@ func newNetwork(flags *Flags) (*Processes, error) {
for apiKey == "" { for apiKey == "" {
apiKey, err = newConsoleEndpoints(consoleAddress).createOrGetAPIKey(context.Background()) apiKey, err = newConsoleEndpoints(consoleAddress).createOrGetAPIKey(context.Background())
if err != nil && time.Since(start) > 3*time.Second { if err != nil && time.Since(start) > 3*time.Second {
return err return fmt.Errorf("failed to create account: %w", err)
} }
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
} }
satNodeID, err := identity.NodeIDFromCertPath(filepath.Join(satellite.Directory, "identity.cert")) satNodeID, err := identity.NodeIDFromCertPath(filepath.Join(satellite.Directory, "identity.cert"))
if err != nil { if err != nil {
return err return fmt.Errorf("failed to get node id from path: %w", err)
} }
nodeURL := storj.NodeURL{ nodeURL := storj.NodeURL{
ID: satNodeID, ID: satNodeID,
@ -530,17 +543,17 @@ func newNetwork(flags *Flags) (*Processes, error) {
access, err := uplink.RequestAccessWithPassphrase(context.Background(), nodeURL.String(), apiKey, "") access, err := uplink.RequestAccessWithPassphrase(context.Background(), nodeURL.String(), apiKey, "")
if err != nil { if err != nil {
return err return fmt.Errorf("failed to get passphrase: %w", err)
} }
accessData, err := access.Serialize() accessData, err := access.Serialize()
if err != nil { if err != nil {
return err return fmt.Errorf("failed to serialize access: %w", err)
} }
vip.Set("access", accessData) vip.Set("access", accessData)
if err := vip.WriteConfig(); err != nil { if err := vip.WriteConfig(); err != nil {
return err return fmt.Errorf("failed to write config: %w", err)
} }
} }

View File

@ -56,8 +56,14 @@ type prefixWriter struct {
buffer []byte buffer []byte
} }
// WriterFlusher implements io.Writer and flushing of pending content.
type WriterFlusher interface {
io.Writer
Flush() error
}
// Prefixed returns a new writer that has writes with specified prefix. // Prefixed returns a new writer that has writes with specified prefix.
func (writer *PrefixWriter) Prefixed(prefix string) io.Writer { func (writer *PrefixWriter) Prefixed(prefix string) WriterFlusher {
writer.mu.Lock() writer.mu.Lock()
writer.prefixlen = max(writer.prefixlen, len(prefix)) writer.prefixlen = max(writer.prefixlen, len(prefix))
writer.mu.Unlock() writer.mu.Unlock()
@ -75,6 +81,11 @@ func (writer *PrefixWriter) Write(data []byte) (int, error) {
return writer.root.Write(data) return writer.root.Write(data)
} }
// Flush any pending content.
func (writer *PrefixWriter) Flush() error {
return writer.root.Flush()
}
// Write implements io.Writer that prefixes lines. // Write implements io.Writer that prefixes lines.
func (writer *prefixWriter) Write(data []byte) (int, error) { func (writer *prefixWriter) Write(data []byte) (int, error) {
if len(data) == 0 { if len(data) == 0 {
@ -175,3 +186,70 @@ func (writer *prefixWriter) Write(data []byte) (int, error) {
return len(data), nil return len(data), nil
} }
// Flush flushes any pending data.
func (writer *prefixWriter) Flush() error {
writer.local.Lock()
defer writer.local.Unlock()
buffer := writer.buffer
writer.buffer = nil
if len(buffer) == 0 {
return nil
}
writer.mu.Lock()
defer writer.mu.Unlock()
prefix := writer.prefix
id := writer.id
timeText := writer.nowFunc().Format(timeFormat)
for len(buffer) > 0 {
pos := bytes.IndexByte(buffer, '\n')
insertbreak := false
// did not find a linebreak
if pos < 0 {
pos = len(buffer)
}
// try to find a nice place where to break the line
if pos < 0 || pos > writer.maxline {
pos = writer.maxline - 1
for p := pos; p >= writer.maxline*2/3; p-- {
// is there a space we can break on?
if buffer[p] == ' ' {
pos = p
break
}
}
insertbreak = true
}
_, err := fmt.Fprintf(writer.dst, "%-*s %-*s %s | ", writer.prefixlen, prefix, maxIDLength, id, timeText)
if err != nil {
return err
}
_, err = writer.dst.Write(buffer[:pos])
buffer = buffer[pos:]
if err != nil {
return err
}
_, err = writer.dst.Write([]byte{'\n'})
if err != nil {
return err
}
// remove the linebreak from buffer, if it's not an insert
if !insertbreak && len(buffer) > 0 {
buffer = buffer[1:]
}
prefix = ""
id = ""
timeText = emptyTimeField
}
return nil
}

View File

@ -17,7 +17,9 @@ import (
func TestPrefixWriter(t *testing.T) { func TestPrefixWriter(t *testing.T) {
root := NewPrefixWriter("", storjSimMaxLineLen, ioutil.Discard) root := NewPrefixWriter("", storjSimMaxLineLen, ioutil.Discard)
alpha := root.Prefixed("alpha") alpha := root.Prefixed("alpha")
defer func() { _ = alpha.Flush() }()
beta := root.Prefixed("beta") beta := root.Prefixed("beta")
defer func() { _ = beta.Flush() }()
var group errgroup.Group var group errgroup.Group
defer func() { defer func() {

View File

@ -5,6 +5,7 @@ package main
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"io" "io"
"net" "net"
@ -50,6 +51,8 @@ func NewProcesses(dir string, failfast bool) *Processes {
// Exec executes a command on all processes. // Exec executes a command on all processes.
func (processes *Processes) Exec(ctx context.Context, command string) error { func (processes *Processes) Exec(ctx context.Context, command string) error {
defer func() { _ = processes.Output.Flush() }()
var group *errgroup.Group var group *errgroup.Group
if processes.FailFast { if processes.FailFast {
group, ctx = errgroup.WithContext(ctx) group, ctx = errgroup.WithContext(ctx)
@ -65,7 +68,14 @@ func (processes *Processes) Start(ctx context.Context, group *errgroup.Group, co
for _, p := range processes.List { for _, p := range processes.List {
process := p process := p
group.Go(func() error { group.Go(func() error {
return process.Exec(ctx, command) err := process.Exec(ctx, command)
if errors.Is(err, context.Canceled) {
err = nil
}
if err != nil {
err = fmt.Errorf("%v failed: %w", process.Name, err)
}
return err
}) })
} }
} }
@ -81,6 +91,8 @@ func (processes *Processes) Env() []string {
// Close closes all the processes and their resources. // Close closes all the processes and their resources.
func (processes *Processes) Close() error { func (processes *Processes) Close() error {
defer func() { _ = processes.Output.Flush() }()
var errlist errs.Group var errlist errs.Group
for _, process := range processes.List { for _, process := range processes.List {
errlist.Add(process.Close()) errlist.Add(process.Close())
@ -168,8 +180,8 @@ type Process struct {
ExecBefore map[string]func(*Process) error ExecBefore map[string]func(*Process) error
Arguments Arguments Arguments Arguments
stdout io.Writer stdout WriterFlusher
stderr io.Writer stderr WriterFlusher
} }
// New creates a process which can be run in the specified directory. // New creates a process which can be run in the specified directory.
@ -203,6 +215,9 @@ func (process *Process) WaitForExited(dependency *Process) {
// Exec runs the process using the arguments for a given command. // Exec runs the process using the arguments for a given command.
func (process *Process) Exec(ctx context.Context, command string) (err error) { func (process *Process) Exec(ctx context.Context, command string) (err error) {
defer func() { _ = process.stdout.Flush() }()
defer func() { _ = process.stderr.Flush() }()
// ensure that we always release all status fences // ensure that we always release all status fences
defer process.Status.Started.Release() defer process.Status.Started.Release()
defer process.Status.Exited.Release() defer process.Status.Exited.Release()
@ -213,20 +228,20 @@ func (process *Process) Exec(ctx context.Context, command string) (err error) {
// wait for dependencies to start // wait for dependencies to start
for _, fence := range process.Wait { for _, fence := range process.Wait {
if !fence.Wait(ctx) { if !fence.Wait(ctx) {
return ctx.Err() return fmt.Errorf("waiting dependencies: %w", ctx.Err())
} }
} }
// in case we have an explicit delay then sleep // in case we have an explicit delay then sleep
if process.Delay > 0 { if process.Delay > 0 {
if !sync2.Sleep(ctx, process.Delay) { if !sync2.Sleep(ctx, process.Delay) {
return ctx.Err() return fmt.Errorf("waiting for delay: %w", ctx.Err())
} }
} }
if exec, ok := process.ExecBefore[command]; ok { if exec, ok := process.ExecBefore[command]; ok {
if err := exec(process); err != nil { if err := exec(process); err != nil {
return err return fmt.Errorf("executing pre-actions: %w", err)
} }
} }
@ -275,7 +290,7 @@ func (process *Process) Exec(ctx context.Context, command string) (err error) {
if printCommands { if printCommands {
fmt.Fprintf(process.processes.Output, "%s running: %v\n", process.Name, strings.Join(cmd.Args, " ")) fmt.Fprintf(process.processes.Output, "%s running: %v\n", process.Name, strings.Join(cmd.Args, " "))
defer func() { defer func() {
fmt.Fprintf(process.processes.Output, "%s exited: %v\n", process.Name, err) fmt.Fprintf(process.processes.Output, "%s exited (code:%d): %v\n", process.Name, cmd.ProcessState.ExitCode(), err)
}() }()
} }
@ -304,13 +319,17 @@ func (process *Process) Exec(ctx context.Context, command string) (err error) {
// wait for process completion // wait for process completion
err = cmd.Wait() err = cmd.Wait()
if errors.Is(err, context.Canceled) && ctx.Err() != nil {
// Ignore error caused by context cancellation.
err = nil
}
// clear the error if the process was killed // clear the error if the process was killed
if status, ok := cmd.ProcessState.Sys().(syscall.WaitStatus); ok { if status, ok := cmd.ProcessState.Sys().(syscall.WaitStatus); ok {
if status.Signaled() && status.Signal() == os.Kill { if status.Signaled() && status.Signal() == os.Kill {
err = nil err = nil
} }
} }
return err return err
} }