storagenode/blobstore/filestore: add tracking of blobs
We've had issues with forgetting to close readers and writers. Add leak tracking to find those pesky issues. Change-Id: If6b0ad6e9958318a7e0affee9c6d0a1ece412b6d
This commit is contained in:
parent
569c639cd5
commit
8b82dba602
2
go.mod
2
go.mod
@ -59,7 +59,7 @@ require (
|
||||
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e
|
||||
gopkg.in/segmentio/analytics-go.v3 v3.1.0
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
storj.io/common v0.0.0-20230428200501-099739f940f7
|
||||
storj.io/common v0.0.0-20230504204616-8b62322ba410
|
||||
storj.io/drpc v0.0.33-0.20230417171205-2ca712ef4ab5
|
||||
storj.io/monkit-jaeger v0.0.0-20220915074555-d100d7589f41
|
||||
storj.io/private v0.0.0-20230405095015-9e5bbc1c7ca8
|
||||
|
4
go.sum
4
go.sum
@ -974,8 +974,8 @@ rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8
|
||||
sourcegraph.com/sourcegraph/go-diff v0.5.0/go.mod h1:kuch7UrkMzY0X+p9CRK03kfuPQ2zzQcaEFbx8wA8rck=
|
||||
sourcegraph.com/sqs/pbtypes v0.0.0-20180604144634-d3ebe8f20ae4/go.mod h1:ketZ/q3QxT9HOBeFhu6RdvsftgpsbFHBF5Cas6cDKZ0=
|
||||
storj.io/common v0.0.0-20220719163320-cd2ef8e1b9b0/go.mod h1:mCYV6Ud5+cdbuaxdPD5Zht/HYaIn0sffnnws9ErkrMQ=
|
||||
storj.io/common v0.0.0-20230428200501-099739f940f7 h1:2EIBMx6+bv52uY5eWE6GDFLOm5zwqkxu00jEDCTGp/E=
|
||||
storj.io/common v0.0.0-20230428200501-099739f940f7/go.mod h1:j5YdcshmpJL+oW1+3IyBnCsv/HGbFkbzNDtuZg24KF0=
|
||||
storj.io/common v0.0.0-20230504204616-8b62322ba410 h1:P4wWUrAx86rL7xbdvc3jj8ExnBGWtKYD1hDLkEzDwsE=
|
||||
storj.io/common v0.0.0-20230504204616-8b62322ba410/go.mod h1:j5YdcshmpJL+oW1+3IyBnCsv/HGbFkbzNDtuZg24KF0=
|
||||
storj.io/drpc v0.0.32/go.mod h1:6rcOyR/QQkSTX/9L5ZGtlZaE2PtXTTZl8d+ulSeeYEg=
|
||||
storj.io/drpc v0.0.33-0.20230417171205-2ca712ef4ab5 h1:4iOQovjXb6oAMLrjf0Qc4MuRRLd9hXC7+CWqOt+AzCw=
|
||||
storj.io/drpc v0.0.33-0.20230417171205-2ca712ef4ab5/go.mod h1:vR804UNzhBa49NOJ6HeLjd2H3MakC1j5Gv8bsOQT6N4=
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/common/leak"
|
||||
"storj.io/storj/storagenode/blobstore"
|
||||
)
|
||||
|
||||
@ -43,10 +44,12 @@ const (
|
||||
type blobReader struct {
|
||||
*os.File
|
||||
formatVersion blobstore.FormatVersion
|
||||
|
||||
track leak.Ref
|
||||
}
|
||||
|
||||
func newBlobReader(file *os.File, formatVersion blobstore.FormatVersion) *blobReader {
|
||||
return &blobReader{file, formatVersion}
|
||||
func newBlobReader(track leak.Ref, file *os.File, formatVersion blobstore.FormatVersion) *blobReader {
|
||||
return &blobReader{file, formatVersion, track}
|
||||
}
|
||||
|
||||
// Size returns how large is the blob.
|
||||
@ -63,6 +66,11 @@ func (blob *blobReader) StorageFormatVersion() blobstore.FormatVersion {
|
||||
return blob.formatVersion
|
||||
}
|
||||
|
||||
// Close closes the reader.
|
||||
func (blob *blobReader) Close() error {
|
||||
return errs.Combine(blob.File.Close(), blob.track.Close())
|
||||
}
|
||||
|
||||
// blobWriter implements writing blobs.
|
||||
type blobWriter struct {
|
||||
ref blobstore.BlobRef
|
||||
@ -71,9 +79,11 @@ type blobWriter struct {
|
||||
formatVersion blobstore.FormatVersion
|
||||
buffer *bufio.Writer
|
||||
fh *os.File
|
||||
|
||||
track leak.Ref
|
||||
}
|
||||
|
||||
func newBlobWriter(ref blobstore.BlobRef, store *blobStore, formatVersion blobstore.FormatVersion, file *os.File, bufferSize int) *blobWriter {
|
||||
func newBlobWriter(track leak.Ref, ref blobstore.BlobRef, store *blobStore, formatVersion blobstore.FormatVersion, file *os.File, bufferSize int) *blobWriter {
|
||||
return &blobWriter{
|
||||
ref: ref,
|
||||
store: store,
|
||||
@ -81,6 +91,8 @@ func newBlobWriter(ref blobstore.BlobRef, store *blobStore, formatVersion blobst
|
||||
formatVersion: formatVersion,
|
||||
buffer: bufio.NewWriterSize(file, bufferSize),
|
||||
fh: file,
|
||||
|
||||
track: track,
|
||||
}
|
||||
}
|
||||
|
||||
@ -100,7 +112,7 @@ func (blob *blobWriter) Cancel(ctx context.Context) (err error) {
|
||||
|
||||
err = blob.fh.Close()
|
||||
removeErr := os.Remove(blob.fh.Name())
|
||||
return Error.Wrap(errs.Combine(err, removeErr))
|
||||
return Error.Wrap(errs.Combine(err, removeErr, blob.track.Close()))
|
||||
}
|
||||
|
||||
// Commit moves the file to the target location.
|
||||
@ -113,11 +125,12 @@ func (blob *blobWriter) Commit(ctx context.Context) (err error) {
|
||||
blob.closed = true
|
||||
|
||||
if err := blob.buffer.Flush(); err != nil {
|
||||
// TODO: when flush fails, it looks like we don't close the file handle
|
||||
return err
|
||||
}
|
||||
|
||||
err = blob.store.dir.Commit(ctx, blob.fh, blob.ref, blob.formatVersion)
|
||||
return Error.Wrap(err)
|
||||
return Error.Wrap(errs.Combine(err, blob.track.Close()))
|
||||
}
|
||||
|
||||
// Seek flushes any buffer and seeks the underlying file.
|
||||
|
@ -16,6 +16,7 @@ import (
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/leak"
|
||||
"storj.io/common/memory"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/storj/storagenode/blobstore"
|
||||
@ -54,11 +55,13 @@ type blobStore struct {
|
||||
log *zap.Logger
|
||||
dir *Dir
|
||||
config Config
|
||||
|
||||
track leak.Ref
|
||||
}
|
||||
|
||||
// New creates a new disk blob store in the specified directory.
|
||||
func New(log *zap.Logger, dir *Dir, config Config) blobstore.Blobs {
|
||||
return &blobStore{dir: dir, log: log, config: config}
|
||||
return &blobStore{dir: dir, log: log, config: config, track: leak.Root(1)}
|
||||
}
|
||||
|
||||
// NewAt creates a new disk blob store in the specified directory.
|
||||
@ -67,11 +70,11 @@ func NewAt(log *zap.Logger, path string, config Config) (blobstore.Blobs, error)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
return &blobStore{dir: dir, log: log, config: config}, nil
|
||||
return &blobStore{dir: dir, log: log, config: config, track: leak.Root(1)}, nil
|
||||
}
|
||||
|
||||
// Close closes the store.
|
||||
func (store *blobStore) Close() error { return nil }
|
||||
func (store *blobStore) Close() error { return store.track.Close() }
|
||||
|
||||
// Open loads blob with the specified hash.
|
||||
func (store *blobStore) Open(ctx context.Context, ref blobstore.BlobRef) (_ blobstore.BlobReader, err error) {
|
||||
@ -83,7 +86,7 @@ func (store *blobStore) Open(ctx context.Context, ref blobstore.BlobRef) (_ blob
|
||||
}
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
return newBlobReader(file, formatVer), nil
|
||||
return newBlobReader(store.track.Child("blobReader", 1), file, formatVer), nil
|
||||
}
|
||||
|
||||
// OpenWithStorageFormat loads the already-located blob, avoiding the potential need to check multiple
|
||||
@ -97,7 +100,7 @@ func (store *blobStore) OpenWithStorageFormat(ctx context.Context, blobRef blobs
|
||||
}
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
return newBlobReader(file, formatVer), nil
|
||||
return newBlobReader(store.track.Child("blobReader", 1), file, formatVer), nil
|
||||
}
|
||||
|
||||
// Stat looks up disk metadata on the blob file.
|
||||
@ -173,7 +176,7 @@ func (store *blobStore) Create(ctx context.Context, ref blobstore.BlobRef, size
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
return newBlobWriter(ref, store, MaxFormatVersionSupported, file, store.config.WriteBufferSize.Int()), nil
|
||||
return newBlobWriter(store.track.Child("blobWriter", 1), ref, store, MaxFormatVersionSupported, file, store.config.WriteBufferSize.Int()), nil
|
||||
}
|
||||
|
||||
// SpaceUsedForBlobs adds up the space used in all namespaces for blob storage.
|
||||
@ -296,7 +299,7 @@ func (store *blobStore) TestCreateV0(ctx context.Context, ref blobstore.BlobRef)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
return newBlobWriter(ref, store, FormatV0, file, store.config.WriteBufferSize.Int()), nil
|
||||
return newBlobWriter(store.track.Child("blobWriter", 1), ref, store, FormatV0, file, store.config.WriteBufferSize.Int()), nil
|
||||
}
|
||||
|
||||
// CreateVerificationFile creates a file to be used for storage directory verification.
|
||||
|
@ -817,6 +817,7 @@ func TestTrashAndRestore(t *testing.T) {
|
||||
func requireFileMatches(ctx context.Context, t *testing.T, store blobstore.Blobs, data []byte, ref blobstore.BlobRef, formatVer blobstore.FormatVersion) {
|
||||
r, err := store.OpenWithStorageFormat(ctx, ref, formatVer)
|
||||
require.NoError(t, err)
|
||||
defer func() { require.NoError(t, err, r.Close()) }()
|
||||
|
||||
buf, err := io.ReadAll(r)
|
||||
require.NoError(t, err)
|
||||
@ -861,6 +862,7 @@ func TestBlobMemoryBuffer(t *testing.T) {
|
||||
|
||||
reader, err := store.Open(ctx, ref)
|
||||
require.NoError(t, err)
|
||||
defer func() { require.NoError(t, reader.Close()) }()
|
||||
|
||||
buf, err := io.ReadAll(reader)
|
||||
require.NoError(t, err)
|
||||
|
@ -617,6 +617,7 @@ func TestPieceVersionMigrate(t *testing.T) {
|
||||
// manually read v1 piece
|
||||
reader, err := tStore.ReaderWithStorageFormat(ctx, satelliteID, pieceID, filestore.FormatV1)
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(reader.Close)
|
||||
|
||||
// generate v1 pieceHash and verify signature is still valid
|
||||
v1Header, err := reader.GetPieceHeader()
|
||||
|
@ -9,7 +9,7 @@ require (
|
||||
github.com/zeebo/errs v1.3.0
|
||||
go.uber.org/zap v1.21.0
|
||||
golang.org/x/sync v0.1.0
|
||||
storj.io/common v0.0.0-20230428200501-099739f940f7
|
||||
storj.io/common v0.0.0-20230504204616-8b62322ba410
|
||||
storj.io/private v0.0.0-20230405095015-9e5bbc1c7ca8
|
||||
storj.io/storj v1.63.1
|
||||
storj.io/storjscan v0.0.0-20220926140643-1623c3b391b0
|
||||
|
@ -1229,8 +1229,8 @@ rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=
|
||||
sourcegraph.com/sourcegraph/go-diff v0.5.0/go.mod h1:kuch7UrkMzY0X+p9CRK03kfuPQ2zzQcaEFbx8wA8rck=
|
||||
sourcegraph.com/sqs/pbtypes v0.0.0-20180604144634-d3ebe8f20ae4/go.mod h1:ketZ/q3QxT9HOBeFhu6RdvsftgpsbFHBF5Cas6cDKZ0=
|
||||
storj.io/common v0.0.0-20220719163320-cd2ef8e1b9b0/go.mod h1:mCYV6Ud5+cdbuaxdPD5Zht/HYaIn0sffnnws9ErkrMQ=
|
||||
storj.io/common v0.0.0-20230428200501-099739f940f7 h1:2EIBMx6+bv52uY5eWE6GDFLOm5zwqkxu00jEDCTGp/E=
|
||||
storj.io/common v0.0.0-20230428200501-099739f940f7/go.mod h1:j5YdcshmpJL+oW1+3IyBnCsv/HGbFkbzNDtuZg24KF0=
|
||||
storj.io/common v0.0.0-20230504204616-8b62322ba410 h1:P4wWUrAx86rL7xbdvc3jj8ExnBGWtKYD1hDLkEzDwsE=
|
||||
storj.io/common v0.0.0-20230504204616-8b62322ba410/go.mod h1:j5YdcshmpJL+oW1+3IyBnCsv/HGbFkbzNDtuZg24KF0=
|
||||
storj.io/drpc v0.0.32/go.mod h1:6rcOyR/QQkSTX/9L5ZGtlZaE2PtXTTZl8d+ulSeeYEg=
|
||||
storj.io/drpc v0.0.33-0.20230417171205-2ca712ef4ab5 h1:4iOQovjXb6oAMLrjf0Qc4MuRRLd9hXC7+CWqOt+AzCw=
|
||||
storj.io/drpc v0.0.33-0.20230417171205-2ca712ef4ab5/go.mod h1:vR804UNzhBa49NOJ6HeLjd2H3MakC1j5Gv8bsOQT6N4=
|
||||
|
@ -10,7 +10,7 @@ require (
|
||||
github.com/spf13/pflag v1.0.5
|
||||
github.com/stretchr/testify v1.8.2
|
||||
go.uber.org/zap v1.23.0
|
||||
storj.io/common v0.0.0-20230428200501-099739f940f7
|
||||
storj.io/common v0.0.0-20230504204616-8b62322ba410
|
||||
storj.io/gateway-mt v1.51.1-0.20230417204402-7d9bb25bc297
|
||||
storj.io/private v0.0.0-20230405095015-9e5bbc1c7ca8
|
||||
storj.io/storj v0.12.1-0.20221125175451-ef4b564b82f7
|
||||
|
@ -1933,8 +1933,8 @@ sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1
|
||||
sourcegraph.com/sourcegraph/go-diff v0.5.0/go.mod h1:kuch7UrkMzY0X+p9CRK03kfuPQ2zzQcaEFbx8wA8rck=
|
||||
sourcegraph.com/sqs/pbtypes v0.0.0-20180604144634-d3ebe8f20ae4/go.mod h1:ketZ/q3QxT9HOBeFhu6RdvsftgpsbFHBF5Cas6cDKZ0=
|
||||
storj.io/common v0.0.0-20220719163320-cd2ef8e1b9b0/go.mod h1:mCYV6Ud5+cdbuaxdPD5Zht/HYaIn0sffnnws9ErkrMQ=
|
||||
storj.io/common v0.0.0-20230428200501-099739f940f7 h1:2EIBMx6+bv52uY5eWE6GDFLOm5zwqkxu00jEDCTGp/E=
|
||||
storj.io/common v0.0.0-20230428200501-099739f940f7/go.mod h1:j5YdcshmpJL+oW1+3IyBnCsv/HGbFkbzNDtuZg24KF0=
|
||||
storj.io/common v0.0.0-20230504204616-8b62322ba410 h1:P4wWUrAx86rL7xbdvc3jj8ExnBGWtKYD1hDLkEzDwsE=
|
||||
storj.io/common v0.0.0-20230504204616-8b62322ba410/go.mod h1:j5YdcshmpJL+oW1+3IyBnCsv/HGbFkbzNDtuZg24KF0=
|
||||
storj.io/dotworld v0.0.0-20210324183515-0d11aeccd840 h1:oqMwoF6vaOrCe92SKRyr8cc2WSjLYAd8fjpAHA7rNqY=
|
||||
storj.io/drpc v0.0.32/go.mod h1:6rcOyR/QQkSTX/9L5ZGtlZaE2PtXTTZl8d+ulSeeYEg=
|
||||
storj.io/drpc v0.0.33-0.20230417171205-2ca712ef4ab5 h1:4iOQovjXb6oAMLrjf0Qc4MuRRLd9hXC7+CWqOt+AzCw=
|
||||
|
Loading…
Reference in New Issue
Block a user