754 lines
26 KiB
Go
754 lines
26 KiB
Go
|
// Copyright (C) 2022 Storj Labs, Inc.
|
||
|
// See LICENSE for copying information.
|
||
|
|
||
|
package reputation
|
||
|
|
||
|
import (
|
||
|
"container/heap"
|
||
|
"context"
|
||
|
"encoding/binary"
|
||
|
"errors"
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
"github.com/zeebo/errs"
|
||
|
"go.uber.org/zap"
|
||
|
|
||
|
"storj.io/common/pb"
|
||
|
"storj.io/common/storj"
|
||
|
"storj.io/storj/satellite/overlay"
|
||
|
)
|
||
|
|
||
|
var _ DB = (*CachingDB)(nil)
|
||
|
|
||
|
// NewCachingDB creates a new CachingDB instance.
|
||
|
func NewCachingDB(log *zap.Logger, peerID storj.NodeID, backingStore DB, reputationConfig Config) *CachingDB {
|
||
|
return &CachingDB{
|
||
|
log: log,
|
||
|
peerID: peerID,
|
||
|
backingStore: backingStore,
|
||
|
nowFunc: time.Now,
|
||
|
reputationConfig: reputationConfig,
|
||
|
syncInterval: reputationConfig.FlushInterval,
|
||
|
errorRetryInterval: reputationConfig.ErrorRetryInterval,
|
||
|
nextSyncTimer: time.NewTimer(reputationConfig.FlushInterval),
|
||
|
requestSyncChannel: make(chan syncRequest),
|
||
|
pending: make(map[storj.NodeID]*cachedNodeReputationInfo),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// CachingDB acts like a reputation.DB but caches reads and writes, to minimize
|
||
|
// load on the backing store.
|
||
|
type CachingDB struct {
|
||
|
// These fields must be populated before the cache starts being used.
|
||
|
// They are not expected to change.
|
||
|
log *zap.Logger
|
||
|
peerID storj.NodeID
|
||
|
backingStore DB
|
||
|
nowFunc func() time.Time
|
||
|
reputationConfig Config
|
||
|
syncInterval time.Duration
|
||
|
errorRetryInterval time.Duration
|
||
|
|
||
|
requestSyncChannel chan syncRequest
|
||
|
|
||
|
// lock must be held when reading or writing to any of the following
|
||
|
// fields.
|
||
|
lock sync.Mutex
|
||
|
|
||
|
nextSyncTimer *time.Timer
|
||
|
|
||
|
// pending and writeOrderHeap contain the same set of entries, just with
|
||
|
// different lookup properties. It should be easy to keep them in sync,
|
||
|
// since we only insert with lock held, and (for now) we never evict
|
||
|
// from the cache.
|
||
|
pending map[storj.NodeID]*cachedNodeReputationInfo
|
||
|
writeOrderHeap nodeIDHeap
|
||
|
}
|
||
|
|
||
|
type syncRequest struct {
|
||
|
nodeID storj.NodeID
|
||
|
doneChan chan struct{}
|
||
|
}
|
||
|
|
||
|
type cachedNodeReputationInfo struct {
|
||
|
nodeID storj.NodeID
|
||
|
|
||
|
// entryLock must be held when reading or writing to the following fields
|
||
|
// in this structure (**except** syncAt. For syncAt, the CachingDB.lock
|
||
|
// must be held). When entryLock is released, either info or syncError
|
||
|
// (or both) must be non-nil.
|
||
|
entryLock sync.Mutex
|
||
|
|
||
|
// info is a best-effort copy of information from the database at some
|
||
|
// point in the recent past (usually less than syncInterval ago) combined
|
||
|
// with the requested updates which have not yet been synced to the
|
||
|
// database.
|
||
|
//
|
||
|
// note: info has no guaranteed relationship to the set of mutations.
|
||
|
// In particular, it is not necessarily the same as the base to which
|
||
|
// the mutations will be applied.
|
||
|
info *Info
|
||
|
|
||
|
// syncError is the error that was encountered when trying to sync
|
||
|
// info with the backing store. If this is set, errorRetryAt should also
|
||
|
// be set.
|
||
|
syncError error
|
||
|
|
||
|
// errorRetryAt is the time at which a sync should be reattempted. It
|
||
|
// should be set if syncError is set.
|
||
|
errorRetryAt time.Time
|
||
|
|
||
|
// syncAt is the time at which the system should try to apply the
|
||
|
// pending mutations for this entry to the backing store. It should
|
||
|
// be less than or equal to syncInterval from now.
|
||
|
//
|
||
|
// The corresponding CachingDB.lock must be held when reading from or
|
||
|
// writing to this field.
|
||
|
syncAt time.Time
|
||
|
|
||
|
// mutations contains the set of changes to be made to a reputations
|
||
|
// entry when the next sync operation fires.
|
||
|
mutations Mutations
|
||
|
}
|
||
|
|
||
|
// Update applies a single update (one audit outcome) to a node's reputations
|
||
|
// record.
|
||
|
//
|
||
|
// If the node (as represented in the returned info) becomes newly vetted,
|
||
|
// disqualified, or suspended as a result of this update, the caller is
|
||
|
// responsible for updating the records in the overlay to match.
|
||
|
func (cdb *CachingDB) Update(ctx context.Context, request UpdateRequest, auditTime time.Time) (info *Info, err error) {
|
||
|
defer mon.Task()(&ctx)(&err)
|
||
|
|
||
|
mutations, err := UpdateRequestToMutations(request, auditTime)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
return cdb.ApplyUpdates(ctx, request.NodeID, mutations, request.Config, auditTime)
|
||
|
}
|
||
|
|
||
|
// ApplyUpdates applies multiple updates (defined by the updates parameter) to
|
||
|
// a node's reputations record.
|
||
|
//
|
||
|
// If the node (as represented in the returned info) becomes newly vetted,
|
||
|
// disqualified, or suspended as a result of these updates, the caller is
|
||
|
// responsible for updating the records in the overlay to match.
|
||
|
func (cdb *CachingDB) ApplyUpdates(ctx context.Context, nodeID storj.NodeID, updates Mutations, config Config, now time.Time) (info *Info, err error) {
|
||
|
defer mon.Task()(&ctx)(&err)
|
||
|
|
||
|
logger := cdb.log.With(zap.Stringer("node-id", nodeID))
|
||
|
doRequestSync := false
|
||
|
|
||
|
cdb.getEntry(ctx, nodeID, now, func(nodeEntry *cachedNodeReputationInfo) {
|
||
|
if nodeEntry.syncError != nil {
|
||
|
if ErrNodeNotFound.Has(nodeEntry.syncError) || errors.Is(nodeEntry.syncError, notPopulated) {
|
||
|
// get it added to the database
|
||
|
info, err = cdb.backingStore.ApplyUpdates(ctx, nodeID, updates, config, now)
|
||
|
if err != nil {
|
||
|
nodeEntry.syncError = err
|
||
|
nodeEntry.errorRetryAt = now.Add(cdb.errorRetryInterval)
|
||
|
return
|
||
|
}
|
||
|
nodeEntry.info = info.Copy()
|
||
|
nodeEntry.syncError = nil
|
||
|
return
|
||
|
}
|
||
|
err = nodeEntry.syncError
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if updates.OnlineHistory != nil {
|
||
|
MergeAuditHistories(nodeEntry.mutations.OnlineHistory, updates.OnlineHistory.Windows, config.AuditHistory)
|
||
|
}
|
||
|
nodeEntry.mutations.PositiveResults += updates.PositiveResults
|
||
|
nodeEntry.mutations.FailureResults += updates.FailureResults
|
||
|
nodeEntry.mutations.OfflineResults += updates.OfflineResults
|
||
|
nodeEntry.mutations.UnknownResults += updates.UnknownResults
|
||
|
|
||
|
// We will also mutate the cached reputation info, as a best-effort
|
||
|
// estimate of what the reputation should be when synced with the
|
||
|
// backing store.
|
||
|
cachedInfo := nodeEntry.info
|
||
|
|
||
|
// We want to return a copy of this entity, after it has been mutated,
|
||
|
// and the copy has to be done while we still hold the lock.
|
||
|
defer func() { info = cachedInfo.Copy() }()
|
||
|
|
||
|
trackingPeriodFull := false
|
||
|
if updates.OnlineHistory != nil {
|
||
|
trackingPeriodFull = MergeAuditHistories(cachedInfo.AuditHistory, updates.OnlineHistory.Windows, config.AuditHistory)
|
||
|
}
|
||
|
cachedInfo.AuditSuccessCount += int64(updates.PositiveResults)
|
||
|
cachedInfo.TotalAuditCount += int64(updates.PositiveResults + updates.FailureResults + updates.OfflineResults + updates.UnknownResults)
|
||
|
cachedInfo.OnlineScore = cachedInfo.AuditHistory.Score
|
||
|
|
||
|
if cachedInfo.VettedAt == nil && cachedInfo.TotalAuditCount >= config.AuditCount {
|
||
|
cachedInfo.VettedAt = &now
|
||
|
// if we think the node is newly vetted, perform a sync to
|
||
|
// have the best chance of propagating that information to
|
||
|
// other satellite services.
|
||
|
doRequestSync = true
|
||
|
}
|
||
|
|
||
|
// for audit failure, only update normal alpha/beta
|
||
|
cachedInfo.AuditReputationBeta, cachedInfo.AuditReputationAlpha = UpdateReputationMultiple(
|
||
|
updates.FailureResults,
|
||
|
cachedInfo.AuditReputationBeta,
|
||
|
cachedInfo.AuditReputationAlpha,
|
||
|
config.AuditLambda,
|
||
|
config.AuditWeight,
|
||
|
)
|
||
|
// for audit unknown, only update unknown alpha/beta
|
||
|
cachedInfo.UnknownAuditReputationBeta, cachedInfo.UnknownAuditReputationAlpha = UpdateReputationMultiple(
|
||
|
updates.UnknownResults,
|
||
|
cachedInfo.UnknownAuditReputationBeta,
|
||
|
cachedInfo.UnknownAuditReputationAlpha,
|
||
|
config.AuditLambda,
|
||
|
config.AuditWeight,
|
||
|
)
|
||
|
|
||
|
// for a successful audit, increase reputation for normal *and* unknown audits
|
||
|
cachedInfo.AuditReputationAlpha, cachedInfo.AuditReputationBeta = UpdateReputationMultiple(
|
||
|
updates.PositiveResults,
|
||
|
cachedInfo.AuditReputationAlpha,
|
||
|
cachedInfo.AuditReputationBeta,
|
||
|
config.AuditLambda,
|
||
|
config.AuditWeight,
|
||
|
)
|
||
|
cachedInfo.UnknownAuditReputationAlpha, cachedInfo.UnknownAuditReputationBeta = UpdateReputationMultiple(
|
||
|
updates.PositiveResults,
|
||
|
cachedInfo.UnknownAuditReputationAlpha,
|
||
|
cachedInfo.UnknownAuditReputationBeta,
|
||
|
config.AuditLambda,
|
||
|
config.AuditWeight,
|
||
|
)
|
||
|
|
||
|
mon.FloatVal("cached_audit_reputation_alpha").Observe(cachedInfo.AuditReputationAlpha)
|
||
|
mon.FloatVal("cached_audit_reputation_beta").Observe(cachedInfo.AuditReputationBeta)
|
||
|
mon.FloatVal("cached_unknown_audit_reputation_alpha").Observe(cachedInfo.UnknownAuditReputationAlpha)
|
||
|
mon.FloatVal("cached_unknown_audit_reputation_beta").Observe(cachedInfo.UnknownAuditReputationBeta)
|
||
|
mon.FloatVal("cached_audit_online_score").Observe(cachedInfo.OnlineScore)
|
||
|
|
||
|
// The following code is all meant to keep the cache working
|
||
|
// similarly to the values in the database. However, the cache
|
||
|
// is not the "source of truth" and fields like Disqualified,
|
||
|
// UnknownAuditSuspended, and UnderReview might be different
|
||
|
// from what is in the backing store. If that happens, the cache
|
||
|
// will get synced back to the source of truth the next time
|
||
|
// this node is synchronized.
|
||
|
|
||
|
// update audit score
|
||
|
newAuditScore := cachedInfo.AuditReputationAlpha / (cachedInfo.AuditReputationAlpha + cachedInfo.AuditReputationBeta)
|
||
|
// disqualification case a
|
||
|
// a) Success/fail audit reputation falls below audit DQ threshold
|
||
|
if newAuditScore <= config.AuditDQ {
|
||
|
if cachedInfo.Disqualified == nil {
|
||
|
cachedInfo.Disqualified = &now
|
||
|
cachedInfo.DisqualificationReason = overlay.DisqualificationReasonAuditFailure
|
||
|
logger.Info("Disqualified", zap.String("dq-type", "audit failure"))
|
||
|
// if we think the node is newly disqualified, perform a sync
|
||
|
// to have the best chance of propagating that information to
|
||
|
// other satellite services.
|
||
|
doRequestSync = true
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// check unknown-audits score
|
||
|
unknownAuditRep := cachedInfo.UnknownAuditReputationAlpha / (cachedInfo.UnknownAuditReputationAlpha + cachedInfo.UnknownAuditReputationBeta)
|
||
|
if unknownAuditRep <= config.AuditDQ {
|
||
|
if cachedInfo.UnknownAuditSuspended == nil {
|
||
|
logger.Info("Suspended", zap.String("category", "unknown-result audits"))
|
||
|
cachedInfo.UnknownAuditSuspended = &now
|
||
|
}
|
||
|
|
||
|
// disqualification case b
|
||
|
// b) Node is suspended (success/unknown reputation below audit DQ threshold)
|
||
|
// AND the suspended grace period has elapsed
|
||
|
// AND audit outcome is unknown or failed
|
||
|
|
||
|
// if suspended grace period has elapsed and unknown audit rep is still
|
||
|
// too low, disqualify node. Set suspended to nil if node is disqualified
|
||
|
if cachedInfo.UnknownAuditSuspended != nil &&
|
||
|
now.Sub(*cachedInfo.UnknownAuditSuspended) > config.SuspensionGracePeriod &&
|
||
|
config.SuspensionDQEnabled {
|
||
|
logger.Info("Disqualified", zap.String("dq-type", "suspension grace period expired for unknown-result audits"))
|
||
|
cachedInfo.Disqualified = &now
|
||
|
cachedInfo.DisqualificationReason = overlay.DisqualificationReasonSuspension
|
||
|
cachedInfo.UnknownAuditSuspended = nil
|
||
|
}
|
||
|
} else if cachedInfo.UnknownAuditSuspended != nil {
|
||
|
logger.Info("Suspension lifted", zap.String("category", "unknown-result audits"))
|
||
|
cachedInfo.UnknownAuditSuspended = nil
|
||
|
}
|
||
|
|
||
|
// if suspension not enabled, skip penalization and unsuspend node if applicable
|
||
|
if !config.AuditHistory.OfflineSuspensionEnabled {
|
||
|
if cachedInfo.OfflineSuspended != nil {
|
||
|
cachedInfo.OfflineSuspended = nil
|
||
|
}
|
||
|
if cachedInfo.UnderReview != nil {
|
||
|
cachedInfo.UnderReview = nil
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// only penalize node if online score is below threshold and
|
||
|
// if it has enough completed windows to fill a tracking period
|
||
|
penalizeOfflineNode := false
|
||
|
if cachedInfo.OnlineScore < config.AuditHistory.OfflineThreshold && trackingPeriodFull {
|
||
|
penalizeOfflineNode = true
|
||
|
}
|
||
|
|
||
|
// Suspension and disqualification for offline nodes
|
||
|
if cachedInfo.UnderReview != nil {
|
||
|
// move node in and out of suspension as needed during review period
|
||
|
if !penalizeOfflineNode && cachedInfo.OfflineSuspended != nil {
|
||
|
cachedInfo.OfflineSuspended = nil
|
||
|
} else if penalizeOfflineNode && cachedInfo.OfflineSuspended == nil {
|
||
|
cachedInfo.OfflineSuspended = &now
|
||
|
}
|
||
|
|
||
|
gracePeriodEnd := cachedInfo.UnderReview.Add(config.AuditHistory.GracePeriod)
|
||
|
trackingPeriodEnd := gracePeriodEnd.Add(config.AuditHistory.TrackingPeriod)
|
||
|
trackingPeriodPassed := now.After(trackingPeriodEnd)
|
||
|
|
||
|
// after tracking period has elapsed, if score is good, clear under review
|
||
|
// otherwise, disqualify node (if OfflineDQEnabled feature flag is true)
|
||
|
if trackingPeriodPassed {
|
||
|
if penalizeOfflineNode {
|
||
|
if config.AuditHistory.OfflineDQEnabled {
|
||
|
logger.Info("Disqualified", zap.String("dq-type", "node offline"))
|
||
|
cachedInfo.Disqualified = &now
|
||
|
cachedInfo.DisqualificationReason = overlay.DisqualificationReasonNodeOffline
|
||
|
}
|
||
|
} else {
|
||
|
logger.Info("Suspension lifted", zap.String("category", "node offline"))
|
||
|
cachedInfo.UnderReview = nil
|
||
|
cachedInfo.OfflineSuspended = nil
|
||
|
}
|
||
|
}
|
||
|
} else if penalizeOfflineNode {
|
||
|
// suspend node for being offline and begin review period
|
||
|
cachedInfo.UnderReview = &now
|
||
|
cachedInfo.OfflineSuspended = &now
|
||
|
}
|
||
|
})
|
||
|
|
||
|
if doRequestSync {
|
||
|
_ = cdb.RequestSync(ctx, nodeID)
|
||
|
}
|
||
|
return info, err
|
||
|
}
|
||
|
|
||
|
// UnsuspendNodeUnknownAudit unsuspends a storage node for unknown audits.
|
||
|
func (cdb *CachingDB) UnsuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID) (err error) {
|
||
|
defer mon.Task()(&ctx)(&err)
|
||
|
|
||
|
err = cdb.backingStore.UnsuspendNodeUnknownAudit(ctx, nodeID)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
// sync with database (this will get it marked as unsuspended in the cache)
|
||
|
return cdb.RequestSync(ctx, nodeID)
|
||
|
}
|
||
|
|
||
|
// DisqualifyNode disqualifies a storage node.
|
||
|
func (cdb *CachingDB) DisqualifyNode(ctx context.Context, nodeID storj.NodeID, disqualifiedAt time.Time, reason overlay.DisqualificationReason) (err error) {
|
||
|
defer mon.Task()(&ctx)(&err)
|
||
|
|
||
|
err = cdb.backingStore.DisqualifyNode(ctx, nodeID, disqualifiedAt, reason)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
// sync with database (this will get it marked as disqualified in the cache)
|
||
|
return cdb.RequestSync(ctx, nodeID)
|
||
|
}
|
||
|
|
||
|
// SuspendNodeUnknownAudit suspends a storage node for unknown audits.
|
||
|
func (cdb *CachingDB) SuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error) {
|
||
|
defer mon.Task()(&ctx)(&err)
|
||
|
|
||
|
err = cdb.backingStore.SuspendNodeUnknownAudit(ctx, nodeID, suspendedAt)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
// sync with database (this will get it marked as suspended in the cache)
|
||
|
return cdb.RequestSync(ctx, nodeID)
|
||
|
}
|
||
|
|
||
|
// RequestSync requests the managing goroutine to perform a sync of cached info
|
||
|
// about the specified node to the backing store. This involves applying the
|
||
|
// cached mutations and resetting the info attribute to match a snapshot of what
|
||
|
// is in the backing store after the mutations.
|
||
|
func (cdb *CachingDB) RequestSync(ctx context.Context, nodeID storj.NodeID) (err error) {
|
||
|
defer mon.Task()(&ctx)(&err)
|
||
|
|
||
|
req := syncRequest{
|
||
|
nodeID: nodeID,
|
||
|
doneChan: make(chan struct{}, 1),
|
||
|
}
|
||
|
select {
|
||
|
case cdb.requestSyncChannel <- req:
|
||
|
case <-ctx.Done():
|
||
|
return ctx.Err()
|
||
|
}
|
||
|
select {
|
||
|
case <-req.doneChan:
|
||
|
case <-ctx.Done():
|
||
|
return ctx.Err()
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// FlushAll syncs all pending reputation mutations to the backing store.
|
||
|
func (cdb *CachingDB) FlushAll(ctx context.Context) (err error) {
|
||
|
defer mon.Task()(&ctx)(&err)
|
||
|
|
||
|
var copyOfEntries []*cachedNodeReputationInfo
|
||
|
func() {
|
||
|
cdb.lock.Lock()
|
||
|
defer cdb.lock.Unlock()
|
||
|
|
||
|
copyOfEntries = make([]*cachedNodeReputationInfo, 0, len(cdb.pending))
|
||
|
for _, entry := range cdb.pending {
|
||
|
copyOfEntries = append(copyOfEntries, entry)
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
var errg errs.Group
|
||
|
for _, entry := range copyOfEntries {
|
||
|
errg.Add(func() error {
|
||
|
entry.entryLock.Lock()
|
||
|
defer entry.entryLock.Unlock()
|
||
|
|
||
|
cdb.syncEntry(ctx, entry, cdb.nowFunc())
|
||
|
return entry.syncError
|
||
|
}())
|
||
|
}
|
||
|
return errg.Err()
|
||
|
}
|
||
|
|
||
|
// Manage should be run in its own goroutine while a CachingDB is in use. This
|
||
|
// will schedule database flushes, trying to avoid too much load all at once.
|
||
|
func (cdb *CachingDB) Manage(ctx context.Context) error {
|
||
|
for {
|
||
|
select {
|
||
|
case <-cdb.nextSyncTimer.C:
|
||
|
cdb.syncDueEntries(ctx, cdb.nowFunc())
|
||
|
cdb.updateTimer(cdb.nowFunc(), false)
|
||
|
case request := <-cdb.requestSyncChannel:
|
||
|
cdb.syncNode(ctx, request, cdb.nowFunc())
|
||
|
case <-ctx.Done():
|
||
|
return ctx.Err()
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// must not be called while there is a concurrent receive on
|
||
|
// cdb.nextSyncTimer.C (see the docs for time.(*Timer).Reset()).
|
||
|
//
|
||
|
// Here we achieve this requirement by calling this function only
|
||
|
// from the same goroutine that waits on that timer.
|
||
|
func (cdb *CachingDB) updateTimer(now time.Time, drainChannel bool) {
|
||
|
cdb.lock.Lock()
|
||
|
defer cdb.lock.Unlock()
|
||
|
|
||
|
var timeToNextSync time.Duration
|
||
|
if cdb.writeOrderHeap.Len() == 0 {
|
||
|
// We could use any large-ish duration here. We just need to
|
||
|
// keep the timer channel valid and want to avoid spinning on
|
||
|
// updateTimer() calls.
|
||
|
timeToNextSync = cdb.syncInterval
|
||
|
} else {
|
||
|
nextSync := cdb.writeOrderHeap[0].syncAt
|
||
|
timeToNextSync = nextSync.Sub(now) // note: may be negative
|
||
|
}
|
||
|
|
||
|
if drainChannel {
|
||
|
if !cdb.nextSyncTimer.Stop() {
|
||
|
<-cdb.nextSyncTimer.C
|
||
|
}
|
||
|
}
|
||
|
cdb.nextSyncTimer.Reset(timeToNextSync)
|
||
|
}
|
||
|
|
||
|
// getExistingEntry looks up an entry in the pending mutations cache, locks it,
|
||
|
// and calls f with the entry while holding the lock. If there is no entry in
|
||
|
// the cache with the given nodeID, f is not called.
|
||
|
func (cdb *CachingDB) getExistingEntry(nodeID storj.NodeID, f func(entryToSync *cachedNodeReputationInfo)) {
|
||
|
var entryToSync *cachedNodeReputationInfo
|
||
|
func() {
|
||
|
cdb.lock.Lock()
|
||
|
defer cdb.lock.Unlock()
|
||
|
|
||
|
entryToSync = cdb.pending[nodeID]
|
||
|
}()
|
||
|
if entryToSync == nil {
|
||
|
mon.Event("writecache-asked-for-unknown-node")
|
||
|
return
|
||
|
}
|
||
|
|
||
|
func() {
|
||
|
entryToSync.entryLock.Lock()
|
||
|
defer entryToSync.entryLock.Unlock()
|
||
|
|
||
|
f(entryToSync)
|
||
|
}()
|
||
|
}
|
||
|
|
||
|
func (cdb *CachingDB) syncNode(ctx context.Context, request syncRequest, now time.Time) {
|
||
|
defer close(request.doneChan)
|
||
|
cdb.getExistingEntry(request.nodeID, func(entryToSync *cachedNodeReputationInfo) {
|
||
|
cdb.syncEntry(ctx, entryToSync, now)
|
||
|
})
|
||
|
}
|
||
|
|
||
|
func (cdb *CachingDB) syncDueEntries(ctx context.Context, now time.Time) {
|
||
|
cdb.getEntriesToSync(now, func(entryToSync *cachedNodeReputationInfo) {
|
||
|
cdb.syncEntry(ctx, entryToSync, now)
|
||
|
})
|
||
|
}
|
||
|
|
||
|
// getEntriesToSync constructs a list of all entries due for syncing, updates
|
||
|
// the syncAt time for each, then locks each one individually and calls f()
|
||
|
// once for each entry while holding its lock.
|
||
|
func (cdb *CachingDB) getEntriesToSync(now time.Time, f func(entryToSync *cachedNodeReputationInfo)) {
|
||
|
var entriesToSync []*cachedNodeReputationInfo
|
||
|
|
||
|
func() {
|
||
|
cdb.lock.Lock()
|
||
|
defer cdb.lock.Unlock()
|
||
|
|
||
|
for {
|
||
|
if cdb.writeOrderHeap.Len() == 0 {
|
||
|
break
|
||
|
}
|
||
|
if !cdb.writeOrderHeap[0].syncAt.Before(now) {
|
||
|
break
|
||
|
}
|
||
|
entryToSync := cdb.writeOrderHeap[0]
|
||
|
|
||
|
// We bump syncAt regardless of whether we are about to sync. If
|
||
|
// something else is already syncing this entry, it has taken
|
||
|
// more time than expected, and the next flush is due. We need
|
||
|
// cdb.writeOrderHeap[0].syncAt.After(now) before we can exit
|
||
|
// from this loop.
|
||
|
entryToSync.syncAt = cdb.nextTimeForSync(entryToSync.nodeID, now)
|
||
|
|
||
|
// move element 0 to its new correct place in the heap. This
|
||
|
// shouldn't affect entryToSync, which is a pointer to the
|
||
|
// entry shared by the heap.
|
||
|
heap.Fix(&cdb.writeOrderHeap, 0)
|
||
|
|
||
|
entriesToSync = append(entriesToSync, entryToSync)
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
for len(entriesToSync) > 0 {
|
||
|
entry := entriesToSync[0]
|
||
|
func() {
|
||
|
entry.entryLock.Lock()
|
||
|
defer entry.entryLock.Unlock()
|
||
|
|
||
|
f(entry)
|
||
|
}()
|
||
|
entriesToSync = entriesToSync[1:]
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (cdb *CachingDB) nextTimeForSync(nodeID storj.NodeID, now time.Time) time.Time {
|
||
|
return nextTimeForSync(cdb.peerID, nodeID, now, cdb.syncInterval)
|
||
|
}
|
||
|
|
||
|
// nextTimeForSync decides the next time at which the given nodeID should next
|
||
|
// be synchronized with the backing store.
|
||
|
//
|
||
|
// We make an effort to distribute the nodes in time, so that the service
|
||
|
// is not usually trying to retrieve or update many rows at the same time. We
|
||
|
// also make an effort to offset this sync schedule by the instance ID of this
|
||
|
// process so that in most cases, instances will not be trying to update the
|
||
|
// same row at the same time, minimizing contention.
|
||
|
func nextTimeForSync(peerID, nodeID storj.NodeID, now time.Time, syncInterval time.Duration) time.Time {
|
||
|
// calculate the fraction into the FlushInterval at which this node
|
||
|
// should always be synchronized.
|
||
|
initialPosition := binary.BigEndian.Uint64(nodeID[:8])
|
||
|
offset := binary.BigEndian.Uint64(peerID[:8])
|
||
|
finalPosition := initialPosition + offset
|
||
|
positionAsFraction := float64(finalPosition) / (1 << 64)
|
||
|
// and apply that fraction to the actual interval
|
||
|
periodStart := now.Truncate(syncInterval)
|
||
|
offsetFromStart := time.Duration(positionAsFraction * float64(syncInterval))
|
||
|
syncTime := periodStart.Add(offsetFromStart)
|
||
|
if syncTime.Before(now) {
|
||
|
syncTime = syncTime.Add(syncInterval)
|
||
|
}
|
||
|
// reapply monotonic time by applying the time delta to 'now'
|
||
|
timeToNextSync := syncTime.Sub(now)
|
||
|
return now.Add(timeToNextSync)
|
||
|
}
|
||
|
|
||
|
// syncEntry synchronizes an entry with the backing store. Any pending mutations
|
||
|
// will be applied to the backing store, and the info and syncError attributes
|
||
|
// will be updated according to the results.
|
||
|
//
|
||
|
// syncEntry must be called with the entry already locked.
|
||
|
func (cdb *CachingDB) syncEntry(ctx context.Context, entry *cachedNodeReputationInfo, now time.Time) {
|
||
|
defer mon.Task()(&ctx)(nil)
|
||
|
|
||
|
entry.info, entry.syncError = cdb.backingStore.ApplyUpdates(ctx, entry.nodeID, entry.mutations, cdb.reputationConfig, now)
|
||
|
|
||
|
// NOTE: If another process has been updating the same row in the
|
||
|
// backing store, it is possible that the node has become newly vetted,
|
||
|
// disqualified, or suspended without us knowing about it. In this case,
|
||
|
// the overlay will not know about the change until it next updates the
|
||
|
// reputation. We may need to add some way for this object to notify the
|
||
|
// overlay of updates such as this.
|
||
|
|
||
|
if entry.syncError != nil {
|
||
|
if ErrNodeNotFound.Has(entry.syncError) {
|
||
|
entry.errorRetryAt = now
|
||
|
} else {
|
||
|
entry.errorRetryAt = now.Add(cdb.errorRetryInterval)
|
||
|
}
|
||
|
}
|
||
|
entry.mutations = Mutations{
|
||
|
OnlineHistory: &pb.AuditHistory{},
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Get retrieves the cached *Info record for the given node ID. If the
|
||
|
// information is not already in the cache, the information is fetched from the
|
||
|
// backing store.
|
||
|
//
|
||
|
// If an error occurred syncing the entry with the backing store, it will be
|
||
|
// returned. In this case, the returned value for 'info' might be nil, or it
|
||
|
// might contain data cached longer than FlushInterval.
|
||
|
func (cdb *CachingDB) Get(ctx context.Context, nodeID storj.NodeID) (info *Info, err error) {
|
||
|
defer mon.Task()(&ctx)(&err)
|
||
|
|
||
|
cdb.getEntry(ctx, nodeID, cdb.nowFunc(), func(entry *cachedNodeReputationInfo) {
|
||
|
if entry.syncError != nil {
|
||
|
err = entry.syncError
|
||
|
}
|
||
|
if entry.info != nil {
|
||
|
info = entry.info.Copy()
|
||
|
}
|
||
|
})
|
||
|
return info, err
|
||
|
}
|
||
|
|
||
|
// getEntry acquires an entry (a *cachedNodeReputationInfo) in the reputation
|
||
|
// cache, locks it, and supplies the entry to the given callback function for
|
||
|
// access or mutation. The pointer to the entry will not remain valid after the
|
||
|
// callback function returns.
|
||
|
//
|
||
|
// If there is no record for the requested nodeID, a new record will be added
|
||
|
// for it, it will be synced with the backing store, and the new record will be
|
||
|
// supplied to the given callback function.
|
||
|
//
|
||
|
// If there was an error fetching up-to-date info from the backing store, the
|
||
|
// entry supplied to the callback will have entry.syncError != nil. In this
|
||
|
// case, entry.info may be nil, or it may have an out-of-date record. If the
|
||
|
// error occurred long enough ago that it is time to try again, another attempt
|
||
|
// to sync the entry will occur before the callback is made.
|
||
|
func (cdb *CachingDB) getEntry(ctx context.Context, nodeID storj.NodeID, now time.Time, f func(entry *cachedNodeReputationInfo)) {
|
||
|
defer mon.Task()(&ctx)(nil)
|
||
|
|
||
|
var nodeEntry *cachedNodeReputationInfo
|
||
|
|
||
|
func() {
|
||
|
cdb.lock.Lock()
|
||
|
defer cdb.lock.Unlock()
|
||
|
|
||
|
var ok bool
|
||
|
nodeEntry, ok = cdb.pending[nodeID]
|
||
|
if !ok {
|
||
|
nodeEntry = cdb.insertNode(nodeID, now)
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
func() {
|
||
|
nodeEntry.entryLock.Lock()
|
||
|
defer nodeEntry.entryLock.Unlock()
|
||
|
|
||
|
if nodeEntry.syncError != nil && nodeEntry.errorRetryAt.Before(now) {
|
||
|
cdb.syncEntry(ctx, nodeEntry, now)
|
||
|
}
|
||
|
|
||
|
f(nodeEntry)
|
||
|
}()
|
||
|
}
|
||
|
|
||
|
// Inserts a mostly-empty *cachedNodeReputationInfo record into the pending
|
||
|
// list and the write-order heap.
|
||
|
//
|
||
|
// The syncError is pre-set so that the first caller to acquire the entryLock
|
||
|
// on the new entry should initiate an immediate sync with the backing store.
|
||
|
//
|
||
|
// cdb.lock must be held when calling.
|
||
|
func (cdb *CachingDB) insertNode(nodeID storj.NodeID, now time.Time) *cachedNodeReputationInfo {
|
||
|
syncTime := cdb.nextTimeForSync(nodeID, now)
|
||
|
mut := &cachedNodeReputationInfo{
|
||
|
nodeID: nodeID,
|
||
|
syncAt: syncTime,
|
||
|
syncError: notPopulated,
|
||
|
errorRetryAt: time.Time{}, // sync will be initiated right away
|
||
|
mutations: Mutations{
|
||
|
OnlineHistory: &pb.AuditHistory{},
|
||
|
},
|
||
|
}
|
||
|
|
||
|
cdb.pending[nodeID] = mut
|
||
|
heap.Push(&cdb.writeOrderHeap, mut)
|
||
|
|
||
|
return mut
|
||
|
}
|
||
|
|
||
|
// SetNowFunc supplies a new function to use for determining the current time,
|
||
|
// for synchronization timing and scheduling purposes. This is frequently useful
|
||
|
// in test scenarios.
|
||
|
func (cdb *CachingDB) SetNowFunc(timeFunc func() time.Time) {
|
||
|
cdb.nowFunc = timeFunc
|
||
|
}
|
||
|
|
||
|
// notPopulated is an error indicating that a cachedNodeReputationInfo
|
||
|
// structure has not yet been populated. The syncError field is initialized
|
||
|
// to this error, and the first access of the entry should cause an immediate
|
||
|
// lookup to the backing store. Therefore, this error should not normally
|
||
|
// escape outside writecache code.
|
||
|
var notPopulated = Error.New("not populated")
|
||
|
|
||
|
// nodeIDHeap is a heap of cachedNodeReputationInfo entries, ordered by the
|
||
|
// associated syncAt times. It implements heap.Interface.
|
||
|
type nodeIDHeap []*cachedNodeReputationInfo
|
||
|
|
||
|
// Len returns the length of the slice.
|
||
|
func (n nodeIDHeap) Len() int {
|
||
|
return len(n)
|
||
|
}
|
||
|
|
||
|
// Swap swaps the elements with indices i and j.
|
||
|
func (n nodeIDHeap) Swap(i, j int) {
|
||
|
n[i], n[j] = n[j], n[i]
|
||
|
}
|
||
|
|
||
|
// Less returns true if the syncAt time for the element with index i comes
|
||
|
// before the syncAt time for the element with index j.
|
||
|
func (n nodeIDHeap) Less(i, j int) bool {
|
||
|
return n[i].syncAt.Before(n[j].syncAt)
|
||
|
}
|
||
|
|
||
|
// Push appends an element to the slice.
|
||
|
func (n *nodeIDHeap) Push(x interface{}) {
|
||
|
*n = append(*n, x.(*cachedNodeReputationInfo))
|
||
|
}
|
||
|
|
||
|
// Pop removes and returns the last element in the slice.
|
||
|
func (n *nodeIDHeap) Pop() interface{} {
|
||
|
oldLen := len(*n)
|
||
|
item := (*n)[oldLen-1]
|
||
|
*n = (*n)[:oldLen-1]
|
||
|
return item
|
||
|
}
|