// Copyright (C) 2019 Storj Labs, Inc. // See LICENSE for copying information. package filestore import ( "bytes" "context" "encoding/base32" "errors" "fmt" "io" "io/fs" "math" "os" "path/filepath" "strings" "sync" "time" "github.com/zeebo/errs" "go.uber.org/zap" "storj.io/common/experiment" "storj.io/common/storj" "storj.io/storj/storagenode/blobstore" ) const ( blobPermission = 0600 dirPermission = 0700 v0PieceFileSuffix = "" v1PieceFileSuffix = ".sj1" unknownPieceFileSuffix = "/..error_unknown_format../" verificationFileName = "storage-dir-verification" ) var pathEncoding = base32.NewEncoding("abcdefghijklmnopqrstuvwxyz234567").WithPadding(base32.NoPadding) // Dir represents single folder for storing blobs. type Dir struct { log *zap.Logger path string mu sync.Mutex deleteQueue []string trashnow func() time.Time // the function used by trash to determine "now" } // OpenDir opens existing folder for storing blobs. func OpenDir(log *zap.Logger, path string) (*Dir, error) { dir := &Dir{ log: log, path: path, trashnow: time.Now, } stat := func(path string) error { _, err := os.Stat(path) return err } return dir, errs.Combine( stat(dir.blobsdir()), stat(dir.tempdir()), stat(dir.garbagedir()), stat(dir.trashdir()), ) } // NewDir returns folder for storing blobs. func NewDir(log *zap.Logger, path string) (*Dir, error) { dir := &Dir{ log: log, path: path, trashnow: time.Now, } return dir, errs.Combine( os.MkdirAll(dir.blobsdir(), dirPermission), os.MkdirAll(dir.tempdir(), dirPermission), os.MkdirAll(dir.garbagedir(), dirPermission), os.MkdirAll(dir.trashdir(), dirPermission), ) } // Path returns the directory path. func (dir *Dir) Path() string { return dir.path } // blobsdir is the sub-directory containing the blobs. func (dir *Dir) blobsdir() string { return filepath.Join(dir.path, "blobs") } // tempdir is used for temp files prior to being moved into blobsdir. func (dir *Dir) tempdir() string { return filepath.Join(dir.path, "temp") } // garbagedir contains files that failed to delete but should be deleted. func (dir *Dir) garbagedir() string { return filepath.Join(dir.path, "garbage") } // trashdir contains files staged for deletion for a period of time. func (dir *Dir) trashdir() string { return filepath.Join(dir.path, "trash") } // CreateVerificationFile creates a file to be used for storage directory verification. func (dir *Dir) CreateVerificationFile(ctx context.Context, id storj.NodeID) error { f, err := os.Create(filepath.Join(dir.path, verificationFileName)) if err != nil { return err } defer func() { err = errs.Combine(err, f.Close()) }() _, err = f.Write(id.Bytes()) return err } // Verify verifies that the storage directory is correct by checking for the existence and validity // of the verification file. func (dir *Dir) Verify(ctx context.Context, id storj.NodeID) error { content, err := os.ReadFile(filepath.Join(dir.path, verificationFileName)) if err != nil { return err } if !bytes.Equal(content, id.Bytes()) { verifyID, err := storj.NodeIDFromBytes(content) if err != nil { return errs.New("content of file is not a valid node ID: %x", content) } return errs.New("node ID in file (%s) does not match running node's ID (%s)", verifyID, id.String()) } return nil } // CreateTemporaryFile creates a preallocated temporary file in the temp directory // prealloc preallocates file to make writing faster. func (dir *Dir) CreateTemporaryFile(ctx context.Context, prealloc int64) (_ *os.File, err error) { const preallocLimit = 5 << 20 // 5 MB if prealloc > preallocLimit { prealloc = preallocLimit } file, err := os.CreateTemp(dir.tempdir(), "blob-*.partial") if err != nil { return nil, err } if prealloc >= 0 { if err := file.Truncate(prealloc); err != nil { return nil, errs.Combine(err, file.Close()) } } return file, nil } // DeleteTemporary deletes a temporary file. func (dir *Dir) DeleteTemporary(ctx context.Context, file *os.File) (err error) { defer mon.Task()(&ctx)(&err) closeErr := file.Close() return errs.Combine(closeErr, os.Remove(file.Name())) } // blobToBasePath converts a blob reference to a filepath in permanent storage. This may not be the // entire path; blobPathForFormatVersion() must also be used. This is a separate call because this // part of the filepath is constant, and blobPathForFormatVersion may need to be called multiple // times with different storage.FormatVersion values. func (dir *Dir) blobToBasePath(ref blobstore.BlobRef) (string, error) { return dir.refToDirPath(ref, dir.blobsdir()) } // refToDirPath converts a blob reference to a filepath in the specified sub-directory. func (dir *Dir) refToDirPath(ref blobstore.BlobRef, subDir string) (string, error) { if !ref.IsValid() { return "", blobstore.ErrInvalidBlobRef.New("") } namespace := pathEncoding.EncodeToString(ref.Namespace) key := pathEncoding.EncodeToString(ref.Key) if len(key) < 3 { // ensure we always have enough characters to split [:2] and [2:] key = "11" + key } return filepath.Join(subDir, namespace, key[:2], key[2:]), nil } // fileConfirmedInTrash returns true if it is able to confirm the file is in // the trash. On errors, or if the file is not in the trash, it returns false. func (dir *Dir) fileConfirmedInTrash(ctx context.Context, ref blobstore.BlobRef, formatVer blobstore.FormatVersion) bool { trashBasePath, err := dir.refToDirPath(ref, dir.trashdir()) if err != nil { return false } trashVerPath := blobPathForFormatVersion(trashBasePath, formatVer) _, err = os.Stat(trashVerPath) return err == nil } // blobPathForFormatVersion adjusts a bare blob path (as might have been generated by a call to // blobToBasePath()) to what it should be for the given storage format version. func blobPathForFormatVersion(path string, formatVersion blobstore.FormatVersion) string { switch formatVersion { case FormatV0: return path + v0PieceFileSuffix case FormatV1: return path + v1PieceFileSuffix } return path + unknownPieceFileSuffix } // blobToGarbagePath converts a blob reference to a filepath in transient // storage. The files in garbage are deleted on an interval (in case the // initial deletion didn't work for some reason). func (dir *Dir) blobToGarbagePath(ref blobstore.BlobRef) string { var name []byte name = append(name, ref.Namespace...) name = append(name, ref.Key...) return filepath.Join(dir.garbagedir(), pathEncoding.EncodeToString(name)) } // Commit commits the temporary file to permanent storage. func (dir *Dir) Commit(ctx context.Context, file *os.File, ref blobstore.BlobRef, formatVersion blobstore.FormatVersion) (err error) { defer mon.Task()(&ctx)(&err) position, seekErr := file.Seek(0, io.SeekCurrent) truncErr := file.Truncate(position) var syncErr error if !experiment.Has(ctx, "nosync") { syncErr = file.Sync() } chmodErr := os.Chmod(file.Name(), blobPermission) closeErr := file.Close() if seekErr != nil || truncErr != nil || syncErr != nil || chmodErr != nil || closeErr != nil { removeErr := os.Remove(file.Name()) return errs.Combine(seekErr, truncErr, syncErr, chmodErr, closeErr, removeErr) } path, err := dir.blobToBasePath(ref) if err != nil { removeErr := os.Remove(file.Name()) return errs.Combine(err, removeErr) } path = blobPathForFormatVersion(path, formatVersion) mkdirErr := os.MkdirAll(filepath.Dir(path), dirPermission) if os.IsExist(mkdirErr) { mkdirErr = nil } if mkdirErr != nil { removeErr := os.Remove(file.Name()) return errs.Combine(mkdirErr, removeErr) } renameErr := rename(file.Name(), path) if renameErr != nil { removeErr := os.Remove(file.Name()) return errs.Combine(renameErr, removeErr) } return nil } // Open opens the file with the specified ref. It may need to check in more than one location in // order to find the blob, if it was stored with an older version of the storage node software. // In cases where the storage format version of a blob is already known, OpenWithStorageFormat() // will generally be a better choice. func (dir *Dir) Open(ctx context.Context, ref blobstore.BlobRef) (_ *os.File, _ blobstore.FormatVersion, err error) { defer mon.Task()(&ctx)(&err) path, err := dir.blobToBasePath(ref) if err != nil { return nil, FormatV0, err } for formatVer := MaxFormatVersionSupported; formatVer >= MinFormatVersionSupported; formatVer-- { vPath := blobPathForFormatVersion(path, formatVer) file, err := openFileReadOnly(vPath, blobPermission) if err == nil { return file, formatVer, nil } if os.IsNotExist(err) { // Check and monitor if the file is in the trash if dir.fileConfirmedInTrash(ctx, ref, formatVer) { monFileInTrash(ref.Namespace).Mark(1) } } else { return nil, FormatV0, Error.New("unable to open %q: %v", vPath, err) } } return nil, FormatV0, os.ErrNotExist } // OpenWithStorageFormat opens an already-located blob file with a known storage format version, // which avoids the potential need to search through multiple storage formats to find the blob. func (dir *Dir) OpenWithStorageFormat(ctx context.Context, ref blobstore.BlobRef, formatVer blobstore.FormatVersion) (_ *os.File, err error) { defer mon.Task()(&ctx)(&err) path, err := dir.blobToBasePath(ref) if err != nil { return nil, err } vPath := blobPathForFormatVersion(path, formatVer) file, err := openFileReadOnly(vPath, blobPermission) if err == nil { return file, nil } if os.IsNotExist(err) { // Check and monitor if the file is in the trash if dir.fileConfirmedInTrash(ctx, ref, formatVer) { monFileInTrash(ref.Namespace).Mark(1) } return nil, err } return nil, Error.New("unable to open %q: %v", vPath, err) } // Stat looks up disk metadata on the blob file. It may need to check in more than one location // in order to find the blob, if it was stored with an older version of the storage node software. // In cases where the storage format version of a blob is already known, StatWithStorageFormat() // will generally be a better choice. func (dir *Dir) Stat(ctx context.Context, ref blobstore.BlobRef) (_ blobstore.BlobInfo, err error) { defer mon.Task()(&ctx)(&err) path, err := dir.blobToBasePath(ref) if err != nil { return nil, err } for formatVer := MaxFormatVersionSupported; formatVer >= MinFormatVersionSupported; formatVer-- { vPath := blobPathForFormatVersion(path, formatVer) stat, err := os.Stat(vPath) if err == nil { return newBlobInfo(ref, vPath, stat, formatVer), nil } if !os.IsNotExist(err) { return nil, Error.New("unable to stat %q: %v", vPath, err) } } return nil, os.ErrNotExist } // StatWithStorageFormat looks up disk metadata on the blob file with the given storage format // version. This avoids the need for checking for the file in multiple different storage format // types. func (dir *Dir) StatWithStorageFormat(ctx context.Context, ref blobstore.BlobRef, formatVer blobstore.FormatVersion) (_ blobstore.BlobInfo, err error) { defer mon.Task()(&ctx)(&err) path, err := dir.blobToBasePath(ref) if err != nil { return nil, err } vPath := blobPathForFormatVersion(path, formatVer) stat, err := os.Stat(vPath) if err == nil { return newBlobInfo(ref, vPath, stat, formatVer), nil } if os.IsNotExist(err) { return nil, err } return nil, Error.New("unable to stat %q: %v", vPath, err) } // Trash moves the piece specified by ref to the trashdir for every format version. func (dir *Dir) Trash(ctx context.Context, ref blobstore.BlobRef) (err error) { defer mon.Task()(&ctx)(&err) return dir.iterateStorageFormatVersions(ctx, ref, dir.TrashWithStorageFormat) } // TrashWithStorageFormat moves the piece specified by ref to the trashdir for the specified format version. func (dir *Dir) TrashWithStorageFormat(ctx context.Context, ref blobstore.BlobRef, formatVer blobstore.FormatVersion) (err error) { // Ensure trashdir exists so that we know any os.IsNotExist errors below // are not from a missing trash dir _, err = os.Stat(dir.trashdir()) if err != nil { return err } blobsBasePath, err := dir.blobToBasePath(ref) if err != nil { return err } blobsVerPath := blobPathForFormatVersion(blobsBasePath, formatVer) trashBasePath, err := dir.refToDirPath(ref, dir.trashdir()) if err != nil { return err } trashVerPath := blobPathForFormatVersion(trashBasePath, formatVer) // ensure the dirs exist for trash path err = os.MkdirAll(filepath.Dir(trashVerPath), dirPermission) if err != nil && !os.IsExist(err) { return err } // Change mtime to now. This allows us to check the mtime to know how long // the file has been in the trash. If the file is restored this may make it // take longer to be trashed again, but the simplicity is worth the // trade-off. // // We change the mtime prior to moving the file so that if this call fails // the file will not be in the trash with an unmodified mtime, which could // result in its permanent deletion too soon. now := dir.trashnow() err = os.Chtimes(blobsVerPath, now, now) if os.IsNotExist(err) { return nil } if err != nil { return err } // move to trash err = rename(blobsVerPath, trashVerPath) if os.IsNotExist(err) { // no piece at that path; either it has a different storage format // version or there was a concurrent call. (This function is expected // by callers to return a nil error in the case of concurrent calls.) return nil } return err } // ReplaceTrashnow is a helper for tests to replace the trashnow function used // when moving files to the trash. func (dir *Dir) ReplaceTrashnow(trashnow func() time.Time) { dir.trashnow = trashnow } // RestoreTrash moves every piece in the trash folder back into blobsdir. func (dir *Dir) RestoreTrash(ctx context.Context, namespace []byte) (keysRestored [][]byte, err error) { var errorsEncountered errs.Group err = dir.walkNamespaceInPath(ctx, namespace, dir.trashdir(), func(info blobstore.BlobInfo) error { blobsBasePath, err := dir.blobToBasePath(info.BlobRef()) if err != nil { errorsEncountered.Add(err) return nil } blobsVerPath := blobPathForFormatVersion(blobsBasePath, info.StorageFormatVersion()) trashBasePath, err := dir.refToDirPath(info.BlobRef(), dir.trashdir()) if err != nil { errorsEncountered.Add(err) return nil } trashVerPath := blobPathForFormatVersion(trashBasePath, info.StorageFormatVersion()) // ensure the dirs exist for blobs path err = os.MkdirAll(filepath.Dir(blobsVerPath), dirPermission) if err != nil && !os.IsExist(err) { errorsEncountered.Add(err) return nil } // move back to blobsdir err = rename(trashVerPath, blobsVerPath) if os.IsNotExist(err) { // no piece at that path; either it has a different storage format // version or there was a concurrent call. (This function is expected // by callers to return a nil error in the case of concurrent calls.) return nil } if err != nil { errorsEncountered.Add(err) return nil } keysRestored = append(keysRestored, info.BlobRef().Key) return nil }) errorsEncountered.Add(err) return keysRestored, errorsEncountered.Err() } // TryRestoreTrashPiece attempts to restore a piece from the trash if it exists. // It returns nil if the piece was restored, or an error if the piece was not // in the trash or could not be restored. func (dir *Dir) TryRestoreTrashPiece(ctx context.Context, ref blobstore.BlobRef) (err error) { defer mon.Task()(&ctx)(&err) blobsBasePath, err := dir.blobToBasePath(ref) if err != nil { return err } trashBasePath, err := dir.refToDirPath(ref, dir.trashdir()) if err != nil { return err } // ensure the dirs exist for blobs path blobsVerPath := blobPathForFormatVersion(blobsBasePath, MaxFormatVersionSupported) err = os.MkdirAll(filepath.Dir(blobsVerPath), dirPermission) if err != nil && !errors.Is(err, fs.ErrExist) { return err } trashVerPath := blobPathForFormatVersion(trashBasePath, MaxFormatVersionSupported) // move back to blobsdir return rename(trashVerPath, blobsVerPath) } // EmptyTrash walks the trash files for the given namespace and deletes any // file whose mtime is older than trashedBefore. The mtime is modified when // Trash is called. func (dir *Dir) EmptyTrash(ctx context.Context, namespace []byte, trashedBefore time.Time) (bytesEmptied int64, deletedKeys [][]byte, err error) { defer mon.Task()(&ctx)(&err) var errorsEncountered errs.Group err = dir.walkNamespaceInPath(ctx, namespace, dir.trashdir(), func(info blobstore.BlobInfo) error { fileInfo, err := info.Stat(ctx) if err != nil { if os.IsNotExist(err) { return nil } if errors.Is(err, ErrIsDir) { return nil } // it would be best if we could report the actual problematic path if thisBlobInfo, ok := info.(*blobInfo); ok { errorsEncountered.Add(Error.New("%s: %s", thisBlobInfo.path, err)) } else { // this is probably a v0PieceAccess; do what we can errorsEncountered.Add(Error.New("blobRef %+v: %s", info.BlobRef(), err)) } return nil } mtime := fileInfo.ModTime() if mtime.Before(trashedBefore) { err = dir.deleteWithStorageFormatInPath(ctx, dir.trashdir(), info.BlobRef(), info.StorageFormatVersion()) if err != nil { errorsEncountered.Add(err) return nil } deletedKeys = append(deletedKeys, info.BlobRef().Key) bytesEmptied += fileInfo.Size() } return nil }) errorsEncountered.Add(err) return bytesEmptied, deletedKeys, errorsEncountered.Err() } // DeleteTrashNamespace deletes the entire trash namespace. func (dir *Dir) DeleteTrashNamespace(ctx context.Context, namespace []byte) (err error) { mon.Task()(&ctx)(&err) return dir.deleteNamespace(ctx, dir.trashdir(), namespace) } // iterateStorageFormatVersions executes f for all storage format versions, // starting with the oldest format version. It is more likely, in the general // case, that we will find the piece with the newest format version instead, // but if we iterate backward here then we run the risk of a race condition: // the piece might have existed with _SomeOldVer before the call, and could // then have been updated atomically with _MaxVer concurrently while we were // iterating. If we iterate _forwards_, this race should not occur because it // is assumed that pieces are never rewritten with an _older_ storage format // version. // // f will be executed for every storage formate version regardless of the // result, and will aggregate errors into a single returned error. func (dir *Dir) iterateStorageFormatVersions(ctx context.Context, ref blobstore.BlobRef, f func(ctx context.Context, ref blobstore.BlobRef, i blobstore.FormatVersion) error) (err error) { defer mon.Task()(&ctx)(&err) var combinedErrors errs.Group for i := MinFormatVersionSupported; i <= MaxFormatVersionSupported; i++ { combinedErrors.Add(f(ctx, ref, i)) } return combinedErrors.Err() } // Delete deletes blobs with the specified ref (in all supported storage formats). // // It doesn't return an error if the blob is not found for any reason or it // cannot be deleted at this moment and it's delayed. func (dir *Dir) Delete(ctx context.Context, ref blobstore.BlobRef) (err error) { defer mon.Task()(&ctx)(&err) return dir.iterateStorageFormatVersions(ctx, ref, dir.DeleteWithStorageFormat) } // DeleteWithStorageFormat deletes the blob with the specified ref for one // specific format version. The method tries the following strategies, in order // of preference until one succeeds: // // * moves the blob to garbage dir. // * directly deletes the blob. // * push the blobs to queue for retrying later. // // It doesn't return an error if the piece isn't found for any reason. func (dir *Dir) DeleteWithStorageFormat(ctx context.Context, ref blobstore.BlobRef, formatVer blobstore.FormatVersion) (err error) { defer mon.Task()(&ctx)(&err) return dir.deleteWithStorageFormatInPath(ctx, dir.blobsdir(), ref, formatVer) } // DeleteNamespace deletes blobs folder for a specific namespace. func (dir *Dir) DeleteNamespace(ctx context.Context, ref []byte) (err error) { defer mon.Task()(&ctx)(&err) return dir.deleteNamespace(ctx, dir.blobsdir(), ref) } func (dir *Dir) deleteWithStorageFormatInPath(ctx context.Context, path string, ref blobstore.BlobRef, formatVer blobstore.FormatVersion) (err error) { defer mon.Task()(&ctx)(&err) // Ensure garbage dir exists so that we know any os.IsNotExist errors below // are not from a missing garbage dir _, err = os.Stat(dir.garbagedir()) if err != nil { return err } pathBase, err := dir.refToDirPath(ref, path) if err != nil { return err } garbagePath := dir.blobToGarbagePath(ref) verPath := blobPathForFormatVersion(pathBase, formatVer) // move to garbage folder, this is allowed for some OS-es moveErr := rename(verPath, garbagePath) if os.IsNotExist(moveErr) { // no piece at that path; either it has a different storage format // version or there was a concurrent delete. (this function is expected // by callers to return a nil error in the case of concurrent deletes.) return nil } if moveErr != nil { // piece could not be moved into the garbage dir; we'll try removing it // directly garbagePath = verPath } // try removing the file err = os.Remove(garbagePath) // ignore concurrent deletes if os.IsNotExist(err) { // something is happening at the same time as this; possibly a // concurrent delete, or possibly a rewrite of the blob. return nil } // the remove may have failed because of an open file handle. put it in a // queue to be retried later. if err != nil { dir.mu.Lock() dir.deleteQueue = append(dir.deleteQueue, garbagePath) dir.mu.Unlock() mon.Event("delete_deferred_to_queue") } // ignore is-busy errors, they are still in the queue but no need to notify if isBusy(err) { err = nil } return err } // deleteNamespace deletes folder with everything inside. func (dir *Dir) deleteNamespace(ctx context.Context, path string, ref []byte) (err error) { defer mon.Task()(&ctx)(&err) namespace := pathEncoding.EncodeToString(ref) folderPath := filepath.Join(path, namespace) err = os.RemoveAll(folderPath) return err } // GarbageCollect collects files that are pending deletion. func (dir *Dir) GarbageCollect(ctx context.Context) (err error) { defer mon.Task()(&ctx)(&err) offset := int(math.MaxInt32) // limited deletion loop to avoid blocking `Delete` for too long for offset >= 0 { dir.mu.Lock() limit := 100 if offset >= len(dir.deleteQueue) { offset = len(dir.deleteQueue) - 1 } for offset >= 0 && limit > 0 { path := dir.deleteQueue[offset] err := os.Remove(path) if os.IsNotExist(err) { err = nil } if err == nil { dir.deleteQueue = append(dir.deleteQueue[:offset], dir.deleteQueue[offset+1:]...) } offset-- limit-- } dir.mu.Unlock() } // remove anything left in the garbagedir _ = removeAllContent(ctx, dir.garbagedir()) return nil } const nameBatchSize = 1024 // ListNamespaces finds all known namespace IDs in use in local storage. They are not // guaranteed to contain any blobs. func (dir *Dir) ListNamespaces(ctx context.Context) (ids [][]byte, err error) { defer mon.Task()(&ctx)(&err) return dir.listNamespacesInPath(ctx, dir.blobsdir()) } func (dir *Dir) listNamespacesInPath(ctx context.Context, path string) (ids [][]byte, err error) { defer mon.Task()(&ctx)(&err) openDir, err := os.Open(path) if err != nil { return nil, err } defer func() { err = errs.Combine(err, openDir.Close()) }() for { dirNames, err := openDir.Readdirnames(nameBatchSize) if err != nil { if errors.Is(err, io.EOF) || os.IsNotExist(err) { return ids, nil } return ids, err } if len(dirNames) == 0 { return ids, nil } for _, name := range dirNames { namespace, err := pathEncoding.DecodeString(name) if err != nil { // just an invalid directory entry, and not a namespace. probably // don't need to pass on this error continue } ids = append(ids, namespace) } } } // WalkNamespace executes walkFunc for each locally stored blob, stored with storage format V1 or // greater, in the given namespace. If walkFunc returns a non-nil error, WalkNamespace will stop // iterating and return the error immediately. The ctx parameter is intended specifically to allow // canceling iteration early. func (dir *Dir) WalkNamespace(ctx context.Context, namespace []byte, walkFunc func(blobstore.BlobInfo) error) (err error) { defer mon.Task()(&ctx)(&err) return dir.walkNamespaceInPath(ctx, namespace, dir.blobsdir(), walkFunc) } func (dir *Dir) walkNamespaceInPath(ctx context.Context, namespace []byte, path string, walkFunc func(blobstore.BlobInfo) error) (err error) { defer mon.Task()(&ctx)(&err) namespaceDir := pathEncoding.EncodeToString(namespace) nsDir := filepath.Join(path, namespaceDir) openDir, err := os.Open(nsDir) if err != nil { if os.IsNotExist(err) { dir.log.Debug("directory not found", zap.String("dir", nsDir)) // job accomplished: there are no blobs in this namespace! return nil } return err } defer func() { err = errs.Combine(err, openDir.Close()) }() for { // check for context done both before and after our readdir() call if err := ctx.Err(); err != nil { return err } subdirNames, err := openDir.Readdirnames(nameBatchSize) if err != nil { if errors.Is(err, io.EOF) || os.IsNotExist(err) { return nil } return err } if len(subdirNames) == 0 { return nil } if err := ctx.Err(); err != nil { return err } for _, keyPrefix := range subdirNames { if len(keyPrefix) != 2 { // just an invalid subdir; could be garbage of many kinds. probably // don't need to pass on this error continue } err := walkNamespaceWithPrefix(ctx, dir.log, namespace, nsDir, keyPrefix, walkFunc) if err != nil { return err } } } } func decodeBlobInfo(namespace []byte, keyPrefix, keyDir, name string) (info blobstore.BlobInfo, ok bool) { blobFileName := name encodedKey := keyPrefix + blobFileName formatVer := FormatV0 if strings.HasSuffix(blobFileName, v1PieceFileSuffix) { formatVer = FormatV1 encodedKey = encodedKey[0 : len(encodedKey)-len(v1PieceFileSuffix)] } key, err := pathEncoding.DecodeString(encodedKey) if err != nil { return nil, false } ref := blobstore.BlobRef{ Namespace: namespace, Key: key, } return newBlobInfo(ref, filepath.Join(keyDir, blobFileName), nil, formatVer), true } func walkNamespaceWithPrefix(ctx context.Context, log *zap.Logger, namespace []byte, nsDir, keyPrefix string, walkFunc func(blobstore.BlobInfo) error) (err error) { keyDir := filepath.Join(nsDir, keyPrefix) openDir, err := os.Open(keyDir) if err != nil { return err } defer func() { err = errs.Combine(err, openDir.Close()) }() for { // check for context done both before and after our readdir() call if err := ctx.Err(); err != nil { return err } names, err := openDir.Readdirnames(nameBatchSize) if err != nil && !errors.Is(err, io.EOF) { return err } if os.IsNotExist(err) || len(names) == 0 { return nil } if err := ctx.Err(); err != nil { return err } for _, name := range names { blobInfo, ok := decodeBlobInfo(namespace, keyPrefix, keyDir, name) if !ok { continue } err = walkFunc(blobInfo) if err != nil { return err } // also check for context done between every walkFunc callback. if err := ctx.Err(); err != nil { return err } } } } // removeAllContent deletes everything in the folder. func removeAllContent(ctx context.Context, path string) (err error) { defer mon.Task()(&ctx)(&err) dir, err := os.Open(path) if err != nil { return err } for { files, err := dir.Readdirnames(100) for _, file := range files { // the file might be still in use, so ignore the error _ = os.RemoveAll(filepath.Join(path, file)) } if errors.Is(err, io.EOF) || len(files) == 0 { return dir.Close() } if err != nil { return err } } } // DiskInfo contains statistics about this dir. type DiskInfo struct { ID string AvailableSpace int64 } // Info returns information about the current state of the dir. func (dir *Dir) Info(ctx context.Context) (DiskInfo, error) { path, err := filepath.Abs(dir.path) if err != nil { return DiskInfo{}, err } return diskInfoFromPath(path) } type blobInfo struct { ref blobstore.BlobRef path string fileInfo os.FileInfo formatVersion blobstore.FormatVersion } func newBlobInfo(ref blobstore.BlobRef, path string, fileInfo os.FileInfo, formatVer blobstore.FormatVersion) blobstore.BlobInfo { return &blobInfo{ ref: ref, path: path, fileInfo: fileInfo, formatVersion: formatVer, } } func (info *blobInfo) BlobRef() blobstore.BlobRef { return info.ref } func (info *blobInfo) StorageFormatVersion() blobstore.FormatVersion { return info.formatVersion } func (info *blobInfo) Stat(ctx context.Context) (os.FileInfo, error) { if info.fileInfo == nil { fileInfo, err := os.Lstat(info.path) if err != nil { if os.IsNotExist(err) { return nil, err } if isLowLevelCorruptionError(err) { return nil, &CorruptDataError{path: info.path, error: err} } return nil, err } if fileInfo.Mode().IsDir() { return fileInfo, ErrIsDir } info.fileInfo = fileInfo } return info.fileInfo, nil } func (info *blobInfo) FullPath(ctx context.Context) (string, error) { return info.path, nil } // CorruptDataError represents a filesystem or disk error which indicates data corruption. // // We use a custom error type here so that we can add explanatory information and wrap the original // error at the same time. type CorruptDataError struct { path string error error } // Unwrap unwraps the error. func (cde CorruptDataError) Unwrap() error { return cde.error } // Path returns the path at which the error was encountered. func (cde CorruptDataError) Path() string { return cde.path } // Error returns an error string describing the condition. func (cde CorruptDataError) Error() string { return fmt.Sprintf("unrecoverable error accessing data on the storage file system (path=%v; error=%v). This is most likely due to disk bad sectors or a corrupted file system. Check your disk for bad sectors and integrity", cde.path, cde.error) }