storagenode: run used-space filewalker as a low IO subprocess
As part of fixing the IO priority of filewalker related processes such as the garbage collection and used-space calculation, this change allows the initial used-space calculation to run as a separate subprocess with lower IO priority. This can be enabled with the `--storage2.enable-lazy-filewalker` config item. It falls back to the old behaviour when the subprocess fails. Updates https://github.com/storj/storj/issues/5349 Change-Id: Ia6ee98ce912de3e89fc5ca670cf4a30be73b36a6
This commit is contained in:
parent
7dc2a5e20b
commit
f076238748
104
cmd/storagenode/cmd_lazy_filewalker.go
Normal file
104
cmd/storagenode/cmd_lazy_filewalker.go
Normal file
@ -0,0 +1,104 @@
|
||||
// Copyright (C) 2023 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"os"
|
||||
"runtime"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/private/process"
|
||||
"storj.io/storj/storagenode/iopriority"
|
||||
"storj.io/storj/storagenode/pieces"
|
||||
"storj.io/storj/storagenode/pieces/lazyfilewalker"
|
||||
"storj.io/storj/storagenode/storagenodedb"
|
||||
)
|
||||
|
||||
type filewalkerCfg struct {
|
||||
lazyfilewalker.Config
|
||||
}
|
||||
|
||||
// DatabaseConfig returns the storagenodedb.Config that should be used with this LazyFilewalkerConfig.
|
||||
func (config *filewalkerCfg) DatabaseConfig() storagenodedb.Config {
|
||||
return storagenodedb.Config{
|
||||
Storage: config.Storage,
|
||||
Info: config.Info,
|
||||
Info2: config.Info2,
|
||||
Pieces: config.Pieces,
|
||||
Filestore: config.Filestore,
|
||||
Driver: config.Driver,
|
||||
}
|
||||
}
|
||||
|
||||
const usedSpaceFilewalkerCmd = "used-space-filewalker"
|
||||
|
||||
func newUsedSpaceFilewalkerCmd() *cobra.Command {
|
||||
var cfg filewalkerCfg
|
||||
|
||||
cmd := &cobra.Command{
|
||||
Use: usedSpaceFilewalkerCmd,
|
||||
Short: "An internal subcommand used to run used-space calculation filewalker as a separate subprocess with lower IO priority",
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
return cmdUsedSpaceFilewalker(cmd, &cfg)
|
||||
},
|
||||
Hidden: true,
|
||||
Args: cobra.ExactArgs(0),
|
||||
}
|
||||
|
||||
process.Bind(cmd, &cfg)
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
func cmdUsedSpaceFilewalker(cmd *cobra.Command, cfg *filewalkerCfg) (err error) {
|
||||
if runtime.GOOS == "linux" {
|
||||
// Pin the current goroutine to the current OS thread, so we can set the IO priority
|
||||
// for the current thread.
|
||||
// This is necessary because Go does use CLONE_IO when creating new threads,
|
||||
// so they do not share a single IO context.
|
||||
runtime.LockOSThread()
|
||||
defer runtime.UnlockOSThread()
|
||||
}
|
||||
|
||||
err = iopriority.SetLowIOPriority()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx, _ := process.Ctx(cmd)
|
||||
log := zap.L().Named("used-space-filewalker")
|
||||
|
||||
// We still need the DB in this case because we still have to deal with v0 pieces.
|
||||
// Once we drop support for v0 pieces, we can remove this.
|
||||
db, err := storagenodedb.OpenExisting(ctx, log.Named("db"), cfg.DatabaseConfig())
|
||||
if err != nil {
|
||||
return errs.New("Error starting master database on storage node: %v", err)
|
||||
}
|
||||
log.Info("Database started")
|
||||
defer func() {
|
||||
err = errs.Combine(err, db.Close())
|
||||
}()
|
||||
|
||||
// Decode the data struct received from the main process
|
||||
var req lazyfilewalker.UsedSpaceRequest
|
||||
if err = json.NewDecoder(io.Reader(os.Stdin)).Decode(&req); err != nil {
|
||||
return errs.New("Error decoding data from stdin: %v", err)
|
||||
}
|
||||
|
||||
filewalker := pieces.NewFileWalker(log, db.Pieces(), db.V0PieceInfo())
|
||||
|
||||
total, contentSize, err := filewalker.WalkAndComputeSpaceUsedBySatellite(ctx, req.SatelliteID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp := lazyfilewalker.UsedSpaceResponse{PiecesTotal: total, PiecesContentSize: contentSize}
|
||||
|
||||
// encode the response struct and write it to stdout
|
||||
return json.NewEncoder(io.Writer(os.Stdout)).Encode(resp)
|
||||
}
|
@ -4,6 +4,8 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/private/process"
|
||||
@ -17,11 +19,22 @@ func main() {
|
||||
return
|
||||
}
|
||||
|
||||
rootCmd, _ := newRootCmd(true)
|
||||
allowDefaults := !isFilewalkerCommand()
|
||||
rootCmd, _ := newRootCmd(allowDefaults)
|
||||
|
||||
loggerFunc := func(logger *zap.Logger) *zap.Logger {
|
||||
return logger.With(zap.String("Process", rootCmd.Use))
|
||||
}
|
||||
|
||||
process.ExecWithCustomConfigAndLogger(rootCmd, false, process.LoadConfig, loggerFunc)
|
||||
process.ExecWithCustomOptions(rootCmd, process.ExecOptions{
|
||||
InitDefaultDebugServer: allowDefaults,
|
||||
InitTracing: allowDefaults,
|
||||
InitProfiler: allowDefaults,
|
||||
LoggerFactory: loggerFunc,
|
||||
LoadConfig: process.LoadConfig,
|
||||
})
|
||||
}
|
||||
|
||||
func isFilewalkerCommand() bool {
|
||||
return len(os.Args) > 1 && os.Args[1] == usedSpaceFilewalkerCmd
|
||||
}
|
||||
|
@ -58,6 +58,8 @@ func newRootCmd(setDefaults bool) (*cobra.Command, *Factory) {
|
||||
newIssueAPIKeyCmd(factory),
|
||||
newGracefulExitInitCmd(factory),
|
||||
newGracefulExitStatusCmd(factory),
|
||||
// internal hidden commands
|
||||
newUsedSpaceFilewalkerCmd(),
|
||||
)
|
||||
|
||||
return cmd, factory
|
||||
|
16
storagenode/iopriority/low_ioprio_darwin.go
Normal file
16
storagenode/iopriority/low_ioprio_darwin.go
Normal file
@ -0,0 +1,16 @@
|
||||
// Copyright (C) 2023 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package iopriority
|
||||
|
||||
// #include <sys/resource.h>
|
||||
import "C"
|
||||
|
||||
// SetLowIOPriority lowers the process I/O priority.
|
||||
func SetLowIOPriority() error {
|
||||
r1, err := C.setiopolicy_np(C.IOPOL_TYPE_DISK, C.IOPOL_SCOPE_PROCESS, C.IOPOL_THROTTLE)
|
||||
if r1 != 0 {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
29
storagenode/iopriority/low_ioprio_linux.go
Normal file
29
storagenode/iopriority/low_ioprio_linux.go
Normal file
@ -0,0 +1,29 @@
|
||||
// Copyright (C) 2023 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package iopriority
|
||||
|
||||
import (
|
||||
"syscall"
|
||||
)
|
||||
|
||||
// These constants come from the definitions in linux's ioprio.h.
|
||||
// See https://github.com/torvalds/linux/blob/master/include/uapi/linux/ioprio.h
|
||||
const (
|
||||
ioprioClassShift = uint32(13)
|
||||
ioprioPrioMask = (uint32(1) << ioprioClassShift) - 1
|
||||
|
||||
ioprioWhoProcess = 1
|
||||
ioprioClassIdle = 3
|
||||
)
|
||||
|
||||
// SetLowIOPriority lowers the process I/O priority.
|
||||
func SetLowIOPriority() error {
|
||||
// from the definition for the IOPRIO_PRIO_VALUE macro in Linux's ioprio.h
|
||||
ioprioPrioValue := ioprioClassIdle<<ioprioClassShift | (0 & ioprioPrioMask)
|
||||
_, _, err := syscall.Syscall(syscall.SYS_IOPRIO_SET, uintptr(ioprioWhoProcess), 0, uintptr(ioprioPrioValue))
|
||||
if err != 0 {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
17
storagenode/iopriority/low_ioprio_unix.go
Normal file
17
storagenode/iopriority/low_ioprio_unix.go
Normal file
@ -0,0 +1,17 @@
|
||||
// Copyright (C) 2023 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
//go:build !windows && !linux && !darwin
|
||||
// +build !windows,!linux,!darwin
|
||||
|
||||
package iopriority
|
||||
|
||||
import "syscall"
|
||||
|
||||
// SetLowIOPriority lowers the process I/O priority.
|
||||
func SetLowIOPriority() error {
|
||||
// This might not necessarily affect the process I/O priority as POSIX does not
|
||||
// mandate the concept of I/O priority. However, it is a "best effort" to lower
|
||||
// the process I/O priority.
|
||||
return syscall.Setpriority(syscall.PRIO_PROCESS, 0, 9)
|
||||
}
|
13
storagenode/iopriority/low_ioprio_windows.go
Normal file
13
storagenode/iopriority/low_ioprio_windows.go
Normal file
@ -0,0 +1,13 @@
|
||||
// Copyright (C) 2023 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package iopriority
|
||||
|
||||
import (
|
||||
"golang.org/x/sys/windows"
|
||||
)
|
||||
|
||||
// SetLowIOPriority lowers the process I/O priority.
|
||||
func SetLowIOPriority() (err error) {
|
||||
return windows.SetPriorityClass(windows.CurrentProcess(), windows.PROCESS_MODE_BACKGROUND_BEGIN)
|
||||
}
|
@ -51,6 +51,7 @@ import (
|
||||
"storj.io/storj/storagenode/payouts"
|
||||
"storj.io/storj/storagenode/payouts/estimatedpayouts"
|
||||
"storj.io/storj/storagenode/pieces"
|
||||
"storj.io/storj/storagenode/pieces/lazyfilewalker"
|
||||
"storj.io/storj/storagenode/piecestore"
|
||||
"storj.io/storj/storagenode/piecestore/usedserials"
|
||||
"storj.io/storj/storagenode/piecetransfer"
|
||||
@ -242,18 +243,19 @@ type Peer struct {
|
||||
|
||||
Storage2 struct {
|
||||
// TODO: lift things outside of it to organize better
|
||||
Trust *trust.Pool
|
||||
Store *pieces.Store
|
||||
TrashChore *pieces.TrashChore
|
||||
BlobsCache *pieces.BlobsUsageCache
|
||||
CacheService *pieces.CacheService
|
||||
RetainService *retain.Service
|
||||
PieceDeleter *pieces.Deleter
|
||||
Endpoint *piecestore.Endpoint
|
||||
Inspector *inspector.Endpoint
|
||||
Monitor *monitor.Service
|
||||
Orders *orders.Service
|
||||
FileWalker *pieces.FileWalker
|
||||
Trust *trust.Pool
|
||||
Store *pieces.Store
|
||||
TrashChore *pieces.TrashChore
|
||||
BlobsCache *pieces.BlobsUsageCache
|
||||
CacheService *pieces.CacheService
|
||||
RetainService *retain.Service
|
||||
PieceDeleter *pieces.Deleter
|
||||
Endpoint *piecestore.Endpoint
|
||||
Inspector *inspector.Endpoint
|
||||
Monitor *monitor.Service
|
||||
Orders *orders.Service
|
||||
FileWalker *pieces.FileWalker
|
||||
LazyFileWalker *lazyfilewalker.Supervisor
|
||||
}
|
||||
|
||||
Collector *collector.Service
|
||||
@ -458,8 +460,27 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
|
||||
peer.Storage2.BlobsCache = pieces.NewBlobsUsageCache(peer.Log.Named("blobscache"), peer.DB.Pieces())
|
||||
peer.Storage2.FileWalker = pieces.NewFileWalker(peer.Log.Named("filewalker"), peer.Storage2.BlobsCache, peer.DB.V0PieceInfo())
|
||||
|
||||
if config.Pieces.EnableLazyFilewalker {
|
||||
executable, err := os.Executable()
|
||||
if err != nil {
|
||||
return nil, errs.Combine(err, peer.Close())
|
||||
}
|
||||
|
||||
dbCfg := config.DatabaseConfig()
|
||||
lazyfilewalkerCfg := lazyfilewalker.Config{
|
||||
Storage: dbCfg.Storage,
|
||||
Info: dbCfg.Info,
|
||||
Info2: dbCfg.Info2,
|
||||
Pieces: dbCfg.Pieces,
|
||||
Filestore: dbCfg.Filestore,
|
||||
Driver: dbCfg.Driver,
|
||||
}
|
||||
peer.Storage2.LazyFileWalker = lazyfilewalker.NewSupervisor(peer.Log.Named("lazyfilewalker"), executable, lazyfilewalkerCfg.Args())
|
||||
}
|
||||
|
||||
peer.Storage2.Store = pieces.NewStore(peer.Log.Named("pieces"),
|
||||
peer.Storage2.FileWalker,
|
||||
peer.Storage2.LazyFileWalker,
|
||||
peer.Storage2.BlobsCache,
|
||||
peer.DB.V0PieceInfo(),
|
||||
peer.DB.PieceExpirationDB(),
|
||||
|
@ -113,7 +113,7 @@ func TestCacheInit(t *testing.T) {
|
||||
cache := pieces.NewBlobsUsageCacheTest(log, nil, 0, 0, 0, nil)
|
||||
cacheService := pieces.NewService(log,
|
||||
cache,
|
||||
pieces.NewStore(log, pieces.NewFileWalker(log, cache, nil), cache, nil, nil, spaceUsedDB, pieces.DefaultConfig),
|
||||
pieces.NewStore(log, pieces.NewFileWalker(log, cache, nil), nil, cache, nil, nil, spaceUsedDB, pieces.DefaultConfig),
|
||||
1*time.Hour,
|
||||
true,
|
||||
)
|
||||
@ -152,7 +152,7 @@ func TestCacheInit(t *testing.T) {
|
||||
cache = pieces.NewBlobsUsageCacheTest(log, nil, expectedPiecesTotal, expectedPiecesContentSize, expectedTrash, expectedTotalBySA)
|
||||
cacheService = pieces.NewService(log,
|
||||
cache,
|
||||
pieces.NewStore(log, pieces.NewFileWalker(log, cache, nil), cache, nil, nil, spaceUsedDB, pieces.DefaultConfig),
|
||||
pieces.NewStore(log, pieces.NewFileWalker(log, cache, nil), nil, cache, nil, nil, spaceUsedDB, pieces.DefaultConfig),
|
||||
1*time.Hour,
|
||||
true,
|
||||
)
|
||||
@ -163,7 +163,7 @@ func TestCacheInit(t *testing.T) {
|
||||
cache = pieces.NewBlobsUsageCacheTest(log, nil, 0, 0, 0, nil)
|
||||
cacheService = pieces.NewService(log,
|
||||
cache,
|
||||
pieces.NewStore(log, pieces.NewFileWalker(log, cache, nil), cache, nil, nil, spaceUsedDB, pieces.DefaultConfig),
|
||||
pieces.NewStore(log, pieces.NewFileWalker(log, cache, nil), nil, cache, nil, nil, spaceUsedDB, pieces.DefaultConfig),
|
||||
1*time.Hour,
|
||||
true,
|
||||
)
|
||||
@ -227,7 +227,7 @@ func TestCachServiceRun(t *testing.T) {
|
||||
cache := pieces.NewBlobsUsageCache(log, store)
|
||||
cacheService := pieces.NewService(log,
|
||||
cache,
|
||||
pieces.NewStore(log, pieces.NewFileWalker(log, cache, nil), cache, nil, nil, spaceUsedDB, pieces.DefaultConfig),
|
||||
pieces.NewStore(log, pieces.NewFileWalker(log, cache, nil), nil, cache, nil, nil, spaceUsedDB, pieces.DefaultConfig),
|
||||
1*time.Hour,
|
||||
true,
|
||||
)
|
||||
@ -314,7 +314,7 @@ func TestPersistCacheTotals(t *testing.T) {
|
||||
cache := pieces.NewBlobsUsageCacheTest(log, nil, expectedPiecesTotal, expectedPiecesContentSize, expectedTrash, expectedTotalsBySA)
|
||||
cacheService := pieces.NewService(log,
|
||||
cache,
|
||||
pieces.NewStore(log, pieces.NewFileWalker(log, cache, nil), cache, nil, nil, spaceUsedDB, pieces.DefaultConfig),
|
||||
pieces.NewStore(log, pieces.NewFileWalker(log, cache, nil), nil, cache, nil, nil, spaceUsedDB, pieces.DefaultConfig),
|
||||
1*time.Hour,
|
||||
true,
|
||||
)
|
||||
|
@ -52,7 +52,7 @@ func TestDeleter(t *testing.T) {
|
||||
WritePreallocSize: 4 * memory.MiB,
|
||||
DeleteToTrash: testCase.deleteToTrash,
|
||||
}
|
||||
store := pieces.NewStore(log, pieces.NewFileWalker(log, blobs, v0PieceInfo), blobs, v0PieceInfo, db.PieceExpirationDB(), nil, conf)
|
||||
store := pieces.NewStore(log, pieces.NewFileWalker(log, blobs, v0PieceInfo), nil, blobs, v0PieceInfo, db.PieceExpirationDB(), nil, conf)
|
||||
deleter := pieces.NewDeleter(log, store, 1, 10000)
|
||||
defer ctx.Check(deleter.Close)
|
||||
deleter.SetupTest()
|
||||
|
@ -5,6 +5,7 @@ package pieces
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
@ -61,3 +62,21 @@ func (fw *FileWalker) WalkSatellitePieces(ctx context.Context, satellite storj.N
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// WalkAndComputeSpaceUsedBySatellite walks over all pieces for a given satellite, adds up and returns the total space used.
|
||||
func (fw *FileWalker) WalkAndComputeSpaceUsedBySatellite(ctx context.Context, satelliteID storj.NodeID) (satPiecesTotal int64, satPiecesContentSize int64, err error) {
|
||||
err = fw.WalkSatellitePieces(ctx, satelliteID, func(access StoredPieceAccess) error {
|
||||
pieceTotal, pieceContentSize, err := access.Size(ctx)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
satPiecesTotal += pieceTotal
|
||||
satPiecesContentSize += pieceContentSize
|
||||
return nil
|
||||
})
|
||||
|
||||
return satPiecesTotal, satPiecesContentSize, err
|
||||
}
|
||||
|
37
storagenode/pieces/lazyfilewalker/config.go
Normal file
37
storagenode/pieces/lazyfilewalker/config.go
Normal file
@ -0,0 +1,37 @@
|
||||
// Copyright (C) 2023 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package lazyfilewalker
|
||||
|
||||
import (
|
||||
"storj.io/storj/storagenode/blobstore/filestore"
|
||||
)
|
||||
|
||||
// Config is the config for lazyfilewalker process.
|
||||
type Config struct {
|
||||
// TODO: just trying to match the names in storagenodedb.Config. Change these to be more descriptive.
|
||||
Storage string `help:"path to the storage database directory"`
|
||||
Info string `help:"path to the piecestore db"`
|
||||
Info2 string `help:"path to the info database"`
|
||||
Driver string `help:"database driver to use" default:"sqlite3"`
|
||||
Pieces string `help:"path to store pieces in"`
|
||||
Filestore filestore.Config
|
||||
}
|
||||
|
||||
// Args returns the flags to be passed lazyfilewalker process.
|
||||
func (config *Config) Args() []string {
|
||||
// TODO: of course, we shouldn't hardcode this.
|
||||
return []string{
|
||||
"--storage", config.Storage,
|
||||
"--info", config.Info,
|
||||
"--info2", config.Info2,
|
||||
"--pieces", config.Pieces,
|
||||
"--driver", config.Driver,
|
||||
"--filestore.write-buffer-size", config.Filestore.WriteBufferSize.String(),
|
||||
// set log output to stderr, so it doesn't interfere with the output of the command
|
||||
"--log.output", "stderr",
|
||||
// use the json formatter in the subprocess, so we could read lines and re-log them in the main process
|
||||
// with all the fields intact.
|
||||
"--log.encoding", "json",
|
||||
}
|
||||
}
|
110
storagenode/pieces/lazyfilewalker/lazyfilewalker.go
Normal file
110
storagenode/pieces/lazyfilewalker/lazyfilewalker.go
Normal file
@ -0,0 +1,110 @@
|
||||
// Copyright (C) 2023 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package lazyfilewalker
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
|
||||
"github.com/spacemonkeygo/monkit/v3"
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sys/execabs"
|
||||
|
||||
"storj.io/common/storj"
|
||||
)
|
||||
|
||||
var (
|
||||
errLazyFilewalker = errs.Class("lazyfilewalker")
|
||||
|
||||
mon = monkit.Package()
|
||||
)
|
||||
|
||||
// Supervisor performs filewalker operations in a subprocess with lower I/O priority.
|
||||
type Supervisor struct {
|
||||
log *zap.Logger
|
||||
|
||||
executable string
|
||||
gcArgs []string
|
||||
usedSpaceArgs []string
|
||||
}
|
||||
|
||||
// NewSupervisor creates a new lazy filewalker Supervisor.
|
||||
func NewSupervisor(log *zap.Logger, executable string, args []string) *Supervisor {
|
||||
return &Supervisor{
|
||||
log: log,
|
||||
gcArgs: append([]string{"gc-filewalker"}, args...),
|
||||
usedSpaceArgs: append([]string{"used-space-filewalker"}, args...),
|
||||
executable: executable,
|
||||
}
|
||||
}
|
||||
|
||||
// UsedSpaceRequest is the request struct for the used-space-filewalker process.
|
||||
type UsedSpaceRequest struct {
|
||||
SatelliteID storj.NodeID `json:"satelliteID"`
|
||||
}
|
||||
|
||||
// UsedSpaceResponse is the response struct for the used-space-filewalker process.
|
||||
type UsedSpaceResponse struct {
|
||||
PiecesTotal int64 `json:"piecesTotal"`
|
||||
PiecesContentSize int64 `json:"piecesContentSize"`
|
||||
}
|
||||
|
||||
// WalkAndComputeSpaceUsedBySatellite returns the total used space by satellite.
|
||||
func (fw *Supervisor) WalkAndComputeSpaceUsedBySatellite(ctx context.Context, satelliteID storj.NodeID) (piecesTotal int64, piecesContentSize int64, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
req := UsedSpaceRequest{
|
||||
SatelliteID: satelliteID,
|
||||
}
|
||||
|
||||
fw.log.Info("starting subprocess", zap.String("satelliteID", satelliteID.String()))
|
||||
cmd := execabs.CommandContext(ctx, fw.executable, fw.usedSpaceArgs...)
|
||||
|
||||
var buf, outbuf bytes.Buffer
|
||||
writer := &zapWrapper{fw.log.Named("subprocess")}
|
||||
|
||||
// encode the struct and write it to the buffer
|
||||
enc := json.NewEncoder(&buf)
|
||||
if err := enc.Encode(req); err != nil {
|
||||
return 0, 0, errLazyFilewalker.Wrap(err)
|
||||
}
|
||||
cmd.Stdin = &buf
|
||||
cmd.Stdout = &outbuf
|
||||
cmd.Stderr = writer
|
||||
|
||||
if err := cmd.Start(); err != nil {
|
||||
fw.log.Error("failed to start subprocess", zap.Error(err))
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
fw.log.Info("subprocess started", zap.String("satelliteID", satelliteID.String()))
|
||||
|
||||
if err := cmd.Wait(); err != nil {
|
||||
var exitErr *execabs.ExitError
|
||||
if errors.As(err, &exitErr) {
|
||||
fw.log.Info("subprocess exited with status", zap.Int("status", exitErr.ExitCode()), zap.Error(exitErr), zap.String("satelliteID", satelliteID.String()))
|
||||
} else {
|
||||
fw.log.Error("subprocess exited with error", zap.Error(err), zap.String("satelliteID", satelliteID.String()))
|
||||
}
|
||||
return 0, 0, errLazyFilewalker.Wrap(err)
|
||||
}
|
||||
|
||||
fw.log.Info("subprocess finished successfully", zap.String("satelliteID", satelliteID.String()), zap.Int64("piecesTotal", piecesTotal), zap.Int64("piecesContentSize", piecesContentSize))
|
||||
|
||||
// Decode and receive the response data struct from the subprocess
|
||||
var resp UsedSpaceResponse
|
||||
decoder := json.NewDecoder(&outbuf)
|
||||
if err := decoder.Decode(&resp); err != nil {
|
||||
fw.log.Error("failed to decode response from subprocess", zap.String("satelliteID", satelliteID.String()), zap.Error(err))
|
||||
return 0, 0, errLazyFilewalker.Wrap(err)
|
||||
}
|
||||
|
||||
return resp.PiecesTotal, resp.PiecesContentSize, nil
|
||||
}
|
113
storagenode/pieces/lazyfilewalker/zapwrapper.go
Normal file
113
storagenode/pieces/lazyfilewalker/zapwrapper.go
Normal file
@ -0,0 +1,113 @@
|
||||
// Copyright (C) 2023 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package lazyfilewalker
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
)
|
||||
|
||||
type zapWrapper struct {
|
||||
Log *zap.Logger
|
||||
}
|
||||
|
||||
// Write writes the provided bytes to the underlying logger
|
||||
// returns the length of the bytes.
|
||||
//
|
||||
// Write will split the input on newlines, parse, and post each line as a new log entry
|
||||
// to the logger.
|
||||
func (w *zapWrapper) Write(p []byte) (n int, err error) {
|
||||
n = len(p)
|
||||
for len(p) > 0 {
|
||||
p, err = w.writeLine(p)
|
||||
if err != nil {
|
||||
return n, err
|
||||
}
|
||||
}
|
||||
|
||||
return n, nil
|
||||
}
|
||||
|
||||
type zapLogger struct {
|
||||
Caller string `json:"C"`
|
||||
Level zapcore.Level `json:"L"`
|
||||
Message string `json:"M"`
|
||||
Stack string `json:"S"`
|
||||
Name string `json:"N"`
|
||||
|
||||
LogMap map[string]interface{} `json:"-"`
|
||||
}
|
||||
|
||||
// writeLine writes a single line from the input and returns the remaining bytes.
|
||||
func (w *zapWrapper) writeLine(b []byte) (remaining []byte, err error) {
|
||||
idx := bytes.IndexByte(b, '\n')
|
||||
if idx < 0 {
|
||||
// If there are no newlines, log the entire string.
|
||||
return nil, w.log(b)
|
||||
}
|
||||
// Split on the newline, log the left.
|
||||
b, remaining = b[:idx], b[idx+1:]
|
||||
|
||||
return remaining, w.log(b)
|
||||
}
|
||||
|
||||
func (w *zapWrapper) log(b []byte) error {
|
||||
logger := zapLogger{}
|
||||
if err := json.Unmarshal(b, &logger); err != nil {
|
||||
return err
|
||||
}
|
||||
// parse the unknown fields into a map
|
||||
if err := json.Unmarshal(b, &logger.LogMap); err != nil {
|
||||
return err
|
||||
}
|
||||
// remove the known fields that are already parsed from the map
|
||||
delete(logger.LogMap, "C")
|
||||
delete(logger.LogMap, "L")
|
||||
delete(logger.LogMap, "M")
|
||||
delete(logger.LogMap, "S")
|
||||
delete(logger.LogMap, "N")
|
||||
|
||||
log := w.Log.Named(logger.Name)
|
||||
if ce := log.Check(logger.Level, logger.Message); ce != nil {
|
||||
if logger.Stack != "" {
|
||||
ce.Stack = logger.Stack
|
||||
}
|
||||
if caller := newEntryCaller(logger.Caller); caller != nil {
|
||||
ce.Caller = *caller
|
||||
}
|
||||
|
||||
var fields []zapcore.Field
|
||||
for key, val := range logger.LogMap {
|
||||
fields = append(fields, zap.Any(key, val))
|
||||
}
|
||||
|
||||
ce.Write(fields...)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func newEntryCaller(caller string) *zapcore.EntryCaller {
|
||||
if caller == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
idx := strings.IndexByte(caller, ':')
|
||||
if idx <= 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
file, line := caller[:idx], caller[idx+1:]
|
||||
lineNum, err := strconv.Atoi(line)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
entryCaller := zapcore.NewEntryCaller(0, file, lineNum, true)
|
||||
return &entryCaller
|
||||
}
|
@ -32,8 +32,7 @@ func BenchmarkReadWrite(b *testing.B) {
|
||||
require.NoError(b, err)
|
||||
blobs := filestore.New(zap.NewNop(), dir, filestore.DefaultConfig)
|
||||
defer ctx.Check(blobs.Close)
|
||||
|
||||
store := pieces.NewStore(zap.NewNop(), pieces.NewFileWalker(zap.NewNop(), blobs, nil), blobs, nil, nil, nil, pieces.DefaultConfig)
|
||||
store := pieces.NewStore(zap.NewNop(), pieces.NewFileWalker(zap.NewNop(), blobs, nil), nil, blobs, nil, nil, nil, pieces.DefaultConfig)
|
||||
|
||||
// setup test parameters
|
||||
const blockSize = int(256 * memory.KiB)
|
||||
@ -101,7 +100,7 @@ func readAndWritePiece(t *testing.T, content []byte) {
|
||||
blobs := filestore.New(log, dir, filestore.DefaultConfig)
|
||||
defer ctx.Check(blobs.Close)
|
||||
|
||||
store := pieces.NewStore(log, pieces.NewFileWalker(log, blobs, nil), blobs, nil, nil, nil, pieces.DefaultConfig)
|
||||
store := pieces.NewStore(log, pieces.NewFileWalker(log, blobs, nil), nil, blobs, nil, nil, nil, pieces.DefaultConfig)
|
||||
|
||||
// test parameters
|
||||
satelliteID := testrand.NodeID()
|
||||
|
@ -19,6 +19,7 @@ import (
|
||||
"storj.io/common/storj"
|
||||
"storj.io/storj/storagenode/blobstore"
|
||||
"storj.io/storj/storagenode/blobstore/filestore"
|
||||
"storj.io/storj/storagenode/pieces/lazyfilewalker"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -157,6 +158,9 @@ type SatelliteUsage struct {
|
||||
type Config struct {
|
||||
WritePreallocSize memory.Size `help:"file preallocated for uploading" default:"4MiB"`
|
||||
DeleteToTrash bool `help:"move pieces to trash upon deletion. Warning: if set to false, you risk disqualification for failed audits if a satellite database is restored from backup." default:"true"`
|
||||
// TODO(clement): default is set to false for now.
|
||||
// I will test and monitor on my node for some time before changing the default to true.
|
||||
EnableLazyFilewalker bool `help:"run garbage collection and used-space calculation filewalkers as a separate subprocess with lower IO priority" releaseDefault:"false" devDefault:"true" testDefault:"false"`
|
||||
}
|
||||
|
||||
// DefaultConfig is the default value for the Config.
|
||||
@ -176,7 +180,8 @@ type Store struct {
|
||||
spaceUsedDB PieceSpaceUsedDB
|
||||
v0PieceInfo V0PieceInfoDB
|
||||
|
||||
Filewalker *FileWalker
|
||||
Filewalker *FileWalker
|
||||
lazyFilewalker *lazyfilewalker.Supervisor
|
||||
}
|
||||
|
||||
// StoreForTest is a wrapper around Store to be used only in test scenarios. It enables writing
|
||||
@ -186,7 +191,7 @@ type StoreForTest struct {
|
||||
}
|
||||
|
||||
// NewStore creates a new piece store.
|
||||
func NewStore(log *zap.Logger, fw *FileWalker, blobs blobstore.Blobs, v0PieceInfo V0PieceInfoDB, expirationInfo PieceExpirationDB, spaceUsedDB PieceSpaceUsedDB, config Config) *Store {
|
||||
func NewStore(log *zap.Logger, fw *FileWalker, lazyFilewalker *lazyfilewalker.Supervisor, blobs blobstore.Blobs, v0PieceInfo V0PieceInfoDB, expirationInfo PieceExpirationDB, spaceUsedDB PieceSpaceUsedDB, config Config) *Store {
|
||||
return &Store{
|
||||
log: log,
|
||||
config: config,
|
||||
@ -195,6 +200,7 @@ func NewStore(log *zap.Logger, fw *FileWalker, blobs blobstore.Blobs, v0PieceInf
|
||||
spaceUsedDB: spaceUsedDB,
|
||||
v0PieceInfo: v0PieceInfo,
|
||||
Filewalker: fw,
|
||||
lazyFilewalker: lazyFilewalker,
|
||||
}
|
||||
}
|
||||
|
||||
@ -676,18 +682,20 @@ func (store *Store) SpaceUsedTotalAndBySatellite(ctx context.Context) (piecesTot
|
||||
var satPiecesTotal int64
|
||||
var satPiecesContentSize int64
|
||||
|
||||
err := store.WalkSatellitePieces(ctx, satelliteID, func(access StoredPieceAccess) error {
|
||||
pieceTotal, pieceContentSize, err := access.Size(ctx)
|
||||
failover := true
|
||||
if store.config.EnableLazyFilewalker && store.lazyFilewalker != nil {
|
||||
satPiecesTotal, satPiecesContentSize, err = store.lazyFilewalker.WalkAndComputeSpaceUsedBySatellite(ctx, satelliteID)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
store.log.Error("failed to lazywalk space used by satellite", zap.Error(err), zap.Stringer("Satellite ID", satelliteID))
|
||||
} else {
|
||||
failover = false
|
||||
}
|
||||
satPiecesTotal += pieceTotal
|
||||
satPiecesContentSize += pieceContentSize
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
if failover {
|
||||
satPiecesTotal, satPiecesContentSize, err = store.Filewalker.WalkAndComputeSpaceUsedBySatellite(ctx, satelliteID)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
group.Add(err)
|
||||
}
|
||||
|
@ -47,7 +47,7 @@ func TestPieces(t *testing.T) {
|
||||
defer ctx.Check(blobs.Close)
|
||||
|
||||
fw := pieces.NewFileWalker(log, blobs, nil)
|
||||
store := pieces.NewStore(log, fw, blobs, nil, nil, nil, pieces.DefaultConfig)
|
||||
store := pieces.NewStore(log, fw, nil, blobs, nil, nil, nil, pieces.DefaultConfig)
|
||||
|
||||
satelliteID := testidentity.MustPregeneratedSignedIdentity(0, storj.LatestIDVersion()).ID
|
||||
pieceID := storj.NewPieceID()
|
||||
@ -326,7 +326,7 @@ func TestTrashAndRestore(t *testing.T) {
|
||||
require.True(t, ok, "V0PieceInfoDB can not satisfy V0PieceInfoDBForTest")
|
||||
|
||||
fw := pieces.NewFileWalker(log, blobs, v0PieceInfo)
|
||||
store := pieces.NewStore(log, fw, blobs, v0PieceInfo, db.PieceExpirationDB(), nil, pieces.DefaultConfig)
|
||||
store := pieces.NewStore(log, fw, nil, blobs, v0PieceInfo, db.PieceExpirationDB(), nil, pieces.DefaultConfig)
|
||||
tStore := &pieces.StoreForTest{store}
|
||||
|
||||
var satelliteURLs []trust.SatelliteURL
|
||||
@ -566,7 +566,7 @@ func TestPieceVersionMigrate(t *testing.T) {
|
||||
defer ctx.Check(blobs.Close)
|
||||
|
||||
fw := pieces.NewFileWalker(log, blobs, v0PieceInfo)
|
||||
store := pieces.NewStore(log, fw, blobs, v0PieceInfo, nil, nil, pieces.DefaultConfig)
|
||||
store := pieces.NewStore(log, fw, nil, blobs, v0PieceInfo, nil, nil, pieces.DefaultConfig)
|
||||
|
||||
// write as a v0 piece
|
||||
tStore := &pieces.StoreForTest{store}
|
||||
@ -652,7 +652,7 @@ func TestMultipleStorageFormatVersions(t *testing.T) {
|
||||
defer ctx.Check(blobs.Close)
|
||||
|
||||
fw := pieces.NewFileWalker(log, blobs, nil)
|
||||
store := pieces.NewStore(log, fw, blobs, nil, nil, nil, pieces.DefaultConfig)
|
||||
store := pieces.NewStore(log, fw, nil, blobs, nil, nil, nil, pieces.DefaultConfig)
|
||||
tStore := &pieces.StoreForTest{store}
|
||||
|
||||
const pieceSize = 1024
|
||||
@ -709,7 +709,7 @@ func TestGetExpired(t *testing.T) {
|
||||
|
||||
blobs := db.Pieces()
|
||||
fw := pieces.NewFileWalker(log, blobs, v0PieceInfo)
|
||||
store := pieces.NewStore(log, fw, blobs, v0PieceInfo, expirationInfo, db.PieceSpaceUsedDB(), pieces.DefaultConfig)
|
||||
store := pieces.NewStore(log, fw, nil, blobs, v0PieceInfo, expirationInfo, db.PieceSpaceUsedDB(), pieces.DefaultConfig)
|
||||
|
||||
now := time.Now()
|
||||
testDates := []struct {
|
||||
@ -780,7 +780,7 @@ func TestOverwriteV0WithV1(t *testing.T) {
|
||||
|
||||
blobs := db.Pieces()
|
||||
fw := pieces.NewFileWalker(log, blobs, v0PieceInfo)
|
||||
store := pieces.NewStore(log, fw, blobs, v0PieceInfo, expirationInfo, db.PieceSpaceUsedDB(), pieces.DefaultConfig)
|
||||
store := pieces.NewStore(log, fw, nil, blobs, v0PieceInfo, expirationInfo, db.PieceSpaceUsedDB(), pieces.DefaultConfig)
|
||||
|
||||
satelliteID := testrand.NodeID()
|
||||
pieceID := testrand.PieceID()
|
||||
|
@ -35,7 +35,7 @@ func TestRetainPieces(t *testing.T) {
|
||||
blobs := db.Pieces()
|
||||
v0PieceInfo := db.V0PieceInfo()
|
||||
fw := pieces.NewFileWalker(log, blobs, v0PieceInfo)
|
||||
store := pieces.NewStore(log, fw, blobs, v0PieceInfo, db.PieceExpirationDB(), db.PieceSpaceUsedDB(), pieces.DefaultConfig)
|
||||
store := pieces.NewStore(log, fw, nil, blobs, v0PieceInfo, db.PieceExpirationDB(), db.PieceSpaceUsedDB(), pieces.DefaultConfig)
|
||||
testStore := pieces.StoreForTest{Store: store}
|
||||
|
||||
const numPieces = 100
|
||||
|
Loading…
Reference in New Issue
Block a user