From 1cb3cbaecfee52b8eaa9679d60f1307dc6f7a7d3 Mon Sep 17 00:00:00 2001 From: Egon Elbre Date: Tue, 2 Aug 2022 14:52:24 +0300 Subject: [PATCH] cmd/storj-sim: ignore context canceled and flush output It's possible that content was not being flushed from processes. For now, ignore other process failures under storj-sim network test. Once we get other processes stable, we can repropagate the error. Change-Id: I01ed572d7c779ab6451124f1e24e3d1168b3ea79 --- cmd/storj-sim/network.go | 29 +++++++++---- cmd/storj-sim/prefix.go | 80 +++++++++++++++++++++++++++++++++++- cmd/storj-sim/prefix_test.go | 2 + cmd/storj-sim/process.go | 35 ++++++++++++---- 4 files changed, 129 insertions(+), 17 deletions(-) diff --git a/cmd/storj-sim/network.go b/cmd/storj-sim/network.go index 5dc2c3063..7f7477719 100644 --- a/cmd/storj-sim/network.go +++ b/cmd/storj-sim/network.go @@ -86,6 +86,7 @@ func networkExec(flags *Flags, args []string, command string) error { if err != nil { return err } + defer func() { _ = processes.Output.Flush() }() ctx, cancel := NewCLIContext(context.Background()) defer cancel() @@ -133,6 +134,7 @@ func networkEnv(flags *Flags, args []string) error { if err != nil { return err } + defer func() { _ = processes.Output.Flush() }() // run exec before, since it will load env vars from configs for _, process := range processes.List { @@ -168,6 +170,7 @@ func networkTest(flags *Flags, command string, args []string) error { if err != nil { return err } + defer func() { _ = processes.Output.Flush() }() ctx, cancel := NewCLIContext(context.Background()) @@ -184,23 +187,33 @@ func networkTest(flags *Flags, command string, args []string) error { process.Status.Started.Wait(ctx) } if err := ctx.Err(); err != nil { - return err + // If the context has been cancelled, it means that one of the processes failed. + // Wait for the processes to shut down themselves and return the first error. + return fmt.Errorf("network canceled: %w", group.Wait()) } cmd := exec.CommandContext(ctx, command, args...) cmd.Env = append(os.Environ(), processes.Env()...) + stdout := processes.Output.Prefixed("test:out") + defer func() { _ = stdout.Flush() }() stderr := processes.Output.Prefixed("test:err") + defer func() { _ = stderr.Flush() }() cmd.Stdout, cmd.Stderr = stdout, stderr + processgroup.Setup(cmd) if printCommands { fmt.Fprintf(processes.Output, "exec: %v\n", strings.Join(cmd.Args, " ")) } errRun := cmd.Run() + if errRun != nil { + fmt.Fprintf(processes.Output, "test command failed: %v\n", errRun) + } cancel() - return errs.Combine(errRun, processes.Close(), group.Wait()) + _ = group.Wait() + return errs.Combine(errRun, processes.Close()) } func networkDestroy(flags *Flags, args []string) error { @@ -506,7 +519,7 @@ func newNetwork(flags *Flags) (*Processes, error) { var consoleAddress string err := readConfigString(&consoleAddress, satellite.Directory, "console.address") if err != nil { - return err + return fmt.Errorf("failed to read config string: %w", err) } // try with 100ms delays until we hit 3s @@ -514,14 +527,14 @@ func newNetwork(flags *Flags) (*Processes, error) { for apiKey == "" { apiKey, err = newConsoleEndpoints(consoleAddress).createOrGetAPIKey(context.Background()) if err != nil && time.Since(start) > 3*time.Second { - return err + return fmt.Errorf("failed to create account: %w", err) } time.Sleep(100 * time.Millisecond) } satNodeID, err := identity.NodeIDFromCertPath(filepath.Join(satellite.Directory, "identity.cert")) if err != nil { - return err + return fmt.Errorf("failed to get node id from path: %w", err) } nodeURL := storj.NodeURL{ ID: satNodeID, @@ -530,17 +543,17 @@ func newNetwork(flags *Flags) (*Processes, error) { access, err := uplink.RequestAccessWithPassphrase(context.Background(), nodeURL.String(), apiKey, "") if err != nil { - return err + return fmt.Errorf("failed to get passphrase: %w", err) } accessData, err := access.Serialize() if err != nil { - return err + return fmt.Errorf("failed to serialize access: %w", err) } vip.Set("access", accessData) if err := vip.WriteConfig(); err != nil { - return err + return fmt.Errorf("failed to write config: %w", err) } } diff --git a/cmd/storj-sim/prefix.go b/cmd/storj-sim/prefix.go index 7b77d74c1..dd77d3f58 100644 --- a/cmd/storj-sim/prefix.go +++ b/cmd/storj-sim/prefix.go @@ -56,8 +56,14 @@ type prefixWriter struct { buffer []byte } +// WriterFlusher implements io.Writer and flushing of pending content. +type WriterFlusher interface { + io.Writer + Flush() error +} + // Prefixed returns a new writer that has writes with specified prefix. -func (writer *PrefixWriter) Prefixed(prefix string) io.Writer { +func (writer *PrefixWriter) Prefixed(prefix string) WriterFlusher { writer.mu.Lock() writer.prefixlen = max(writer.prefixlen, len(prefix)) writer.mu.Unlock() @@ -75,6 +81,11 @@ func (writer *PrefixWriter) Write(data []byte) (int, error) { return writer.root.Write(data) } +// Flush any pending content. +func (writer *PrefixWriter) Flush() error { + return writer.root.Flush() +} + // Write implements io.Writer that prefixes lines. func (writer *prefixWriter) Write(data []byte) (int, error) { if len(data) == 0 { @@ -175,3 +186,70 @@ func (writer *prefixWriter) Write(data []byte) (int, error) { return len(data), nil } + +// Flush flushes any pending data. +func (writer *prefixWriter) Flush() error { + writer.local.Lock() + defer writer.local.Unlock() + + buffer := writer.buffer + writer.buffer = nil + if len(buffer) == 0 { + return nil + } + + writer.mu.Lock() + defer writer.mu.Unlock() + + prefix := writer.prefix + id := writer.id + timeText := writer.nowFunc().Format(timeFormat) + for len(buffer) > 0 { + pos := bytes.IndexByte(buffer, '\n') + insertbreak := false + + // did not find a linebreak + if pos < 0 { + pos = len(buffer) + } + + // try to find a nice place where to break the line + if pos < 0 || pos > writer.maxline { + pos = writer.maxline - 1 + for p := pos; p >= writer.maxline*2/3; p-- { + // is there a space we can break on? + if buffer[p] == ' ' { + pos = p + break + } + } + insertbreak = true + } + + _, err := fmt.Fprintf(writer.dst, "%-*s %-*s %s | ", writer.prefixlen, prefix, maxIDLength, id, timeText) + if err != nil { + return err + } + + _, err = writer.dst.Write(buffer[:pos]) + buffer = buffer[pos:] + if err != nil { + return err + } + _, err = writer.dst.Write([]byte{'\n'}) + if err != nil { + return err + } + + // remove the linebreak from buffer, if it's not an insert + if !insertbreak && len(buffer) > 0 { + buffer = buffer[1:] + } + + prefix = "" + id = "" + timeText = emptyTimeField + } + + return nil +} diff --git a/cmd/storj-sim/prefix_test.go b/cmd/storj-sim/prefix_test.go index 7938b74b6..89ad87440 100644 --- a/cmd/storj-sim/prefix_test.go +++ b/cmd/storj-sim/prefix_test.go @@ -17,7 +17,9 @@ import ( func TestPrefixWriter(t *testing.T) { root := NewPrefixWriter("", storjSimMaxLineLen, ioutil.Discard) alpha := root.Prefixed("alpha") + defer func() { _ = alpha.Flush() }() beta := root.Prefixed("beta") + defer func() { _ = beta.Flush() }() var group errgroup.Group defer func() { diff --git a/cmd/storj-sim/process.go b/cmd/storj-sim/process.go index 014c9c9d4..0951bf4a1 100644 --- a/cmd/storj-sim/process.go +++ b/cmd/storj-sim/process.go @@ -5,6 +5,7 @@ package main import ( "context" + "errors" "fmt" "io" "net" @@ -50,6 +51,8 @@ func NewProcesses(dir string, failfast bool) *Processes { // Exec executes a command on all processes. func (processes *Processes) Exec(ctx context.Context, command string) error { + defer func() { _ = processes.Output.Flush() }() + var group *errgroup.Group if processes.FailFast { group, ctx = errgroup.WithContext(ctx) @@ -65,7 +68,14 @@ func (processes *Processes) Start(ctx context.Context, group *errgroup.Group, co for _, p := range processes.List { process := p group.Go(func() error { - return process.Exec(ctx, command) + err := process.Exec(ctx, command) + if errors.Is(err, context.Canceled) { + err = nil + } + if err != nil { + err = fmt.Errorf("%v failed: %w", process.Name, err) + } + return err }) } } @@ -81,6 +91,8 @@ func (processes *Processes) Env() []string { // Close closes all the processes and their resources. func (processes *Processes) Close() error { + defer func() { _ = processes.Output.Flush() }() + var errlist errs.Group for _, process := range processes.List { errlist.Add(process.Close()) @@ -168,8 +180,8 @@ type Process struct { ExecBefore map[string]func(*Process) error Arguments Arguments - stdout io.Writer - stderr io.Writer + stdout WriterFlusher + stderr WriterFlusher } // New creates a process which can be run in the specified directory. @@ -203,6 +215,9 @@ func (process *Process) WaitForExited(dependency *Process) { // Exec runs the process using the arguments for a given command. func (process *Process) Exec(ctx context.Context, command string) (err error) { + defer func() { _ = process.stdout.Flush() }() + defer func() { _ = process.stderr.Flush() }() + // ensure that we always release all status fences defer process.Status.Started.Release() defer process.Status.Exited.Release() @@ -213,20 +228,20 @@ func (process *Process) Exec(ctx context.Context, command string) (err error) { // wait for dependencies to start for _, fence := range process.Wait { if !fence.Wait(ctx) { - return ctx.Err() + return fmt.Errorf("waiting dependencies: %w", ctx.Err()) } } // in case we have an explicit delay then sleep if process.Delay > 0 { if !sync2.Sleep(ctx, process.Delay) { - return ctx.Err() + return fmt.Errorf("waiting for delay: %w", ctx.Err()) } } if exec, ok := process.ExecBefore[command]; ok { if err := exec(process); err != nil { - return err + return fmt.Errorf("executing pre-actions: %w", err) } } @@ -275,7 +290,7 @@ func (process *Process) Exec(ctx context.Context, command string) (err error) { if printCommands { fmt.Fprintf(process.processes.Output, "%s running: %v\n", process.Name, strings.Join(cmd.Args, " ")) defer func() { - fmt.Fprintf(process.processes.Output, "%s exited: %v\n", process.Name, err) + fmt.Fprintf(process.processes.Output, "%s exited (code:%d): %v\n", process.Name, cmd.ProcessState.ExitCode(), err) }() } @@ -304,13 +319,17 @@ func (process *Process) Exec(ctx context.Context, command string) (err error) { // wait for process completion err = cmd.Wait() - + if errors.Is(err, context.Canceled) && ctx.Err() != nil { + // Ignore error caused by context cancellation. + err = nil + } // clear the error if the process was killed if status, ok := cmd.ProcessState.Sys().(syscall.WaitStatus); ok { if status.Signaled() && status.Signal() == os.Kill { err = nil } } + return err }