diff --git a/cmd/storagenode/cmd_lazy_filewalker.go b/cmd/storagenode/cmd_lazy_filewalker.go index b3d9c8ae9..a0b788ec6 100644 --- a/cmd/storagenode/cmd_lazy_filewalker.go +++ b/cmd/storagenode/cmd_lazy_filewalker.go @@ -13,6 +13,7 @@ import ( "github.com/zeebo/errs" "go.uber.org/zap" + "storj.io/common/bloomfilter" "storj.io/private/process" "storj.io/storj/storagenode/iopriority" "storj.io/storj/storagenode/pieces" @@ -36,13 +37,11 @@ func (config *filewalkerCfg) DatabaseConfig() storagenodedb.Config { } } -const usedSpaceFilewalkerCmd = "used-space-filewalker" - func newUsedSpaceFilewalkerCmd() *cobra.Command { var cfg filewalkerCfg cmd := &cobra.Command{ - Use: usedSpaceFilewalkerCmd, + Use: lazyfilewalker.UsedSpaceFilewalkerCmdName, Short: "An internal subcommand used to run used-space calculation filewalker as a separate subprocess with lower IO priority", RunE: func(cmd *cobra.Command, args []string) error { return cmdUsedSpaceFilewalker(cmd, &cfg) @@ -56,6 +55,24 @@ func newUsedSpaceFilewalkerCmd() *cobra.Command { return cmd } +func newGCFilewalkerCmd() *cobra.Command { + var cfg filewalkerCfg + + cmd := &cobra.Command{ + Use: lazyfilewalker.GCFilewalkerCmdName, + Short: "An internal subcommand used to run garbage collection filewalker as a separate subprocess with lower IO priority", + RunE: func(cmd *cobra.Command, args []string) error { + return cmdGCFilewalker(cmd, &cfg) + }, + Hidden: true, + Args: cobra.ExactArgs(0), + } + + process.Bind(cmd, &cfg) + + return cmd +} + func cmdUsedSpaceFilewalker(cmd *cobra.Command, cfg *filewalkerCfg) (err error) { if runtime.GOOS == "linux" { // Pin the current goroutine to the current OS thread, so we can set the IO priority @@ -72,7 +89,7 @@ func cmdUsedSpaceFilewalker(cmd *cobra.Command, cfg *filewalkerCfg) (err error) } ctx, _ := process.Ctx(cmd) - log := zap.L().Named("used-space-filewalker") + log := zap.L() // We still need the DB in this case because we still have to deal with v0 pieces. // Once we drop support for v0 pieces, we can remove this. @@ -91,6 +108,10 @@ func cmdUsedSpaceFilewalker(cmd *cobra.Command, cfg *filewalkerCfg) (err error) return errs.New("Error decoding data from stdin: %v", err) } + if req.SatelliteID.IsZero() { + return errs.New("SatelliteID is required") + } + filewalker := pieces.NewFileWalker(log, db.Pieces(), db.V0PieceInfo()) total, contentSize, err := filewalker.WalkAndComputeSpaceUsedBySatellite(ctx, req.SatelliteID) @@ -102,3 +123,68 @@ func cmdUsedSpaceFilewalker(cmd *cobra.Command, cfg *filewalkerCfg) (err error) // encode the response struct and write it to stdout return json.NewEncoder(io.Writer(os.Stdout)).Encode(resp) } + +func cmdGCFilewalker(cmd *cobra.Command, cfg *filewalkerCfg) (err error) { + if runtime.GOOS == "linux" { + // Pin the current goroutine to the current OS thread, so we can set the IO priority + // for the current thread. + // This is necessary because Go does use CLONE_IO when creating new threads, + // so they do not share a single IO context. + runtime.LockOSThread() + defer runtime.UnlockOSThread() + } + + err = iopriority.SetLowIOPriority() + if err != nil { + return err + } + + ctx, _ := process.Ctx(cmd) + log := zap.L() + + // We still need the DB in this case because we still have to deal with v0 pieces. + // Once we drop support for v0 pieces, we can remove this. + db, err := storagenodedb.OpenExisting(ctx, log.Named("db"), cfg.DatabaseConfig()) + if err != nil { + return errs.New("Error starting master database on storage node: %v", err) + } + log.Info("Database started") + defer func() { + err = errs.Combine(err, db.Close()) + }() + + // Decode the data struct received from the main process + var req lazyfilewalker.GCFilewalkerRequest + if err = json.NewDecoder(io.Reader(os.Stdin)).Decode(&req); err != nil { + return errs.New("Error decoding data from stdin: %v", err) + } + + // Validate the request data + switch { + case req.SatelliteID.IsZero(): + return errs.New("SatelliteID is required") + case req.CreatedBefore.IsZero(): + return errs.New("CreatedBefore is required") + } + + // Decode the bloom filter + filter, err := bloomfilter.NewFromBytes(req.BloomFilter) + if err != nil { + return err + } + + filewalker := pieces.NewFileWalker(log, db.Pieces(), db.V0PieceInfo()) + pieceIDs, piecesCount, piecesSkippedCount, err := filewalker.WalkSatellitePiecesToTrash(ctx, req.SatelliteID, req.CreatedBefore, filter) + if err != nil { + return err + } + + resp := lazyfilewalker.GCFilewalkerResponse{ + PieceIDs: pieceIDs, + PiecesCount: piecesCount, + PiecesSkippedCount: piecesSkippedCount, + } + + // encode the response struct and write it to stdout + return json.NewEncoder(io.Writer(os.Stdout)).Encode(resp) +} diff --git a/cmd/storagenode/main.go b/cmd/storagenode/main.go index bc7fe48b6..8ebc61ab4 100644 --- a/cmd/storagenode/main.go +++ b/cmd/storagenode/main.go @@ -10,6 +10,7 @@ import ( "storj.io/private/process" _ "storj.io/storj/private/version" // This attaches version information during release builds. + "storj.io/storj/storagenode/pieces/lazyfilewalker" ) func main() { @@ -23,7 +24,7 @@ func main() { rootCmd, _ := newRootCmd(allowDefaults) loggerFunc := func(logger *zap.Logger) *zap.Logger { - return logger.With(zap.String("Process", rootCmd.Use)) + return logger.With(zap.String("process", rootCmd.Use)) } process.ExecWithCustomOptions(rootCmd, process.ExecOptions{ @@ -36,5 +37,5 @@ func main() { } func isFilewalkerCommand() bool { - return len(os.Args) > 1 && os.Args[1] == usedSpaceFilewalkerCmd + return len(os.Args) > 1 && (os.Args[1] == lazyfilewalker.UsedSpaceFilewalkerCmdName || os.Args[1] == lazyfilewalker.GCFilewalkerCmdName) } diff --git a/cmd/storagenode/root.go b/cmd/storagenode/root.go index 4d2d5986a..5b0a23286 100644 --- a/cmd/storagenode/root.go +++ b/cmd/storagenode/root.go @@ -60,6 +60,7 @@ func newRootCmd(setDefaults bool) (*cobra.Command, *Factory) { newGracefulExitStatusCmd(factory), // internal hidden commands newUsedSpaceFilewalkerCmd(), + newGCFilewalkerCmd(), ) return cmd, factory diff --git a/storagenode/iopriority/low_ioprio_linux.go b/storagenode/iopriority/low_ioprio_linux.go index c77726d8f..a6709879f 100644 --- a/storagenode/iopriority/low_ioprio_linux.go +++ b/storagenode/iopriority/low_ioprio_linux.go @@ -8,22 +8,31 @@ import ( ) // These constants come from the definitions in linux's ioprio.h. -// See https://github.com/torvalds/linux/blob/master/include/uapi/linux/ioprio.h +// See https://github.com/torvalds/linux/blob/61d325dcbc05d8fef88110d35ef7776f3ac3f68b/include/uapi/linux/ioprio.h const ( - ioprioClassShift = uint32(13) - ioprioPrioMask = (uint32(1) << ioprioClassShift) - 1 + ioprioClassShift uint32 = 13 + ioprioClassMask uint32 = 0x07 + ioprioPrioMask uint32 = (1 << ioprioClassShift) - 1 - ioprioWhoProcess = 1 - ioprioClassIdle = 3 + ioprioWhoProcess uint32 = 1 + ioprioClassBE uint32 = 2 ) // SetLowIOPriority lowers the process I/O priority. +// +// On linux, this sets the I/O priority to "best effort" with a priority class data of 7. func SetLowIOPriority() error { - // from the definition for the IOPRIO_PRIO_VALUE macro in Linux's ioprio.h - ioprioPrioValue := ioprioClassIdle< (CreationTime - MaxTimeSkew)`. We also allow for storage +// node operators doing file system manipulations after a piece has been written. If piece files +// are copied between volumes and their attributes are not preserved, it will be possible for their +// modification times to be changed to something later in time. This still preserves the inequality +// relationship mentioned above, `ModTime > (CreationTime - MaxTimeSkew)`. We only stipulate +// that storage node operators must not artificially change blob file modification times to be in +// the past. +// +// If there is a mismatch: in most cases, a mismatch between ModTime and CreationTime has no +// effect. In certain remaining cases, the only effect is that a piece file which _should_ be +// garbage collected survives until the next round of garbage collection. The only really +// problematic case is when there is a relatively new piece file which was created _after_ this +// node's Retain bloom filter started being built on the satellite, and is recorded in this +// storage node's blob store before the Retain operation has completed. Then, it might be possible +// for that new piece to be garbage collected incorrectly, because it does not show up in the +// bloom filter and the node incorrectly thinks that it was created before the bloom filter. +// But if the uplink is not lying about CreationTime and its clock drift versus the storage node +// is less than `MaxTimeSkew`, and the ModTime on a blob file is correctly set from the +// storage node system time, then it is still true that `ModTime > (CreationTime - +// MaxTimeSkew)`. +// +// The rule that storage node operators need to be aware of is only this: do not artificially set +// mtimes on blob files to be in the past. Let the filesystem manage mtimes. If blob files need to +// be moved or copied between locations, and this updates the mtime, that is ok. A secondary effect +// of this rule is that if the storage node's system clock needs to be changed forward by a +// nontrivial amount, mtimes on existing blobs should also be adjusted (by the same interval, +// ideally, but just running "touch" on all blobs is sufficient to avoid incorrect deletion of +// data). +func (fw *FileWalker) WalkSatellitePiecesToTrash(ctx context.Context, satelliteID storj.NodeID, createdBefore time.Time, filter *bloomfilter.Filter) (pieceIDs []storj.PieceID, piecesCount, piecesSkipped int64, err error) { + defer mon.Task()(&ctx)(&err) + + if filter == nil { + return + } + + err = fw.WalkSatellitePieces(ctx, satelliteID, func(access StoredPieceAccess) error { + piecesCount++ + + // We call Gosched() when done because the GC process is expected to be long and we want to keep it at low priority, + // so other goroutines can continue serving requests. + defer runtime.Gosched() + + pieceID := access.PieceID() + if filter.Contains(pieceID) { + // This piece is explicitly not trash. Move on. + return nil + } + + // If the blob's mtime is at or after the createdBefore line, we can't safely delete it; + // it might not be trash. If it is, we can expect to get it next time. + // + // See the comment above the WalkSatellitePiecesToTrash() function for a discussion on the correctness + // of using ModTime in place of the more precise CreationTime. + mTime, err := access.ModTime(ctx) + if err != nil { + if os.IsNotExist(err) { + // piece was deleted while we were scanning. + return nil + } + + piecesSkipped++ + fw.log.Warn("failed to determine mtime of blob", zap.Error(err)) + // but continue iterating. + return nil + } + if !mTime.Before(createdBefore) { + return nil + } + + pieceIDs = append(pieceIDs, pieceID) + + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + return nil + }) + + return pieceIDs, piecesCount, piecesSkipped, errFileWalker.Wrap(err) } diff --git a/storagenode/pieces/lazyfilewalker/lazyfilewalker.go b/storagenode/pieces/lazyfilewalker/lazyfilewalker.go deleted file mode 100644 index f7aeddc4c..000000000 --- a/storagenode/pieces/lazyfilewalker/lazyfilewalker.go +++ /dev/null @@ -1,110 +0,0 @@ -// Copyright (C) 2023 Storj Labs, Inc. -// See LICENSE for copying information. - -package lazyfilewalker - -import ( - "bytes" - "context" - "encoding/json" - "errors" - - "github.com/spacemonkeygo/monkit/v3" - "github.com/zeebo/errs" - "go.uber.org/zap" - "golang.org/x/sys/execabs" - - "storj.io/common/storj" -) - -var ( - errLazyFilewalker = errs.Class("lazyfilewalker") - - mon = monkit.Package() -) - -// Supervisor performs filewalker operations in a subprocess with lower I/O priority. -type Supervisor struct { - log *zap.Logger - - executable string - gcArgs []string - usedSpaceArgs []string -} - -// NewSupervisor creates a new lazy filewalker Supervisor. -func NewSupervisor(log *zap.Logger, executable string, args []string) *Supervisor { - return &Supervisor{ - log: log, - gcArgs: append([]string{"gc-filewalker"}, args...), - usedSpaceArgs: append([]string{"used-space-filewalker"}, args...), - executable: executable, - } -} - -// UsedSpaceRequest is the request struct for the used-space-filewalker process. -type UsedSpaceRequest struct { - SatelliteID storj.NodeID `json:"satelliteID"` -} - -// UsedSpaceResponse is the response struct for the used-space-filewalker process. -type UsedSpaceResponse struct { - PiecesTotal int64 `json:"piecesTotal"` - PiecesContentSize int64 `json:"piecesContentSize"` -} - -// WalkAndComputeSpaceUsedBySatellite returns the total used space by satellite. -func (fw *Supervisor) WalkAndComputeSpaceUsedBySatellite(ctx context.Context, satelliteID storj.NodeID) (piecesTotal int64, piecesContentSize int64, err error) { - defer mon.Task()(&ctx)(&err) - - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - req := UsedSpaceRequest{ - SatelliteID: satelliteID, - } - - fw.log.Info("starting subprocess", zap.String("satelliteID", satelliteID.String())) - cmd := execabs.CommandContext(ctx, fw.executable, fw.usedSpaceArgs...) - - var buf, outbuf bytes.Buffer - writer := &zapWrapper{fw.log.Named("subprocess")} - - // encode the struct and write it to the buffer - enc := json.NewEncoder(&buf) - if err := enc.Encode(req); err != nil { - return 0, 0, errLazyFilewalker.Wrap(err) - } - cmd.Stdin = &buf - cmd.Stdout = &outbuf - cmd.Stderr = writer - - if err := cmd.Start(); err != nil { - fw.log.Error("failed to start subprocess", zap.Error(err)) - return 0, 0, err - } - - fw.log.Info("subprocess started", zap.String("satelliteID", satelliteID.String())) - - if err := cmd.Wait(); err != nil { - var exitErr *execabs.ExitError - if errors.As(err, &exitErr) { - fw.log.Info("subprocess exited with status", zap.Int("status", exitErr.ExitCode()), zap.Error(exitErr), zap.String("satelliteID", satelliteID.String())) - } else { - fw.log.Error("subprocess exited with error", zap.Error(err), zap.String("satelliteID", satelliteID.String())) - } - return 0, 0, errLazyFilewalker.Wrap(err) - } - - fw.log.Info("subprocess finished successfully", zap.String("satelliteID", satelliteID.String()), zap.Int64("piecesTotal", piecesTotal), zap.Int64("piecesContentSize", piecesContentSize)) - - // Decode and receive the response data struct from the subprocess - var resp UsedSpaceResponse - decoder := json.NewDecoder(&outbuf) - if err := decoder.Decode(&resp); err != nil { - fw.log.Error("failed to decode response from subprocess", zap.String("satelliteID", satelliteID.String()), zap.Error(err)) - return 0, 0, errLazyFilewalker.Wrap(err) - } - - return resp.PiecesTotal, resp.PiecesContentSize, nil -} diff --git a/storagenode/pieces/lazyfilewalker/process.go b/storagenode/pieces/lazyfilewalker/process.go new file mode 100644 index 000000000..ad60798a3 --- /dev/null +++ b/storagenode/pieces/lazyfilewalker/process.go @@ -0,0 +1,83 @@ +// Copyright (C) 2023 Storj Labs, Inc. +// See LICENSE for copying information. + +package lazyfilewalker + +import ( + "bytes" + "context" + "encoding/json" + "errors" + + "go.uber.org/zap" + "golang.org/x/sys/execabs" +) + +// process is a subprocess that can be used to perform filewalker operations. +type process struct { + log *zap.Logger + executable string + args []string +} + +func newProcess(log *zap.Logger, executable string, args []string) *process { + return &process{ + log: log, + executable: executable, + args: args, + } +} + +// run runs the process and decodes the response into the value pointed by `resp`. +// It returns an error if the Process fails to start, or if the Process exits with a non-zero status. +// NOTE: the `resp` value must be a pointer to a struct. +func (p *process) run(ctx context.Context, req, resp interface{}) (err error) { + defer mon.Task()(&ctx)(&err) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + p.log.Info("starting subprocess") + + var buf, outbuf bytes.Buffer + writer := &zapWrapper{p.log.Named("subprocess")} + + // encode the struct and write it to the buffer + enc := json.NewEncoder(&buf) + if err := enc.Encode(req); err != nil { + return errLazyFilewalker.Wrap(err) + } + + cmd := execabs.CommandContext(ctx, p.executable, p.args...) + cmd.Stdin = &buf + cmd.Stdout = &outbuf + cmd.Stderr = writer + + if err := cmd.Start(); err != nil { + p.log.Error("failed to start subprocess", zap.Error(err)) + return errLazyFilewalker.Wrap(err) + } + + p.log.Info("subprocess started") + + if err := cmd.Wait(); err != nil { + var exitErr *execabs.ExitError + if errors.As(err, &exitErr) { + p.log.Info("subprocess exited with status", zap.Int("status", exitErr.ExitCode()), zap.Error(exitErr)) + } else { + p.log.Error("subprocess exited with error", zap.Error(err)) + } + return errLazyFilewalker.Wrap(err) + } + + p.log.Info("subprocess finished successfully") + + // Decode and receive the response data struct from the subprocess + decoder := json.NewDecoder(&outbuf) + if err := decoder.Decode(&resp); err != nil { + p.log.Error("failed to decode response from subprocess", zap.Error(err)) + return errLazyFilewalker.Wrap(err) + } + + return nil +} diff --git a/storagenode/pieces/lazyfilewalker/supervisor.go b/storagenode/pieces/lazyfilewalker/supervisor.go new file mode 100644 index 000000000..ab8a52f01 --- /dev/null +++ b/storagenode/pieces/lazyfilewalker/supervisor.go @@ -0,0 +1,120 @@ +// Copyright (C) 2023 Storj Labs, Inc. +// See LICENSE for copying information. + +package lazyfilewalker + +import ( + "context" + "time" + + "github.com/spacemonkeygo/monkit/v3" + "github.com/zeebo/errs" + "go.uber.org/zap" + + "storj.io/common/bloomfilter" + "storj.io/common/storj" +) + +const ( + // UsedSpaceFilewalkerCmdName is the name of the used-space-filewalker subcommand. + UsedSpaceFilewalkerCmdName = "used-space-filewalker" + // GCFilewalkerCmdName is the name of the gc-filewalker subcommand. + GCFilewalkerCmdName = "gc-filewalker" +) + +var ( + errLazyFilewalker = errs.Class("lazyfilewalker") + + mon = monkit.Package() +) + +// Supervisor manages the lazyfilewalker subprocesses. +// +// TODO: we should keep track of the number of subprocesses we have running and +// limit it to a configurable number, and queue them, since they are run per satellite. +type Supervisor struct { + log *zap.Logger + + executable string + gcArgs []string + usedSpaceArgs []string +} + +// NewSupervisor creates a new lazy filewalker Supervisor. +func NewSupervisor(log *zap.Logger, executable string, args []string) *Supervisor { + return &Supervisor{ + log: log, + gcArgs: append([]string{GCFilewalkerCmdName}, args...), + usedSpaceArgs: append([]string{UsedSpaceFilewalkerCmdName}, args...), + executable: executable, + } +} + +// UsedSpaceRequest is the request struct for the used-space-filewalker process. +type UsedSpaceRequest struct { + SatelliteID storj.NodeID `json:"satelliteID"` +} + +// UsedSpaceResponse is the response struct for the used-space-filewalker process. +type UsedSpaceResponse struct { + PiecesTotal int64 `json:"piecesTotal"` + PiecesContentSize int64 `json:"piecesContentSize"` +} + +// GCFilewalkerRequest is the request struct for the gc-filewalker process. +type GCFilewalkerRequest struct { + SatelliteID storj.NodeID `json:"satelliteID"` + BloomFilter []byte `json:"bloomFilter"` + CreatedBefore time.Time `json:"createdBefore"` +} + +// GCFilewalkerResponse is the response struct for the gc-filewalker process. +type GCFilewalkerResponse struct { + PieceIDs []storj.PieceID `json:"pieceIDs"` + PiecesSkippedCount int64 `json:"piecesSkippedCount"` + PiecesCount int64 `json:"piecesCount"` +} + +// WalkAndComputeSpaceUsedBySatellite returns the total used space by satellite. +func (fw *Supervisor) WalkAndComputeSpaceUsedBySatellite(ctx context.Context, satelliteID storj.NodeID) (piecesTotal int64, piecesContentSize int64, err error) { + defer mon.Task()(&ctx)(&err) + + req := UsedSpaceRequest{ + SatelliteID: satelliteID, + } + var resp UsedSpaceResponse + + log := fw.log.Named(UsedSpaceFilewalkerCmdName).With(zap.String("satelliteID", satelliteID.String())) + + err = newProcess(log, fw.executable, fw.usedSpaceArgs).run(ctx, req, &resp) + if err != nil { + return 0, 0, err + } + + return resp.PiecesTotal, resp.PiecesContentSize, nil +} + +// WalkSatellitePiecesToTrash returns a list of pieceIDs that need to be trashed for the given satellite. +func (fw *Supervisor) WalkSatellitePiecesToTrash(ctx context.Context, satelliteID storj.NodeID, createdBefore time.Time, filter *bloomfilter.Filter) (pieceIDs []storj.PieceID, piecesCount, piecesSkipped int64, err error) { + defer mon.Task()(&ctx)(&err) + + if filter == nil { + return + } + + req := GCFilewalkerRequest{ + SatelliteID: satelliteID, + BloomFilter: filter.Bytes(), + CreatedBefore: createdBefore, + } + var resp GCFilewalkerResponse + + log := fw.log.Named(GCFilewalkerCmdName).With(zap.String("satelliteID", satelliteID.String())) + + err = newProcess(log, fw.executable, fw.gcArgs).run(ctx, req, &resp) + if err != nil { + return nil, 0, 0, err + } + + return resp.PieceIDs, resp.PiecesSkippedCount, resp.PiecesCount, nil +} diff --git a/storagenode/pieces/store.go b/storagenode/pieces/store.go index 337867c07..b903aae28 100644 --- a/storagenode/pieces/store.go +++ b/storagenode/pieces/store.go @@ -14,6 +14,7 @@ import ( "github.com/zeebo/errs" "go.uber.org/zap" + "storj.io/common/bloomfilter" "storj.io/common/memory" "storj.io/common/pb" "storj.io/common/storj" @@ -534,6 +535,27 @@ func (store *Store) WalkSatellitePieces(ctx context.Context, satellite storj.Nod return store.Filewalker.WalkSatellitePieces(ctx, satellite, walkFunc) } +// SatellitePiecesToTrash returns a list of piece IDs that are trash for the given satellite. +// +// If the lazy filewalker is enabled, it will be used to find the pieces to trash, otherwise +// the regular filewalker will be used. If the lazy filewalker fails, the regular filewalker +// will be used as a fallback. +func (store *Store) SatellitePiecesToTrash(ctx context.Context, satelliteID storj.NodeID, createdBefore time.Time, filter *bloomfilter.Filter) (pieceIDs []storj.PieceID, piecesCount, piecesSkipped int64, err error) { + defer mon.Task()(&ctx)(&err) + + if store.config.EnableLazyFilewalker && store.lazyFilewalker != nil { + pieceIDs, piecesCount, piecesSkipped, err = store.lazyFilewalker.WalkSatellitePiecesToTrash(ctx, satelliteID, createdBefore, filter) + if err == nil { + return pieceIDs, piecesCount, piecesSkipped, nil + } + store.log.Error("lazyfilewalker failed", zap.Error(err)) + } + // fallback to the regular filewalker + pieceIDs, piecesCount, piecesSkipped, err = store.Filewalker.WalkSatellitePiecesToTrash(ctx, satelliteID, createdBefore, filter) + + return pieceIDs, piecesCount, piecesSkipped, err +} + // GetExpired gets piece IDs that are expired and were created before the given time. func (store *Store) GetExpired(ctx context.Context, expiredAt time.Time, limit int64) (_ []ExpiredInfo, err error) { defer mon.Task()(&ctx)(&err) diff --git a/storagenode/retain/retain.go b/storagenode/retain/retain.go index ebd26c427..b9e0b6349 100644 --- a/storagenode/retain/retain.go +++ b/storagenode/retain/retain.go @@ -5,8 +5,6 @@ package retain import ( "context" - "os" - "runtime" "sync" "time" @@ -298,52 +296,6 @@ func (s *Service) Status() Status { return s.config.Status } -// ------------------------------------------------------------------------------------------------ -// On the correctness of using access.ModTime() in place of the more precise access.CreationTime() -// in retainPieces(): -// ------------------------------------------------------------------------------------------------ -// -// Background: for pieces not stored with storage.FormatV0, the access.CreationTime() value can -// only be retrieved by opening the piece file, and reading and unmarshaling the piece header. -// This is far slower than access.ModTime(), which gets the file modification time from the file -// system and only needs to do a stat(2) on the piece file. If we can make Retain() work with -// ModTime, we should. -// -// Possibility of mismatch: We do not force or require piece file modification times to be equal to -// or close to the CreationTime specified by the uplink, but we do expect that piece files will be -// written to the filesystem _after_ the CreationTime. We make the assumption already that storage -// nodes and satellites and uplinks have system clocks that are very roughly in sync (that is, they -// are out of sync with each other by less than an hour of real time, or whatever is configured as -// MaxTimeSkew). So if an uplink is not lying about CreationTime and it uploads a piece that -// makes it to a storagenode's disk as quickly as possible, even in the worst-synchronized-clocks -// case we can assume that `ModTime > (CreationTime - MaxTimeSkew)`. We also allow for storage -// node operators doing file system manipulations after a piece has been written. If piece files -// are copied between volumes and their attributes are not preserved, it will be possible for their -// modification times to be changed to something later in time. This still preserves the inequality -// relationship mentioned above, `ModTime > (CreationTime - MaxTimeSkew)`. We only stipulate -// that storage node operators must not artificially change blob file modification times to be in -// the past. -// -// If there is a mismatch: in most cases, a mismatch between ModTime and CreationTime has no -// effect. In certain remaining cases, the only effect is that a piece file which _should_ be -// garbage collected survives until the next round of garbage collection. The only really -// problematic case is when there is a relatively new piece file which was created _after_ this -// node's Retain bloom filter started being built on the satellite, and is recorded in this -// storage node's blob store before the Retain operation has completed. Then, it might be possible -// for that new piece to be garbage collected incorrectly, because it does not show up in the -// bloom filter and the node incorrectly thinks that it was created before the bloom filter. -// But if the uplink is not lying about CreationTime and its clock drift versus the storage node -// is less than `MaxTimeSkew`, and the ModTime on a blob file is correctly set from the -// storage node system time, then it is still true that `ModTime > (CreationTime - -// MaxTimeSkew)`. -// -// The rule that storage node operators need to be aware of is only this: do not artificially set -// mtimes on blob files to be in the past. Let the filesystem manage mtimes. If blob files need to -// be moved or copied between locations, and this updates the mtime, that is ok. A secondary effect -// of this rule is that if the storage node's system clock needs to be changed forward by a -// nontrivial amount, mtimes on existing blobs should also be adjusted (by the same interval, -// ideally, but just running "touch" on all blobs is sufficient to avoid incorrect deletion of -// data). func (s *Service) retainPieces(ctx context.Context, req Request) (err error) { // if retain status is disabled, return immediately if s.config.Status == Disabled { @@ -352,9 +304,6 @@ func (s *Service) retainPieces(ctx context.Context, req Request) (err error) { defer mon.Task()(&ctx, req.SatelliteID, req.CreatedBefore)(&err) - var piecesCount int64 - var piecesSkipped int64 - var piecesToDeleteCount int64 numDeleted := 0 satelliteID := req.SatelliteID filter := req.Filter @@ -373,48 +322,20 @@ func (s *Service) retainPieces(ctx context.Context, req Request) (err error) { zap.Int64("Filter Size", filter.Size()), zap.Stringer("Satellite ID", satelliteID)) - err = s.store.WalkSatellitePieces(ctx, satelliteID, func(access pieces.StoredPieceAccess) (err error) { - defer mon.Task()(&ctx)(&err) - piecesCount++ + pieceIDs, piecesCount, piecesSkipped, err := s.store.SatellitePiecesToTrash(ctx, satelliteID, createdBefore, filter) + if err != nil { + return Error.Wrap(err) + } - // We call Gosched() when done because the GC process is expected to be long and we want to keep it at low priority, - // so other goroutines can continue serving requests. - defer runtime.Gosched() - - pieceID := access.PieceID() - if filter.Contains(pieceID) { - // This piece is explicitly not trash. Move on. - return nil - } - - // If the blob's mtime is at or after the createdBefore line, we can't safely delete it; - // it might not be trash. If it is, we can expect to get it next time. - // - // See the comment above the retainPieces() function for a discussion on the correctness - // of using ModTime in place of the more precise CreationTime. - mTime, err := access.ModTime(ctx) - if err != nil { - if os.IsNotExist(err) { - // piece was deleted while we were scanning. - return nil - } - - piecesSkipped++ - s.log.Warn("failed to determine mtime of blob", zap.Error(err)) - // but continue iterating. - return nil - } - if !mTime.Before(createdBefore) { - return nil - } + piecesToDeleteCount := len(pieceIDs) + for i := range pieceIDs { + pieceID := pieceIDs[i] s.log.Debug("About to move piece to trash", zap.Stringer("Satellite ID", satelliteID), zap.Stringer("Piece ID", pieceID), zap.String("Status", s.config.Status.String())) - piecesToDeleteCount++ - // if retain status is enabled, delete pieceid if s.config.Status == Enabled { if err = s.trash(ctx, satelliteID, pieceID); err != nil { @@ -426,21 +347,10 @@ func (s *Service) retainPieces(ctx context.Context, req Request) (err error) { } } numDeleted++ - - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - return nil - }) - if err != nil { - return Error.Wrap(err) } mon.IntVal("garbage_collection_pieces_count").Observe(piecesCount) mon.IntVal("garbage_collection_pieces_skipped").Observe(piecesSkipped) - mon.IntVal("garbage_collection_pieces_to_delete_count").Observe(piecesToDeleteCount) + mon.IntVal("garbage_collection_pieces_to_delete_count").Observe(int64(piecesToDeleteCount)) mon.IntVal("garbage_collection_pieces_deleted").Observe(int64(numDeleted)) mon.DurationVal("garbage_collection_loop_duration").Observe(time.Now().UTC().Sub(started)) s.log.Info("Moved pieces to trash during retain", zap.Int("num deleted", numDeleted), zap.String("Retain Status", s.config.Status.String()))