storagenode/{contact, piecestore}: implement low disk notification with cooldown

When a storagenode begins to run low on capacity, we want to notify
the satellite before completely running out of space. To achieve this,
at the end of an upload request, the SN checks if its available space has
fallen below a certain threshold. If so, trigger a notification to the
satellites.

The new NotifyLowDisk method on the monitor chore is implemented using the
common/syn2.Cooldown type, which allows us to execute contact only once
within a given timeframe; avoiding hammering the satellites with requests.
This PR contains changes to the storagenode/contact package, namely moving
methods involving the actual satellite communication out of Chore and into
Service. This allows us to ping satellites from the monitor chore

Change-Id: I668455748cdc6741291b61130d8ef9feece86458
This commit is contained in:
Cameron Ayer 2020-02-25 21:39:44 -05:00
parent d384e48ad7
commit 7244a6a84e
8 changed files with 171 additions and 116 deletions

4
go.sum
View File

@ -613,6 +613,10 @@ honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXe
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
storj.io/common v0.0.0-20200226144507-3fe9f7839df5/go.mod h1:lCc2baFO7GQlKsPTri8xwCsCPO2LsEUUiAGeRQuXY48=
storj.io/common v0.0.0-20200226185531-a823b4a45116 h1:lyAJ2vNPzmE+5b5mUjOid6ZhXTCvcCyjbScR+EG7F8A=
storj.io/common v0.0.0-20200226185531-a823b4a45116/go.mod h1:lCc2baFO7GQlKsPTri8xwCsCPO2LsEUUiAGeRQuXY48=
storj.io/common v0.0.0-20200228160146-7414c4db24c1 h1:C0MjiumuQ2f1yWYTH/csYGKfONFv7WKwjr3syN2R7ys=
storj.io/common v0.0.0-20200228160146-7414c4db24c1/go.mod h1:c9228xUKEg/sqWSOiVLoKQ3DiUqm1WrUAd9autjYfQc=
storj.io/common v0.0.0-20200303092706-429875361e5d h1:TNUV5+Nc77VV0nRpDWXsGEnxopsyOBknO6vMtrUiRbU=
storj.io/common v0.0.0-20200303092706-429875361e5d/go.mod h1:c9228xUKEg/sqWSOiVLoKQ3DiUqm1WrUAd9autjYfQc=
storj.io/drpc v0.0.7-0.20191115031725-2171c57838d2/go.mod h1:/ascUDbzNAv0A3Jj7wUIKFBH2JdJ2uJIBO/b9+2yHgQ=

View File

