From 5e1ab841cfb285078d1967dbf077e1ea57450712 Mon Sep 17 00:00:00 2001 From: Dylan Lott Date: Wed, 30 Jan 2019 09:29:18 -0700 Subject: [PATCH] Cache refresher (#1171) * got tests passed * wire up paginate function for cache node retrieval * Add tests for paginate but they're failing * fix the test arguments * Updates paginate function to return more variable * Updates * Some test and logic tweaks * improves config handling in discovery * adds refresh offset to discovery struct --- internal/testplanet/planet.go | 1 + pkg/discovery/service.go | 85 ++++++++++++++++++++------- pkg/overlay/cache.go | 12 ++++ pkg/overlay/cache_test.go | 22 ++++++- satellite/peer.go | 2 +- satellite/satellitedb/locked.go | 7 +++ satellite/satellitedb/overlaycache.go | 34 +++++++++++ 7 files changed, 140 insertions(+), 23 deletions(-) diff --git a/internal/testplanet/planet.go b/internal/testplanet/planet.go index 6e6a36d05..528931167 100644 --- a/internal/testplanet/planet.go +++ b/internal/testplanet/planet.go @@ -273,6 +273,7 @@ func (planet *Planet) newSatellites(count int) ([]*satellite.Peer, error) { }, Discovery: discovery.Config{ RefreshInterval: 1 * time.Second, + RefreshLimit: 100, }, PointerDB: pointerdb.Config{ DatabaseURL: "bolt://" + filepath.Join(storageDir, "pointers.db"), diff --git a/pkg/discovery/service.go b/pkg/discovery/service.go index cf1e5bafd..284d193bf 100644 --- a/pkg/discovery/service.go +++ b/pkg/discovery/service.go @@ -24,39 +24,49 @@ var ( Error = errs.Class("discovery error") ) -// Config loads on the configuration values from run flags +// Config loads on the configuration values for the cache type Config struct { RefreshInterval time.Duration `help:"the interval at which the cache refreshes itself in seconds" default:"1s"` + RefreshLimit int `help:"the amount of nodes refreshed at each interval" default:"100"` +} + +// Refresh tracks the offset of the current refresh cycle +type Refresh struct { + offset int64 } // Discovery struct loads on cache, kad, and statdb type Discovery struct { - log *zap.Logger - cache *overlay.Cache - kad *kademlia.Kademlia - statdb statdb.DB - - refreshInterval time.Duration + log *zap.Logger + cache *overlay.Cache + kad *kademlia.Kademlia + statdb statdb.DB + config Config + refresh Refresh } // New returns a new discovery service. -func New(logger *zap.Logger, ol *overlay.Cache, kad *kademlia.Kademlia, stat statdb.DB, refreshInterval time.Duration) *Discovery { - return &Discovery{ - log: logger, - cache: ol, - kad: kad, - statdb: stat, - refreshInterval: refreshInterval, - } -} - -// NewDiscovery Returns a new Discovery instance with cache, kad, and statdb loaded on -func NewDiscovery(logger *zap.Logger, ol *overlay.Cache, kad *kademlia.Kademlia, stat statdb.DB) *Discovery { +func New(logger *zap.Logger, ol *overlay.Cache, kad *kademlia.Kademlia, stat statdb.DB, config Config) *Discovery { return &Discovery{ log: logger, cache: ol, kad: kad, statdb: stat, + config: config, + refresh: Refresh{ + offset: 0, + }, + } +} + +// NewDiscovery Returns a new Discovery instance with cache, kad, and statdb loaded on +func NewDiscovery(logger *zap.Logger, ol *overlay.Cache, kad *kademlia.Kademlia, stat statdb.DB, config Config) *Discovery { + return &Discovery{ + log: logger, + cache: ol, + kad: kad, + statdb: stat, + config: config, } } @@ -65,7 +75,7 @@ func (discovery *Discovery) Close() error { return nil } // Run runs the discovery service func (discovery *Discovery) Run(ctx context.Context) error { - ticker := time.NewTicker(discovery.refreshInterval) + ticker := time.NewTicker(discovery.config.RefreshInterval) defer ticker.Stop() for { @@ -91,7 +101,6 @@ func (discovery *Discovery) Run(ctx context.Context) error { // We currently do not penalize nodes that are unresponsive, // but should in the future. func (discovery *Discovery) Refresh(ctx context.Context) error { - // TODO(coyle): make refresh work by looking on the network for new ndoes nodes := discovery.kad.Seen() for _, v := range nodes { if err := discovery.cache.Put(ctx, v.Id, *v); err != nil { @@ -99,6 +108,40 @@ func (discovery *Discovery) Refresh(ctx context.Context) error { } } + list, more, err := discovery.cache.Paginate(ctx, discovery.refresh.offset, discovery.config.RefreshLimit) + if err != nil { + return Error.Wrap(err) + } + + // more means there are more rows to page through in the cache + if more == false { + discovery.refresh.offset = 0 + } else { + discovery.refresh.offset = discovery.refresh.offset + int64(len(list)) + } + + for _, node := range list { + ping, err := discovery.kad.Ping(ctx, *node) + if err != nil { + discovery.log.Info("could not pinging node") + _, err := discovery.statdb.UpdateUptime(ctx, ping.Id, false) + if err != nil { + discovery.log.Error("error updating node uptime in statdb") + } + continue + } + + _, err = discovery.statdb.UpdateUptime(ctx, ping.Id, true) + if err != nil { + discovery.log.Error("error updating node uptime in statdb") + } + + err = discovery.cache.Put(ctx, ping.Id, ping) + if err != nil { + discovery.log.Error("error putting node into cache") + } + } + return nil } diff --git a/pkg/overlay/cache.go b/pkg/overlay/cache.go index ea1a0d41b..47cc5ea43 100644 --- a/pkg/overlay/cache.go +++ b/pkg/overlay/cache.go @@ -43,6 +43,8 @@ type DB interface { GetAll(ctx context.Context, nodeIDs storj.NodeIDList) ([]*pb.Node, error) // List lists nodes starting from cursor List(ctx context.Context, cursor storj.NodeID, limit int) ([]*pb.Node, error) + // Paginate will page through the database nodes + Paginate(ctx context.Context, offset int64, limit int) ([]*pb.Node, bool, error) // Update updates node information Update(ctx context.Context, value *pb.Node) error // Delete deletes node based on id @@ -71,6 +73,16 @@ func (cache *Cache) Inspect(ctx context.Context) (storage.Keys, error) { return nil, errors.New("not implemented") } +// List returns a list of nodes from the cache DB +func (cache *Cache) List(ctx context.Context, cursor storj.NodeID, limit int) ([]*pb.Node, error) { + return cache.db.List(ctx, cursor, limit) +} + +// Paginate returns a list of `limit` nodes starting from `start` offset. +func (cache *Cache) Paginate(ctx context.Context, offset int64, limit int) ([]*pb.Node, bool, error) { + return cache.db.Paginate(ctx, offset, limit) +} + // Get looks up the provided nodeID from the overlay cache func (cache *Cache) Get(ctx context.Context, nodeID storj.NodeID) (*pb.Node, error) { if nodeID.IsZero() { diff --git a/pkg/overlay/cache_test.go b/pkg/overlay/cache_test.go index eb7854802..0382ed931 100644 --- a/pkg/overlay/cache_test.go +++ b/pkg/overlay/cache_test.go @@ -9,7 +9,6 @@ import ( "testing" "github.com/stretchr/testify/assert" - "storj.io/storj/internal/testcontext" "storj.io/storj/pkg/overlay" "storj.io/storj/pkg/pb" @@ -97,6 +96,27 @@ func testCache(ctx context.Context, t *testing.T, store overlay.DB, sdb statdb.D // TODO: add erroring database test } + { // List + list, err := cache.List(ctx, storj.NodeID{}, 3) + assert.NoError(t, err) + assert.NotNil(t, list) + } + + { // Paginate + + // should return two nodes + nodes, more, err := cache.Paginate(ctx, 0, 2) + assert.NotNil(t, more) + assert.NoError(t, err) + assert.Equal(t, len(nodes), 2) + + // should return no nodes + zero, more, err := cache.Paginate(ctx, 0, 0) + assert.NoError(t, err) + assert.NotNil(t, more) + assert.NotEqual(t, len(zero), 0) + } + { // Delete // Test standard delete err := cache.Delete(ctx, valid1ID) diff --git a/satellite/peer.go b/satellite/peer.go index 558a7cfc8..2b7e6833e 100644 --- a/satellite/peer.go +++ b/satellite/peer.go @@ -274,7 +274,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config) (* { // setup discovery config := config.Discovery - peer.Discovery.Service = discovery.New(peer.Log.Named("discovery"), peer.Overlay.Service, peer.Kademlia.Service, peer.DB.StatDB(), config.RefreshInterval) + peer.Discovery.Service = discovery.New(peer.Log.Named("discovery"), peer.Overlay.Service, peer.Kademlia.Service, peer.DB.StatDB(), config) } { // setup metainfo diff --git a/satellite/satellitedb/locked.go b/satellite/satellitedb/locked.go index c540b2721..4c677b412 100644 --- a/satellite/satellitedb/locked.go +++ b/satellite/satellitedb/locked.go @@ -501,6 +501,13 @@ func (m *lockedOverlayCache) List(ctx context.Context, cursor storj.NodeID, limi return m.db.List(ctx, cursor, limit) } +// Paginate will page through the database nodes +func (m *lockedOverlayCache) Paginate(ctx context.Context, offset int64, limit int) ([]*pb.Node, bool, error) { + m.Lock() + defer m.Unlock() + return m.db.Paginate(ctx, offset, limit) +} + // Update updates node information func (m *lockedOverlayCache) Update(ctx context.Context, value *pb.Node) error { m.Lock() diff --git a/satellite/satellitedb/overlaycache.go b/satellite/satellitedb/overlaycache.go index c5243b53c..94e29d07f 100644 --- a/satellite/satellitedb/overlaycache.go +++ b/satellite/satellitedb/overlaycache.go @@ -288,6 +288,40 @@ func (cache *overlaycache) List(ctx context.Context, cursor storj.NodeID, limit return infos, nil } +// Paginate will run through +func (cache *overlaycache) Paginate(ctx context.Context, offset int64, limit int) ([]*pb.Node, bool, error) { + cursor := storj.NodeID{} + + // more represents end of table. If there are more rows in the database, more will be true. + more := true + + if limit <= 0 || limit > storage.LookupLimit { + limit = storage.LookupLimit + } + + dbxInfos, err := cache.db.Limited_OverlayCacheNode_By_NodeId_GreaterOrEqual(ctx, + dbx.OverlayCacheNode_NodeId(cursor.Bytes()), + limit, offset, + ) + + if err != nil { + return nil, false, err + } + + if len(dbxInfos) < limit { + more = false + } + + infos := make([]*pb.Node, len(dbxInfos)) + for i, dbxInfo := range dbxInfos { + infos[i], err = convertOverlayNode(dbxInfo) + if err != nil { + return nil, false, err + } + } + return infos, more, nil +} + // Update updates node information func (cache *overlaycache) Update(ctx context.Context, info *pb.Node) (err error) { if info == nil || info.Id.IsZero() {