storage/filestore: avoid stat() during walkNamespaceInPath

Calling stat() (really, lstat()) on every file during a directory walk
is the step that takes up the most time. Furthermore, not all directory
walk uses _need_ to have a stat done on every file. Therefore, in this
commit we avoid doing the stat at the lowest level of
walkNamespaceInPath. The stat will still be done when it is requested,
with the Stat() method on the blobInfo object.

The major upside of this is that we can avoid the stat call on most
files during a Retain operation. This should speed up garbage collection
considerably.

The major downside is that walkNamespaceInPath will no longer
automatically skip over directories that are named like blob files, or
blob files which are deleted between readdir() and stat(). Callers to
walkNamespaceInPath and its variants (WalkNamespace,
WalkSatellitePieces, etc) are now expected to handle these cases
individually.

Thanks to forum member Toyoo for the insight that this would speed up
garbage collection.

Refs: https://github.com/storj/storj/issues/5454
Change-Id: I72930573d58928fa25057ed89cd4ec474b884199
This commit is contained in:
paul cannon 2023-01-29 15:16:19 -06:00 committed by Storj Robot
parent facbd65882
commit ed7c82439d
5 changed files with 80 additions and 51 deletions

View File

@ -424,17 +424,20 @@ func (dir *Dir) ReplaceTrashnow(trashnow func() time.Time) {
// RestoreTrash moves every piece in the trash folder back into blobsdir. // RestoreTrash moves every piece in the trash folder back into blobsdir.
func (dir *Dir) RestoreTrash(ctx context.Context, namespace []byte) (keysRestored [][]byte, err error) { func (dir *Dir) RestoreTrash(ctx context.Context, namespace []byte) (keysRestored [][]byte, err error) {
var errorsEncountered errs.Group
err = dir.walkNamespaceInPath(ctx, namespace, dir.trashdir(), func(info storage.BlobInfo) error { err = dir.walkNamespaceInPath(ctx, namespace, dir.trashdir(), func(info storage.BlobInfo) error {
blobsBasePath, err := dir.blobToBasePath(info.BlobRef()) blobsBasePath, err := dir.blobToBasePath(info.BlobRef())
if err != nil { if err != nil {
return err errorsEncountered.Add(err)
return nil
} }
blobsVerPath := blobPathForFormatVersion(blobsBasePath, info.StorageFormatVersion()) blobsVerPath := blobPathForFormatVersion(blobsBasePath, info.StorageFormatVersion())
trashBasePath, err := dir.refToDirPath(info.BlobRef(), dir.trashdir()) trashBasePath, err := dir.refToDirPath(info.BlobRef(), dir.trashdir())
if err != nil { if err != nil {
return err errorsEncountered.Add(err)
return nil
} }
trashVerPath := blobPathForFormatVersion(trashBasePath, info.StorageFormatVersion()) trashVerPath := blobPathForFormatVersion(trashBasePath, info.StorageFormatVersion())
@ -442,7 +445,8 @@ func (dir *Dir) RestoreTrash(ctx context.Context, namespace []byte) (keysRestore
// ensure the dirs exist for blobs path // ensure the dirs exist for blobs path
err = os.MkdirAll(filepath.Dir(blobsVerPath), dirPermission) err = os.MkdirAll(filepath.Dir(blobsVerPath), dirPermission)
if err != nil && !os.IsExist(err) { if err != nil && !os.IsExist(err) {
return err errorsEncountered.Add(err)
return nil
} }
// move back to blobsdir // move back to blobsdir
@ -454,13 +458,15 @@ func (dir *Dir) RestoreTrash(ctx context.Context, namespace []byte) (keysRestore
return nil return nil
} }
if err != nil { if err != nil {
return err errorsEncountered.Add(err)
return nil
} }
keysRestored = append(keysRestored, info.BlobRef().Key) keysRestored = append(keysRestored, info.BlobRef().Key)
return nil return nil
}) })
return keysRestored, err errorsEncountered.Add(err)
return keysRestored, errorsEncountered.Err()
} }
// EmptyTrash walks the trash files for the given namespace and deletes any // EmptyTrash walks the trash files for the given namespace and deletes any
@ -468,27 +474,33 @@ func (dir *Dir) RestoreTrash(ctx context.Context, namespace []byte) (keysRestore
// Trash is called. // Trash is called.
func (dir *Dir) EmptyTrash(ctx context.Context, namespace []byte, trashedBefore time.Time) (bytesEmptied int64, deletedKeys [][]byte, err error) { func (dir *Dir) EmptyTrash(ctx context.Context, namespace []byte, trashedBefore time.Time) (bytesEmptied int64, deletedKeys [][]byte, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
var errorsEncountered errs.Group
err = dir.walkNamespaceInPath(ctx, namespace, dir.trashdir(), func(blobInfo storage.BlobInfo) error { err = dir.walkNamespaceInPath(ctx, namespace, dir.trashdir(), func(blobInfo storage.BlobInfo) error {
fileInfo, err := blobInfo.Stat(ctx) fileInfo, err := blobInfo.Stat(ctx)
if err != nil { if err != nil {
return err if os.IsNotExist(err) {
return nil
}
if !errors.Is(err, ErrIsDir) {
errorsEncountered.Add(Error.New("%s: %s", fileInfo.Name(), err))
}
return nil
} }
mtime := fileInfo.ModTime() mtime := fileInfo.ModTime()
if mtime.Before(trashedBefore) { if mtime.Before(trashedBefore) {
err = dir.deleteWithStorageFormatInPath(ctx, dir.trashdir(), blobInfo.BlobRef(), blobInfo.StorageFormatVersion()) err = dir.deleteWithStorageFormatInPath(ctx, dir.trashdir(), blobInfo.BlobRef(), blobInfo.StorageFormatVersion())
if err != nil { if err != nil {
return err errorsEncountered.Add(err)
return nil
} }
deletedKeys = append(deletedKeys, blobInfo.BlobRef().Key) deletedKeys = append(deletedKeys, blobInfo.BlobRef().Key)
bytesEmptied += fileInfo.Size() bytesEmptied += fileInfo.Size()
} }
return nil return nil
}) })
if err != nil { errorsEncountered.Add(err)
return 0, nil, err return bytesEmptied, deletedKeys, errorsEncountered.Err()
}
return bytesEmptied, deletedKeys, nil
} }
// iterateStorageFormatVersions executes f for all storage format versions, // iterateStorageFormatVersions executes f for all storage format versions,
@ -735,8 +747,8 @@ func (dir *Dir) walkNamespaceInPath(ctx context.Context, namespace []byte, path
} }
} }
func decodeBlobInfo(namespace []byte, keyPrefix, keyDir string, keyInfo os.FileInfo) (info storage.BlobInfo, ok bool) { func decodeBlobInfo(namespace []byte, keyPrefix, keyDir, name string) (info storage.BlobInfo, ok bool) {
blobFileName := keyInfo.Name() blobFileName := name
encodedKey := keyPrefix + blobFileName encodedKey := keyPrefix + blobFileName
formatVer := FormatV0 formatVer := FormatV0
if strings.HasSuffix(blobFileName, v1PieceFileSuffix) { if strings.HasSuffix(blobFileName, v1PieceFileSuffix) {
@ -751,7 +763,7 @@ func decodeBlobInfo(namespace []byte, keyPrefix, keyDir string, keyInfo os.FileI
Namespace: namespace, Namespace: namespace,
Key: key, Key: key,
} }
return newBlobInfo(ref, filepath.Join(keyDir, blobFileName), keyInfo, formatVer), true return newBlobInfo(ref, filepath.Join(keyDir, blobFileName), nil, formatVer), true
} }
func walkNamespaceWithPrefix(ctx context.Context, log *zap.Logger, namespace []byte, nsDir, keyPrefix string, walkFunc func(storage.BlobInfo) error) (err error) { func walkNamespaceWithPrefix(ctx context.Context, log *zap.Logger, namespace []byte, nsDir, keyPrefix string, walkFunc func(storage.BlobInfo) error) (err error) {
@ -777,25 +789,7 @@ func walkNamespaceWithPrefix(ctx context.Context, log *zap.Logger, namespace []b
return err return err
} }
for _, name := range names { for _, name := range names {
info, err := os.Lstat(keyDir + "/" + name) blobInfo, ok := decodeBlobInfo(namespace, keyPrefix, keyDir, name)
if err != nil {
if os.IsNotExist(err) {
continue
}
// convert to lowercase the perr.Op because Go reports inconsistently
// "lstat" in Linux and "Lstat" in Windows
var perr *os.PathError
if errors.As(err, &perr) && strings.ToLower(perr.Op) == "lstat" {
log.Error("Unable to read the disk, please verify the disk is not corrupt")
}
return errs.Wrap(err)
}
if info.Mode().IsDir() {
continue
}
blobInfo, ok := decodeBlobInfo(namespace, keyPrefix, keyDir, info)
if !ok { if !ok {
continue continue
} }
@ -874,6 +868,16 @@ func (info *blobInfo) StorageFormatVersion() storage.FormatVersion {
} }
func (info *blobInfo) Stat(ctx context.Context) (os.FileInfo, error) { func (info *blobInfo) Stat(ctx context.Context) (os.FileInfo, error) {
if info.fileInfo == nil {
fileInfo, err := os.Lstat(info.path)
if err != nil {
return nil, err
}
if fileInfo.Mode().IsDir() {
return fileInfo, ErrIsDir
}
info.fileInfo = fileInfo
}
return info.fileInfo, nil return info.fileInfo, nil
} }

View File

@ -24,6 +24,9 @@ import (
var ( var (
// Error is the default filestore error class. // Error is the default filestore error class.
Error = errs.Class("filestore error") Error = errs.Class("filestore error")
// ErrIsDir is the error returned when we encounter a directory named like a blob file
// while traversing a blob namespace.
ErrIsDir = Error.New("file is a directory")
mon = monkit.Package() mon = monkit.Package()

View File

@ -5,6 +5,7 @@ package gracefulexit
import ( import (
"context" "context"
"os"
"time" "time"
"github.com/zeebo/errs" "github.com/zeebo/errs"
@ -118,6 +119,9 @@ func (c *Service) deleteSatellitePieces(ctx context.Context, satelliteID storj.N
err = c.store.WalkSatellitePieces(ctx, satelliteID, func(piece pieces.StoredPieceAccess) error { err = c.store.WalkSatellitePieces(ctx, satelliteID, func(piece pieces.StoredPieceAccess) error {
err := c.store.Delete(ctx, satelliteID, piece.PieceID()) err := c.store.Delete(ctx, satelliteID, piece.PieceID())
if err != nil { if err != nil {
if os.IsNotExist(err) {
return nil
}
logger.Error("failed to delete piece", logger.Error("failed to delete piece",
zap.Stringer("Piece ID", piece.PieceID()), zap.Error(err)) zap.Stringer("Piece ID", piece.PieceID()), zap.Error(err))
// but continue // but continue

View File

@ -645,6 +645,9 @@ func (store *Store) SpaceUsedBySatellite(ctx context.Context, satelliteID storj.
err = store.WalkSatellitePieces(ctx, satelliteID, func(access StoredPieceAccess) error { err = store.WalkSatellitePieces(ctx, satelliteID, func(access StoredPieceAccess) error {
pieceTotal, pieceContentSize, statErr := access.Size(ctx) pieceTotal, pieceContentSize, statErr := access.Size(ctx)
if statErr != nil { if statErr != nil {
if os.IsNotExist(statErr) {
return nil
}
store.log.Error("failed to stat", zap.Error(statErr), zap.Stringer("Piece ID", access.PieceID()), zap.Stringer("Satellite ID", satelliteID)) store.log.Error("failed to stat", zap.Error(statErr), zap.Stringer("Piece ID", access.PieceID()), zap.Stringer("Satellite ID", satelliteID))
// keep iterating; we want a best effort total here. // keep iterating; we want a best effort total here.
return nil return nil
@ -678,6 +681,9 @@ func (store *Store) SpaceUsedTotalAndBySatellite(ctx context.Context) (piecesTot
err := store.WalkSatellitePieces(ctx, satelliteID, func(access StoredPieceAccess) error { err := store.WalkSatellitePieces(ctx, satelliteID, func(access StoredPieceAccess) error {
pieceTotal, pieceContentSize, err := access.Size(ctx) pieceTotal, pieceContentSize, err := access.Size(ctx)
if err != nil { if err != nil {
if os.IsNotExist(err) {
return nil
}
return err return err
} }
satPiecesTotal += pieceTotal satPiecesTotal += pieceTotal

View File

@ -5,6 +5,7 @@ package retain
import ( import (
"context" "context"
"os"
"runtime" "runtime"
"sync" "sync"
"time" "time"
@ -379,40 +380,51 @@ func (s *Service) retainPieces(ctx context.Context, req Request) (err error) {
// We call Gosched() when done because the GC process is expected to be long and we want to keep it at low priority, // We call Gosched() when done because the GC process is expected to be long and we want to keep it at low priority,
// so other goroutines can continue serving requests. // so other goroutines can continue serving requests.
defer runtime.Gosched() defer runtime.Gosched()
pieceID := access.PieceID()
if filter.Contains(pieceID) {
// This piece is explicitly not trash. Move on.
return nil
}
// If the blob's mtime is at or after the createdBefore line, we can't safely delete it;
// it might not be trash. If it is, we can expect to get it next time.
//
// See the comment above the retainPieces() function for a discussion on the correctness // See the comment above the retainPieces() function for a discussion on the correctness
// of using ModTime in place of the more precise CreationTime. // of using ModTime in place of the more precise CreationTime.
mTime, err := access.ModTime(ctx) mTime, err := access.ModTime(ctx)
if err != nil { if err != nil {
if os.IsNotExist(err) {
// piece was deleted while we were scanning.
return nil
}
piecesSkipped++ piecesSkipped++
s.log.Warn("failed to determine mtime of blob", zap.Error(err)) s.log.Warn("failed to determine mtime of blob", zap.Error(err))
// but continue iterating. // but continue iterating.
return nil return nil
} }
if !mTime.Before(createdBefore) { if !mTime.Before(createdBefore) {
return nil return nil
} }
pieceID := access.PieceID()
if !filter.Contains(pieceID) {
s.log.Debug("About to move piece to trash",
zap.Stringer("Satellite ID", satelliteID),
zap.Stringer("Piece ID", pieceID),
zap.String("Status", s.config.Status.String()))
piecesToDeleteCount++ s.log.Debug("About to move piece to trash",
zap.Stringer("Satellite ID", satelliteID),
zap.Stringer("Piece ID", pieceID),
zap.String("Status", s.config.Status.String()))
// if retain status is enabled, delete pieceid piecesToDeleteCount++
if s.config.Status == Enabled {
if err = s.trash(ctx, satelliteID, pieceID); err != nil { // if retain status is enabled, delete pieceid
s.log.Warn("failed to delete piece", if s.config.Status == Enabled {
zap.Stringer("Satellite ID", satelliteID), if err = s.trash(ctx, satelliteID, pieceID); err != nil {
zap.Stringer("Piece ID", pieceID), s.log.Warn("failed to delete piece",
zap.Error(err)) zap.Stringer("Satellite ID", satelliteID),
return nil zap.Stringer("Piece ID", pieceID),
} zap.Error(err))
return nil
} }
numDeleted++
} }
numDeleted++
select { select {
case <-ctx.Done(): case <-ctx.Done():