storagenode: run garbage collection filewalker as a low I/O subprocess
Updates https://github.com/storj/storj/issues/5349 Change-Id: I7d810d737b17f0b74943765f7f7cc30b9fcf1425
This commit is contained in:
parent
25f2305e00
commit
e0542c2d24
@ -13,6 +13,7 @@ import (
|
|||||||
"github.com/zeebo/errs"
|
"github.com/zeebo/errs"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"storj.io/common/bloomfilter"
|
||||||
"storj.io/private/process"
|
"storj.io/private/process"
|
||||||
"storj.io/storj/storagenode/iopriority"
|
"storj.io/storj/storagenode/iopriority"
|
||||||
"storj.io/storj/storagenode/pieces"
|
"storj.io/storj/storagenode/pieces"
|
||||||
@ -36,13 +37,11 @@ func (config *filewalkerCfg) DatabaseConfig() storagenodedb.Config {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const usedSpaceFilewalkerCmd = "used-space-filewalker"
|
|
||||||
|
|
||||||
func newUsedSpaceFilewalkerCmd() *cobra.Command {
|
func newUsedSpaceFilewalkerCmd() *cobra.Command {
|
||||||
var cfg filewalkerCfg
|
var cfg filewalkerCfg
|
||||||
|
|
||||||
cmd := &cobra.Command{
|
cmd := &cobra.Command{
|
||||||
Use: usedSpaceFilewalkerCmd,
|
Use: lazyfilewalker.UsedSpaceFilewalkerCmdName,
|
||||||
Short: "An internal subcommand used to run used-space calculation filewalker as a separate subprocess with lower IO priority",
|
Short: "An internal subcommand used to run used-space calculation filewalker as a separate subprocess with lower IO priority",
|
||||||
RunE: func(cmd *cobra.Command, args []string) error {
|
RunE: func(cmd *cobra.Command, args []string) error {
|
||||||
return cmdUsedSpaceFilewalker(cmd, &cfg)
|
return cmdUsedSpaceFilewalker(cmd, &cfg)
|
||||||
@ -56,6 +55,24 @@ func newUsedSpaceFilewalkerCmd() *cobra.Command {
|
|||||||
return cmd
|
return cmd
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newGCFilewalkerCmd() *cobra.Command {
|
||||||
|
var cfg filewalkerCfg
|
||||||
|
|
||||||
|
cmd := &cobra.Command{
|
||||||
|
Use: lazyfilewalker.GCFilewalkerCmdName,
|
||||||
|
Short: "An internal subcommand used to run garbage collection filewalker as a separate subprocess with lower IO priority",
|
||||||
|
RunE: func(cmd *cobra.Command, args []string) error {
|
||||||
|
return cmdGCFilewalker(cmd, &cfg)
|
||||||
|
},
|
||||||
|
Hidden: true,
|
||||||
|
Args: cobra.ExactArgs(0),
|
||||||
|
}
|
||||||
|
|
||||||
|
process.Bind(cmd, &cfg)
|
||||||
|
|
||||||
|
return cmd
|
||||||
|
}
|
||||||
|
|
||||||
func cmdUsedSpaceFilewalker(cmd *cobra.Command, cfg *filewalkerCfg) (err error) {
|
func cmdUsedSpaceFilewalker(cmd *cobra.Command, cfg *filewalkerCfg) (err error) {
|
||||||
if runtime.GOOS == "linux" {
|
if runtime.GOOS == "linux" {
|
||||||
// Pin the current goroutine to the current OS thread, so we can set the IO priority
|
// Pin the current goroutine to the current OS thread, so we can set the IO priority
|
||||||
@ -72,7 +89,7 @@ func cmdUsedSpaceFilewalker(cmd *cobra.Command, cfg *filewalkerCfg) (err error)
|
|||||||
}
|
}
|
||||||
|
|
||||||
ctx, _ := process.Ctx(cmd)
|
ctx, _ := process.Ctx(cmd)
|
||||||
log := zap.L().Named("used-space-filewalker")
|
log := zap.L()
|
||||||
|
|
||||||
// We still need the DB in this case because we still have to deal with v0 pieces.
|
// We still need the DB in this case because we still have to deal with v0 pieces.
|
||||||
// Once we drop support for v0 pieces, we can remove this.
|
// Once we drop support for v0 pieces, we can remove this.
|
||||||
@ -91,6 +108,10 @@ func cmdUsedSpaceFilewalker(cmd *cobra.Command, cfg *filewalkerCfg) (err error)
|
|||||||
return errs.New("Error decoding data from stdin: %v", err)
|
return errs.New("Error decoding data from stdin: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if req.SatelliteID.IsZero() {
|
||||||
|
return errs.New("SatelliteID is required")
|
||||||
|
}
|
||||||
|
|
||||||
filewalker := pieces.NewFileWalker(log, db.Pieces(), db.V0PieceInfo())
|
filewalker := pieces.NewFileWalker(log, db.Pieces(), db.V0PieceInfo())
|
||||||
|
|
||||||
total, contentSize, err := filewalker.WalkAndComputeSpaceUsedBySatellite(ctx, req.SatelliteID)
|
total, contentSize, err := filewalker.WalkAndComputeSpaceUsedBySatellite(ctx, req.SatelliteID)
|
||||||
@ -102,3 +123,68 @@ func cmdUsedSpaceFilewalker(cmd *cobra.Command, cfg *filewalkerCfg) (err error)
|
|||||||
// encode the response struct and write it to stdout
|
// encode the response struct and write it to stdout
|
||||||
return json.NewEncoder(io.Writer(os.Stdout)).Encode(resp)
|
return json.NewEncoder(io.Writer(os.Stdout)).Encode(resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func cmdGCFilewalker(cmd *cobra.Command, cfg *filewalkerCfg) (err error) {
|
||||||
|
if runtime.GOOS == "linux" {
|
||||||
|
// Pin the current goroutine to the current OS thread, so we can set the IO priority
|
||||||
|
// for the current thread.
|
||||||
|
// This is necessary because Go does use CLONE_IO when creating new threads,
|
||||||
|
// so they do not share a single IO context.
|
||||||
|
runtime.LockOSThread()
|
||||||
|
defer runtime.UnlockOSThread()
|
||||||
|
}
|
||||||
|
|
||||||
|
err = iopriority.SetLowIOPriority()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, _ := process.Ctx(cmd)
|
||||||
|
log := zap.L()
|
||||||
|
|
||||||
|
// We still need the DB in this case because we still have to deal with v0 pieces.
|
||||||
|
// Once we drop support for v0 pieces, we can remove this.
|
||||||
|
db, err := storagenodedb.OpenExisting(ctx, log.Named("db"), cfg.DatabaseConfig())
|
||||||
|
if err != nil {
|
||||||
|
return errs.New("Error starting master database on storage node: %v", err)
|
||||||
|
}
|
||||||
|
log.Info("Database started")
|
||||||
|
defer func() {
|
||||||
|
err = errs.Combine(err, db.Close())
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Decode the data struct received from the main process
|
||||||
|
var req lazyfilewalker.GCFilewalkerRequest
|
||||||
|
if err = json.NewDecoder(io.Reader(os.Stdin)).Decode(&req); err != nil {
|
||||||
|
return errs.New("Error decoding data from stdin: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate the request data
|
||||||
|
switch {
|
||||||
|
case req.SatelliteID.IsZero():
|
||||||
|
return errs.New("SatelliteID is required")
|
||||||
|
case req.CreatedBefore.IsZero():
|
||||||
|
return errs.New("CreatedBefore is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Decode the bloom filter
|
||||||
|
filter, err := bloomfilter.NewFromBytes(req.BloomFilter)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
filewalker := pieces.NewFileWalker(log, db.Pieces(), db.V0PieceInfo())
|
||||||
|
pieceIDs, piecesCount, piecesSkippedCount, err := filewalker.WalkSatellitePiecesToTrash(ctx, req.SatelliteID, req.CreatedBefore, filter)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := lazyfilewalker.GCFilewalkerResponse{
|
||||||
|
PieceIDs: pieceIDs,
|
||||||
|
PiecesCount: piecesCount,
|
||||||
|
PiecesSkippedCount: piecesSkippedCount,
|
||||||
|
}
|
||||||
|
|
||||||
|
// encode the response struct and write it to stdout
|
||||||
|
return json.NewEncoder(io.Writer(os.Stdout)).Encode(resp)
|
||||||
|
}
|
||||||
|
@ -10,6 +10,7 @@ import (
|
|||||||
|
|
||||||
"storj.io/private/process"
|
"storj.io/private/process"
|
||||||
_ "storj.io/storj/private/version" // This attaches version information during release builds.
|
_ "storj.io/storj/private/version" // This attaches version information during release builds.
|
||||||
|
"storj.io/storj/storagenode/pieces/lazyfilewalker"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
@ -23,7 +24,7 @@ func main() {
|
|||||||
rootCmd, _ := newRootCmd(allowDefaults)
|
rootCmd, _ := newRootCmd(allowDefaults)
|
||||||
|
|
||||||
loggerFunc := func(logger *zap.Logger) *zap.Logger {
|
loggerFunc := func(logger *zap.Logger) *zap.Logger {
|
||||||
return logger.With(zap.String("Process", rootCmd.Use))
|
return logger.With(zap.String("process", rootCmd.Use))
|
||||||
}
|
}
|
||||||
|
|
||||||
process.ExecWithCustomOptions(rootCmd, process.ExecOptions{
|
process.ExecWithCustomOptions(rootCmd, process.ExecOptions{
|
||||||
@ -36,5 +37,5 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func isFilewalkerCommand() bool {
|
func isFilewalkerCommand() bool {
|
||||||
return len(os.Args) > 1 && os.Args[1] == usedSpaceFilewalkerCmd
|
return len(os.Args) > 1 && (os.Args[1] == lazyfilewalker.UsedSpaceFilewalkerCmdName || os.Args[1] == lazyfilewalker.GCFilewalkerCmdName)
|
||||||
}
|
}
|
||||||
|
@ -60,6 +60,7 @@ func newRootCmd(setDefaults bool) (*cobra.Command, *Factory) {
|
|||||||
newGracefulExitStatusCmd(factory),
|
newGracefulExitStatusCmd(factory),
|
||||||
// internal hidden commands
|
// internal hidden commands
|
||||||
newUsedSpaceFilewalkerCmd(),
|
newUsedSpaceFilewalkerCmd(),
|
||||||
|
newGCFilewalkerCmd(),
|
||||||
)
|
)
|
||||||
|
|
||||||
return cmd, factory
|
return cmd, factory
|
||||||
|
@ -8,22 +8,31 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// These constants come from the definitions in linux's ioprio.h.
|
// These constants come from the definitions in linux's ioprio.h.
|
||||||
// See https://github.com/torvalds/linux/blob/master/include/uapi/linux/ioprio.h
|
// See https://github.com/torvalds/linux/blob/61d325dcbc05d8fef88110d35ef7776f3ac3f68b/include/uapi/linux/ioprio.h
|
||||||
const (
|
const (
|
||||||
ioprioClassShift = uint32(13)
|
ioprioClassShift uint32 = 13
|
||||||
ioprioPrioMask = (uint32(1) << ioprioClassShift) - 1
|
ioprioClassMask uint32 = 0x07
|
||||||
|
ioprioPrioMask uint32 = (1 << ioprioClassShift) - 1
|
||||||
|
|
||||||
ioprioWhoProcess = 1
|
ioprioWhoProcess uint32 = 1
|
||||||
ioprioClassIdle = 3
|
ioprioClassBE uint32 = 2
|
||||||
)
|
)
|
||||||
|
|
||||||
// SetLowIOPriority lowers the process I/O priority.
|
// SetLowIOPriority lowers the process I/O priority.
|
||||||
|
//
|
||||||
|
// On linux, this sets the I/O priority to "best effort" with a priority class data of 7.
|
||||||
func SetLowIOPriority() error {
|
func SetLowIOPriority() error {
|
||||||
// from the definition for the IOPRIO_PRIO_VALUE macro in Linux's ioprio.h
|
ioprioPrioValue := ioprioPrioClassValue(ioprioClassBE, 7)
|
||||||
ioprioPrioValue := ioprioClassIdle<<ioprioClassShift | (0 & ioprioPrioMask)
|
|
||||||
_, _, err := syscall.Syscall(syscall.SYS_IOPRIO_SET, uintptr(ioprioWhoProcess), 0, uintptr(ioprioPrioValue))
|
_, _, err := syscall.Syscall(syscall.SYS_IOPRIO_SET, uintptr(ioprioWhoProcess), 0, uintptr(ioprioPrioValue))
|
||||||
if err != 0 {
|
if err != 0 {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ioprioPrioClassValue returns the class value based on the definition for the IOPRIO_PRIO_VALUE
|
||||||
|
// macro in Linux's ioprio.h
|
||||||
|
// See https://github.com/torvalds/linux/blob/61d325dcbc05d8fef88110d35ef7776f3ac3f68b/include/uapi/linux/ioprio.h#L15-L17
|
||||||
|
func ioprioPrioClassValue(class, data uint32) uint32 {
|
||||||
|
return (((class) & ioprioClassMask) << ioprioClassShift) | ((data) & ioprioPrioMask)
|
||||||
|
}
|
||||||
|
@ -6,14 +6,20 @@ package pieces
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"os"
|
"os"
|
||||||
|
"runtime"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/zeebo/errs"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"storj.io/common/bloomfilter"
|
||||||
"storj.io/common/storj"
|
"storj.io/common/storj"
|
||||||
"storj.io/storj/storagenode/blobstore"
|
"storj.io/storj/storagenode/blobstore"
|
||||||
"storj.io/storj/storagenode/blobstore/filestore"
|
"storj.io/storj/storagenode/blobstore/filestore"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var errFileWalker = errs.Class("filewalker")
|
||||||
|
|
||||||
// FileWalker implements methods to walk over pieces in a storage directory.
|
// FileWalker implements methods to walk over pieces in a storage directory.
|
||||||
type FileWalker struct {
|
type FileWalker struct {
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
@ -60,7 +66,7 @@ func (fw *FileWalker) WalkSatellitePieces(ctx context.Context, satellite storj.N
|
|||||||
err = fw.v0PieceInfo.WalkSatelliteV0Pieces(ctx, fw.blobs, satellite, fn)
|
err = fw.v0PieceInfo.WalkSatelliteV0Pieces(ctx, fw.blobs, satellite, fn)
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return errFileWalker.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// WalkAndComputeSpaceUsedBySatellite walks over all pieces for a given satellite, adds up and returns the total space used.
|
// WalkAndComputeSpaceUsedBySatellite walks over all pieces for a given satellite, adds up and returns the total space used.
|
||||||
@ -78,5 +84,109 @@ func (fw *FileWalker) WalkAndComputeSpaceUsedBySatellite(ctx context.Context, sa
|
|||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
return satPiecesTotal, satPiecesContentSize, err
|
return satPiecesTotal, satPiecesContentSize, errFileWalker.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// WalkSatellitePiecesToTrash returns a list of piece IDs that need to be trashed for the given satellite.
|
||||||
|
//
|
||||||
|
// ------------------------------------------------------------------------------------------------
|
||||||
|
//
|
||||||
|
// On the correctness of using access.ModTime() in place of the more precise access.CreationTime():
|
||||||
|
//
|
||||||
|
// ------------------------------------------------------------------------------------------------
|
||||||
|
//
|
||||||
|
// Background: for pieces not stored with storage.FormatV0, the access.CreationTime() value can
|
||||||
|
// only be retrieved by opening the piece file, and reading and unmarshaling the piece header.
|
||||||
|
// This is far slower than access.ModTime(), which gets the file modification time from the file
|
||||||
|
// system and only needs to do a stat(2) on the piece file. If we can make Retain() work with
|
||||||
|
// ModTime, we should.
|
||||||
|
//
|
||||||
|
// Possibility of mismatch: We do not force or require piece file modification times to be equal to
|
||||||
|
// or close to the CreationTime specified by the uplink, but we do expect that piece files will be
|
||||||
|
// written to the filesystem _after_ the CreationTime. We make the assumption already that storage
|
||||||
|
// nodes and satellites and uplinks have system clocks that are very roughly in sync (that is, they
|
||||||
|
// are out of sync with each other by less than an hour of real time, or whatever is configured as
|
||||||
|
// MaxTimeSkew). So if an uplink is not lying about CreationTime and it uploads a piece that
|
||||||
|
// makes it to a storagenode's disk as quickly as possible, even in the worst-synchronized-clocks
|
||||||
|
// case we can assume that `ModTime > (CreationTime - MaxTimeSkew)`. We also allow for storage
|
||||||
|
// node operators doing file system manipulations after a piece has been written. If piece files
|
||||||
|
// are copied between volumes and their attributes are not preserved, it will be possible for their
|
||||||
|
// modification times to be changed to something later in time. This still preserves the inequality
|
||||||
|
// relationship mentioned above, `ModTime > (CreationTime - MaxTimeSkew)`. We only stipulate
|
||||||
|
// that storage node operators must not artificially change blob file modification times to be in
|
||||||
|
// the past.
|
||||||
|
//
|
||||||
|
// If there is a mismatch: in most cases, a mismatch between ModTime and CreationTime has no
|
||||||
|
// effect. In certain remaining cases, the only effect is that a piece file which _should_ be
|
||||||
|
// garbage collected survives until the next round of garbage collection. The only really
|
||||||
|
// problematic case is when there is a relatively new piece file which was created _after_ this
|
||||||
|
// node's Retain bloom filter started being built on the satellite, and is recorded in this
|
||||||
|
// storage node's blob store before the Retain operation has completed. Then, it might be possible
|
||||||
|
// for that new piece to be garbage collected incorrectly, because it does not show up in the
|
||||||
|
// bloom filter and the node incorrectly thinks that it was created before the bloom filter.
|
||||||
|
// But if the uplink is not lying about CreationTime and its clock drift versus the storage node
|
||||||
|
// is less than `MaxTimeSkew`, and the ModTime on a blob file is correctly set from the
|
||||||
|
// storage node system time, then it is still true that `ModTime > (CreationTime -
|
||||||
|
// MaxTimeSkew)`.
|
||||||
|
//
|
||||||
|
// The rule that storage node operators need to be aware of is only this: do not artificially set
|
||||||
|
// mtimes on blob files to be in the past. Let the filesystem manage mtimes. If blob files need to
|
||||||
|
// be moved or copied between locations, and this updates the mtime, that is ok. A secondary effect
|
||||||
|
// of this rule is that if the storage node's system clock needs to be changed forward by a
|
||||||
|
// nontrivial amount, mtimes on existing blobs should also be adjusted (by the same interval,
|
||||||
|
// ideally, but just running "touch" on all blobs is sufficient to avoid incorrect deletion of
|
||||||
|
// data).
|
||||||
|
func (fw *FileWalker) WalkSatellitePiecesToTrash(ctx context.Context, satelliteID storj.NodeID, createdBefore time.Time, filter *bloomfilter.Filter) (pieceIDs []storj.PieceID, piecesCount, piecesSkipped int64, err error) {
|
||||||
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
|
if filter == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err = fw.WalkSatellitePieces(ctx, satelliteID, func(access StoredPieceAccess) error {
|
||||||
|
piecesCount++
|
||||||
|
|
||||||
|
// We call Gosched() when done because the GC process is expected to be long and we want to keep it at low priority,
|
||||||
|
// so other goroutines can continue serving requests.
|
||||||
|
defer runtime.Gosched()
|
||||||
|
|
||||||
|
pieceID := access.PieceID()
|
||||||
|
if filter.Contains(pieceID) {
|
||||||
|
// This piece is explicitly not trash. Move on.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the blob's mtime is at or after the createdBefore line, we can't safely delete it;
|
||||||
|
// it might not be trash. If it is, we can expect to get it next time.
|
||||||
|
//
|
||||||
|
// See the comment above the WalkSatellitePiecesToTrash() function for a discussion on the correctness
|
||||||
|
// of using ModTime in place of the more precise CreationTime.
|
||||||
|
mTime, err := access.ModTime(ctx)
|
||||||
|
if err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
// piece was deleted while we were scanning.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
piecesSkipped++
|
||||||
|
fw.log.Warn("failed to determine mtime of blob", zap.Error(err))
|
||||||
|
// but continue iterating.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if !mTime.Before(createdBefore) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
pieceIDs = append(pieceIDs, pieceID)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
return pieceIDs, piecesCount, piecesSkipped, errFileWalker.Wrap(err)
|
||||||
}
|
}
|
||||||
|
@ -1,110 +0,0 @@
|
|||||||
// Copyright (C) 2023 Storj Labs, Inc.
|
|
||||||
// See LICENSE for copying information.
|
|
||||||
|
|
||||||
package lazyfilewalker
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
|
||||||
|
|
||||||
"github.com/spacemonkeygo/monkit/v3"
|
|
||||||
"github.com/zeebo/errs"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
"golang.org/x/sys/execabs"
|
|
||||||
|
|
||||||
"storj.io/common/storj"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
errLazyFilewalker = errs.Class("lazyfilewalker")
|
|
||||||
|
|
||||||
mon = monkit.Package()
|
|
||||||
)
|
|
||||||
|
|
||||||
// Supervisor performs filewalker operations in a subprocess with lower I/O priority.
|
|
||||||
type Supervisor struct {
|
|
||||||
log *zap.Logger
|
|
||||||
|
|
||||||
executable string
|
|
||||||
gcArgs []string
|
|
||||||
usedSpaceArgs []string
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewSupervisor creates a new lazy filewalker Supervisor.
|
|
||||||
func NewSupervisor(log *zap.Logger, executable string, args []string) *Supervisor {
|
|
||||||
return &Supervisor{
|
|
||||||
log: log,
|
|
||||||
gcArgs: append([]string{"gc-filewalker"}, args...),
|
|
||||||
usedSpaceArgs: append([]string{"used-space-filewalker"}, args...),
|
|
||||||
executable: executable,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// UsedSpaceRequest is the request struct for the used-space-filewalker process.
|
|
||||||
type UsedSpaceRequest struct {
|
|
||||||
SatelliteID storj.NodeID `json:"satelliteID"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// UsedSpaceResponse is the response struct for the used-space-filewalker process.
|
|
||||||
type UsedSpaceResponse struct {
|
|
||||||
PiecesTotal int64 `json:"piecesTotal"`
|
|
||||||
PiecesContentSize int64 `json:"piecesContentSize"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// WalkAndComputeSpaceUsedBySatellite returns the total used space by satellite.
|
|
||||||
func (fw *Supervisor) WalkAndComputeSpaceUsedBySatellite(ctx context.Context, satelliteID storj.NodeID) (piecesTotal int64, piecesContentSize int64, err error) {
|
|
||||||
defer mon.Task()(&ctx)(&err)
|
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
req := UsedSpaceRequest{
|
|
||||||
SatelliteID: satelliteID,
|
|
||||||
}
|
|
||||||
|
|
||||||
fw.log.Info("starting subprocess", zap.String("satelliteID", satelliteID.String()))
|
|
||||||
cmd := execabs.CommandContext(ctx, fw.executable, fw.usedSpaceArgs...)
|
|
||||||
|
|
||||||
var buf, outbuf bytes.Buffer
|
|
||||||
writer := &zapWrapper{fw.log.Named("subprocess")}
|
|
||||||
|
|
||||||
// encode the struct and write it to the buffer
|
|
||||||
enc := json.NewEncoder(&buf)
|
|
||||||
if err := enc.Encode(req); err != nil {
|
|
||||||
return 0, 0, errLazyFilewalker.Wrap(err)
|
|
||||||
}
|
|
||||||
cmd.Stdin = &buf
|
|
||||||
cmd.Stdout = &outbuf
|
|
||||||
cmd.Stderr = writer
|
|
||||||
|
|
||||||
if err := cmd.Start(); err != nil {
|
|
||||||
fw.log.Error("failed to start subprocess", zap.Error(err))
|
|
||||||
return 0, 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
fw.log.Info("subprocess started", zap.String("satelliteID", satelliteID.String()))
|
|
||||||
|
|
||||||
if err := cmd.Wait(); err != nil {
|
|
||||||
var exitErr *execabs.ExitError
|
|
||||||
if errors.As(err, &exitErr) {
|
|
||||||
fw.log.Info("subprocess exited with status", zap.Int("status", exitErr.ExitCode()), zap.Error(exitErr), zap.String("satelliteID", satelliteID.String()))
|
|
||||||
} else {
|
|
||||||
fw.log.Error("subprocess exited with error", zap.Error(err), zap.String("satelliteID", satelliteID.String()))
|
|
||||||
}
|
|
||||||
return 0, 0, errLazyFilewalker.Wrap(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fw.log.Info("subprocess finished successfully", zap.String("satelliteID", satelliteID.String()), zap.Int64("piecesTotal", piecesTotal), zap.Int64("piecesContentSize", piecesContentSize))
|
|
||||||
|
|
||||||
// Decode and receive the response data struct from the subprocess
|
|
||||||
var resp UsedSpaceResponse
|
|
||||||
decoder := json.NewDecoder(&outbuf)
|
|
||||||
if err := decoder.Decode(&resp); err != nil {
|
|
||||||
fw.log.Error("failed to decode response from subprocess", zap.String("satelliteID", satelliteID.String()), zap.Error(err))
|
|
||||||
return 0, 0, errLazyFilewalker.Wrap(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return resp.PiecesTotal, resp.PiecesContentSize, nil
|
|
||||||
}
|
|
83
storagenode/pieces/lazyfilewalker/process.go
Normal file
83
storagenode/pieces/lazyfilewalker/process.go
Normal file
@ -0,0 +1,83 @@
|
|||||||
|
// Copyright (C) 2023 Storj Labs, Inc.
|
||||||
|
// See LICENSE for copying information.
|
||||||
|
|
||||||
|
package lazyfilewalker
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
"go.uber.org/zap"
|
||||||
|
"golang.org/x/sys/execabs"
|
||||||
|
)
|
||||||
|
|
||||||
|
// process is a subprocess that can be used to perform filewalker operations.
|
||||||
|
type process struct {
|
||||||
|
log *zap.Logger
|
||||||
|
executable string
|
||||||
|
args []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func newProcess(log *zap.Logger, executable string, args []string) *process {
|
||||||
|
return &process{
|
||||||
|
log: log,
|
||||||
|
executable: executable,
|
||||||
|
args: args,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// run runs the process and decodes the response into the value pointed by `resp`.
|
||||||
|
// It returns an error if the Process fails to start, or if the Process exits with a non-zero status.
|
||||||
|
// NOTE: the `resp` value must be a pointer to a struct.
|
||||||
|
func (p *process) run(ctx context.Context, req, resp interface{}) (err error) {
|
||||||
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
p.log.Info("starting subprocess")
|
||||||
|
|
||||||
|
var buf, outbuf bytes.Buffer
|
||||||
|
writer := &zapWrapper{p.log.Named("subprocess")}
|
||||||
|
|
||||||
|
// encode the struct and write it to the buffer
|
||||||
|
enc := json.NewEncoder(&buf)
|
||||||
|
if err := enc.Encode(req); err != nil {
|
||||||
|
return errLazyFilewalker.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cmd := execabs.CommandContext(ctx, p.executable, p.args...)
|
||||||
|
cmd.Stdin = &buf
|
||||||
|
cmd.Stdout = &outbuf
|
||||||
|
cmd.Stderr = writer
|
||||||
|
|
||||||
|
if err := cmd.Start(); err != nil {
|
||||||
|
p.log.Error("failed to start subprocess", zap.Error(err))
|
||||||
|
return errLazyFilewalker.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
p.log.Info("subprocess started")
|
||||||
|
|
||||||
|
if err := cmd.Wait(); err != nil {
|
||||||
|
var exitErr *execabs.ExitError
|
||||||
|
if errors.As(err, &exitErr) {
|
||||||
|
p.log.Info("subprocess exited with status", zap.Int("status", exitErr.ExitCode()), zap.Error(exitErr))
|
||||||
|
} else {
|
||||||
|
p.log.Error("subprocess exited with error", zap.Error(err))
|
||||||
|
}
|
||||||
|
return errLazyFilewalker.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
p.log.Info("subprocess finished successfully")
|
||||||
|
|
||||||
|
// Decode and receive the response data struct from the subprocess
|
||||||
|
decoder := json.NewDecoder(&outbuf)
|
||||||
|
if err := decoder.Decode(&resp); err != nil {
|
||||||
|
p.log.Error("failed to decode response from subprocess", zap.Error(err))
|
||||||
|
return errLazyFilewalker.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
120
storagenode/pieces/lazyfilewalker/supervisor.go
Normal file
120
storagenode/pieces/lazyfilewalker/supervisor.go
Normal file
@ -0,0 +1,120 @@
|
|||||||
|
// Copyright (C) 2023 Storj Labs, Inc.
|
||||||
|
// See LICENSE for copying information.
|
||||||
|
|
||||||
|
package lazyfilewalker
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/spacemonkeygo/monkit/v3"
|
||||||
|
"github.com/zeebo/errs"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"storj.io/common/bloomfilter"
|
||||||
|
"storj.io/common/storj"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// UsedSpaceFilewalkerCmdName is the name of the used-space-filewalker subcommand.
|
||||||
|
UsedSpaceFilewalkerCmdName = "used-space-filewalker"
|
||||||
|
// GCFilewalkerCmdName is the name of the gc-filewalker subcommand.
|
||||||
|
GCFilewalkerCmdName = "gc-filewalker"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
errLazyFilewalker = errs.Class("lazyfilewalker")
|
||||||
|
|
||||||
|
mon = monkit.Package()
|
||||||
|
)
|
||||||
|
|
||||||
|
// Supervisor manages the lazyfilewalker subprocesses.
|
||||||
|
//
|
||||||
|
// TODO: we should keep track of the number of subprocesses we have running and
|
||||||
|
// limit it to a configurable number, and queue them, since they are run per satellite.
|
||||||
|
type Supervisor struct {
|
||||||
|
log *zap.Logger
|
||||||
|
|
||||||
|
executable string
|
||||||
|
gcArgs []string
|
||||||
|
usedSpaceArgs []string
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewSupervisor creates a new lazy filewalker Supervisor.
|
||||||
|
func NewSupervisor(log *zap.Logger, executable string, args []string) *Supervisor {
|
||||||
|
return &Supervisor{
|
||||||
|
log: log,
|
||||||
|
gcArgs: append([]string{GCFilewalkerCmdName}, args...),
|
||||||
|
usedSpaceArgs: append([]string{UsedSpaceFilewalkerCmdName}, args...),
|
||||||
|
executable: executable,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// UsedSpaceRequest is the request struct for the used-space-filewalker process.
|
||||||
|
type UsedSpaceRequest struct {
|
||||||
|
SatelliteID storj.NodeID `json:"satelliteID"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// UsedSpaceResponse is the response struct for the used-space-filewalker process.
|
||||||
|
type UsedSpaceResponse struct {
|
||||||
|
PiecesTotal int64 `json:"piecesTotal"`
|
||||||
|
PiecesContentSize int64 `json:"piecesContentSize"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// GCFilewalkerRequest is the request struct for the gc-filewalker process.
|
||||||
|
type GCFilewalkerRequest struct {
|
||||||
|
SatelliteID storj.NodeID `json:"satelliteID"`
|
||||||
|
BloomFilter []byte `json:"bloomFilter"`
|
||||||
|
CreatedBefore time.Time `json:"createdBefore"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// GCFilewalkerResponse is the response struct for the gc-filewalker process.
|
||||||
|
type GCFilewalkerResponse struct {
|
||||||
|
PieceIDs []storj.PieceID `json:"pieceIDs"`
|
||||||
|
PiecesSkippedCount int64 `json:"piecesSkippedCount"`
|
||||||
|
PiecesCount int64 `json:"piecesCount"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// WalkAndComputeSpaceUsedBySatellite returns the total used space by satellite.
|
||||||
|
func (fw *Supervisor) WalkAndComputeSpaceUsedBySatellite(ctx context.Context, satelliteID storj.NodeID) (piecesTotal int64, piecesContentSize int64, err error) {
|
||||||
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
|
req := UsedSpaceRequest{
|
||||||
|
SatelliteID: satelliteID,
|
||||||
|
}
|
||||||
|
var resp UsedSpaceResponse
|
||||||
|
|
||||||
|
log := fw.log.Named(UsedSpaceFilewalkerCmdName).With(zap.String("satelliteID", satelliteID.String()))
|
||||||
|
|
||||||
|
err = newProcess(log, fw.executable, fw.usedSpaceArgs).run(ctx, req, &resp)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp.PiecesTotal, resp.PiecesContentSize, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// WalkSatellitePiecesToTrash returns a list of pieceIDs that need to be trashed for the given satellite.
|
||||||
|
func (fw *Supervisor) WalkSatellitePiecesToTrash(ctx context.Context, satelliteID storj.NodeID, createdBefore time.Time, filter *bloomfilter.Filter) (pieceIDs []storj.PieceID, piecesCount, piecesSkipped int64, err error) {
|
||||||
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
|
if filter == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
req := GCFilewalkerRequest{
|
||||||
|
SatelliteID: satelliteID,
|
||||||
|
BloomFilter: filter.Bytes(),
|
||||||
|
CreatedBefore: createdBefore,
|
||||||
|
}
|
||||||
|
var resp GCFilewalkerResponse
|
||||||
|
|
||||||
|
log := fw.log.Named(GCFilewalkerCmdName).With(zap.String("satelliteID", satelliteID.String()))
|
||||||
|
|
||||||
|
err = newProcess(log, fw.executable, fw.gcArgs).run(ctx, req, &resp)
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp.PieceIDs, resp.PiecesSkippedCount, resp.PiecesCount, nil
|
||||||
|
}
|
@ -14,6 +14,7 @@ import (
|
|||||||
"github.com/zeebo/errs"
|
"github.com/zeebo/errs"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"storj.io/common/bloomfilter"
|
||||||
"storj.io/common/memory"
|
"storj.io/common/memory"
|
||||||
"storj.io/common/pb"
|
"storj.io/common/pb"
|
||||||
"storj.io/common/storj"
|
"storj.io/common/storj"
|
||||||
@ -534,6 +535,27 @@ func (store *Store) WalkSatellitePieces(ctx context.Context, satellite storj.Nod
|
|||||||
return store.Filewalker.WalkSatellitePieces(ctx, satellite, walkFunc)
|
return store.Filewalker.WalkSatellitePieces(ctx, satellite, walkFunc)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SatellitePiecesToTrash returns a list of piece IDs that are trash for the given satellite.
|
||||||
|
//
|
||||||
|
// If the lazy filewalker is enabled, it will be used to find the pieces to trash, otherwise
|
||||||
|
// the regular filewalker will be used. If the lazy filewalker fails, the regular filewalker
|
||||||
|
// will be used as a fallback.
|
||||||
|
func (store *Store) SatellitePiecesToTrash(ctx context.Context, satelliteID storj.NodeID, createdBefore time.Time, filter *bloomfilter.Filter) (pieceIDs []storj.PieceID, piecesCount, piecesSkipped int64, err error) {
|
||||||
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
|
if store.config.EnableLazyFilewalker && store.lazyFilewalker != nil {
|
||||||
|
pieceIDs, piecesCount, piecesSkipped, err = store.lazyFilewalker.WalkSatellitePiecesToTrash(ctx, satelliteID, createdBefore, filter)
|
||||||
|
if err == nil {
|
||||||
|
return pieceIDs, piecesCount, piecesSkipped, nil
|
||||||
|
}
|
||||||
|
store.log.Error("lazyfilewalker failed", zap.Error(err))
|
||||||
|
}
|
||||||
|
// fallback to the regular filewalker
|
||||||
|
pieceIDs, piecesCount, piecesSkipped, err = store.Filewalker.WalkSatellitePiecesToTrash(ctx, satelliteID, createdBefore, filter)
|
||||||
|
|
||||||
|
return pieceIDs, piecesCount, piecesSkipped, err
|
||||||
|
}
|
||||||
|
|
||||||
// GetExpired gets piece IDs that are expired and were created before the given time.
|
// GetExpired gets piece IDs that are expired and were created before the given time.
|
||||||
func (store *Store) GetExpired(ctx context.Context, expiredAt time.Time, limit int64) (_ []ExpiredInfo, err error) {
|
func (store *Store) GetExpired(ctx context.Context, expiredAt time.Time, limit int64) (_ []ExpiredInfo, err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
@ -5,8 +5,6 @@ package retain
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"os"
|
|
||||||
"runtime"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -298,52 +296,6 @@ func (s *Service) Status() Status {
|
|||||||
return s.config.Status
|
return s.config.Status
|
||||||
}
|
}
|
||||||
|
|
||||||
// ------------------------------------------------------------------------------------------------
|
|
||||||
// On the correctness of using access.ModTime() in place of the more precise access.CreationTime()
|
|
||||||
// in retainPieces():
|
|
||||||
// ------------------------------------------------------------------------------------------------
|
|
||||||
//
|
|
||||||
// Background: for pieces not stored with storage.FormatV0, the access.CreationTime() value can
|
|
||||||
// only be retrieved by opening the piece file, and reading and unmarshaling the piece header.
|
|
||||||
// This is far slower than access.ModTime(), which gets the file modification time from the file
|
|
||||||
// system and only needs to do a stat(2) on the piece file. If we can make Retain() work with
|
|
||||||
// ModTime, we should.
|
|
||||||
//
|
|
||||||
// Possibility of mismatch: We do not force or require piece file modification times to be equal to
|
|
||||||
// or close to the CreationTime specified by the uplink, but we do expect that piece files will be
|
|
||||||
// written to the filesystem _after_ the CreationTime. We make the assumption already that storage
|
|
||||||
// nodes and satellites and uplinks have system clocks that are very roughly in sync (that is, they
|
|
||||||
// are out of sync with each other by less than an hour of real time, or whatever is configured as
|
|
||||||
// MaxTimeSkew). So if an uplink is not lying about CreationTime and it uploads a piece that
|
|
||||||
// makes it to a storagenode's disk as quickly as possible, even in the worst-synchronized-clocks
|
|
||||||
// case we can assume that `ModTime > (CreationTime - MaxTimeSkew)`. We also allow for storage
|
|
||||||
// node operators doing file system manipulations after a piece has been written. If piece files
|
|
||||||
// are copied between volumes and their attributes are not preserved, it will be possible for their
|
|
||||||
// modification times to be changed to something later in time. This still preserves the inequality
|
|
||||||
// relationship mentioned above, `ModTime > (CreationTime - MaxTimeSkew)`. We only stipulate
|
|
||||||
// that storage node operators must not artificially change blob file modification times to be in
|
|
||||||
// the past.
|
|
||||||
//
|
|
||||||
// If there is a mismatch: in most cases, a mismatch between ModTime and CreationTime has no
|
|
||||||
// effect. In certain remaining cases, the only effect is that a piece file which _should_ be
|
|
||||||
// garbage collected survives until the next round of garbage collection. The only really
|
|
||||||
// problematic case is when there is a relatively new piece file which was created _after_ this
|
|
||||||
// node's Retain bloom filter started being built on the satellite, and is recorded in this
|
|
||||||
// storage node's blob store before the Retain operation has completed. Then, it might be possible
|
|
||||||
// for that new piece to be garbage collected incorrectly, because it does not show up in the
|
|
||||||
// bloom filter and the node incorrectly thinks that it was created before the bloom filter.
|
|
||||||
// But if the uplink is not lying about CreationTime and its clock drift versus the storage node
|
|
||||||
// is less than `MaxTimeSkew`, and the ModTime on a blob file is correctly set from the
|
|
||||||
// storage node system time, then it is still true that `ModTime > (CreationTime -
|
|
||||||
// MaxTimeSkew)`.
|
|
||||||
//
|
|
||||||
// The rule that storage node operators need to be aware of is only this: do not artificially set
|
|
||||||
// mtimes on blob files to be in the past. Let the filesystem manage mtimes. If blob files need to
|
|
||||||
// be moved or copied between locations, and this updates the mtime, that is ok. A secondary effect
|
|
||||||
// of this rule is that if the storage node's system clock needs to be changed forward by a
|
|
||||||
// nontrivial amount, mtimes on existing blobs should also be adjusted (by the same interval,
|
|
||||||
// ideally, but just running "touch" on all blobs is sufficient to avoid incorrect deletion of
|
|
||||||
// data).
|
|
||||||
func (s *Service) retainPieces(ctx context.Context, req Request) (err error) {
|
func (s *Service) retainPieces(ctx context.Context, req Request) (err error) {
|
||||||
// if retain status is disabled, return immediately
|
// if retain status is disabled, return immediately
|
||||||
if s.config.Status == Disabled {
|
if s.config.Status == Disabled {
|
||||||
@ -352,9 +304,6 @@ func (s *Service) retainPieces(ctx context.Context, req Request) (err error) {
|
|||||||
|
|
||||||
defer mon.Task()(&ctx, req.SatelliteID, req.CreatedBefore)(&err)
|
defer mon.Task()(&ctx, req.SatelliteID, req.CreatedBefore)(&err)
|
||||||
|
|
||||||
var piecesCount int64
|
|
||||||
var piecesSkipped int64
|
|
||||||
var piecesToDeleteCount int64
|
|
||||||
numDeleted := 0
|
numDeleted := 0
|
||||||
satelliteID := req.SatelliteID
|
satelliteID := req.SatelliteID
|
||||||
filter := req.Filter
|
filter := req.Filter
|
||||||
@ -373,48 +322,20 @@ func (s *Service) retainPieces(ctx context.Context, req Request) (err error) {
|
|||||||
zap.Int64("Filter Size", filter.Size()),
|
zap.Int64("Filter Size", filter.Size()),
|
||||||
zap.Stringer("Satellite ID", satelliteID))
|
zap.Stringer("Satellite ID", satelliteID))
|
||||||
|
|
||||||
err = s.store.WalkSatellitePieces(ctx, satelliteID, func(access pieces.StoredPieceAccess) (err error) {
|
pieceIDs, piecesCount, piecesSkipped, err := s.store.SatellitePiecesToTrash(ctx, satelliteID, createdBefore, filter)
|
||||||
defer mon.Task()(&ctx)(&err)
|
|
||||||
piecesCount++
|
|
||||||
|
|
||||||
// We call Gosched() when done because the GC process is expected to be long and we want to keep it at low priority,
|
|
||||||
// so other goroutines can continue serving requests.
|
|
||||||
defer runtime.Gosched()
|
|
||||||
|
|
||||||
pieceID := access.PieceID()
|
|
||||||
if filter.Contains(pieceID) {
|
|
||||||
// This piece is explicitly not trash. Move on.
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the blob's mtime is at or after the createdBefore line, we can't safely delete it;
|
|
||||||
// it might not be trash. If it is, we can expect to get it next time.
|
|
||||||
//
|
|
||||||
// See the comment above the retainPieces() function for a discussion on the correctness
|
|
||||||
// of using ModTime in place of the more precise CreationTime.
|
|
||||||
mTime, err := access.ModTime(ctx)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if os.IsNotExist(err) {
|
return Error.Wrap(err)
|
||||||
// piece was deleted while we were scanning.
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
piecesSkipped++
|
piecesToDeleteCount := len(pieceIDs)
|
||||||
s.log.Warn("failed to determine mtime of blob", zap.Error(err))
|
|
||||||
// but continue iterating.
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if !mTime.Before(createdBefore) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
|
for i := range pieceIDs {
|
||||||
|
pieceID := pieceIDs[i]
|
||||||
s.log.Debug("About to move piece to trash",
|
s.log.Debug("About to move piece to trash",
|
||||||
zap.Stringer("Satellite ID", satelliteID),
|
zap.Stringer("Satellite ID", satelliteID),
|
||||||
zap.Stringer("Piece ID", pieceID),
|
zap.Stringer("Piece ID", pieceID),
|
||||||
zap.String("Status", s.config.Status.String()))
|
zap.String("Status", s.config.Status.String()))
|
||||||
|
|
||||||
piecesToDeleteCount++
|
|
||||||
|
|
||||||
// if retain status is enabled, delete pieceid
|
// if retain status is enabled, delete pieceid
|
||||||
if s.config.Status == Enabled {
|
if s.config.Status == Enabled {
|
||||||
if err = s.trash(ctx, satelliteID, pieceID); err != nil {
|
if err = s.trash(ctx, satelliteID, pieceID); err != nil {
|
||||||
@ -426,21 +347,10 @@ func (s *Service) retainPieces(ctx context.Context, req Request) (err error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
numDeleted++
|
numDeleted++
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return Error.Wrap(err)
|
|
||||||
}
|
}
|
||||||
mon.IntVal("garbage_collection_pieces_count").Observe(piecesCount)
|
mon.IntVal("garbage_collection_pieces_count").Observe(piecesCount)
|
||||||
mon.IntVal("garbage_collection_pieces_skipped").Observe(piecesSkipped)
|
mon.IntVal("garbage_collection_pieces_skipped").Observe(piecesSkipped)
|
||||||
mon.IntVal("garbage_collection_pieces_to_delete_count").Observe(piecesToDeleteCount)
|
mon.IntVal("garbage_collection_pieces_to_delete_count").Observe(int64(piecesToDeleteCount))
|
||||||
mon.IntVal("garbage_collection_pieces_deleted").Observe(int64(numDeleted))
|
mon.IntVal("garbage_collection_pieces_deleted").Observe(int64(numDeleted))
|
||||||
mon.DurationVal("garbage_collection_loop_duration").Observe(time.Now().UTC().Sub(started))
|
mon.DurationVal("garbage_collection_loop_duration").Observe(time.Now().UTC().Sub(started))
|
||||||
s.log.Info("Moved pieces to trash during retain", zap.Int("num deleted", numDeleted), zap.String("Retain Status", s.config.Status.String()))
|
s.log.Info("Moved pieces to trash during retain", zap.Int("num deleted", numDeleted), zap.String("Retain Status", s.config.Status.String()))
|
||||||
|
Loading…
Reference in New Issue
Block a user