storagenode/{monitor,pieces}, storage/filestore: add loop to check storage directory writability
periodically create and delete a temp file in the storage directory to verify writability. If this check fails, shut the node down. Change-Id: I433e3a8d1d775fc779ae78e7cf3144a05ffd0574
This commit is contained in:
parent
5d21e85529
commit
ca0c1a5f0c
@ -21,6 +21,11 @@ type Deprecated struct {
|
|||||||
Wallet string `default:"" hidden:"true"`
|
Wallet string `default:"" hidden:"true"`
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Storage2 struct {
|
||||||
|
Monitor struct {
|
||||||
|
VerifyDirInterval string `default:"" hidden:"true"`
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// maps deprecated config values to new values if applicable.
|
// maps deprecated config values to new values if applicable.
|
||||||
@ -50,6 +55,12 @@ func mapDeprecatedConfigs(log *zap.Logger) {
|
|||||||
oldValue: runCfg.Deprecated.Kademlia.Operator.Email,
|
oldValue: runCfg.Deprecated.Kademlia.Operator.Email,
|
||||||
oldConfigString: "kademlia.operator.email",
|
oldConfigString: "kademlia.operator.email",
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
newValue: &runCfg.Config.Storage2.Monitor.VerifyDirReadableInterval,
|
||||||
|
newConfigString: "storage2.monitor.verify-dir-readable-interval",
|
||||||
|
oldValue: runCfg.Deprecated.Storage2.Monitor.VerifyDirInterval,
|
||||||
|
oldConfigString: "storage2.monitor.verify-dir-interval",
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, migration := range migrations {
|
for _, migration := range migrations {
|
||||||
@ -58,7 +69,6 @@ func mapDeprecatedConfigs(log *zap.Logger) {
|
|||||||
override := parseOverride(typ, migration.oldValue)
|
override := parseOverride(typ, migration.oldValue)
|
||||||
|
|
||||||
reflect.ValueOf(migration.newValue).Elem().Set(reflect.ValueOf(override))
|
reflect.ValueOf(migration.newValue).Elem().Set(reflect.ValueOf(override))
|
||||||
|
|
||||||
log.Debug("Found deprecated flag. Migrating value.",
|
log.Debug("Found deprecated flag. Migrating value.",
|
||||||
zap.Stringer("Value", reflect.ValueOf(migration.newValue).Elem()),
|
zap.Stringer("Value", reflect.ValueOf(migration.newValue).Elem()),
|
||||||
zap.String("From", migration.oldConfigString),
|
zap.String("From", migration.oldConfigString),
|
||||||
|
@ -183,6 +183,14 @@ func (bad *BadBlobs) FreeSpace() (int64, error) {
|
|||||||
return bad.blobs.FreeSpace()
|
return bad.blobs.FreeSpace()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CheckWritability tests writability of the storage directory by creating and deleting a file.
|
||||||
|
func (bad *BadBlobs) CheckWritability() error {
|
||||||
|
if bad.err != nil {
|
||||||
|
return bad.err
|
||||||
|
}
|
||||||
|
return bad.blobs.CheckWritability()
|
||||||
|
}
|
||||||
|
|
||||||
// SpaceUsedForBlobs adds up how much is used in all namespaces.
|
// SpaceUsedForBlobs adds up how much is used in all namespaces.
|
||||||
func (bad *BadBlobs) SpaceUsedForBlobs(ctx context.Context) (int64, error) {
|
func (bad *BadBlobs) SpaceUsedForBlobs(ctx context.Context) (int64, error) {
|
||||||
if bad.err != nil {
|
if bad.err != nil {
|
||||||
|
@ -153,6 +153,12 @@ func (slow *SlowBlobs) FreeSpace() (int64, error) {
|
|||||||
return slow.blobs.FreeSpace()
|
return slow.blobs.FreeSpace()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CheckWritability tests writability of the storage directory by creating and deleting a file.
|
||||||
|
func (slow *SlowBlobs) CheckWritability() error {
|
||||||
|
slow.sleep()
|
||||||
|
return slow.blobs.CheckWritability()
|
||||||
|
}
|
||||||
|
|
||||||
// SpaceUsedForBlobs adds up how much is used in all namespaces.
|
// SpaceUsedForBlobs adds up how much is used in all namespaces.
|
||||||
func (slow *SlowBlobs) SpaceUsedForBlobs(ctx context.Context) (int64, error) {
|
func (slow *SlowBlobs) SpaceUsedForBlobs(ctx context.Context) (int64, error) {
|
||||||
slow.sleep()
|
slow.sleep()
|
||||||
|
@ -146,9 +146,10 @@ func (planet *Planet) newStorageNodes(count int, whitelistedSatellites storj.Nod
|
|||||||
Path: filepath.Join(storageDir, "orders"),
|
Path: filepath.Join(storageDir, "orders"),
|
||||||
},
|
},
|
||||||
Monitor: monitor.Config{
|
Monitor: monitor.Config{
|
||||||
MinimumDiskSpace: 100 * memory.MB,
|
MinimumDiskSpace: 100 * memory.MB,
|
||||||
NotifyLowDiskCooldown: defaultInterval,
|
NotifyLowDiskCooldown: defaultInterval,
|
||||||
VerifyDirInterval: defaultInterval,
|
VerifyDirReadableInterval: defaultInterval,
|
||||||
|
VerifyDirWritableInterval: defaultInterval,
|
||||||
},
|
},
|
||||||
Trust: trust.Config{
|
Trust: trust.Config{
|
||||||
Sources: sources,
|
Sources: sources,
|
||||||
|
@ -97,6 +97,8 @@ type Blobs interface {
|
|||||||
StatWithStorageFormat(ctx context.Context, ref BlobRef, formatVer FormatVersion) (BlobInfo, error)
|
StatWithStorageFormat(ctx context.Context, ref BlobRef, formatVer FormatVersion) (BlobInfo, error)
|
||||||
// FreeSpace return how much free space is available to the blobstore.
|
// FreeSpace return how much free space is available to the blobstore.
|
||||||
FreeSpace() (int64, error)
|
FreeSpace() (int64, error)
|
||||||
|
// CheckWritability tests writability of the storage directory by creating and deleting a file.
|
||||||
|
CheckWritability() error
|
||||||
// SpaceUsedForTrash returns the total space used by the trash.
|
// SpaceUsedForTrash returns the total space used by the trash.
|
||||||
SpaceUsedForTrash(ctx context.Context) (int64, error)
|
SpaceUsedForTrash(ctx context.Context) (int64, error)
|
||||||
// SpaceUsedForBlobs adds up how much is used in all namespaces.
|
// SpaceUsedForBlobs adds up how much is used in all namespaces.
|
||||||
|
@ -7,6 +7,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"time"
|
"time"
|
||||||
@ -254,6 +255,18 @@ func (store *blobStore) FreeSpace() (int64, error) {
|
|||||||
return info.AvailableSpace, nil
|
return info.AvailableSpace, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CheckWritability tests writability of the storage directory by creating and deleting a file.
|
||||||
|
func (store *blobStore) CheckWritability() error {
|
||||||
|
f, err := ioutil.TempFile(store.dir.Path(), "write-test")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := f.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return os.Remove(f.Name())
|
||||||
|
}
|
||||||
|
|
||||||
// ListNamespaces finds all known namespace IDs in use in local storage. They are not
|
// ListNamespaces finds all known namespace IDs in use in local storage. They are not
|
||||||
// guaranteed to contain any blobs.
|
// guaranteed to contain any blobs.
|
||||||
func (store *blobStore) ListNamespaces(ctx context.Context) (ids [][]byte, err error) {
|
func (store *blobStore) ListNamespaces(ctx context.Context) (ids [][]byte, err error) {
|
||||||
|
@ -29,40 +29,43 @@ var (
|
|||||||
|
|
||||||
// Config defines parameters for storage node disk and bandwidth usage monitoring.
|
// Config defines parameters for storage node disk and bandwidth usage monitoring.
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Interval time.Duration `help:"how frequently Kademlia bucket should be refreshed with node stats" default:"1h0m0s"`
|
Interval time.Duration `help:"how frequently Kademlia bucket should be refreshed with node stats" default:"1h0m0s"`
|
||||||
VerifyDirInterval time.Duration `help:"how frequently to verify access to the storage directory" releaseDefault:"1m" devDefault:"30s"`
|
VerifyDirReadableInterval time.Duration `help:"how frequently to verify the location and readability of the storage directory" releaseDefault:"1m" devDefault:"30s"`
|
||||||
MinimumDiskSpace memory.Size `help:"how much disk space a node at minimum has to advertise" default:"500GB"`
|
VerifyDirWritableInterval time.Duration `help:"how frequently to verify writability of storage directory" releaseDefault:"5m" devDefault:"30s"`
|
||||||
MinimumBandwidth memory.Size `help:"how much bandwidth a node at minimum has to advertise (deprecated)" default:"0TB"`
|
MinimumDiskSpace memory.Size `help:"how much disk space a node at minimum has to advertise" default:"500GB"`
|
||||||
NotifyLowDiskCooldown time.Duration `help:"minimum length of time between capacity reports" default:"10m" hidden:"true"`
|
MinimumBandwidth memory.Size `help:"how much bandwidth a node at minimum has to advertise (deprecated)" default:"0TB"`
|
||||||
|
NotifyLowDiskCooldown time.Duration `help:"minimum length of time between capacity reports" default:"10m" hidden:"true"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Service which monitors disk usage
|
// Service which monitors disk usage
|
||||||
//
|
//
|
||||||
// architecture: Service
|
// architecture: Service
|
||||||
type Service struct {
|
type Service struct {
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
store *pieces.Store
|
store *pieces.Store
|
||||||
contact *contact.Service
|
contact *contact.Service
|
||||||
usageDB bandwidth.DB
|
usageDB bandwidth.DB
|
||||||
allocatedDiskSpace int64
|
allocatedDiskSpace int64
|
||||||
cooldown *sync2.Cooldown
|
cooldown *sync2.Cooldown
|
||||||
Loop *sync2.Cycle
|
Loop *sync2.Cycle
|
||||||
VerifyDirLoop *sync2.Cycle
|
VerifyDirReadableLoop *sync2.Cycle
|
||||||
Config Config
|
VerifyDirWritableLoop *sync2.Cycle
|
||||||
|
Config Config
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewService creates a new storage node monitoring service.
|
// NewService creates a new storage node monitoring service.
|
||||||
func NewService(log *zap.Logger, store *pieces.Store, contact *contact.Service, usageDB bandwidth.DB, allocatedDiskSpace int64, interval time.Duration, reportCapacity func(context.Context), config Config) *Service {
|
func NewService(log *zap.Logger, store *pieces.Store, contact *contact.Service, usageDB bandwidth.DB, allocatedDiskSpace int64, interval time.Duration, reportCapacity func(context.Context), config Config) *Service {
|
||||||
return &Service{
|
return &Service{
|
||||||
log: log,
|
log: log,
|
||||||
store: store,
|
store: store,
|
||||||
contact: contact,
|
contact: contact,
|
||||||
usageDB: usageDB,
|
usageDB: usageDB,
|
||||||
allocatedDiskSpace: allocatedDiskSpace,
|
allocatedDiskSpace: allocatedDiskSpace,
|
||||||
cooldown: sync2.NewCooldown(config.NotifyLowDiskCooldown),
|
cooldown: sync2.NewCooldown(config.NotifyLowDiskCooldown),
|
||||||
Loop: sync2.NewCycle(interval),
|
Loop: sync2.NewCycle(interval),
|
||||||
VerifyDirLoop: sync2.NewCycle(config.VerifyDirInterval),
|
VerifyDirReadableLoop: sync2.NewCycle(config.VerifyDirReadableInterval),
|
||||||
Config: config,
|
VerifyDirWritableLoop: sync2.NewCycle(config.VerifyDirWritableInterval),
|
||||||
|
Config: config,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -71,7 +74,6 @@ func (service *Service) Run(ctx context.Context) (err error) {
|
|||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
// get the disk space details
|
// get the disk space details
|
||||||
|
|
||||||
// The returned path ends in a slash only if it represents a root directory, such as "/" on Unix or `C:\` on Windows.
|
// The returned path ends in a slash only if it represents a root directory, such as "/" on Unix or `C:\` on Windows.
|
||||||
storageStatus, err := service.store.StorageStatus(ctx)
|
storageStatus, err := service.store.StorageStatus(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -118,10 +120,19 @@ func (service *Service) Run(ctx context.Context) (err error) {
|
|||||||
|
|
||||||
group, ctx := errgroup.WithContext(ctx)
|
group, ctx := errgroup.WithContext(ctx)
|
||||||
group.Go(func() error {
|
group.Go(func() error {
|
||||||
return service.VerifyDirLoop.Run(ctx, func(ctx context.Context) error {
|
return service.VerifyDirReadableLoop.Run(ctx, func(ctx context.Context) error {
|
||||||
err := service.store.VerifyStorageDir(service.contact.Local().ID)
|
err := service.store.VerifyStorageDir(service.contact.Local().ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Error.New("error verifying storage directory: %v", err)
|
return Error.New("error verifying location and/or readability of storage directory: %v", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
})
|
||||||
|
group.Go(func() error {
|
||||||
|
return service.VerifyDirWritableLoop.Run(ctx, func(ctx context.Context) error {
|
||||||
|
err := service.store.CheckWritability()
|
||||||
|
if err != nil {
|
||||||
|
return Error.New("error verifying writability of storage directory: %v", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
@ -201,7 +212,6 @@ func (service *Service) AvailableSpace(ctx context.Context) (_ int64, err error)
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, Error.Wrap(err)
|
return 0, Error.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if diskStatus.DiskFree < freeSpaceForStorj {
|
if diskStatus.DiskFree < freeSpaceForStorj {
|
||||||
freeSpaceForStorj = diskStatus.DiskFree
|
freeSpaceForStorj = diskStatus.DiskFree
|
||||||
}
|
}
|
||||||
|
@ -32,7 +32,8 @@ func TestMonitor(t *testing.T) {
|
|||||||
nodeAssertions := 0
|
nodeAssertions := 0
|
||||||
for _, storageNode := range planet.StorageNodes {
|
for _, storageNode := range planet.StorageNodes {
|
||||||
storageNode.Storage2.Monitor.Loop.TriggerWait()
|
storageNode.Storage2.Monitor.Loop.TriggerWait()
|
||||||
storageNode.Storage2.Monitor.VerifyDirLoop.TriggerWait()
|
storageNode.Storage2.Monitor.VerifyDirReadableLoop.TriggerWait()
|
||||||
|
storageNode.Storage2.Monitor.VerifyDirWritableLoop.TriggerWait()
|
||||||
stats, err := storageNode.Storage2.Inspector.Stats(ctx, &pb.StatsRequest{})
|
stats, err := storageNode.Storage2.Inspector.Stats(ctx, &pb.StatsRequest{})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
if stats.UsedSpace > 0 {
|
if stats.UsedSpace > 0 {
|
||||||
|
@ -699,6 +699,11 @@ func (store *Store) StorageStatus(ctx context.Context) (_ StorageStatus, err err
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CheckWritability tests writability of the storage directory by creating and deleting a file.
|
||||||
|
func (store *Store) CheckWritability() error {
|
||||||
|
return store.blobs.CheckWritability()
|
||||||
|
}
|
||||||
|
|
||||||
type storedPieceAccess struct {
|
type storedPieceAccess struct {
|
||||||
storage.BlobInfo
|
storage.BlobInfo
|
||||||
store *Store
|
store *Store
|
||||||
|
@ -501,7 +501,7 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err
|
|||||||
pieceReader, err = endpoint.store.Reader(ctx, limit.SatelliteId, limit.PieceId)
|
pieceReader, err = endpoint.store.Reader(ctx, limit.SatelliteId, limit.PieceId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
endpoint.monitor.VerifyDirLoop.TriggerWait()
|
endpoint.monitor.VerifyDirReadableLoop.TriggerWait()
|
||||||
return rpcstatus.Wrap(rpcstatus.NotFound, err)
|
return rpcstatus.Wrap(rpcstatus.NotFound, err)
|
||||||
}
|
}
|
||||||
return rpcstatus.Wrap(rpcstatus.Internal, err)
|
return rpcstatus.Wrap(rpcstatus.Internal, err)
|
||||||
|
Loading…
Reference in New Issue
Block a user