storagenode/pieces/lazyfilewalker: add execwrapper package
The execwrapper package wraps the exec.Cmd and has a Command interface that mimics the behaviour of the exec.Cmd. This is useful for testing the lazyfilewalker subprocesses by stubbing instead of spawning a real subprocess. Updates https://github.com/storj/storj/issues/5349 Change-Id: I14084139c76a531f2b6d7163f9aa35c3f5e192d7
This commit is contained in:
parent
ec8bfe6b94
commit
291e639ac2
27
storagenode/pieces/lazyfilewalker/execwrapper/command.go
Normal file
27
storagenode/pieces/lazyfilewalker/execwrapper/command.go
Normal file
@ -0,0 +1,27 @@
|
||||
// Copyright (C) 2023 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package execwrapper
|
||||
|
||||
import "io"
|
||||
|
||||
// A Command is an external command being prepared or run.
|
||||
// It tries to mimic the exec.Cmd.
|
||||
type Command interface {
|
||||
// Start starts the command but does not wait for it to complete.
|
||||
// It returns an error if the command fails to start.
|
||||
// The command must be started before calling Wait.
|
||||
Start() error
|
||||
// Wait waits for the command to exit and waits for any copying to stdin or copying
|
||||
// from stdout or stderr to complete.
|
||||
// Start must be called before calling Wait.
|
||||
Wait() error
|
||||
// Run starts the specified command and waits for it to complete.
|
||||
Run() error
|
||||
// SetIn sets the stdin of the command.
|
||||
SetIn(io.Reader)
|
||||
// SetOut sets the stdout of the command.
|
||||
SetOut(io.Writer)
|
||||
// SetErr sets the stderr of the command.
|
||||
SetErr(io.Writer)
|
||||
}
|
59
storagenode/pieces/lazyfilewalker/execwrapper/exec.go
Normal file
59
storagenode/pieces/lazyfilewalker/execwrapper/exec.go
Normal file
@ -0,0 +1,59 @@
|
||||
// Copyright (C) 2023 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package execwrapper
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"golang.org/x/sys/execabs"
|
||||
)
|
||||
|
||||
var _ Command = (*Cmd)(nil)
|
||||
|
||||
// Cmd is an external command being prepared or run.
|
||||
type Cmd struct {
|
||||
cmd *execabs.Cmd
|
||||
}
|
||||
|
||||
// SetIn sets the stdin of the command.
|
||||
func (c *Cmd) SetIn(reader io.Reader) {
|
||||
c.cmd.Stdin = reader
|
||||
}
|
||||
|
||||
// SetOut sets the stdout of the command.
|
||||
func (c *Cmd) SetOut(writer io.Writer) {
|
||||
c.cmd.Stdout = writer
|
||||
}
|
||||
|
||||
// SetErr sets the stderr of the command.
|
||||
func (c *Cmd) SetErr(writer io.Writer) {
|
||||
c.cmd.Stderr = writer
|
||||
}
|
||||
|
||||
// CommandContext returns the Cmd struct to execute the named program with the given arguments.
|
||||
func CommandContext(ctx context.Context, executable string, args ...string) *Cmd {
|
||||
return &Cmd{
|
||||
cmd: execabs.CommandContext(ctx, executable, args...),
|
||||
}
|
||||
}
|
||||
|
||||
// Run starts the specified command and waits for it to complete.
|
||||
func (c *Cmd) Run() error {
|
||||
return c.cmd.Run()
|
||||
}
|
||||
|
||||
// Start starts the command but does not wait for it to complete.
|
||||
// It returns an error if the command fails to start.
|
||||
// The command must be started before calling Wait.
|
||||
func (c *Cmd) Start() error {
|
||||
return c.cmd.Start()
|
||||
}
|
||||
|
||||
// Wait waits for the command to exit and waits for any copying to stdin or copying
|
||||
// from stdout or stderr to complete.
|
||||
// Start must be called before calling Wait.
|
||||
func (c *Cmd) Wait() error {
|
||||
return c.cmd.Wait()
|
||||
}
|
@ -11,6 +11,8 @@ import (
|
||||
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sys/execabs"
|
||||
|
||||
"storj.io/storj/storagenode/pieces/lazyfilewalker/execwrapper"
|
||||
)
|
||||
|
||||
// process is a subprocess that can be used to perform filewalker operations.
|
||||
@ -18,10 +20,15 @@ type process struct {
|
||||
log *zap.Logger
|
||||
executable string
|
||||
args []string
|
||||
|
||||
cmd execwrapper.Command
|
||||
}
|
||||
|
||||
func newProcess(log *zap.Logger, executable string, args []string) *process {
|
||||
// newProcess creates a new process.
|
||||
// The cmd argument can be used to replace the subprocess with a runner for testing, it can be nil.
|
||||
func newProcess(cmd execwrapper.Command, log *zap.Logger, executable string, args []string) *process {
|
||||
return &process{
|
||||
cmd: cmd,
|
||||
log: log,
|
||||
executable: executable,
|
||||
args: args,
|
||||
@ -48,19 +55,22 @@ func (p *process) run(ctx context.Context, req, resp interface{}) (err error) {
|
||||
return errLazyFilewalker.Wrap(err)
|
||||
}
|
||||
|
||||
cmd := execabs.CommandContext(ctx, p.executable, p.args...)
|
||||
cmd.Stdin = &buf
|
||||
cmd.Stdout = &outbuf
|
||||
cmd.Stderr = writer
|
||||
if p.cmd == nil {
|
||||
p.cmd = execwrapper.CommandContext(ctx, p.executable, p.args...)
|
||||
}
|
||||
|
||||
if err := cmd.Start(); err != nil {
|
||||
p.cmd.SetIn(&buf)
|
||||
p.cmd.SetOut(&outbuf)
|
||||
p.cmd.SetErr(writer)
|
||||
|
||||
if err := p.cmd.Start(); err != nil {
|
||||
p.log.Error("failed to start subprocess", zap.Error(err))
|
||||
return errLazyFilewalker.Wrap(err)
|
||||
}
|
||||
|
||||
p.log.Info("subprocess started")
|
||||
|
||||
if err := cmd.Wait(); err != nil {
|
||||
if err := p.cmd.Wait(); err != nil {
|
||||
var exitErr *execabs.ExitError
|
||||
if errors.As(err, &exitErr) {
|
||||
p.log.Info("subprocess exited with status", zap.Int("status", exitErr.ExitCode()), zap.Error(exitErr))
|
||||
|
@ -13,6 +13,7 @@ import (
|
||||
|
||||
"storj.io/common/bloomfilter"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/storj/storagenode/pieces/lazyfilewalker/execwrapper"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -38,6 +39,9 @@ type Supervisor struct {
|
||||
executable string
|
||||
gcArgs []string
|
||||
usedSpaceArgs []string
|
||||
|
||||
testingGCCmd execwrapper.Command
|
||||
testingUsedSpaceCmd execwrapper.Command
|
||||
}
|
||||
|
||||
// NewSupervisor creates a new lazy filewalker Supervisor.
|
||||
@ -50,6 +54,18 @@ func NewSupervisor(log *zap.Logger, executable string, args []string) *Superviso
|
||||
}
|
||||
}
|
||||
|
||||
// TestingSetGCCmd sets the command for the gc-filewalker subprocess.
|
||||
// The cmd acts as a replacement for the subprocess.
|
||||
func (fw *Supervisor) TestingSetGCCmd(cmd execwrapper.Command) {
|
||||
fw.testingGCCmd = cmd
|
||||
}
|
||||
|
||||
// TestingSetUsedSpaceCmd sets the command for the used-space-filewalker subprocess.
|
||||
// The cmd acts as a replacement for the subprocess.
|
||||
func (fw *Supervisor) TestingSetUsedSpaceCmd(cmd execwrapper.Command) {
|
||||
fw.testingUsedSpaceCmd = cmd
|
||||
}
|
||||
|
||||
// UsedSpaceRequest is the request struct for the used-space-filewalker process.
|
||||
type UsedSpaceRequest struct {
|
||||
SatelliteID storj.NodeID `json:"satelliteID"`
|
||||
@ -86,7 +102,7 @@ func (fw *Supervisor) WalkAndComputeSpaceUsedBySatellite(ctx context.Context, sa
|
||||
|
||||
log := fw.log.Named(UsedSpaceFilewalkerCmdName).With(zap.String("satelliteID", satelliteID.String()))
|
||||
|
||||
err = newProcess(log, fw.executable, fw.usedSpaceArgs).run(ctx, req, &resp)
|
||||
err = newProcess(fw.testingUsedSpaceCmd, log, fw.executable, fw.usedSpaceArgs).run(ctx, req, &resp)
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
@ -111,7 +127,7 @@ func (fw *Supervisor) WalkSatellitePiecesToTrash(ctx context.Context, satelliteI
|
||||
|
||||
log := fw.log.Named(GCFilewalkerCmdName).With(zap.String("satelliteID", satelliteID.String()))
|
||||
|
||||
err = newProcess(log, fw.executable, fw.gcArgs).run(ctx, req, &resp)
|
||||
err = newProcess(fw.testingGCCmd, log, fw.executable, fw.gcArgs).run(ctx, req, &resp)
|
||||
if err != nil {
|
||||
return nil, 0, 0, err
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user