Cache refresher (#1171)
* got tests passed * wire up paginate function for cache node retrieval * Add tests for paginate but they're failing * fix the test arguments * Updates paginate function to return more variable * Updates * Some test and logic tweaks * improves config handling in discovery * adds refresh offset to discovery struct
This commit is contained in:
parent
19bc01c19a
commit
5e1ab841cf
@ -273,6 +273,7 @@ func (planet *Planet) newSatellites(count int) ([]*satellite.Peer, error) {
|
||||
},
|
||||
Discovery: discovery.Config{
|
||||
RefreshInterval: 1 * time.Second,
|
||||
RefreshLimit: 100,
|
||||
},
|
||||
PointerDB: pointerdb.Config{
|
||||
DatabaseURL: "bolt://" + filepath.Join(storageDir, "pointers.db"),
|
||||
|
@ -24,9 +24,15 @@ var (
|
||||
Error = errs.Class("discovery error")
|
||||
)
|
||||
|
||||
// Config loads on the configuration values from run flags
|
||||
// Config loads on the configuration values for the cache
|
||||
type Config struct {
|
||||
RefreshInterval time.Duration `help:"the interval at which the cache refreshes itself in seconds" default:"1s"`
|
||||
RefreshLimit int `help:"the amount of nodes refreshed at each interval" default:"100"`
|
||||
}
|
||||
|
||||
// Refresh tracks the offset of the current refresh cycle
|
||||
type Refresh struct {
|
||||
offset int64
|
||||
}
|
||||
|
||||
// Discovery struct loads on cache, kad, and statdb
|
||||
@ -35,28 +41,32 @@ type Discovery struct {
|
||||
cache *overlay.Cache
|
||||
kad *kademlia.Kademlia
|
||||
statdb statdb.DB
|
||||
|
||||
refreshInterval time.Duration
|
||||
config Config
|
||||
refresh Refresh
|
||||
}
|
||||
|
||||
// New returns a new discovery service.
|
||||
func New(logger *zap.Logger, ol *overlay.Cache, kad *kademlia.Kademlia, stat statdb.DB, refreshInterval time.Duration) *Discovery {
|
||||
func New(logger *zap.Logger, ol *overlay.Cache, kad *kademlia.Kademlia, stat statdb.DB, config Config) *Discovery {
|
||||
return &Discovery{
|
||||
log: logger,
|
||||
cache: ol,
|
||||
kad: kad,
|
||||
statdb: stat,
|
||||
refreshInterval: refreshInterval,
|
||||
config: config,
|
||||
refresh: Refresh{
|
||||
offset: 0,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// NewDiscovery Returns a new Discovery instance with cache, kad, and statdb loaded on
|
||||
func NewDiscovery(logger *zap.Logger, ol *overlay.Cache, kad *kademlia.Kademlia, stat statdb.DB) *Discovery {
|
||||
func NewDiscovery(logger *zap.Logger, ol *overlay.Cache, kad *kademlia.Kademlia, stat statdb.DB, config Config) *Discovery {
|
||||
return &Discovery{
|
||||
log: logger,
|
||||
cache: ol,
|
||||
kad: kad,
|
||||
statdb: stat,
|
||||
config: config,
|
||||
}
|
||||
}
|
||||
|
||||
@ -65,7 +75,7 @@ func (discovery *Discovery) Close() error { return nil }
|
||||
|
||||
// Run runs the discovery service
|
||||
func (discovery *Discovery) Run(ctx context.Context) error {
|
||||
ticker := time.NewTicker(discovery.refreshInterval)
|
||||
ticker := time.NewTicker(discovery.config.RefreshInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
@ -91,7 +101,6 @@ func (discovery *Discovery) Run(ctx context.Context) error {
|
||||
// We currently do not penalize nodes that are unresponsive,
|
||||
// but should in the future.
|
||||
func (discovery *Discovery) Refresh(ctx context.Context) error {
|
||||
// TODO(coyle): make refresh work by looking on the network for new ndoes
|
||||
nodes := discovery.kad.Seen()
|
||||
for _, v := range nodes {
|
||||
if err := discovery.cache.Put(ctx, v.Id, *v); err != nil {
|
||||
@ -99,6 +108,40 @@ func (discovery *Discovery) Refresh(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
list, more, err := discovery.cache.Paginate(ctx, discovery.refresh.offset, discovery.config.RefreshLimit)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
// more means there are more rows to page through in the cache
|
||||
if more == false {
|
||||
discovery.refresh.offset = 0
|
||||
} else {
|
||||
discovery.refresh.offset = discovery.refresh.offset + int64(len(list))
|
||||
}
|
||||
|
||||
for _, node := range list {
|
||||
ping, err := discovery.kad.Ping(ctx, *node)
|
||||
if err != nil {
|
||||
discovery.log.Info("could not pinging node")
|
||||
_, err := discovery.statdb.UpdateUptime(ctx, ping.Id, false)
|
||||
if err != nil {
|
||||
discovery.log.Error("error updating node uptime in statdb")
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
_, err = discovery.statdb.UpdateUptime(ctx, ping.Id, true)
|
||||
if err != nil {
|
||||
discovery.log.Error("error updating node uptime in statdb")
|
||||
}
|
||||
|
||||
err = discovery.cache.Put(ctx, ping.Id, ping)
|
||||
if err != nil {
|
||||
discovery.log.Error("error putting node into cache")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -43,6 +43,8 @@ type DB interface {
|
||||
GetAll(ctx context.Context, nodeIDs storj.NodeIDList) ([]*pb.Node, error)
|
||||
// List lists nodes starting from cursor
|
||||
List(ctx context.Context, cursor storj.NodeID, limit int) ([]*pb.Node, error)
|
||||
// Paginate will page through the database nodes
|
||||
Paginate(ctx context.Context, offset int64, limit int) ([]*pb.Node, bool, error)
|
||||
// Update updates node information
|
||||
Update(ctx context.Context, value *pb.Node) error
|
||||
// Delete deletes node based on id
|
||||
@ -71,6 +73,16 @@ func (cache *Cache) Inspect(ctx context.Context) (storage.Keys, error) {
|
||||
return nil, errors.New("not implemented")
|
||||
}
|
||||
|
||||
// List returns a list of nodes from the cache DB
|
||||
func (cache *Cache) List(ctx context.Context, cursor storj.NodeID, limit int) ([]*pb.Node, error) {
|
||||
return cache.db.List(ctx, cursor, limit)
|
||||
}
|
||||
|
||||
// Paginate returns a list of `limit` nodes starting from `start` offset.
|
||||
func (cache *Cache) Paginate(ctx context.Context, offset int64, limit int) ([]*pb.Node, bool, error) {
|
||||
return cache.db.Paginate(ctx, offset, limit)
|
||||
}
|
||||
|
||||
// Get looks up the provided nodeID from the overlay cache
|
||||
func (cache *Cache) Get(ctx context.Context, nodeID storj.NodeID) (*pb.Node, error) {
|
||||
if nodeID.IsZero() {
|
||||
|
@ -9,7 +9,6 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"storj.io/storj/internal/testcontext"
|
||||
"storj.io/storj/pkg/overlay"
|
||||
"storj.io/storj/pkg/pb"
|
||||
@ -97,6 +96,27 @@ func testCache(ctx context.Context, t *testing.T, store overlay.DB, sdb statdb.D
|
||||
// TODO: add erroring database test
|
||||
}
|
||||
|
||||
{ // List
|
||||
list, err := cache.List(ctx, storj.NodeID{}, 3)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, list)
|
||||
}
|
||||
|
||||
{ // Paginate
|
||||
|
||||
// should return two nodes
|
||||
nodes, more, err := cache.Paginate(ctx, 0, 2)
|
||||
assert.NotNil(t, more)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(nodes), 2)
|
||||
|
||||
// should return no nodes
|
||||
zero, more, err := cache.Paginate(ctx, 0, 0)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, more)
|
||||
assert.NotEqual(t, len(zero), 0)
|
||||
}
|
||||
|
||||
{ // Delete
|
||||
// Test standard delete
|
||||
err := cache.Delete(ctx, valid1ID)
|
||||
|
@ -274,7 +274,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config) (*
|
||||
|
||||
{ // setup discovery
|
||||
config := config.Discovery
|
||||
peer.Discovery.Service = discovery.New(peer.Log.Named("discovery"), peer.Overlay.Service, peer.Kademlia.Service, peer.DB.StatDB(), config.RefreshInterval)
|
||||
peer.Discovery.Service = discovery.New(peer.Log.Named("discovery"), peer.Overlay.Service, peer.Kademlia.Service, peer.DB.StatDB(), config)
|
||||
}
|
||||
|
||||
{ // setup metainfo
|
||||
|
@ -501,6 +501,13 @@ func (m *lockedOverlayCache) List(ctx context.Context, cursor storj.NodeID, limi
|
||||
return m.db.List(ctx, cursor, limit)
|
||||
}
|
||||
|
||||
// Paginate will page through the database nodes
|
||||
func (m *lockedOverlayCache) Paginate(ctx context.Context, offset int64, limit int) ([]*pb.Node, bool, error) {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
return m.db.Paginate(ctx, offset, limit)
|
||||
}
|
||||
|
||||
// Update updates node information
|
||||
func (m *lockedOverlayCache) Update(ctx context.Context, value *pb.Node) error {
|
||||
m.Lock()
|
||||
|
@ -288,6 +288,40 @@ func (cache *overlaycache) List(ctx context.Context, cursor storj.NodeID, limit
|
||||
return infos, nil
|
||||
}
|
||||
|
||||
// Paginate will run through
|
||||
func (cache *overlaycache) Paginate(ctx context.Context, offset int64, limit int) ([]*pb.Node, bool, error) {
|
||||
cursor := storj.NodeID{}
|
||||
|
||||
// more represents end of table. If there are more rows in the database, more will be true.
|
||||
more := true
|
||||
|
||||
if limit <= 0 || limit > storage.LookupLimit {
|
||||
limit = storage.LookupLimit
|
||||
}
|
||||
|
||||
dbxInfos, err := cache.db.Limited_OverlayCacheNode_By_NodeId_GreaterOrEqual(ctx,
|
||||
dbx.OverlayCacheNode_NodeId(cursor.Bytes()),
|
||||
limit, offset,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
if len(dbxInfos) < limit {
|
||||
more = false
|
||||
}
|
||||
|
||||
infos := make([]*pb.Node, len(dbxInfos))
|
||||
for i, dbxInfo := range dbxInfos {
|
||||
infos[i], err = convertOverlayNode(dbxInfo)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
}
|
||||
return infos, more, nil
|
||||
}
|
||||
|
||||
// Update updates node information
|
||||
func (cache *overlaycache) Update(ctx context.Context, info *pb.Node) (err error) {
|
||||
if info == nil || info.Id.IsZero() {
|
||||
|
Loading…
Reference in New Issue
Block a user