satellite/overlay: use AS OF SYSTEM TIME with Cockroach

Query nodes table using AS OF SYSTEM TIME '-10s' (by default) when on CRDB to alleviate contention on the nodes table and minimize CRDB retries. Queries for standard uploads are already cached, and node lookups for graceful exit uploads has retry logic so it isn't necessary for the nodes returned to be current.
This commit is contained in:
Ethan Adams 2020-12-22 14:07:07 -05:00 committed by GitHub
parent 7cccbdb766
commit 6070018021
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 134 additions and 58 deletions

View File

@ -249,7 +249,10 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
{ // setup overlay
peer.Overlay.DB = peer.DB.OverlayCache()
peer.Overlay.Service = overlay.NewService(peer.Log.Named("overlay"), peer.Overlay.DB, config.Overlay)
peer.Overlay.Service, err = overlay.NewService(peer.Log.Named("overlay"), peer.Overlay.DB, config.Overlay)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Services.Add(lifecycle.Item{
Name: "overlay",
Close: peer.Overlay.Service.Close,

View File

@ -221,7 +221,10 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
{ // setup overlay
peer.Overlay.DB = peer.DB.OverlayCache()
peer.Overlay.Service = overlay.NewService(peer.Log.Named("overlay"), peer.Overlay.DB, config.Overlay)
peer.Overlay.Service, err = overlay.NewService(peer.Log.Named("overlay"), peer.Overlay.DB, config.Overlay)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Services.Add(lifecycle.Item{
Name: "overlay",
Close: peer.Overlay.Service.Close,

View File

@ -448,12 +448,13 @@ func BenchmarkNodeSelection(b *testing.B) {
}
})
service := overlay.NewService(zap.NewNop(), overlaydb, overlay.Config{
service, err := overlay.NewService(zap.NewNop(), overlaydb, overlay.Config{
Node: nodeSelectionConfig,
NodeSelectionCache: overlay.CacheConfig{
Staleness: time.Hour,
},
})
require.NoError(b, err)
b.Run("FindStorageNodesWithPreference", func(b *testing.B) {
for i := 0; i < b.N; i++ {

View File

@ -26,6 +26,12 @@ type Config struct {
AuditHistory AuditHistoryConfig
}
// AsOfSystemTimeConfig is a configuration struct to enable 'AS OF SYSTEM TIME' for CRDB queries.
type AsOfSystemTimeConfig struct {
Enabled bool `help:"enables the use of the AS OF SYSTEM TIME feature in CRDB" default:"true"`
DefaultInterval time.Duration `help:"default duration for AS OF SYSTEM TIME" devDefault:"-1µs" releaseDefault:"-10s"`
}
// NodeSelectionConfig is a configuration struct to determine the minimum
// values for nodes to select.
type NodeSelectionConfig struct {
@ -44,6 +50,8 @@ type NodeSelectionConfig struct {
AuditReputationDQ float64 `help:"the reputation cut-off for disqualifying SNs based on audit history" default:"0.6"`
SuspensionGracePeriod time.Duration `help:"the time period that must pass before suspended nodes will be disqualified" releaseDefault:"168h" devDefault:"1h"`
SuspensionDQEnabled bool `help:"whether nodes will be disqualified if they have been suspended for longer than the suspended grace period" releaseDefault:"false" devDefault:"true"`
AsOfSystemTime AsOfSystemTimeConfig
}
// AuditHistoryConfig is a configuration struct defining time periods and thresholds for penalizing nodes for being offline.
@ -55,3 +63,16 @@ type AuditHistoryConfig struct {
OfflineThreshold float64 `help:"The point below which a node is punished for offline audits. Determined by calculating the ratio of online/total audits within each window and finding the average across windows within the tracking period." default:"0.6"`
OfflineDQEnabled bool `help:"whether nodes will be disqualified if they have low online score after a review period" releaseDefault:"false" devDefault:"true"`
}
func (aost *AsOfSystemTimeConfig) isValid() error {
if aost.Enabled {
if aost.DefaultInterval >= 0 {
return errs.New("AS OF SYSTEM TIME interval must be a negative number")
}
if aost.DefaultInterval > -time.Microsecond {
return errs.New("AS OF SYSTEM TIME interval cannot be in nanoseconds")
}
}
return nil
}

View File

@ -131,19 +131,21 @@ type InfoResponse struct {
// FindStorageNodesRequest defines easy request parameters.
type FindStorageNodesRequest struct {
RequestedCount int
ExcludedIDs []storj.NodeID
MinimumVersion string // semver or empty
RequestedCount int
ExcludedIDs []storj.NodeID
MinimumVersion string // semver or empty
AsOfSystemTimeInterval time.Duration // only used for CRDB queries
}
// NodeCriteria are the requirements for selecting nodes.
type NodeCriteria struct {
FreeDisk int64
ExcludedIDs []storj.NodeID
ExcludedNetworks []string // the /24 subnet IPv4 or /64 subnet IPv6 for nodes
MinimumVersion string // semver or empty
OnlineWindow time.Duration
DistinctIP bool
FreeDisk int64
ExcludedIDs []storj.NodeID
ExcludedNetworks []string // the /24 subnet IPv4 or /64 subnet IPv6 for nodes
MinimumVersion string // semver or empty
OnlineWindow time.Duration
DistinctIP bool
AsOfSystemTimeInterval time.Duration // only used for CRDB queries
}
// AuditType is an enum representing the outcome of a particular audit reported to the overlay.
@ -276,7 +278,11 @@ type Service struct {
}
// NewService returns a new Service.
func NewService(log *zap.Logger, db DB, config Config) *Service {
func NewService(log *zap.Logger, db DB, config Config) (*Service, error) {
if err := config.Node.AsOfSystemTime.isValid(); err != nil {
return nil, err
}
return &Service{
log: log,
db: db,
@ -284,7 +290,7 @@ func NewService(log *zap.Logger, db DB, config Config) *Service {
SelectionCache: NewNodeSelectionCache(log, db,
config.NodeSelectionCache.Staleness, config.Node,
),
}
}, nil
}
// Close closes resources.
@ -323,6 +329,9 @@ func (service *Service) IsOnline(node *NodeDossier) bool {
// The main difference between this method and the normal FindStorageNodes is that here we avoid using the cache.
func (service *Service) FindStorageNodesForGracefulExit(ctx context.Context, req FindStorageNodesRequest) (_ []*SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)
if service.config.Node.AsOfSystemTime.Enabled && service.config.Node.AsOfSystemTime.DefaultInterval < 0 {
req.AsOfSystemTimeInterval = service.config.Node.AsOfSystemTime.DefaultInterval
}
return service.FindStorageNodesWithPreferences(ctx, req, &service.config.Node)
}
@ -332,6 +341,10 @@ func (service *Service) FindStorageNodesForGracefulExit(ctx context.Context, req
// When the node selection from the cache fails, it falls back to the old implementation.
func (service *Service) FindStorageNodesForUpload(ctx context.Context, req FindStorageNodesRequest) (_ []*SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)
if service.config.Node.AsOfSystemTime.Enabled && service.config.Node.AsOfSystemTime.DefaultInterval < 0 {
req.AsOfSystemTimeInterval = service.config.Node.AsOfSystemTime.DefaultInterval
}
if service.config.NodeSelectionCache.Disabled {
return service.FindStorageNodesWithPreferences(ctx, req, &service.config.Node)
}
@ -340,6 +353,7 @@ func (service *Service) FindStorageNodesForUpload(ctx context.Context, req FindS
if err != nil {
service.log.Warn("error selecting from node selection cache", zap.String("err", err.Error()))
}
if len(selectedNodes) < req.RequestedCount {
mon.Event("default_node_selection")
return service.FindStorageNodesWithPreferences(ctx, req, &service.config.Node)
@ -352,7 +366,6 @@ func (service *Service) FindStorageNodesForUpload(ctx context.Context, req FindS
// This does not use a cache.
func (service *Service) FindStorageNodesWithPreferences(ctx context.Context, req FindStorageNodesRequest, preferences *NodeSelectionConfig) (nodes []*SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)
// TODO: add sanity limits to requested node count
// TODO: add sanity limits to excluded nodes
totalNeededNodes := req.RequestedCount
@ -374,12 +387,13 @@ func (service *Service) FindStorageNodesWithPreferences(ctx context.Context, req
}
criteria := NodeCriteria{
FreeDisk: preferences.MinimumDiskSpace.Int64(),
ExcludedIDs: excludedIDs,
ExcludedNetworks: excludedNetworks,
MinimumVersion: preferences.MinimumVersion,
OnlineWindow: preferences.OnlineWindow,
DistinctIP: preferences.DistinctIP,
FreeDisk: preferences.MinimumDiskSpace.Int64(),
ExcludedIDs: excludedIDs,
ExcludedNetworks: excludedNetworks,
MinimumVersion: preferences.MinimumVersion,
OnlineWindow: preferences.OnlineWindow,
DistinctIP: preferences.DistinctIP,
AsOfSystemTimeInterval: req.AsOfSystemTimeInterval,
}
nodes, err = service.db.SelectStorageNodes(ctx, totalNeededNodes, newNodeCount, &criteria)
if err != nil {

View File

@ -73,8 +73,8 @@ func testCache(ctx context.Context, t *testing.T, store overlay.DB) {
nodeSelectionConfig := testNodeSelectionConfig(0, false)
serviceConfig := overlay.Config{Node: nodeSelectionConfig, UpdateStatsBatchSize: 100, AuditHistory: testAuditHistoryConfig()}
service := overlay.NewService(zaptest.NewLogger(t), store, serviceConfig)
service, err := overlay.NewService(zaptest.NewLogger(t), store, serviceConfig)
require.NoError(t, err)
d := overlay.NodeCheckInInfo{
Address: address,
LastIPPort: address.Address,

View File

@ -8,6 +8,7 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"storj.io/common/pb"
@ -21,7 +22,8 @@ func TestReliabilityCache_Concurrent(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
ocache := overlay.NewService(zap.NewNop(), fakeOverlayDB{}, overlay.Config{})
ocache, err := overlay.NewService(zap.NewNop(), fakeOverlayDB{}, overlay.Config{})
require.NoError(t, err)
rcache := NewReliabilityCache(ocache, time.Millisecond)
for i := 0; i < 10; i++ {

View File

@ -132,7 +132,11 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity,
}
{ // setup overlay
peer.Overlay = overlay.NewService(log.Named("overlay"), overlayCache, config.Overlay)
var err error
peer.Overlay, err = overlay.NewService(log.Named("overlay"), overlayCache, config.Overlay)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Services.Add(lifecycle.Item{
Name: "overlay",
Close: peer.Overlay.Close,

View File

@ -6,6 +6,7 @@ package satellitedb
import (
"context"
"sync"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
@ -164,6 +165,17 @@ func (dbc *satelliteDBCollection) getByName(name string) *satelliteDB {
return dbc.dbs[""]
}
// AsOfSystemTimeClause returns the "AS OF SYSTEM TIME" clause if the DB implementation
// is CockroachDB and the interval is less than 0.
func (db *satelliteDB) AsOfSystemTimeClause(interval time.Duration) (asOf string) {
if db.implementation == dbutil.Cockroach && interval < 0 {
asOf = " AS OF SYSTEM TIME '" + interval.String() + "' "
}
return asOf
}
// TestDBAccess for raw database access,
// should not be used outside of migration tests.
func (db *satelliteDB) TestDBAccess() *dbx.DB { return db.DB }

View File

@ -90,10 +90,7 @@ func (cache *overlaycache) SelectStorageNodes(ctx context.Context, totalNeededNo
return nodes, nil
}
func (cache *overlaycache) selectStorageNodesOnce(ctx context.Context, reputableNodeCount, newNodeCount int,
criteria *overlay.NodeCriteria, excludedIDs []storj.NodeID,
excludedNetworks []string) (reputableNodes, newNodes []*overlay.SelectedNode, err error) {
func (cache *overlaycache) selectStorageNodesOnce(ctx context.Context, reputableNodeCount, newNodeCount int, criteria *overlay.NodeCriteria, excludedIDs []storj.NodeID, excludedNetworks []string) (reputableNodes, newNodes []*overlay.SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)
newNodesCondition, err := nodeSelectionCondition(ctx, criteria, excludedIDs, excludedNetworks, true)
@ -107,33 +104,39 @@ func (cache *overlaycache) selectStorageNodesOnce(ctx context.Context, reputable
var reputableNodeQuery, newNodeQuery partialQuery
asOf := cache.db.AsOfSystemTimeClause(criteria.AsOfSystemTimeInterval)
// Note: the true/false at the end of each selection string indicates if the selection is for new nodes or not.
// Later, the flag allows us to distinguish if a node is new when scanning the db rows.
if !criteria.DistinctIP {
reputableNodeQuery = partialQuery{
selection: `SELECT last_net, id, address, last_ip_port, false FROM nodes`,
condition: reputableNodesCondition,
limit: reputableNodeCount,
selection: `SELECT last_net, id, address, last_ip_port, false FROM nodes ` + asOf,
condition: reputableNodesCondition,
limit: reputableNodeCount,
aostClause: asOf,
}
newNodeQuery = partialQuery{
selection: `SELECT last_net, id, address, last_ip_port, true FROM nodes`,
condition: newNodesCondition,
limit: newNodeCount,
selection: `SELECT last_net, id, address, last_ip_port, true FROM nodes ` + asOf,
condition: newNodesCondition,
limit: newNodeCount,
aostClause: asOf,
}
} else {
reputableNodeQuery = partialQuery{
selection: `SELECT DISTINCT ON (last_net) last_net, id, address, last_ip_port, false FROM nodes`,
condition: reputableNodesCondition,
distinct: true,
limit: reputableNodeCount,
orderBy: "last_net",
selection: `SELECT DISTINCT ON (last_net) last_net, id, address, last_ip_port, false FROM nodes ` + asOf,
condition: reputableNodesCondition,
distinct: true,
limit: reputableNodeCount,
orderBy: "last_net",
aostClause: asOf,
}
newNodeQuery = partialQuery{
selection: `SELECT DISTINCT ON (last_net) last_net, id, address, last_ip_port, true FROM nodes`,
condition: newNodesCondition,
distinct: true,
limit: newNodeCount,
orderBy: "last_net",
selection: `SELECT DISTINCT ON (last_net) last_net, id, address, last_ip_port, true FROM nodes ` + asOf,
condition: newNodesCondition,
distinct: true,
limit: newNodeCount,
orderBy: "last_net",
aostClause: asOf,
}
}
@ -174,9 +177,7 @@ func (cache *overlaycache) selectStorageNodesOnce(ctx context.Context, reputable
}
// nodeSelectionCondition creates a condition with arguments that corresponds to the arguments.
func nodeSelectionCondition(ctx context.Context, criteria *overlay.NodeCriteria, excludedIDs []storj.NodeID,
excludedNetworks []string, isNewNodeQuery bool) (condition, error) {
func nodeSelectionCondition(ctx context.Context, criteria *overlay.NodeCriteria, excludedIDs []storj.NodeID, excludedNetworks []string, isNewNodeQuery bool) (condition, error) {
var conds conditions
conds.add(`disqualified IS NULL`)
conds.add(`unknown_audit_suspended IS NULL`)
@ -236,11 +237,12 @@ func nodeSelectionCondition(ctx context.Context, criteria *overlay.NodeCriteria,
// SELECT * FROM ($selection WHERE $condition ORDER BY $orderBy, RANDOM()) filtered ORDER BY RANDOM() LIMIT $limit
//
type partialQuery struct {
selection string
condition condition
distinct bool
orderBy string
limit int
selection string
condition condition
distinct bool
orderBy string
limit int
aostClause string
}
// isEmpty returns whether the result for the query is definitely empty.
@ -271,7 +273,7 @@ func (partial partialQuery) asQuery() query {
fmt.Fprint(&q, " LIMIT ? ")
args = append(args, partial.limit)
} else {
fmt.Fprint(&q, ") filtered ORDER BY RANDOM() LIMIT ?")
fmt.Fprint(&q, ") filtered "+partial.aostClause+" ORDER BY RANDOM() LIMIT ?")
args = append(args, partial.limit)
}

View File

@ -56,9 +56,11 @@ func (cache *overlaycache) SelectAllStorageNodesUpload(ctx context.Context, sele
func (cache *overlaycache) selectAllStorageNodesUpload(ctx context.Context, selectionCfg overlay.NodeSelectionConfig) (reputable, new []*overlay.SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)
asOf := cache.db.AsOfSystemTimeClause(selectionCfg.AsOfSystemTime.DefaultInterval)
query := `
SELECT id, address, last_net, last_ip_port, vetted_at
FROM nodes
FROM nodes ` + asOf + `
WHERE disqualified IS NULL
AND unknown_audit_suspended IS NULL
AND exit_initiated_at IS NULL
@ -253,10 +255,12 @@ func (cache *overlaycache) knownOffline(ctx context.Context, criteria *overlay.N
return nil, Error.New("no ids provided")
}
asOf := cache.db.AsOfSystemTimeClause(criteria.AsOfSystemTimeInterval)
// get offline nodes
var rows tagsql.Rows
rows, err = cache.db.Query(ctx, cache.db.Rebind(`
SELECT id FROM nodes
SELECT id FROM nodes `+asOf+`
WHERE id = any($1::bytea[])
AND last_contact_success < $2
`), pgutil.NodeIDArray(nodeIds), time.Now().Add(-criteria.OnlineWindow),
@ -300,10 +304,12 @@ func (cache *overlaycache) knownUnreliableOrOffline(ctx context.Context, criteri
return nil, Error.New("no ids provided")
}
asOf := cache.db.AsOfSystemTimeClause(criteria.AsOfSystemTimeInterval)
// get reliable and online nodes
var rows tagsql.Rows
rows, err = cache.db.Query(ctx, cache.db.Rebind(`
SELECT id FROM nodes
SELECT id FROM nodes `+asOf+`
WHERE id = any($1::bytea[])
AND disqualified IS NULL
AND unknown_audit_suspended IS NULL
@ -404,9 +410,11 @@ func (cache *overlaycache) Reliable(ctx context.Context, criteria *overlay.NodeC
}
func (cache *overlaycache) reliable(ctx context.Context, criteria *overlay.NodeCriteria) (nodes storj.NodeIDList, err error) {
asOf := cache.db.AsOfSystemTimeClause(criteria.AsOfSystemTimeInterval)
// get reliable and online nodes
rows, err := cache.db.Query(ctx, cache.db.Rebind(`
SELECT id FROM nodes
SELECT id FROM nodes `+asOf+`
WHERE disqualified IS NULL
AND unknown_audit_suspended IS NULL
AND exit_finished_at IS NULL

View File

@ -460,6 +460,12 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
# how stale the node selection cache can be
# overlay.node-selection-cache.staleness: 3m0s
# default duration for AS OF SYSTEM TIME
# overlay.node.as-of-system-time.default-interval: -10s
# enables the use of the AS OF SYSTEM TIME feature in CRDB
# overlay.node.as-of-system-time.enabled: true
# the number of times a node has been audited to not be considered a New Node
# overlay.node.audit-count: 100