storagenode/monitor: add timeout to storage dir verification
Resolves https://github.com/storj/storj/issues/4567 Change-Id: Ia071c476bcd1f5c99a9874801c94db86d1e105c6
This commit is contained in:
parent
e0577eedf5
commit
c3d5965ef2
@ -183,6 +183,8 @@ func (planet *Planet) newStorageNode(ctx context.Context, prefix string, index,
|
||||
NotifyLowDiskCooldown: defaultInterval,
|
||||
VerifyDirReadableInterval: defaultInterval,
|
||||
VerifyDirWritableInterval: defaultInterval,
|
||||
VerifyDirReadableTimeout: 10 * time.Second,
|
||||
VerifyDirWritableTimeout: 10 * time.Second,
|
||||
},
|
||||
Trust: trust.Config{
|
||||
Sources: sources,
|
||||
|
@ -44,6 +44,8 @@ type Config struct {
|
||||
Interval time.Duration `help:"how frequently Kademlia bucket should be refreshed with node stats" default:"1h0m0s"`
|
||||
VerifyDirReadableInterval time.Duration `help:"how frequently to verify the location and readability of the storage directory" releaseDefault:"1m" devDefault:"30s"`
|
||||
VerifyDirWritableInterval time.Duration `help:"how frequently to verify writability of storage directory" releaseDefault:"5m" devDefault:"30s"`
|
||||
VerifyDirReadableTimeout time.Duration `help:"how long to wait for a storage directory readability verification to complete" releaseDefault:"1m" devDefault:"10s"`
|
||||
VerifyDirWritableTimeout time.Duration `help:"how long to wait for a storage directory writability verification to complete" releaseDefault:"1m" devDefault:"10s"`
|
||||
MinimumDiskSpace memory.Size `help:"how much disk space a node at minimum has to advertise" default:"500GB"`
|
||||
MinimumBandwidth memory.Size `help:"how much bandwidth a node at minimum has to advertise (deprecated)" default:"0TB"`
|
||||
NotifyLowDiskCooldown time.Duration `help:"minimum length of time between capacity reports" default:"10m" hidden:"true"`
|
||||
@ -127,18 +129,26 @@ func (service *Service) Run(ctx context.Context) (err error) {
|
||||
|
||||
group, ctx := errgroup.WithContext(ctx)
|
||||
group.Go(func() error {
|
||||
timeout := service.Config.VerifyDirReadableTimeout
|
||||
return service.VerifyDirReadableLoop.Run(ctx, func(ctx context.Context) error {
|
||||
err := service.store.VerifyStorageDir(ctx, service.contact.Local().ID)
|
||||
err := service.store.VerifyStorageDirWithTimeout(ctx, service.contact.Local().ID, timeout)
|
||||
if err != nil {
|
||||
if errs.Is(err, context.DeadlineExceeded) {
|
||||
return Error.New("timed out after %v while verifying readability of storage directory", timeout)
|
||||
}
|
||||
return Error.New("error verifying location and/or readability of storage directory: %v", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
})
|
||||
group.Go(func() error {
|
||||
timeout := service.Config.VerifyDirWritableTimeout
|
||||
return service.VerifyDirWritableLoop.Run(ctx, func(ctx context.Context) error {
|
||||
err := service.store.CheckWritability(ctx)
|
||||
err := service.store.CheckWritabilityWithTimeout(ctx, timeout)
|
||||
if err != nil {
|
||||
if errs.Is(err, context.DeadlineExceeded) {
|
||||
return Error.New("timed out after %v while verifying writability of storage directory", timeout)
|
||||
}
|
||||
return Error.New("error verifying writability of storage directory: %v", err)
|
||||
}
|
||||
return nil
|
||||
|
@ -208,6 +208,25 @@ func (store *Store) VerifyStorageDir(ctx context.Context, id storj.NodeID) error
|
||||
return store.blobs.VerifyStorageDir(ctx, id)
|
||||
}
|
||||
|
||||
// VerifyStorageDirWithTimeout verifies that the storage directory is correct by checking for the existence and validity
|
||||
// of the verification file. It uses the provided timeout for the operation.
|
||||
func (store *Store) VerifyStorageDirWithTimeout(ctx context.Context, id storj.NodeID, timeout time.Duration) error {
|
||||
ctx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
|
||||
ch := make(chan error, 1)
|
||||
go func() {
|
||||
ch <- store.VerifyStorageDir(ctx, id)
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-ch:
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
// Writer returns a new piece writer.
|
||||
func (store *Store) Writer(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, hashAlgorithm pb.PieceHashAlgorithm) (_ *Writer, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
@ -734,6 +753,24 @@ func (store *Store) CheckWritability(ctx context.Context) error {
|
||||
return store.blobs.CheckWritability(ctx)
|
||||
}
|
||||
|
||||
// CheckWritabilityWithTimeout tests writability of the storage directory by creating and deleting a file with a timeout.
|
||||
func (store *Store) CheckWritabilityWithTimeout(ctx context.Context, timeout time.Duration) error {
|
||||
ctx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
|
||||
ch := make(chan error, 1)
|
||||
go func() {
|
||||
ch <- store.CheckWritability(ctx)
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-ch:
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
// Stat looks up disk metadata on the blob file.
|
||||
func (store *Store) Stat(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (storage.BlobInfo, error) {
|
||||
return store.blobs.Stat(ctx, storage.BlobRef{
|
||||
|
Loading…
Reference in New Issue
Block a user