storage/filestore: log potential disk corruption

In walkNamespaceWithPrefix log in case of "lstat" error, because this may indicate an underlying disk corruption.

SG-50

Change-Id: I867c3ffc47cfac325ae90658ec4780d213ff3e63
This commit is contained in:
Qweder93 2020-05-22 21:28:26 +03:00 committed by Nikolai Siedov
parent 8db848791f
commit f2a0c64425
6 changed files with 28 additions and 16 deletions

View File

@ -16,6 +16,7 @@ import (
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/storj/storage"
)
@ -33,6 +34,7 @@ var pathEncoding = base32.NewEncoding("abcdefghijklmnopqrstuvwxyz234567").WithPa
// Dir represents single folder for storing blobs
type Dir struct {
log *zap.Logger
path string
mu sync.Mutex
@ -41,8 +43,9 @@ type Dir struct {
}
// NewDir returns folder for storing blobs
func NewDir(path string) (*Dir, error) {
func NewDir(log *zap.Logger, path string) (*Dir, error) {
dir := &Dir{
log: log,
path: path,
trashnow: time.Now,
}
@ -646,7 +649,7 @@ func (dir *Dir) walkNamespaceInPath(ctx context.Context, namespace []byte, path
// don't need to pass on this error
continue
}
err := walkNamespaceWithPrefix(ctx, namespace, nsDir, keyPrefix, walkFunc)
err := walkNamespaceWithPrefix(ctx, dir.log, namespace, nsDir, keyPrefix, walkFunc)
if err != nil {
return err
}
@ -673,7 +676,7 @@ func decodeBlobInfo(namespace []byte, keyPrefix, keyDir string, keyInfo os.FileI
return newBlobInfo(ref, filepath.Join(keyDir, blobFileName), keyInfo, formatVer), true
}
func walkNamespaceWithPrefix(ctx context.Context, namespace []byte, nsDir, keyPrefix string, walkFunc func(storage.BlobInfo) error) (err error) {
func walkNamespaceWithPrefix(ctx context.Context, log *zap.Logger, namespace []byte, nsDir, keyPrefix string, walkFunc func(storage.BlobInfo) error) (err error) {
keyDir := filepath.Join(nsDir, keyPrefix)
openDir, err := os.Open(keyDir)
if err != nil {
@ -685,25 +688,34 @@ func walkNamespaceWithPrefix(ctx context.Context, namespace []byte, nsDir, keyPr
if err := ctx.Err(); err != nil {
return err
}
keyInfos, err := openDir.Readdir(nameBatchSize)
names, err := openDir.Readdirnames(nameBatchSize)
if err != nil && err != io.EOF {
return err
}
if os.IsNotExist(err) || len(keyInfos) == 0 {
if os.IsNotExist(err) || len(names) == 0 {
return nil
}
if err := ctx.Err(); err != nil {
return err
}
for _, keyInfo := range keyInfos {
if keyInfo.Mode().IsDir() {
for _, name := range names {
info, err := os.Lstat(keyDir + "/" + name)
if err != nil {
if pErr, ok := err.(*os.PathError); ok {
if pErr.Err.Error() == "lstat" {
log.Error("Unable to read the disk, please verify the disk is not corrupt")
}
}
return err
}
if info.Mode().IsDir() {
continue
}
info, ok := decodeBlobInfo(namespace, keyPrefix, keyDir, keyInfo)
blobInfo, ok := decodeBlobInfo(namespace, keyPrefix, keyDir, info)
if !ok {
continue
}
err = walkFunc(info)
err = walkFunc(blobInfo)
if err != nil {
return err
}

View File

@ -52,7 +52,7 @@ func New(log *zap.Logger, dir *Dir, config Config) storage.Blobs {
// NewAt creates a new disk blob store in the specified directory
func NewAt(log *zap.Logger, path string, config Config) (storage.Blobs, error) {
dir, err := NewDir(path)
dir, err := NewDir(log, path)
if err != nil {
return nil, Error.Wrap(err)
}

View File

@ -25,7 +25,7 @@ func TestDeleter(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
dir, err := filestore.NewDir(ctx.Dir("piecedeleter"))
dir, err := filestore.NewDir(zaptest.NewLogger(t), ctx.Dir("piecedeleter"))
require.NoError(t, err)
blobs := filestore.New(zaptest.NewLogger(t), dir, filestore.DefaultConfig)

View File

@ -27,7 +27,7 @@ func BenchmarkReadWrite(b *testing.B) {
ctx := testcontext.New(b)
defer ctx.Cleanup()
dir, err := filestore.NewDir(ctx.Dir("pieces"))
dir, err := filestore.NewDir(zap.NewNop(), ctx.Dir("pieces"))
require.NoError(b, err)
blobs := filestore.New(zap.NewNop(), dir, filestore.DefaultConfig)
defer ctx.Check(blobs.Close)
@ -93,7 +93,7 @@ func readAndWritePiece(t *testing.T, content []byte) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
dir, err := filestore.NewDir(ctx.Dir("pieces"))
dir, err := filestore.NewDir(zaptest.NewLogger(t), ctx.Dir("pieces"))
require.NoError(t, err)
blobs := filestore.New(zaptest.NewLogger(t), dir, filestore.DefaultConfig)
defer ctx.Check(blobs.Close)

View File

@ -38,7 +38,7 @@ func TestPieces(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
dir, err := filestore.NewDir(ctx.Dir("pieces"))
dir, err := filestore.NewDir(zaptest.NewLogger(t), ctx.Dir("pieces"))
require.NoError(t, err)
blobs := filestore.New(zaptest.NewLogger(t), dir, filestore.DefaultConfig)
@ -310,7 +310,7 @@ func TestTrashAndRestore(t *testing.T) {
}
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {
dir, err := filestore.NewDir(ctx.Dir("store"))
dir, err := filestore.NewDir(zaptest.NewLogger(t), ctx.Dir("store"))
require.NoError(t, err)
blobs := filestore.New(zaptest.NewLogger(t), dir, filestore.DefaultConfig)

View File

@ -111,7 +111,7 @@ type DB struct {
// New creates a new master database for storage node
func New(log *zap.Logger, config Config) (*DB, error) {
piecesDir, err := filestore.NewDir(config.Pieces)
piecesDir, err := filestore.NewDir(log, config.Pieces)
if err != nil {
return nil, err
}