diff --git a/internal/testplanet/satellite.go b/internal/testplanet/satellite.go index 9885c0e55..699a13666 100644 --- a/internal/testplanet/satellite.go +++ b/internal/testplanet/satellite.go @@ -124,9 +124,10 @@ func (planet *Planet) newSatellites(count int) ([]*satellite.Peer, error) { }, }, Discovery: discovery.Config{ - DiscoveryInterval: 1 * time.Second, - RefreshInterval: 1 * time.Second, - RefreshLimit: 100, + DiscoveryInterval: 1 * time.Second, + RefreshInterval: 1 * time.Second, + RefreshLimit: 100, + RefreshConcurrency: 2, }, Metainfo: metainfo.Config{ DatabaseURL: "bolt://" + filepath.Join(storageDir, "pointers.db"), diff --git a/pkg/discovery/service.go b/pkg/discovery/service.go index faee4d6eb..a6ab731dc 100644 --- a/pkg/discovery/service.go +++ b/pkg/discovery/service.go @@ -28,9 +28,10 @@ var ( // Config loads on the configuration values for the cache type Config struct { - RefreshInterval time.Duration `help:"the interval at which the cache refreshes itself in seconds" default:"1s"` - DiscoveryInterval time.Duration `help:"the interval at which the satellite attempts to find new nodes via random node ID lookups" default:"1s"` - RefreshLimit int `help:"the amount of nodes refreshed at each interval" default:"100"` + RefreshInterval time.Duration `help:"the interval at which the cache refreshes itself in seconds" default:"1s"` + DiscoveryInterval time.Duration `help:"the interval at which the satellite attempts to find new nodes via random node ID lookups" default:"1s"` + RefreshLimit int `help:"the amount of nodes read from the overlay cache in a single pagination call" default:"100"` + RefreshConcurrency int `help:"the amount of nodes refreshed in parallel" default:"8"` } // Discovery struct loads on cache, kad @@ -39,9 +40,8 @@ type Discovery struct { cache *overlay.Cache kad *kademlia.Kademlia - // refreshOffset tracks the offset of the current refresh cycle - refreshOffset int64 - refreshLimit int + refreshLimit int + refreshConcurrency int Refresh sync2.Cycle Discovery sync2.Cycle @@ -54,8 +54,8 @@ func New(logger *zap.Logger, ol *overlay.Cache, kad *kademlia.Kademlia, config C cache: ol, kad: kad, - refreshOffset: 0, - refreshLimit: config.RefreshLimit, + refreshLimit: config.RefreshLimit, + refreshConcurrency: config.RefreshConcurrency, } discovery.Refresh.SetInterval(config.RefreshInterval) @@ -97,54 +97,49 @@ func (discovery *Discovery) Run(ctx context.Context) (err error) { func (discovery *Discovery) refresh(ctx context.Context) (err error) { defer mon.Task()(&ctx)(&err) - list, more, err := discovery.cache.Paginate(ctx, discovery.refreshOffset, discovery.refreshLimit) - if err != nil { - return Error.Wrap(err) - } + limiter := sync2.NewLimiter(discovery.refreshConcurrency) - // more means there are more rows to page through in the cache - if more == false { - discovery.refreshOffset = 0 - } else { - discovery.refreshOffset += int64(len(list)) - } - - for _, node := range list { - if ctx.Err() != nil { - return ctx.Err() - } - - if node.Disqualified != nil { - discovery.log.Debug("skip disqualified node", zap.Stringer("ID", node.Id)) - continue - } - - info, err := discovery.kad.FetchInfo(ctx, node.Node) - - if ctx.Err() != nil { - return ctx.Err() - } + var offset int64 + for { + list, more, err := discovery.cache.PaginateQualified(ctx, offset, discovery.refreshLimit) if err != nil { - discovery.log.Info("could not ping node", zap.Stringer("ID", node.Id), zap.Error(err)) - _, err := discovery.cache.UpdateUptime(ctx, node.Id, false) - if err != nil { - discovery.log.Error("could not update node uptime in cache", zap.Stringer("ID", node.Id), zap.Error(err)) - } - continue + return Error.Wrap(err) } - // TODO: combine these into the same db call - _, err = discovery.cache.UpdateUptime(ctx, node.Id, true) - if err != nil { - discovery.log.Error("could not update node uptime in cache", zap.Stringer("ID", node.Id), zap.Error(err)) + if len(list) == 0 { + break } - _, err = discovery.cache.UpdateNodeInfo(ctx, node.Id, info) - if err != nil { - discovery.log.Warn("could not update node info", zap.Stringer("ID", node.GetAddress())) + + offset += int64(len(list)) + + for _, node := range list { + node := node + + limiter.Go(ctx, func() { + // NB: FetchInfo updates node uptime already + info, err := discovery.kad.FetchInfo(ctx, *node) + if ctx.Err() != nil { + return + } + + if err != nil { + discovery.log.Info("could not ping node", zap.Stringer("ID", node.Id), zap.Error(err)) + return + } + + if _, err = discovery.cache.UpdateNodeInfo(ctx, node.Id, info); err != nil { + discovery.log.Warn("could not update node info", zap.Stringer("ID", node.GetAddress())) + } + }) + } + + if !more { + break } } + limiter.Wait() return nil } diff --git a/pkg/overlay/cache.go b/pkg/overlay/cache.go index c8d36eac5..6160e7f20 100644 --- a/pkg/overlay/cache.go +++ b/pkg/overlay/cache.go @@ -55,6 +55,8 @@ type DB interface { Reliable(context.Context, *NodeCriteria) (storj.NodeIDList, error) // Paginate will page through the database nodes Paginate(ctx context.Context, offset int64, limit int) ([]*NodeDossier, bool, error) + // PaginateQualified will page through the qualified nodes + PaginateQualified(ctx context.Context, offset int64, limit int) ([]*pb.Node, bool, error) // IsVetted returns whether or not the node reaches reputable thresholds IsVetted(ctx context.Context, id storj.NodeID, criteria *NodeCriteria) (bool, error) // Update updates node address @@ -166,6 +168,12 @@ func (cache *Cache) Paginate(ctx context.Context, offset int64, limit int) (_ [] return cache.db.Paginate(ctx, offset, limit) } +// PaginateQualified returns a list of `limit` qualified nodes starting from `start` offset. +func (cache *Cache) PaginateQualified(ctx context.Context, offset int64, limit int) (_ []*pb.Node, _ bool, err error) { + defer mon.Task()(&ctx)(&err) + return cache.db.PaginateQualified(ctx, offset, limit) +} + // Get looks up the provided nodeID from the overlay cache func (cache *Cache) Get(ctx context.Context, nodeID storj.NodeID) (_ *NodeDossier, err error) { defer mon.Task()(&ctx)(&err) diff --git a/pkg/overlay/cache_test.go b/pkg/overlay/cache_test.go index 1ddcc4120..a33330271 100644 --- a/pkg/overlay/cache_test.go +++ b/pkg/overlay/cache_test.go @@ -63,6 +63,7 @@ func testNodeSelectionConfig(auditCount int64, newNodePercentage float64, distin func testCache(ctx context.Context, t *testing.T, store overlay.DB) { valid1ID := testrand.NodeID() valid2ID := testrand.NodeID() + valid3ID := testrand.NodeID() missingID := testrand.NodeID() address := &pb.NodeAddress{Address: "127.0.0.1:0"} @@ -75,6 +76,12 @@ func testCache(ctx context.Context, t *testing.T, store overlay.DB) { err = cache.Put(ctx, valid2ID, pb.Node{Id: valid2ID, Address: address}) require.NoError(t, err) + + err = cache.Put(ctx, valid3ID, pb.Node{Id: valid3ID, Address: address}) + require.NoError(t, err) + + _, err = cache.UpdateUptime(ctx, valid3ID, false) + require.NoError(t, err) } { // Get @@ -113,6 +120,15 @@ func testCache(ctx context.Context, t *testing.T, store overlay.DB) { assert.NotEqual(t, len(zero), 0) } + { // PaginateQualified + + // should return two nodes + nodes, more, err := cache.PaginateQualified(ctx, 0, 3) + assert.NotNil(t, more) + assert.NoError(t, err) + assert.Equal(t, len(nodes), 2) + } + { // Reputation valid1, err := cache.Get(ctx, valid1ID) require.NoError(t, err) diff --git a/satellite/satellitedb/dbx/satellitedb.dbx b/satellite/satellitedb/dbx/satellitedb.dbx index b39bd73f8..16a6c0263 100644 --- a/satellite/satellitedb/dbx/satellitedb.dbx +++ b/satellite/satellitedb/dbx/satellitedb.dbx @@ -180,6 +180,13 @@ read limitoffset ( orderby asc node.id ) +read limitoffset ( + select node.id node.last_net node.address node.protocol + where node.id >= ? + where node.disqualified = null + orderby asc node.id +) + //--- repairqueue ---// model injuredsegment ( diff --git a/satellite/satellitedb/dbx/satellitedb.dbx.go b/satellite/satellitedb/dbx/satellitedb.dbx.go index 0aec8bfd6..3951fef58 100644 --- a/satellite/satellitedb/dbx/satellitedb.dbx.go +++ b/satellite/satellitedb/dbx/satellitedb.dbx.go @@ -5268,6 +5268,13 @@ func (h *__sqlbundle_Hole) Render() string { return h.SQL.Render() } // end runtime support for building sql statements // +type Id_LastNet_Address_Protocol_Row struct { + Id []byte + LastNet string + Address string + Protocol int +} + type Id_Row struct { Id []byte } @@ -6281,6 +6288,42 @@ func (obj *postgresImpl) Limited_Node_By_Id_GreaterOrEqual_OrderBy_Asc_Id(ctx co } +func (obj *postgresImpl) Limited_Node_Id_Node_LastNet_Node_Address_Node_Protocol_By_Id_GreaterOrEqual_And_Disqualified_Is_Null_OrderBy_Asc_Id(ctx context.Context, + node_id_greater_or_equal Node_Id_Field, + limit int, offset int64) ( + rows []*Id_LastNet_Address_Protocol_Row, err error) { + + var __embed_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.last_net, nodes.address, nodes.protocol FROM nodes WHERE nodes.id >= ? AND nodes.disqualified is NULL ORDER BY nodes.id LIMIT ? OFFSET ?") + + var __values []interface{} + __values = append(__values, node_id_greater_or_equal.value()) + + __values = append(__values, limit, offset) + + var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) + obj.logStmt(__stmt, __values...) + + __rows, err := obj.driver.Query(__stmt, __values...) + if err != nil { + return nil, obj.makeErr(err) + } + defer __rows.Close() + + for __rows.Next() { + row := &Id_LastNet_Address_Protocol_Row{} + err = __rows.Scan(&row.Id, &row.LastNet, &row.Address, &row.Protocol) + if err != nil { + return nil, obj.makeErr(err) + } + rows = append(rows, row) + } + if err := __rows.Err(); err != nil { + return nil, obj.makeErr(err) + } + return rows, nil + +} + func (obj *postgresImpl) Get_User_By_Email_And_Status_Not_Number(ctx context.Context, user_email User_Email_Field) ( user *User, err error) { @@ -9978,6 +10021,42 @@ func (obj *sqlite3Impl) Limited_Node_By_Id_GreaterOrEqual_OrderBy_Asc_Id(ctx con } +func (obj *sqlite3Impl) Limited_Node_Id_Node_LastNet_Node_Address_Node_Protocol_By_Id_GreaterOrEqual_And_Disqualified_Is_Null_OrderBy_Asc_Id(ctx context.Context, + node_id_greater_or_equal Node_Id_Field, + limit int, offset int64) ( + rows []*Id_LastNet_Address_Protocol_Row, err error) { + + var __embed_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.last_net, nodes.address, nodes.protocol FROM nodes WHERE nodes.id >= ? AND nodes.disqualified is NULL ORDER BY nodes.id LIMIT ? OFFSET ?") + + var __values []interface{} + __values = append(__values, node_id_greater_or_equal.value()) + + __values = append(__values, limit, offset) + + var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) + obj.logStmt(__stmt, __values...) + + __rows, err := obj.driver.Query(__stmt, __values...) + if err != nil { + return nil, obj.makeErr(err) + } + defer __rows.Close() + + for __rows.Next() { + row := &Id_LastNet_Address_Protocol_Row{} + err = __rows.Scan(&row.Id, &row.LastNet, &row.Address, &row.Protocol) + if err != nil { + return nil, obj.makeErr(err) + } + rows = append(rows, row) + } + if err := __rows.Err(); err != nil { + return nil, obj.makeErr(err) + } + return rows, nil + +} + func (obj *sqlite3Impl) Get_User_By_Email_And_Status_Not_Number(ctx context.Context, user_email User_Email_Field) ( user *User, err error) { @@ -14262,6 +14341,17 @@ func (rx *Rx) Limited_Node_By_Id_GreaterOrEqual_OrderBy_Asc_Id(ctx context.Conte return tx.Limited_Node_By_Id_GreaterOrEqual_OrderBy_Asc_Id(ctx, node_id_greater_or_equal, limit, offset) } +func (rx *Rx) Limited_Node_Id_Node_LastNet_Node_Address_Node_Protocol_By_Id_GreaterOrEqual_And_Disqualified_Is_Null_OrderBy_Asc_Id(ctx context.Context, + node_id_greater_or_equal Node_Id_Field, + limit int, offset int64) ( + rows []*Id_LastNet_Address_Protocol_Row, err error) { + var tx *Tx + if tx, err = rx.getTx(ctx); err != nil { + return + } + return tx.Limited_Node_Id_Node_LastNet_Node_Address_Node_Protocol_By_Id_GreaterOrEqual_And_Disqualified_Is_Null_OrderBy_Asc_Id(ctx, node_id_greater_or_equal, limit, offset) +} + func (rx *Rx) Limited_ProjectMember_By_ProjectId(ctx context.Context, project_member_project_id ProjectMember_ProjectId_Field, limit int, offset int64) ( @@ -14899,6 +14989,11 @@ type Methods interface { limit int, offset int64) ( rows []*Node, err error) + Limited_Node_Id_Node_LastNet_Node_Address_Node_Protocol_By_Id_GreaterOrEqual_And_Disqualified_Is_Null_OrderBy_Asc_Id(ctx context.Context, + node_id_greater_or_equal Node_Id_Field, + limit int, offset int64) ( + rows []*Id_LastNet_Address_Protocol_Row, err error) + Limited_ProjectMember_By_ProjectId(ctx context.Context, project_member_project_id ProjectMember_ProjectId_Field, limit int, offset int64) ( diff --git a/satellite/satellitedb/locked.go b/satellite/satellitedb/locked.go index d87747206..e4b1bbd20 100644 --- a/satellite/satellitedb/locked.go +++ b/satellite/satellitedb/locked.go @@ -886,6 +886,13 @@ func (m *lockedOverlayCache) Paginate(ctx context.Context, offset int64, limit i return m.db.Paginate(ctx, offset, limit) } +// Paginate will page through the database nodes +func (m *lockedOverlayCache) PaginateQualified(ctx context.Context, offset int64, limit int) ([]*pb.Node, bool, error) { + m.Lock() + defer m.Unlock() + return m.db.PaginateQualified(ctx, offset, limit) +} + // Reliable returns all nodes that are reliable func (m *lockedOverlayCache) Reliable(ctx context.Context, a1 *overlay.NodeCriteria) (storj.NodeIDList, error) { m.Lock() diff --git a/satellite/satellitedb/overlaycache.go b/satellite/satellitedb/overlaycache.go index e6ab90952..69a7bddbd 100644 --- a/satellite/satellitedb/overlaycache.go +++ b/satellite/satellitedb/overlaycache.go @@ -566,6 +566,37 @@ func (cache *overlaycache) Paginate(ctx context.Context, offset int64, limit int return infos, more, nil } +// PaginateQualified will retrieve all qualified nodes +func (cache *overlaycache) PaginateQualified(ctx context.Context, offset int64, limit int) (_ []*pb.Node, _ bool, err error) { + defer mon.Task()(&ctx)(&err) + + cursor := storj.NodeID{} + + // more represents end of table. If there are more rows in the database, more will be true. + more := true + + if limit <= 0 || limit > storage.LookupLimit { + limit = storage.LookupLimit + } + + dbxInfos, err := cache.db.Limited_Node_Id_Node_LastNet_Node_Address_Node_Protocol_By_Id_GreaterOrEqual_And_Disqualified_Is_Null_OrderBy_Asc_Id(ctx, dbx.Node_Id(cursor.Bytes()), limit, offset) + if err != nil { + return nil, false, err + } + if len(dbxInfos) < limit { + more = false + } + + infos := make([]*pb.Node, len(dbxInfos)) + for i, dbxInfo := range dbxInfos { + infos[i], err = convertDBNodeToPBNode(ctx, dbxInfo) + if err != nil { + return nil, false, err + } + } + return infos, more, nil +} + // Update updates node address func (cache *overlaycache) UpdateAddress(ctx context.Context, info *pb.Node, defaults overlay.NodeSelectionConfig) (err error) { defer mon.Task()(&ctx)(&err) @@ -906,6 +937,26 @@ func convertDBNode(ctx context.Context, info *dbx.Node) (_ *overlay.NodeDossier, return node, nil } +func convertDBNodeToPBNode(ctx context.Context, info *dbx.Id_LastNet_Address_Protocol_Row) (_ *pb.Node, err error) { + defer mon.Task()(&ctx)(&err) + if info == nil { + return nil, Error.New("missing info") + } + + id, err := storj.NodeIDFromBytes(info.Id) + if err != nil { + return nil, err + } + return &pb.Node{ + Id: id, + LastIp: info.LastNet, + Address: &pb.NodeAddress{ + Address: info.Address, + Transport: pb.NodeTransport(info.Protocol), + }, + }, nil +} + func getNodeStats(dbNode *dbx.Node) *overlay.NodeStats { nodeStats := &overlay.NodeStats{ Latency90: dbNode.Latency90, diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index 3cb53bd77..4c86e762c 100644 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -64,10 +64,13 @@ defaults: "release" # the interval at which the satellite attempts to find new nodes via random node ID lookups # discovery.discovery-interval: 1s +# the amount of nodes refreshed in parallel +# discovery.refresh-concurrency: 8 + # the interval at which the cache refreshes itself in seconds # discovery.refresh-interval: 1s -# the amount of nodes refreshed at each interval +# the amount of nodes read from the overlay cache in a single pagination call # discovery.refresh-limit: 100 # help for setup