{storagenode/pieces,cmd/storagenode}: refactor lazyfilewalker commands and tests

With this change we are directly testing how the command
is executed when the args are passed

Change-Id: Ibb33926014c9d71c928e0fd374bf4edc5a8a1232
This commit is contained in:
Clement Sam 2023-05-24 21:26:33 +00:00 committed by Clement Sam
parent 8c4a9f9277
commit b64179c82a
9 changed files with 164 additions and 173 deletions

View File

@ -16,6 +16,7 @@ import (
"storj.io/private/process" "storj.io/private/process"
"storj.io/storj/storagenode/pieces/lazyfilewalker" "storj.io/storj/storagenode/pieces/lazyfilewalker"
"storj.io/storj/storagenode/pieces/lazyfilewalker/execwrapper"
"storj.io/storj/storagenode/storagenodedb" "storj.io/storj/storagenode/storagenodedb"
) )
@ -24,23 +25,92 @@ type FilewalkerCfg struct {
lazyfilewalker.Config lazyfilewalker.Config
} }
// DatabaseConfig returns the storagenodedb.Config that should be used with this lazyfilewalker.
func (config *FilewalkerCfg) DatabaseConfig() storagenodedb.Config {
return storagenodedb.Config{
Storage: config.Storage,
Info: config.Info,
Info2: config.Info2,
Pieces: config.Pieces,
Filestore: config.Filestore,
Driver: config.Driver,
}
}
// RunOptions defines the options for the lazyfilewalker runners. // RunOptions defines the options for the lazyfilewalker runners.
type RunOptions struct { type RunOptions struct {
Ctx context.Context Ctx context.Context
Logger *zap.Logger Logger *zap.Logger
Config *FilewalkerCfg config *FilewalkerCfg
stdin io.Reader stdin io.Reader
stderr io.Writer stderr io.Writer
stdout io.Writer stdout io.Writer
} }
type nopWriterSyncCloser struct { // LazyFilewalkerCmd is a wrapper for the lazyfilewalker commands.
io.Writer type LazyFilewalkerCmd struct {
Command *cobra.Command
*RunOptions
} }
func (cw nopWriterSyncCloser) Close() error { return nil } var _ execwrapper.Command = (*LazyFilewalkerCmd)(nil)
func (cw nopWriterSyncCloser) Sync() error { return nil }
// SetArgs sets arguments for the command.
// The command or executable path should be passed as the first argument.
func (cmd *LazyFilewalkerCmd) SetArgs(args []string) {
if len(args) > 0 {
// arg[0] is the command name or executable path, which we don't need
// args[1] is the lazyfilewalker subcommand.
args = args[2:]
}
cmd.Command.SetArgs(args)
}
// Run runs the LazyFileWalker.
func (cmd *LazyFilewalkerCmd) Run() error {
return cmd.Command.ExecuteContext(cmd.Ctx)
}
// Start starts the LazyFileWalker command, assuming it behaves like the Start method on exec.Cmd.
// This is a no-op and only exists to satisfy the execwrapper.Command interface.
// Wait must be called to actually run the command.
func (cmd *LazyFilewalkerCmd) Start() error {
return nil
}
// Wait waits for the LazyFileWalker to finish, assuming it behaves like the Wait method on exec.Cmd.
func (cmd *LazyFilewalkerCmd) Wait() error {
return cmd.Run()
}
func (r *RunOptions) normalize(cmd *cobra.Command) {
if r.Ctx == nil {
ctx, _ := process.Ctx(cmd)
r.Ctx = ctx
}
if r.stdin == nil {
r.SetIn(os.Stdin)
}
if r.stdout == nil {
r.SetOut(os.Stdout)
}
if r.stderr == nil {
r.SetErr(os.Stderr)
}
if r.Logger == nil {
r.Logger = zap.L()
}
}
// SetIn sets the stdin reader.
func (r *RunOptions) SetIn(reader io.Reader) {
r.stdin = reader
}
// SetOut sets the stdout writer. // SetOut sets the stdout writer.
func (r *RunOptions) SetOut(writer io.Writer) { func (r *RunOptions) SetOut(writer io.Writer) {
@ -50,13 +120,16 @@ func (r *RunOptions) SetOut(writer io.Writer) {
// SetErr sets the stderr writer. // SetErr sets the stderr writer.
func (r *RunOptions) SetErr(writer io.Writer) { func (r *RunOptions) SetErr(writer io.Writer) {
r.stderr = writer r.stderr = writer
writerkey := "zapwriter" r.tryCreateNewLogger()
}
func (r *RunOptions) tryCreateNewLogger() {
// If the writer is os.Stderr, we don't need to register it because the stderr // If the writer is os.Stderr, we don't need to register it because the stderr
// writer is registered by default. // writer is registered by default.
if writer == os.Stderr { if r.stderr == os.Stderr {
return return
} }
writerkey := "zapwriter"
err := zap.RegisterSink(writerkey, func(u *url.URL) (zap.Sink, error) { err := zap.RegisterSink(writerkey, func(u *url.URL) (zap.Sink, error) {
return nopWriterSyncCloser{r.stderr}, nil return nopWriterSyncCloser{r.stderr}, nil
@ -87,71 +160,9 @@ func (r *RunOptions) SetErr(writer io.Writer) {
r.Logger = logger r.Logger = logger
} }
// SetIn sets the stdin reader. type nopWriterSyncCloser struct {
func (r *RunOptions) SetIn(reader io.Reader) { io.Writer
r.stdin = reader
} }
// DefaultRunOpts returns the default RunOptions. func (cw nopWriterSyncCloser) Close() error { return nil }
func DefaultRunOpts(ctx context.Context, logger *zap.Logger, config *FilewalkerCfg) *RunOptions { func (cw nopWriterSyncCloser) Sync() error { return nil }
return &RunOptions{
Ctx: ctx,
Logger: logger,
Config: config,
stdin: os.Stdin,
stdout: os.Stdout,
stderr: os.Stderr,
}
}
// DatabaseConfig returns the storagenodedb.Config that should be used with this lazyfilewalker.
func (config *FilewalkerCfg) DatabaseConfig() storagenodedb.Config {
return storagenodedb.Config{
Storage: config.Storage,
Info: config.Info,
Info2: config.Info2,
Pieces: config.Pieces,
Filestore: config.Filestore,
Driver: config.Driver,
}
}
// NewUsedSpaceFilewalkerCmd creates a new cobra command for running used-space calculation filewalker.
func NewUsedSpaceFilewalkerCmd() *cobra.Command {
var cfg FilewalkerCfg
cmd := &cobra.Command{
Use: lazyfilewalker.UsedSpaceFilewalkerCmdName,
Short: "An internal subcommand used to run used-space calculation filewalker as a separate subprocess with lower IO priority",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, _ := process.Ctx(cmd)
return NewUsedSpaceLazyFilewalkerWithConfig(ctx, zap.L(), &cfg).Run()
},
Hidden: true,
Args: cobra.ExactArgs(0),
}
process.Bind(cmd, &cfg)
return cmd
}
// NewGCFilewalkerCmd creates a new cobra command for running garbage collection filewalker.
func NewGCFilewalkerCmd() *cobra.Command {
var cfg FilewalkerCfg
cmd := &cobra.Command{
Use: lazyfilewalker.GCFilewalkerCmdName,
Short: "An internal subcommand used to run garbage collection filewalker as a separate subprocess with lower IO priority",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, _ := process.Ctx(cmd)
return NewGCLazyFilewalkerWithConfig(ctx, zap.L(), &cfg).Run()
},
Hidden: true,
Args: cobra.ExactArgs(0),
}
process.Bind(cmd, &cfg)
return cmd
}

View File

@ -4,44 +4,53 @@
package internalcmd package internalcmd
import ( import (
"context"
"encoding/json" "encoding/json"
"io"
"runtime" "runtime"
"github.com/spf13/cobra"
"github.com/zeebo/errs" "github.com/zeebo/errs"
"go.uber.org/zap" "go.uber.org/zap"
"storj.io/common/bloomfilter" "storj.io/common/bloomfilter"
"storj.io/private/process"
"storj.io/storj/storagenode/iopriority" "storj.io/storj/storagenode/iopriority"
"storj.io/storj/storagenode/pieces" "storj.io/storj/storagenode/pieces"
"storj.io/storj/storagenode/pieces/lazyfilewalker" "storj.io/storj/storagenode/pieces/lazyfilewalker"
"storj.io/storj/storagenode/pieces/lazyfilewalker/execwrapper"
"storj.io/storj/storagenode/storagenodedb" "storj.io/storj/storagenode/storagenodedb"
) )
// GCLazyFileWalker is an execwrapper.Command for the gc-filewalker. // NewGCFilewalkerCmd creates a new cobra command for running garbage collection filewalker.
type GCLazyFileWalker struct { func NewGCFilewalkerCmd() *LazyFilewalkerCmd {
*RunOptions var cfg FilewalkerCfg
} var runOpts RunOptions
var _ execwrapper.Command = (*GCLazyFileWalker)(nil) cmd := &cobra.Command{
Use: lazyfilewalker.GCFilewalkerCmdName,
Short: "An internal subcommand used to run garbage collection filewalker as a separate subprocess with lower IO priority",
RunE: func(cmd *cobra.Command, args []string) error {
runOpts.normalize(cmd)
runOpts.config = &cfg
// NewGCLazyFilewalker creates a new GCLazyFileWalker instance. return gcCmdRun(&runOpts)
func NewGCLazyFilewalker(ctx context.Context, logger *zap.Logger, config lazyfilewalker.Config) *GCLazyFileWalker { },
return NewGCLazyFilewalkerWithConfig(ctx, logger, &FilewalkerCfg{config}) FParseErrWhitelist: cobra.FParseErrWhitelist{
} UnknownFlags: true,
},
Hidden: true,
Args: cobra.ExactArgs(0),
}
// NewGCLazyFilewalkerWithConfig creates a new GCLazyFileWalker instance with the given config. process.Bind(cmd, &cfg)
func NewGCLazyFilewalkerWithConfig(ctx context.Context, logger *zap.Logger, config *FilewalkerCfg) *GCLazyFileWalker {
return &GCLazyFileWalker{ return &LazyFilewalkerCmd{
RunOptions: DefaultRunOpts(ctx, logger, config), Command: cmd,
RunOptions: &runOpts,
} }
} }
// Run runs the GCLazyFileWalker. // Run runs the GCLazyFileWalker.
func (g *GCLazyFileWalker) Run() (err error) { func gcCmdRun(g *RunOptions) (err error) {
if g.Config.LowerIOPriority { if g.config.LowerIOPriority {
if runtime.GOOS == "linux" { if runtime.GOOS == "linux" {
// Pin the current goroutine to the current OS thread, so we can set the IO priority // Pin the current goroutine to the current OS thread, so we can set the IO priority
// for the current thread. // for the current thread.
@ -75,7 +84,7 @@ func (g *GCLazyFileWalker) Run() (err error) {
// We still need the DB in this case because we still have to deal with v0 pieces. // We still need the DB in this case because we still have to deal with v0 pieces.
// Once we drop support for v0 pieces, we can remove this. // Once we drop support for v0 pieces, we can remove this.
db, err := storagenodedb.OpenExisting(g.Ctx, log.Named("db"), g.Config.DatabaseConfig()) db, err := storagenodedb.OpenExisting(g.Ctx, log.Named("db"), g.config.DatabaseConfig())
if err != nil { if err != nil {
return errs.New("Error starting master database on storage node: %v", err) return errs.New("Error starting master database on storage node: %v", err)
} }
@ -109,30 +118,3 @@ func (g *GCLazyFileWalker) Run() (err error) {
// encode the response struct and write it to stdout // encode the response struct and write it to stdout
return json.NewEncoder(g.stdout).Encode(resp) return json.NewEncoder(g.stdout).Encode(resp)
} }
// Start starts the GCLazyFileWalker, assuming it behaves like the Start method on exec.Cmd.
// This is a no-op and only exists to satisfy the execwrapper.Command interface.
// Wait must be called to actually run the command.
func (g *GCLazyFileWalker) Start() error {
return nil
}
// Wait waits for the GCLazyFileWalker to finish, assuming it behaves like the Wait method on exec.Cmd.
func (g *GCLazyFileWalker) Wait() error {
return g.Run()
}
// SetIn sets the stdin of the GCLazyFileWalker.
func (g *GCLazyFileWalker) SetIn(reader io.Reader) {
g.RunOptions.SetIn(reader)
}
// SetOut sets the stdout of the GCLazyFileWalker.
func (g *GCLazyFileWalker) SetOut(writer io.Writer) {
g.RunOptions.SetOut(writer)
}
// SetErr sets the stderr of the GCLazyFileWalker.
func (g *GCLazyFileWalker) SetErr(writer io.Writer) {
g.RunOptions.SetErr(writer)
}

View File

@ -4,43 +4,52 @@
package internalcmd package internalcmd
import ( import (
"context"
"encoding/json" "encoding/json"
"io"
"runtime" "runtime"
"github.com/spf13/cobra"
"github.com/zeebo/errs" "github.com/zeebo/errs"
"go.uber.org/zap" "go.uber.org/zap"
"storj.io/private/process"
"storj.io/storj/storagenode/iopriority" "storj.io/storj/storagenode/iopriority"
"storj.io/storj/storagenode/pieces" "storj.io/storj/storagenode/pieces"
"storj.io/storj/storagenode/pieces/lazyfilewalker" "storj.io/storj/storagenode/pieces/lazyfilewalker"
"storj.io/storj/storagenode/pieces/lazyfilewalker/execwrapper"
"storj.io/storj/storagenode/storagenodedb" "storj.io/storj/storagenode/storagenodedb"
) )
// UsedSpaceLazyFileWalker is an execwrapper.Command for the used-space-filewalker. // NewUsedSpaceFilewalkerCmd creates a new cobra command for running used-space calculation filewalker.
type UsedSpaceLazyFileWalker struct { func NewUsedSpaceFilewalkerCmd() *LazyFilewalkerCmd {
*RunOptions var cfg FilewalkerCfg
} var runOpts RunOptions
var _ execwrapper.Command = (*UsedSpaceLazyFileWalker)(nil) cmd := &cobra.Command{
Use: lazyfilewalker.UsedSpaceFilewalkerCmdName,
Short: "An internal subcommand used to run used-space calculation filewalker as a separate subprocess with lower IO priority",
RunE: func(cmd *cobra.Command, args []string) error {
runOpts.normalize(cmd)
runOpts.config = &cfg
// NewUsedSpaceLazyFilewalker creates a new UsedSpaceLazyFileWalker instance. return usedSpaceCmdRun(&runOpts)
func NewUsedSpaceLazyFilewalker(ctx context.Context, logger *zap.Logger, config lazyfilewalker.Config) *UsedSpaceLazyFileWalker { },
return NewUsedSpaceLazyFilewalkerWithConfig(ctx, logger, &FilewalkerCfg{config}) FParseErrWhitelist: cobra.FParseErrWhitelist{
} UnknownFlags: true,
},
Hidden: true,
Args: cobra.ExactArgs(0),
}
// NewUsedSpaceLazyFilewalkerWithConfig creates a new UsedSpaceLazyFileWalker instance with the given config. process.Bind(cmd, &cfg)
func NewUsedSpaceLazyFilewalkerWithConfig(ctx context.Context, logger *zap.Logger, config *FilewalkerCfg) *UsedSpaceLazyFileWalker {
return &UsedSpaceLazyFileWalker{ return &LazyFilewalkerCmd{
RunOptions: DefaultRunOpts(ctx, logger, config), Command: cmd,
RunOptions: &runOpts,
} }
} }
// Run runs the UsedSpaceLazyFileWalker. // Run runs the UsedSpaceLazyFileWalker.
func (u *UsedSpaceLazyFileWalker) Run() (err error) { func usedSpaceCmdRun(opts *RunOptions) (err error) {
if u.Config.LowerIOPriority { if opts.config.LowerIOPriority {
if runtime.GOOS == "linux" { if runtime.GOOS == "linux" {
// Pin the current goroutine to the current OS thread, so we can set the IO priority // Pin the current goroutine to the current OS thread, so we can set the IO priority
// for the current thread. // for the current thread.
@ -55,11 +64,11 @@ func (u *UsedSpaceLazyFileWalker) Run() (err error) {
return err return err
} }
} }
log := u.Logger log := opts.Logger
// Decode the data struct received from the main process // Decode the data struct received from the main process
var req lazyfilewalker.UsedSpaceRequest var req lazyfilewalker.UsedSpaceRequest
if err = json.NewDecoder(u.stdin).Decode(&req); err != nil { if err = json.NewDecoder(opts.stdin).Decode(&req); err != nil {
return errs.New("Error decoding data from stdin: %v", err) return errs.New("Error decoding data from stdin: %v", err)
} }
@ -69,7 +78,7 @@ func (u *UsedSpaceLazyFileWalker) Run() (err error) {
// We still need the DB in this case because we still have to deal with v0 pieces. // We still need the DB in this case because we still have to deal with v0 pieces.
// Once we drop support for v0 pieces, we can remove this. // Once we drop support for v0 pieces, we can remove this.
db, err := storagenodedb.OpenExisting(u.Ctx, log.Named("db"), u.Config.DatabaseConfig()) db, err := storagenodedb.OpenExisting(opts.Ctx, log.Named("db"), opts.config.DatabaseConfig())
if err != nil { if err != nil {
return errs.New("Error starting master database on storage node: %v", err) return errs.New("Error starting master database on storage node: %v", err)
} }
@ -81,7 +90,7 @@ func (u *UsedSpaceLazyFileWalker) Run() (err error) {
log.Info("used-space-filewalker started") log.Info("used-space-filewalker started")
filewalker := pieces.NewFileWalker(log, db.Pieces(), db.V0PieceInfo()) filewalker := pieces.NewFileWalker(log, db.Pieces(), db.V0PieceInfo())
total, contentSize, err := filewalker.WalkAndComputeSpaceUsedBySatellite(u.Ctx, req.SatelliteID) total, contentSize, err := filewalker.WalkAndComputeSpaceUsedBySatellite(opts.Ctx, req.SatelliteID)
if err != nil { if err != nil {
return err return err
} }
@ -90,32 +99,5 @@ func (u *UsedSpaceLazyFileWalker) Run() (err error) {
log.Info("used-space-filewalker completed", zap.Int64("piecesTotal", total), zap.Int64("piecesContentSize", contentSize)) log.Info("used-space-filewalker completed", zap.Int64("piecesTotal", total), zap.Int64("piecesContentSize", contentSize))
// encode the response struct and write it to stdout // encode the response struct and write it to stdout
return json.NewEncoder(u.stdout).Encode(resp) return json.NewEncoder(opts.stdout).Encode(resp)
}
// Start starts the GCLazyFileWalker, assuming it behaves like the Start method on exec.Cmd.
// This is a no-op and only exists to satisfy the execwrapper.Command interface.
// Wait must be called to actually run the command.
func (u *UsedSpaceLazyFileWalker) Start() error {
return nil
}
// Wait waits for the GCLazyFileWalker to finish, assuming it behaves like the Wait method on exec.Cmd.
func (u *UsedSpaceLazyFileWalker) Wait() error {
return u.Run()
}
// SetIn sets the stdin of the UsedSpaceLazyFileWalker.
func (u *UsedSpaceLazyFileWalker) SetIn(reader io.Reader) {
u.RunOptions.SetIn(reader)
}
// SetOut sets the stdout of the UsedSpaceLazyFileWalker.
func (u *UsedSpaceLazyFileWalker) SetOut(writer io.Writer) {
u.RunOptions.SetOut(writer)
}
// SetErr sets the stderr of the UsedSpaceLazyFileWalker.
func (u *UsedSpaceLazyFileWalker) SetErr(writer io.Writer) {
u.RunOptions.SetErr(writer)
} }

View File

@ -60,8 +60,8 @@ func newRootCmd(setDefaults bool) (*cobra.Command, *Factory) {
newGracefulExitInitCmd(factory), newGracefulExitInitCmd(factory),
newGracefulExitStatusCmd(factory), newGracefulExitStatusCmd(factory),
// internal hidden commands // internal hidden commands
internalcmd.NewUsedSpaceFilewalkerCmd(), internalcmd.NewUsedSpaceFilewalkerCmd().Command,
internalcmd.NewGCFilewalkerCmd(), internalcmd.NewGCFilewalkerCmd().Command,
) )
return cmd, factory return cmd, factory

View File

@ -311,7 +311,10 @@ func TestCacheServiceRun_LazyFilewalker(t *testing.T) {
lazyFwCfg := dbConfig.LazyFilewalkerConfig() lazyFwCfg := dbConfig.LazyFilewalkerConfig()
lazyFwCfg.LowerIOPriority = false lazyFwCfg.LowerIOPriority = false
lazyFw := lazyfilewalker.NewSupervisor(log, lazyFwCfg, "") lazyFw := lazyfilewalker.NewSupervisor(log, lazyFwCfg, "")
lazyFw.TestingSetUsedSpaceCmd(internalcmd.NewUsedSpaceLazyFilewalker(ctx, log.Named("used-space-filewalker.subprocess"), lazyFwCfg)) cmd := internalcmd.NewUsedSpaceFilewalkerCmd()
cmd.Logger = log.Named("used-space-filewalker")
cmd.Ctx = ctx
lazyFw.TestingSetUsedSpaceCmd(cmd)
// Now instantiate the cache // Now instantiate the cache
cache := pieces.NewBlobsUsageCache(log, store) cache := pieces.NewBlobsUsageCache(log, store)

View File

@ -24,4 +24,6 @@ type Command interface {
SetOut(io.Writer) SetOut(io.Writer)
// SetErr sets the stderr of the command. // SetErr sets the stderr of the command.
SetErr(io.Writer) SetErr(io.Writer)
// SetArgs sets arguments for the command including the command or executable path as the first argument.
SetArgs([]string)
} }

View File

@ -32,6 +32,11 @@ func (c *Cmd) SetErr(writer io.Writer) {
c.cmd.Stderr = writer c.cmd.Stderr = writer
} }
// SetArgs sets arguments for the command including the command or executable path as the first argument.
func (c *Cmd) SetArgs(args []string) {
c.cmd.Args = args
}
// CommandContext returns the Cmd struct to execute the named program with the given arguments. // CommandContext returns the Cmd struct to execute the named program with the given arguments.
func CommandContext(ctx context.Context, executable string, args ...string) *Cmd { func CommandContext(ctx context.Context, executable string, args ...string) *Cmd {
return &Cmd{ return &Cmd{

View File

@ -57,6 +57,9 @@ func (p *process) run(ctx context.Context, req, resp interface{}) (err error) {
if p.cmd == nil { if p.cmd == nil {
p.cmd = execwrapper.CommandContext(ctx, p.executable, p.args...) p.cmd = execwrapper.CommandContext(ctx, p.executable, p.args...)
} else {
args := append([]string{p.executable}, p.args...)
p.cmd.SetArgs(args)
} }
p.cmd.SetIn(&buf) p.cmd.SetIn(&buf)

View File

@ -211,7 +211,10 @@ func TestRetainPieces_lazyFilewalker(t *testing.T) {
lazyFwCfg := db.Config().LazyFilewalkerConfig() lazyFwCfg := db.Config().LazyFilewalkerConfig()
lazyFw := lazyfilewalker.NewSupervisor(log, lazyFwCfg, "") lazyFw := lazyfilewalker.NewSupervisor(log, lazyFwCfg, "")
lazyFw.TestingSetGCCmd(internalcmd.NewGCLazyFilewalker(ctx, log.Named("gc-filewalker.subprocess"), lazyFwCfg)) cmd := internalcmd.NewGCFilewalkerCmd()
cmd.Logger = log.Named("used-space-filewalker")
cmd.Ctx = ctx
lazyFw.TestingSetGCCmd(cmd)
store := pieces.NewStore(log, fw, lazyFw, blobs, v0PieceInfo, db.PieceExpirationDB(), db.PieceSpaceUsedDB(), cfg) store := pieces.NewStore(log, fw, lazyFw, blobs, v0PieceInfo, db.PieceExpirationDB(), db.PieceSpaceUsedDB(), cfg)
testStore := pieces.StoreForTest{Store: store} testStore := pieces.StoreForTest{Store: store}