@ -128,8 +128,9 @@ func (planet *Planet) newStorageNodes(count int, whitelistedSatellites storj.Nod
MaxSleep: 0,
},
Monitor: monitor.Config{
MinimumBandwidth: 100 * memory.MB,
MinimumDiskSpace: 100 * memory.MB,
MinimumBandwidth: 100 * memory.MB,
MinimumDiskSpace: 100 * memory.MB,
NotifyLowDiskCooldown: defaultInterval,
},
Trust: trust.Config{
Sources: sources,

View File

@ -8,15 +8,11 @@ import (
"sync"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"storj.io/common/pb"
"storj.io/common/rpc"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/storj/storagenode/trust"
)
// Chore is the contact chore for nodes announcing themselves to their trusted satellites
@ -25,9 +21,6 @@ import (
type Chore struct {
log *zap.Logger
service *Service
dialer rpc.Dialer
trust *trust.Pool
mu sync.Mutex
cycles map[storj.NodeID]*sync2.Cycle
@ -35,20 +28,11 @@ type Chore struct {
interval time.Duration
}
var (
errPingSatellite = errs.Class("ping satellite error")
)
const initialBackOff = time.Second
// NewChore creates a new contact chore
func NewChore(log *zap.Logger, interval time.Duration, trust *trust.Pool, dialer rpc.Dialer, service *Service) *Chore {
func NewChore(log *zap.Logger, interval time.Duration, service *Service) *Chore {
return &Chore{
log: log,
service: service,
dialer: dialer,
trust: trust,
cycles: make(map[storj.NodeID]*sync2.Cycle),
interval: interval,
@ -65,12 +49,12 @@ func (chore *Chore) Run(ctx context.Context) (err error) {
}
// configure the satellite ping cycles
chore.updateCycles(ctx, &group, chore.trust.GetSatellites(ctx))
chore.updateCycles(ctx, &group, chore.service.trust.GetSatellites(ctx))
// set up a cycle to update ping cycles on a frequent interval
refreshCycle := sync2.NewCycle(time.Minute)
refreshCycle.Start(ctx, &group, func(ctx context.Context) error {
chore.updateCycles(ctx, &group, chore.trust.GetSatellites(ctx))
chore.updateCycles(ctx, &group, chore.service.trust.GetSatellites(ctx))
return nil
})
@ -100,7 +84,7 @@ func (chore *Chore) updateCycles(ctx context.Context, group *errgroup.Group, sat
cycle := sync2.NewCycle(chore.interval)
chore.cycles[satellite] = cycle
cycle.Start(ctx, group, func(ctx context.Context) error {
return chore.pingSatellite(ctx, satellite)
return chore.service.pingSatellite(ctx, satellite, chore.interval)
})
}
@ -114,61 +98,6 @@ func (chore *Chore) updateCycles(ctx context.Context, group *errgroup.Group, sat
}
}
func (chore *Chore) pingSatellite(ctx context.Context, satellite storj.NodeID) error {
interval := initialBackOff
attempts := 0
for {
mon.Meter("satellite_contact_request").Mark(1) //locked
err := chore.pingSatelliteOnce(ctx, satellite)
attempts++
if err == nil {
return nil
}
chore.log.Error("ping satellite failed ", zap.Stringer("Satellite ID", satellite), zap.Int("attempts", attempts), zap.Error(err))
// Sleeps until interval times out, then continue. Returns if context is cancelled.
if !sync2.Sleep(ctx, interval) {
chore.log.Info("context cancelled", zap.Stringer("Satellite ID", satellite))
return nil
}
interval *= 2
if interval >= chore.interval {
chore.log.Info("retries timed out for this cycle", zap.Stringer("Satellite ID", satellite))
return nil
}
}
}
func (chore *Chore) pingSatelliteOnce(ctx context.Context, id storj.NodeID) (err error) {
defer mon.Task()(&ctx, id)(&err)
self := chore.service.Local()
address, err := chore.trust.GetAddress(ctx, id)
if err != nil {
return errPingSatellite.Wrap(err)
}
conn, err := chore.dialer.DialAddressID(ctx, address, id)
if err != nil {
return errPingSatellite.Wrap(err)
}
defer func() { err = errs.Combine(err, conn.Close()) }()
_, err = pb.NewDRPCNodeClient(conn.Raw()).CheckIn(ctx, &pb.CheckInRequest{
Address: self.Address.GetAddress(),
Version: &self.Version,
Capacity: &self.Capacity,
Operator: &self.Operator,
})
if err != nil {
return errPingSatellite.Wrap(err)
}
return nil
}
// Pause stops all the cycles in the contact chore.
func (chore *Chore) Pause(ctx context.Context) {
chore.started.Wait(ctx)

View File

@ -76,6 +76,35 @@ func TestNodeInfoUpdated(t *testing.T) {
})
}
func TestServicePingSatellites(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 2, StorageNodeCount: 1, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
node := planet.StorageNodes[0]
node.Contact.Chore.Pause(ctx)
newCapacity := pb.NodeCapacity{
FreeBandwidth: 0,
FreeDisk: 0,
}
for _, satellite := range planet.Satellites {
info, err := satellite.Overlay.Service.Get(ctx, node.ID())
require.NoError(t, err)
require.NotEqual(t, newCapacity, info.Capacity)
}
node.Contact.Service.UpdateSelf(&newCapacity)
err := node.Contact.Service.PingSatellites(ctx, 10*time.Second)
require.NoError(t, err)
for _, satellite := range planet.Satellites {
info, err := satellite.Overlay.Service.Get(ctx, node.ID())
require.NoError(t, err)
require.Equal(t, newCapacity, info.Capacity)
}
})
}
func TestLocalAndUpdateSelf(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,

View File

@ -4,22 +4,33 @@
package contact
import (
"context"
"sync"
"time"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"storj.io/common/pb"
"storj.io/common/rpc"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/storj/satellite/overlay"
"storj.io/storj/storagenode/trust"
)
// Error is the default error class for contact package
var Error = errs.Class("contact")
var (
mon = monkit.Package()
var mon = monkit.Package()
// Error is the default error class for contact package
Error = errs.Class("contact")
errPingSatellite = errs.Class("ping satellite error")
)
const initialBackOff = time.Second
// Config contains configurable values for contact service
type Config struct {
@ -31,22 +42,96 @@ type Config struct {
// Service is the contact service between storage nodes and satellites
type Service struct {
log *zap.Logger
log *zap.Logger
dialer rpc.Dialer
mu sync.Mutex
self *overlay.NodeDossier
trust *trust.Pool
initialized sync2.Fence
}
// NewService creates a new contact service
func NewService(log *zap.Logger, self *overlay.NodeDossier) *Service {
func NewService(log *zap.Logger, dialer rpc.Dialer, self *overlay.NodeDossier, trust *trust.Pool) *Service {
return &Service{
log: log,
self: self,
log: log,
dialer: dialer,
trust: trust,
self: self,
}
}
// PingSatellites attempts to ping all satellites in trusted list until backoff reaches maxInterval
func (service *Service) PingSatellites(ctx context.Context, maxInterval time.Duration) error {
defer mon.Task()(&ctx)
satellites := service.trust.GetSatellites(ctx)
var group errgroup.Group
for _, satellite := range satellites {
satellite := satellite
group.Go(func() error {
return service.pingSatellite(ctx, satellite, maxInterval)
})
}
return group.Wait()
}
func (service *Service) pingSatellite(ctx context.Context, satellite storj.NodeID, maxInterval time.Duration) error {
interval := initialBackOff
attempts := 0
for {
mon.Meter("satellite_contact_request").Mark(1) //locked
err := service.pingSatelliteOnce(ctx, satellite)
attempts++
if err == nil {
return nil
}
service.log.Error("ping satellite failed ", zap.Stringer("Satellite ID", satellite), zap.Int("attempts", attempts), zap.Error(err))
// Sleeps until interval times out, then continue. Returns if context is cancelled.
if !sync2.Sleep(ctx, interval) {
service.log.Info("context cancelled", zap.Stringer("Satellite ID", satellite))
return nil
}
interval *= 2
if interval >= maxInterval {
service.log.Info("retries timed out for this cycle", zap.Stringer("Satellite ID", satellite))
return nil
}
}
}
func (service *Service) pingSatelliteOnce(ctx context.Context, id storj.NodeID) (err error) {
defer mon.Task()(&ctx, id)(&err)
self := service.Local()
address, err := service.trust.GetAddress(ctx, id)
if err != nil {
return errPingSatellite.Wrap(err)
}
conn, err := service.dialer.DialAddressID(ctx, address, id)
if err != nil {
return errPingSatellite.Wrap(err)
}
defer func() { err = errs.Combine(err, conn.Close()) }()
_, err = pb.NewDRPCNodeClient(conn.Raw()).CheckIn(ctx, &pb.CheckInRequest{
Address: self.Address.GetAddress(),
Version: &self.Version,
Capacity: &self.Capacity,
Operator: &self.Operator,
})
if err != nil {
return errPingSatellite.Wrap(err)
}
return nil
}
// Local returns the storagenode node-dossier
func (service *Service) Local() overlay.NodeDossier {
service.mu.Lock()

View File

@ -10,6 +10,7 @@ import (
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"storj.io/common/memory"
"storj.io/common/pb"
@ -28,9 +29,10 @@ var (
// Config defines parameters for storage node disk and bandwidth usage monitoring.
type Config struct {
Interval time.Duration `help:"how frequently Kademlia bucket should be refreshed with node stats" default:"1h0m0s"`
MinimumDiskSpace memory.Size `help:"how much disk space a node at minimum has to advertise" default:"500GB"`
MinimumBandwidth memory.Size `help:"how much bandwidth a node at minimum has to advertise" default:"500GB"`
Interval time.Duration `help:"how frequently Kademlia bucket should be refreshed with node stats" default:"1h0m0s"`
MinimumDiskSpace memory.Size `help:"how much disk space a node at minimum has to advertise" default:"500GB"`
MinimumBandwidth memory.Size `help:"how much bandwidth a node at minimum has to advertise" default:"500GB"`
NotifyLowDiskCooldown time.Duration `help:"minimum length of time between capacity reports" default:"10m" hidden:"true"`
}
// Service which monitors disk usage
@ -43,6 +45,7 @@ type Service struct {
usageDB bandwidth.DB
allocatedDiskSpace int64
allocatedBandwidth int64
cooldown *sync2.Cooldown
Loop *sync2.Cycle
Config Config
}
@ -50,7 +53,7 @@ type Service struct {
// TODO: should it be responsible for monitoring actual bandwidth as well?
// NewService creates a new storage node monitoring service.
func NewService(log *zap.Logger, store *pieces.Store, contact *contact.Service, usageDB bandwidth.DB, allocatedDiskSpace, allocatedBandwidth int64, interval time.Duration, config Config) *Service {
func NewService(log *zap.Logger, store *pieces.Store, contact *contact.Service, usageDB bandwidth.DB, allocatedDiskSpace, allocatedBandwidth int64, interval time.Duration, reportCapacity func(context.Context), config Config) *Service {
return &Service{
log: log,
store: store,
@ -58,6 +61,7 @@ func NewService(log *zap.Logger, store *pieces.Store, contact *contact.Service,
usageDB: usageDB,
allocatedDiskSpace: allocatedDiskSpace,
allocatedBandwidth: allocatedBandwidth,
cooldown: sync2.NewCooldown(config.NotifyLowDiskCooldown),
Loop: sync2.NewCycle(interval),
Config: config,
}
@ -124,18 +128,42 @@ func (service *Service) Run(ctx context.Context) (err error) {
return Error.New("bandwidth requirement not met")
}
return service.Loop.Run(ctx, func(ctx context.Context) error {
var group errgroup.Group
group.Go(func() error {
return service.Loop.Run(ctx, func(ctx context.Context) error {
err := service.updateNodeInformation(ctx)
if err != nil {
service.log.Error("error during updating node information: ", zap.Error(err))
}
return nil
})
})
service.cooldown.Start(ctx, &group, func(ctx context.Context) error {
err := service.updateNodeInformation(ctx)
if err != nil {
service.log.Error("error during updating node information: ", zap.Error(err))
return nil
}
return err
err = service.contact.PingSatellites(ctx, service.Config.NotifyLowDiskCooldown)
if err != nil {
service.log.Error("error notifying satellites: ", zap.Error(err))
}
return nil
})
return group.Wait()
}
// NotifyLowDisk reports disk space to satellites if cooldown timer has expired
func (service *Service) NotifyLowDisk() {
service.cooldown.Trigger()
}
// Close stops the monitor service.
func (service *Service) Close() (err error) {
service.Loop.Close()
service.cooldown.Close()
return nil
}

View File

@ -349,9 +349,9 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
Version: *pbVersion,
}
peer.Contact.PingStats = new(contact.PingStats)
peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), self)
peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), peer.Dialer, self, peer.Storage2.Trust)
peer.Contact.Chore = contact.NewChore(peer.Log.Named("contact:chore"), config.Contact.Interval, peer.Storage2.Trust, peer.Dialer, peer.Contact.Service)
peer.Contact.Chore = contact.NewChore(peer.Log.Named("contact:chore"), config.Contact.Interval, peer.Contact.Service)
peer.Services.Add(lifecycle.Item{
Name: "contact:chore",
Run: peer.Contact.Chore.Run,
@ -410,6 +410,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
config.Storage.AllocatedBandwidth.Int64(),
//TODO use config.Storage.Monitor.Interval, but for some reason is not set
config.Storage.KBucketRefreshInterval,
peer.Contact.Chore.Trigger,
config.Storage2.Monitor,
)
peer.Services.Add(lifecycle.Item{
@ -442,17 +443,11 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
peer.DB.Orders(),
peer.DB.Bandwidth(),
peer.DB.UsedSerials(),
peer.Contact.Chore.Trigger,
config.Storage2,
)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
// TODO remove this once workgroup is removed from piecestore endpoint
peer.Services.Add(lifecycle.Item{
Name: "piecestore",
Close: peer.Storage2.Endpoint.Close,
})
pbgrpc.RegisterPiecestoreServer(peer.Server.GRPC(), peer.Storage2.Endpoint)
pb.DRPCRegisterPiecestore(peer.Server.DRPC(), peer.Storage2.Endpoint.DRPC())

View File

@ -89,10 +89,6 @@ type Endpoint struct {
usage bandwidth.DB
usedSerials UsedSerials
group sync2.WorkGroup // temporary fix for uncontrolled goroutine at end of doUpload
reportCapacity func(context.Context)
// liveRequests tracks the total number of incoming rpc requests. For gRPC
// requests only, this number is compared to config.MaxConcurrentRequests
// and limits the number of gRPC requests. dRPC requests are tracked but
@ -107,7 +103,7 @@ type drpcEndpoint struct{ *Endpoint }
func (endpoint *Endpoint) DRPC() pb.DRPCPiecestoreServer { return &drpcEndpoint{Endpoint: endpoint} }
// NewEndpoint creates a new piecestore endpoint.
func NewEndpoint(log *zap.Logger, signer signing.Signer, trust *trust.Pool, monitor *monitor.Service, retain *retain.Service, pingStats pingStatsSource, store *pieces.Store, orders orders.DB, usage bandwidth.DB, usedSerials UsedSerials, reportCapacity func(context.Context), config Config) (*Endpoint, error) {
func NewEndpoint(log *zap.Logger, signer signing.Signer, trust *trust.Pool, monitor *monitor.Service, retain *retain.Service, pingStats pingStatsSource, store *pieces.Store, orders orders.DB, usage bandwidth.DB, usedSerials UsedSerials, config Config) (*Endpoint, error) {
// If config.MaxConcurrentRequests is set we want to repsect it for grpc.
// However, if it is 0 (unlimited) we force a limit.
grpcReqLimit := config.MaxConcurrentRequests
@ -126,8 +122,6 @@ func NewEndpoint(log *zap.Logger, signer signing.Signer, trust *trust.Pool, moni
retain: retain,
pingStats: pingStats,
reportCapacity: reportCapacity,
store: store,
orders: orders,
usage: usage,
@ -288,11 +282,7 @@ func (endpoint *Endpoint) doUpload(stream uploadStream, requestLimit int) (err e
// if availableSpace has fallen below ReportCapacityThreshold, report capacity to satellites
defer func() {
if availableSpace < endpoint.config.ReportCapacityThreshold.Int64() {
// workgroup is a temporary fix to clean up goroutine when peer shuts down
endpoint.group.Go(func() {
endpoint.monitor.Loop.TriggerWait()
endpoint.reportCapacity(ctx)
})
endpoint.monitor.NotifyLowDisk()
}
}()
@ -827,9 +817,3 @@ func min(a, b int64) int64 {
}
return b
}
// Close is a temporary fix to clean up uncontrolled goroutine in doUpload
func (endpoint *Endpoint) Close() error {
endpoint.group.Close()
return nil
}