discovery: parallelize refresh (#2535)

* parallelize discovery refresh

* add paginateQualifiedtest, address pr comments

* Remove duplicate uptime update

* Lower concurrency in Testplanet for discovery
This commit is contained in:
Alexander Leitner 2019-07-12 10:35:48 -04:00 committed by GitHub
parent d887ffec62
commit 64b2769de3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 234 additions and 51 deletions

View File

@ -124,9 +124,10 @@ func (planet *Planet) newSatellites(count int) ([]*satellite.Peer, error) {
}, },
}, },
Discovery: discovery.Config{ Discovery: discovery.Config{
DiscoveryInterval: 1 * time.Second, DiscoveryInterval: 1 * time.Second,
RefreshInterval: 1 * time.Second, RefreshInterval: 1 * time.Second,
RefreshLimit: 100, RefreshLimit: 100,
RefreshConcurrency: 2,
}, },
Metainfo: metainfo.Config{ Metainfo: metainfo.Config{
DatabaseURL: "bolt://" + filepath.Join(storageDir, "pointers.db"), DatabaseURL: "bolt://" + filepath.Join(storageDir, "pointers.db"),

View File

@ -28,9 +28,10 @@ var (
// Config loads on the configuration values for the cache // Config loads on the configuration values for the cache
type Config struct { type Config struct {
RefreshInterval time.Duration `help:"the interval at which the cache refreshes itself in seconds" default:"1s"` RefreshInterval time.Duration `help:"the interval at which the cache refreshes itself in seconds" default:"1s"`
DiscoveryInterval time.Duration `help:"the interval at which the satellite attempts to find new nodes via random node ID lookups" default:"1s"` DiscoveryInterval time.Duration `help:"the interval at which the satellite attempts to find new nodes via random node ID lookups" default:"1s"`
RefreshLimit int `help:"the amount of nodes refreshed at each interval" default:"100"` RefreshLimit int `help:"the amount of nodes read from the overlay cache in a single pagination call" default:"100"`
RefreshConcurrency int `help:"the amount of nodes refreshed in parallel" default:"8"`
} }
// Discovery struct loads on cache, kad // Discovery struct loads on cache, kad
@ -39,9 +40,8 @@ type Discovery struct {
cache *overlay.Cache cache *overlay.Cache
kad *kademlia.Kademlia kad *kademlia.Kademlia
// refreshOffset tracks the offset of the current refresh cycle refreshLimit int
refreshOffset int64 refreshConcurrency int
refreshLimit int
Refresh sync2.Cycle Refresh sync2.Cycle
Discovery sync2.Cycle Discovery sync2.Cycle
@ -54,8 +54,8 @@ func New(logger *zap.Logger, ol *overlay.Cache, kad *kademlia.Kademlia, config C
cache: ol, cache: ol,
kad: kad, kad: kad,
refreshOffset: 0, refreshLimit: config.RefreshLimit,
refreshLimit: config.RefreshLimit, refreshConcurrency: config.RefreshConcurrency,
} }
discovery.Refresh.SetInterval(config.RefreshInterval) discovery.Refresh.SetInterval(config.RefreshInterval)
@ -97,54 +97,49 @@ func (discovery *Discovery) Run(ctx context.Context) (err error) {
func (discovery *Discovery) refresh(ctx context.Context) (err error) { func (discovery *Discovery) refresh(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
list, more, err := discovery.cache.Paginate(ctx, discovery.refreshOffset, discovery.refreshLimit) limiter := sync2.NewLimiter(discovery.refreshConcurrency)
if err != nil {
return Error.Wrap(err)
}
// more means there are more rows to page through in the cache var offset int64
if more == false {
discovery.refreshOffset = 0
} else {
discovery.refreshOffset += int64(len(list))
}
for _, node := range list {
if ctx.Err() != nil {
return ctx.Err()
}
if node.Disqualified != nil {
discovery.log.Debug("skip disqualified node", zap.Stringer("ID", node.Id))
continue
}
info, err := discovery.kad.FetchInfo(ctx, node.Node)
if ctx.Err() != nil {
return ctx.Err()
}
for {
list, more, err := discovery.cache.PaginateQualified(ctx, offset, discovery.refreshLimit)
if err != nil { if err != nil {
discovery.log.Info("could not ping node", zap.Stringer("ID", node.Id), zap.Error(err)) return Error.Wrap(err)
_, err := discovery.cache.UpdateUptime(ctx, node.Id, false)
if err != nil {
discovery.log.Error("could not update node uptime in cache", zap.Stringer("ID", node.Id), zap.Error(err))
}
continue
} }
// TODO: combine these into the same db call if len(list) == 0 {
_, err = discovery.cache.UpdateUptime(ctx, node.Id, true) break
if err != nil {
discovery.log.Error("could not update node uptime in cache", zap.Stringer("ID", node.Id), zap.Error(err))
} }
_, err = discovery.cache.UpdateNodeInfo(ctx, node.Id, info)
if err != nil { offset += int64(len(list))
discovery.log.Warn("could not update node info", zap.Stringer("ID", node.GetAddress()))
for _, node := range list {
node := node
limiter.Go(ctx, func() {
// NB: FetchInfo updates node uptime already
info, err := discovery.kad.FetchInfo(ctx, *node)
if ctx.Err() != nil {
return
}
if err != nil {
discovery.log.Info("could not ping node", zap.Stringer("ID", node.Id), zap.Error(err))
return
}
if _, err = discovery.cache.UpdateNodeInfo(ctx, node.Id, info); err != nil {
discovery.log.Warn("could not update node info", zap.Stringer("ID", node.GetAddress()))
}
})
}
if !more {
break
} }
} }
limiter.Wait()
return nil return nil
} }

View File

@ -55,6 +55,8 @@ type DB interface {
Reliable(context.Context, *NodeCriteria) (storj.NodeIDList, error) Reliable(context.Context, *NodeCriteria) (storj.NodeIDList, error)
// Paginate will page through the database nodes // Paginate will page through the database nodes
Paginate(ctx context.Context, offset int64, limit int) ([]*NodeDossier, bool, error) Paginate(ctx context.Context, offset int64, limit int) ([]*NodeDossier, bool, error)
// PaginateQualified will page through the qualified nodes
PaginateQualified(ctx context.Context, offset int64, limit int) ([]*pb.Node, bool, error)
// IsVetted returns whether or not the node reaches reputable thresholds // IsVetted returns whether or not the node reaches reputable thresholds
IsVetted(ctx context.Context, id storj.NodeID, criteria *NodeCriteria) (bool, error) IsVetted(ctx context.Context, id storj.NodeID, criteria *NodeCriteria) (bool, error)
// Update updates node address // Update updates node address
@ -166,6 +168,12 @@ func (cache *Cache) Paginate(ctx context.Context, offset int64, limit int) (_ []
return cache.db.Paginate(ctx, offset, limit) return cache.db.Paginate(ctx, offset, limit)
} }
// PaginateQualified returns a list of `limit` qualified nodes starting from `start` offset.
func (cache *Cache) PaginateQualified(ctx context.Context, offset int64, limit int) (_ []*pb.Node, _ bool, err error) {
defer mon.Task()(&ctx)(&err)
return cache.db.PaginateQualified(ctx, offset, limit)
}
// Get looks up the provided nodeID from the overlay cache // Get looks up the provided nodeID from the overlay cache
func (cache *Cache) Get(ctx context.Context, nodeID storj.NodeID) (_ *NodeDossier, err error) { func (cache *Cache) Get(ctx context.Context, nodeID storj.NodeID) (_ *NodeDossier, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)

View File

@ -63,6 +63,7 @@ func testNodeSelectionConfig(auditCount int64, newNodePercentage float64, distin
func testCache(ctx context.Context, t *testing.T, store overlay.DB) { func testCache(ctx context.Context, t *testing.T, store overlay.DB) {
valid1ID := testrand.NodeID() valid1ID := testrand.NodeID()
valid2ID := testrand.NodeID() valid2ID := testrand.NodeID()
valid3ID := testrand.NodeID()
missingID := testrand.NodeID() missingID := testrand.NodeID()
address := &pb.NodeAddress{Address: "127.0.0.1:0"} address := &pb.NodeAddress{Address: "127.0.0.1:0"}
@ -75,6 +76,12 @@ func testCache(ctx context.Context, t *testing.T, store overlay.DB) {
err = cache.Put(ctx, valid2ID, pb.Node{Id: valid2ID, Address: address}) err = cache.Put(ctx, valid2ID, pb.Node{Id: valid2ID, Address: address})
require.NoError(t, err) require.NoError(t, err)
err = cache.Put(ctx, valid3ID, pb.Node{Id: valid3ID, Address: address})
require.NoError(t, err)
_, err = cache.UpdateUptime(ctx, valid3ID, false)
require.NoError(t, err)
} }
{ // Get { // Get
@ -113,6 +120,15 @@ func testCache(ctx context.Context, t *testing.T, store overlay.DB) {
assert.NotEqual(t, len(zero), 0) assert.NotEqual(t, len(zero), 0)
} }
{ // PaginateQualified
// should return two nodes
nodes, more, err := cache.PaginateQualified(ctx, 0, 3)
assert.NotNil(t, more)
assert.NoError(t, err)
assert.Equal(t, len(nodes), 2)
}
{ // Reputation { // Reputation
valid1, err := cache.Get(ctx, valid1ID) valid1, err := cache.Get(ctx, valid1ID)
require.NoError(t, err) require.NoError(t, err)

View File

@ -180,6 +180,13 @@ read limitoffset (
orderby asc node.id orderby asc node.id
) )
read limitoffset (
select node.id node.last_net node.address node.protocol
where node.id >= ?
where node.disqualified = null
orderby asc node.id
)
//--- repairqueue ---// //--- repairqueue ---//
model injuredsegment ( model injuredsegment (

View File

@ -5268,6 +5268,13 @@ func (h *__sqlbundle_Hole) Render() string { return h.SQL.Render() }
// end runtime support for building sql statements // end runtime support for building sql statements
// //
type Id_LastNet_Address_Protocol_Row struct {
Id []byte
LastNet string
Address string
Protocol int
}
type Id_Row struct { type Id_Row struct {
Id []byte Id []byte
} }
@ -6281,6 +6288,42 @@ func (obj *postgresImpl) Limited_Node_By_Id_GreaterOrEqual_OrderBy_Asc_Id(ctx co
} }
func (obj *postgresImpl) Limited_Node_Id_Node_LastNet_Node_Address_Node_Protocol_By_Id_GreaterOrEqual_And_Disqualified_Is_Null_OrderBy_Asc_Id(ctx context.Context,
node_id_greater_or_equal Node_Id_Field,
limit int, offset int64) (
rows []*Id_LastNet_Address_Protocol_Row, err error) {
var __embed_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.last_net, nodes.address, nodes.protocol FROM nodes WHERE nodes.id >= ? AND nodes.disqualified is NULL ORDER BY nodes.id LIMIT ? OFFSET ?")
var __values []interface{}
__values = append(__values, node_id_greater_or_equal.value())
__values = append(__values, limit, offset)
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __values...)
__rows, err := obj.driver.Query(__stmt, __values...)
if err != nil {
return nil, obj.makeErr(err)
}
defer __rows.Close()
for __rows.Next() {
row := &Id_LastNet_Address_Protocol_Row{}
err = __rows.Scan(&row.Id, &row.LastNet, &row.Address, &row.Protocol)
if err != nil {
return nil, obj.makeErr(err)
}
rows = append(rows, row)
}
if err := __rows.Err(); err != nil {
return nil, obj.makeErr(err)
}
return rows, nil
}
func (obj *postgresImpl) Get_User_By_Email_And_Status_Not_Number(ctx context.Context, func (obj *postgresImpl) Get_User_By_Email_And_Status_Not_Number(ctx context.Context,
user_email User_Email_Field) ( user_email User_Email_Field) (
user *User, err error) { user *User, err error) {
@ -9978,6 +10021,42 @@ func (obj *sqlite3Impl) Limited_Node_By_Id_GreaterOrEqual_OrderBy_Asc_Id(ctx con
} }
func (obj *sqlite3Impl) Limited_Node_Id_Node_LastNet_Node_Address_Node_Protocol_By_Id_GreaterOrEqual_And_Disqualified_Is_Null_OrderBy_Asc_Id(ctx context.Context,
node_id_greater_or_equal Node_Id_Field,
limit int, offset int64) (
rows []*Id_LastNet_Address_Protocol_Row, err error) {
var __embed_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.last_net, nodes.address, nodes.protocol FROM nodes WHERE nodes.id >= ? AND nodes.disqualified is NULL ORDER BY nodes.id LIMIT ? OFFSET ?")
var __values []interface{}
__values = append(__values, node_id_greater_or_equal.value())
__values = append(__values, limit, offset)
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __values...)
__rows, err := obj.driver.Query(__stmt, __values...)
if err != nil {
return nil, obj.makeErr(err)
}
defer __rows.Close()
for __rows.Next() {
row := &Id_LastNet_Address_Protocol_Row{}
err = __rows.Scan(&row.Id, &row.LastNet, &row.Address, &row.Protocol)
if err != nil {
return nil, obj.makeErr(err)
}
rows = append(rows, row)
}
if err := __rows.Err(); err != nil {
return nil, obj.makeErr(err)
}
return rows, nil
}
func (obj *sqlite3Impl) Get_User_By_Email_And_Status_Not_Number(ctx context.Context, func (obj *sqlite3Impl) Get_User_By_Email_And_Status_Not_Number(ctx context.Context,
user_email User_Email_Field) ( user_email User_Email_Field) (
user *User, err error) { user *User, err error) {
@ -14262,6 +14341,17 @@ func (rx *Rx) Limited_Node_By_Id_GreaterOrEqual_OrderBy_Asc_Id(ctx context.Conte
return tx.Limited_Node_By_Id_GreaterOrEqual_OrderBy_Asc_Id(ctx, node_id_greater_or_equal, limit, offset) return tx.Limited_Node_By_Id_GreaterOrEqual_OrderBy_Asc_Id(ctx, node_id_greater_or_equal, limit, offset)
} }
func (rx *Rx) Limited_Node_Id_Node_LastNet_Node_Address_Node_Protocol_By_Id_GreaterOrEqual_And_Disqualified_Is_Null_OrderBy_Asc_Id(ctx context.Context,
node_id_greater_or_equal Node_Id_Field,
limit int, offset int64) (
rows []*Id_LastNet_Address_Protocol_Row, err error) {
var tx *Tx
if tx, err = rx.getTx(ctx); err != nil {
return
}
return tx.Limited_Node_Id_Node_LastNet_Node_Address_Node_Protocol_By_Id_GreaterOrEqual_And_Disqualified_Is_Null_OrderBy_Asc_Id(ctx, node_id_greater_or_equal, limit, offset)
}
func (rx *Rx) Limited_ProjectMember_By_ProjectId(ctx context.Context, func (rx *Rx) Limited_ProjectMember_By_ProjectId(ctx context.Context,
project_member_project_id ProjectMember_ProjectId_Field, project_member_project_id ProjectMember_ProjectId_Field,
limit int, offset int64) ( limit int, offset int64) (
@ -14899,6 +14989,11 @@ type Methods interface {
limit int, offset int64) ( limit int, offset int64) (
rows []*Node, err error) rows []*Node, err error)
Limited_Node_Id_Node_LastNet_Node_Address_Node_Protocol_By_Id_GreaterOrEqual_And_Disqualified_Is_Null_OrderBy_Asc_Id(ctx context.Context,
node_id_greater_or_equal Node_Id_Field,
limit int, offset int64) (
rows []*Id_LastNet_Address_Protocol_Row, err error)
Limited_ProjectMember_By_ProjectId(ctx context.Context, Limited_ProjectMember_By_ProjectId(ctx context.Context,
project_member_project_id ProjectMember_ProjectId_Field, project_member_project_id ProjectMember_ProjectId_Field,
limit int, offset int64) ( limit int, offset int64) (

View File

@ -886,6 +886,13 @@ func (m *lockedOverlayCache) Paginate(ctx context.Context, offset int64, limit i
return m.db.Paginate(ctx, offset, limit) return m.db.Paginate(ctx, offset, limit)
} }
// Paginate will page through the database nodes
func (m *lockedOverlayCache) PaginateQualified(ctx context.Context, offset int64, limit int) ([]*pb.Node, bool, error) {
m.Lock()
defer m.Unlock()
return m.db.PaginateQualified(ctx, offset, limit)
}
// Reliable returns all nodes that are reliable // Reliable returns all nodes that are reliable
func (m *lockedOverlayCache) Reliable(ctx context.Context, a1 *overlay.NodeCriteria) (storj.NodeIDList, error) { func (m *lockedOverlayCache) Reliable(ctx context.Context, a1 *overlay.NodeCriteria) (storj.NodeIDList, error) {
m.Lock() m.Lock()

View File

@ -566,6 +566,37 @@ func (cache *overlaycache) Paginate(ctx context.Context, offset int64, limit int
return infos, more, nil return infos, more, nil
} }
// PaginateQualified will retrieve all qualified nodes
func (cache *overlaycache) PaginateQualified(ctx context.Context, offset int64, limit int) (_ []*pb.Node, _ bool, err error) {
defer mon.Task()(&ctx)(&err)
cursor := storj.NodeID{}
// more represents end of table. If there are more rows in the database, more will be true.
more := true
if limit <= 0 || limit > storage.LookupLimit {
limit = storage.LookupLimit
}
dbxInfos, err := cache.db.Limited_Node_Id_Node_LastNet_Node_Address_Node_Protocol_By_Id_GreaterOrEqual_And_Disqualified_Is_Null_OrderBy_Asc_Id(ctx, dbx.Node_Id(cursor.Bytes()), limit, offset)
if err != nil {
return nil, false, err
}
if len(dbxInfos) < limit {
more = false
}
infos := make([]*pb.Node, len(dbxInfos))
for i, dbxInfo := range dbxInfos {
infos[i], err = convertDBNodeToPBNode(ctx, dbxInfo)
if err != nil {
return nil, false, err
}
}
return infos, more, nil
}
// Update updates node address // Update updates node address
func (cache *overlaycache) UpdateAddress(ctx context.Context, info *pb.Node, defaults overlay.NodeSelectionConfig) (err error) { func (cache *overlaycache) UpdateAddress(ctx context.Context, info *pb.Node, defaults overlay.NodeSelectionConfig) (err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
@ -906,6 +937,26 @@ func convertDBNode(ctx context.Context, info *dbx.Node) (_ *overlay.NodeDossier,
return node, nil return node, nil
} }
func convertDBNodeToPBNode(ctx context.Context, info *dbx.Id_LastNet_Address_Protocol_Row) (_ *pb.Node, err error) {
defer mon.Task()(&ctx)(&err)
if info == nil {
return nil, Error.New("missing info")
}
id, err := storj.NodeIDFromBytes(info.Id)
if err != nil {
return nil, err
}
return &pb.Node{
Id: id,
LastIp: info.LastNet,
Address: &pb.NodeAddress{
Address: info.Address,
Transport: pb.NodeTransport(info.Protocol),
},
}, nil
}
func getNodeStats(dbNode *dbx.Node) *overlay.NodeStats { func getNodeStats(dbNode *dbx.Node) *overlay.NodeStats {
nodeStats := &overlay.NodeStats{ nodeStats := &overlay.NodeStats{
Latency90: dbNode.Latency90, Latency90: dbNode.Latency90,

View File

@ -64,10 +64,13 @@ defaults: "release"
# the interval at which the satellite attempts to find new nodes via random node ID lookups # the interval at which the satellite attempts to find new nodes via random node ID lookups
# discovery.discovery-interval: 1s # discovery.discovery-interval: 1s
# the amount of nodes refreshed in parallel
# discovery.refresh-concurrency: 8
# the interval at which the cache refreshes itself in seconds # the interval at which the cache refreshes itself in seconds
# discovery.refresh-interval: 1s # discovery.refresh-interval: 1s
# the amount of nodes refreshed at each interval # the amount of nodes read from the overlay cache in a single pagination call
# discovery.refresh-limit: 100 # discovery.refresh-limit: 100
# help for setup # help for setup