storj/satellite/reputation/writecache.go
paul cannon 0dcc0a9ee0 satellite/reputation: reconfigure lambda and alpha
This is in response to community feedback that our existing reputation
calculation is too likely to disqualify storage nodes unfairly with
extreme swings up and down.

For details and analysis, please see the data_loss_vs_dq_chance_sim.py
tool, the "tuning reputation further.ipynb" Jupyter notebook in the
storj/datascience repository, and the discussion at

    https://forum.storj.io/t/tuning-audit-scoring/14084

In brief: changing the lambda and initial-alpha parameters in this way
causes the swings in reputation to be smaller and less likely to put a
node past the disqualification threshold unfairly.

Note: this change will cause a one-time reset of all (non-disqualified)
node reputations, because the new initial alpha value of 1000 is
dramatically different, and the disqualification threshold is going to
be much higher.

Change-Id: Id6dc4ba8fde1be3db4255b72282207bab5491ca3
2022-08-17 18:52:53 +00:00

755 lines
26 KiB
Go

// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package reputation
import (
"container/heap"
"context"
"encoding/binary"
"errors"
"math/rand"
"sync"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/storj/satellite/overlay"
)
var _ DB = (*CachingDB)(nil)
// NewCachingDB creates a new CachingDB instance.
func NewCachingDB(log *zap.Logger, backingStore DB, reputationConfig Config) *CachingDB {
randSource := rand.New(rand.NewSource(time.Now().UnixNano()))
return &CachingDB{
log: log,
instanceOffset: randSource.Uint64(),
backingStore: backingStore,
nowFunc: time.Now,
reputationConfig: reputationConfig,
syncInterval: reputationConfig.FlushInterval,
errorRetryInterval: reputationConfig.ErrorRetryInterval,
nextSyncTimer: time.NewTimer(reputationConfig.FlushInterval),
requestSyncChannel: make(chan syncRequest),
pending: make(map[storj.NodeID]*cachedNodeReputationInfo),
}
}
// CachingDB acts like a reputation.DB but caches reads and writes, to minimize
// load on the backing store.
type CachingDB struct {
// These fields must be populated before the cache starts being used.
// They are not expected to change.
log *zap.Logger
instanceOffset uint64
backingStore DB
nowFunc func() time.Time
reputationConfig Config
syncInterval time.Duration
errorRetryInterval time.Duration
requestSyncChannel chan syncRequest
// lock must be held when reading or writing to any of the following
// fields.
lock sync.Mutex
nextSyncTimer *time.Timer
// pending and writeOrderHeap contain the same set of entries, just with
// different lookup properties. It should be easy to keep them in sync,
// since we only insert with lock held, and (for now) we never evict
// from the cache.
pending map[storj.NodeID]*cachedNodeReputationInfo
writeOrderHeap nodeIDHeap
}
type syncRequest struct {
nodeID storj.NodeID
doneChan chan struct{}
}
type cachedNodeReputationInfo struct {
nodeID storj.NodeID
// entryLock must be held when reading or writing to the following fields
// in this structure (**except** syncAt. For syncAt, the CachingDB.lock
// must be held). When entryLock is released, either info or syncError
// (or both) must be non-nil.
entryLock sync.Mutex
// info is a best-effort copy of information from the database at some
// point in the recent past (usually less than syncInterval ago) combined
// with the requested updates which have not yet been synced to the
// database.
//
// note: info has no guaranteed relationship to the set of mutations.
// In particular, it is not necessarily the same as the base to which
// the mutations will be applied.
info *Info
// syncError is the error that was encountered when trying to sync
// info with the backing store. If this is set, errorRetryAt should also
// be set.
syncError error
// errorRetryAt is the time at which a sync should be reattempted. It
// should be set if syncError is set.
errorRetryAt time.Time
// syncAt is the time at which the system should try to apply the
// pending mutations for this entry to the backing store. It should
// be less than or equal to syncInterval from now.
//
// The corresponding CachingDB.lock must be held when reading from or
// writing to this field.
syncAt time.Time
// mutations contains the set of changes to be made to a reputations
// entry when the next sync operation fires.
mutations Mutations
}
// Update applies a single update (one audit outcome) to a node's reputations
// record.
//
// If the node (as represented in the returned info) becomes newly vetted,
// disqualified, or suspended as a result of this update, the caller is
// responsible for updating the records in the overlay to match.
func (cdb *CachingDB) Update(ctx context.Context, request UpdateRequest, auditTime time.Time) (info *Info, err error) {
defer mon.Task()(&ctx)(&err)
mutations, err := UpdateRequestToMutations(request, auditTime)
if err != nil {
return nil, err
}
return cdb.ApplyUpdates(ctx, request.NodeID, mutations, request.Config, auditTime)
}
// ApplyUpdates applies multiple updates (defined by the updates parameter) to
// a node's reputations record.
//
// If the node (as represented in the returned info) becomes newly vetted,
// disqualified, or suspended as a result of these updates, the caller is
// responsible for updating the records in the overlay to match.
func (cdb *CachingDB) ApplyUpdates(ctx context.Context, nodeID storj.NodeID, updates Mutations, config Config, now time.Time) (info *Info, err error) {
defer mon.Task()(&ctx)(&err)
logger := cdb.log.With(zap.Stringer("node-id", nodeID))
doRequestSync := false
cdb.getEntry(ctx, nodeID, now, func(nodeEntry *cachedNodeReputationInfo) {
if nodeEntry.syncError != nil {
if ErrNodeNotFound.Has(nodeEntry.syncError) || errors.Is(nodeEntry.syncError, notPopulated) {
// get it added to the database
info, err = cdb.backingStore.ApplyUpdates(ctx, nodeID, updates, config, now)
if err != nil {
nodeEntry.syncError = err
nodeEntry.errorRetryAt = now.Add(cdb.errorRetryInterval)
return
}
nodeEntry.info = info.Copy()
nodeEntry.syncError = nil
return
}
err = nodeEntry.syncError
return
}
if updates.OnlineHistory != nil {
MergeAuditHistories(nodeEntry.mutations.OnlineHistory, updates.OnlineHistory.Windows, config.AuditHistory)
}
nodeEntry.mutations.PositiveResults += updates.PositiveResults
nodeEntry.mutations.FailureResults += updates.FailureResults
nodeEntry.mutations.OfflineResults += updates.OfflineResults
nodeEntry.mutations.UnknownResults += updates.UnknownResults
// We will also mutate the cached reputation info, as a best-effort
// estimate of what the reputation should be when synced with the
// backing store.
cachedInfo := nodeEntry.info
// We want to return a copy of this entity, after it has been mutated,
// and the copy has to be done while we still hold the lock.
defer func() { info = cachedInfo.Copy() }()
trackingPeriodFull := false
if updates.OnlineHistory != nil {
trackingPeriodFull = MergeAuditHistories(cachedInfo.AuditHistory, updates.OnlineHistory.Windows, config.AuditHistory)
}
cachedInfo.AuditSuccessCount += int64(updates.PositiveResults)
cachedInfo.TotalAuditCount += int64(updates.PositiveResults + updates.FailureResults + updates.OfflineResults + updates.UnknownResults)
cachedInfo.OnlineScore = cachedInfo.AuditHistory.Score
if cachedInfo.VettedAt == nil && cachedInfo.TotalAuditCount >= config.AuditCount {
cachedInfo.VettedAt = &now
// if we think the node is newly vetted, perform a sync to
// have the best chance of propagating that information to
// other satellite services.
doRequestSync = true
}
// for audit failure, only update normal alpha/beta
cachedInfo.AuditReputationBeta, cachedInfo.AuditReputationAlpha = UpdateReputationMultiple(
updates.FailureResults,
cachedInfo.AuditReputationBeta,
cachedInfo.AuditReputationAlpha,
config.AuditLambda,
config.AuditWeight,
)
// for audit unknown, only update unknown alpha/beta
cachedInfo.UnknownAuditReputationBeta, cachedInfo.UnknownAuditReputationAlpha = UpdateReputationMultiple(
updates.UnknownResults,
cachedInfo.UnknownAuditReputationBeta,
cachedInfo.UnknownAuditReputationAlpha,
config.UnknownAuditLambda,
config.AuditWeight,
)
// for a successful audit, increase reputation for normal *and* unknown audits
cachedInfo.AuditReputationAlpha, cachedInfo.AuditReputationBeta = UpdateReputationMultiple(
updates.PositiveResults,
cachedInfo.AuditReputationAlpha,
cachedInfo.AuditReputationBeta,
config.AuditLambda,
config.AuditWeight,
)
cachedInfo.UnknownAuditReputationAlpha, cachedInfo.UnknownAuditReputationBeta = UpdateReputationMultiple(
updates.PositiveResults,
cachedInfo.UnknownAuditReputationAlpha,
cachedInfo.UnknownAuditReputationBeta,
config.UnknownAuditLambda,
config.AuditWeight,
)
mon.FloatVal("cached_audit_reputation_alpha").Observe(cachedInfo.AuditReputationAlpha)
mon.FloatVal("cached_audit_reputation_beta").Observe(cachedInfo.AuditReputationBeta)
mon.FloatVal("cached_unknown_audit_reputation_alpha").Observe(cachedInfo.UnknownAuditReputationAlpha)
mon.FloatVal("cached_unknown_audit_reputation_beta").Observe(cachedInfo.UnknownAuditReputationBeta)
mon.FloatVal("cached_audit_online_score").Observe(cachedInfo.OnlineScore)
// The following code is all meant to keep the cache working
// similarly to the values in the database. However, the cache
// is not the "source of truth" and fields like Disqualified,
// UnknownAuditSuspended, and UnderReview might be different
// from what is in the backing store. If that happens, the cache
// will get synced back to the source of truth the next time
// this node is synchronized.
// update audit score
newAuditScore := cachedInfo.AuditReputationAlpha / (cachedInfo.AuditReputationAlpha + cachedInfo.AuditReputationBeta)
// disqualification case a
// a) Success/fail audit reputation falls below audit DQ threshold
if newAuditScore <= config.AuditDQ {
if cachedInfo.Disqualified == nil {
cachedInfo.Disqualified = &now
cachedInfo.DisqualificationReason = overlay.DisqualificationReasonAuditFailure
logger.Info("Disqualified", zap.String("dq-type", "audit failure"))
// if we think the node is newly disqualified, perform a sync
// to have the best chance of propagating that information to
// other satellite services.
doRequestSync = true
}
}
// check unknown-audits score
unknownAuditRep := cachedInfo.UnknownAuditReputationAlpha / (cachedInfo.UnknownAuditReputationAlpha + cachedInfo.UnknownAuditReputationBeta)
if unknownAuditRep <= config.UnknownAuditDQ {
if cachedInfo.UnknownAuditSuspended == nil {
logger.Info("Suspended", zap.String("category", "unknown-result audits"))
cachedInfo.UnknownAuditSuspended = &now
}
// disqualification case b
// b) Node is suspended (success/unknown reputation below audit DQ threshold)
// AND the suspended grace period has elapsed
// AND audit outcome is unknown or failed
// if suspended grace period has elapsed and unknown audit rep is still
// too low, disqualify node. Set suspended to nil if node is disqualified
if cachedInfo.UnknownAuditSuspended != nil &&
now.Sub(*cachedInfo.UnknownAuditSuspended) > config.SuspensionGracePeriod &&
config.SuspensionDQEnabled {
logger.Info("Disqualified", zap.String("dq-type", "suspension grace period expired for unknown-result audits"))
cachedInfo.Disqualified = &now
cachedInfo.DisqualificationReason = overlay.DisqualificationReasonSuspension
cachedInfo.UnknownAuditSuspended = nil
}
} else if cachedInfo.UnknownAuditSuspended != nil {
logger.Info("Suspension lifted", zap.String("category", "unknown-result audits"))
cachedInfo.UnknownAuditSuspended = nil
}
// if suspension not enabled, skip penalization and unsuspend node if applicable
if !config.AuditHistory.OfflineSuspensionEnabled {
if cachedInfo.OfflineSuspended != nil {
cachedInfo.OfflineSuspended = nil
}
if cachedInfo.UnderReview != nil {
cachedInfo.UnderReview = nil
}
return
}
// only penalize node if online score is below threshold and
// if it has enough completed windows to fill a tracking period
penalizeOfflineNode := false
if cachedInfo.OnlineScore < config.AuditHistory.OfflineThreshold && trackingPeriodFull {
penalizeOfflineNode = true
}
// Suspension and disqualification for offline nodes
if cachedInfo.UnderReview != nil {
// move node in and out of suspension as needed during review period
if !penalizeOfflineNode && cachedInfo.OfflineSuspended != nil {
cachedInfo.OfflineSuspended = nil
} else if penalizeOfflineNode && cachedInfo.OfflineSuspended == nil {
cachedInfo.OfflineSuspended = &now
}
gracePeriodEnd := cachedInfo.UnderReview.Add(config.AuditHistory.GracePeriod)
trackingPeriodEnd := gracePeriodEnd.Add(config.AuditHistory.TrackingPeriod)
trackingPeriodPassed := now.After(trackingPeriodEnd)
// after tracking period has elapsed, if score is good, clear under review
// otherwise, disqualify node (if OfflineDQEnabled feature flag is true)
if trackingPeriodPassed {
if penalizeOfflineNode {
if config.AuditHistory.OfflineDQEnabled {
logger.Info("Disqualified", zap.String("dq-type", "node offline"))
cachedInfo.Disqualified = &now
cachedInfo.DisqualificationReason = overlay.DisqualificationReasonNodeOffline
}
} else {
logger.Info("Suspension lifted", zap.String("category", "node offline"))
cachedInfo.UnderReview = nil
cachedInfo.OfflineSuspended = nil
}
}
} else if penalizeOfflineNode {
// suspend node for being offline and begin review period
cachedInfo.UnderReview = &now
cachedInfo.OfflineSuspended = &now
}
})
if doRequestSync {
_ = cdb.RequestSync(ctx, nodeID)
}
return info, err
}
// UnsuspendNodeUnknownAudit unsuspends a storage node for unknown audits.
func (cdb *CachingDB) UnsuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID) (err error) {
defer mon.Task()(&ctx)(&err)
err = cdb.backingStore.UnsuspendNodeUnknownAudit(ctx, nodeID)
if err != nil {
return err
}
// sync with database (this will get it marked as unsuspended in the cache)
return cdb.RequestSync(ctx, nodeID)
}
// DisqualifyNode disqualifies a storage node.
func (cdb *CachingDB) DisqualifyNode(ctx context.Context, nodeID storj.NodeID, disqualifiedAt time.Time, reason overlay.DisqualificationReason) (err error) {
defer mon.Task()(&ctx)(&err)
err = cdb.backingStore.DisqualifyNode(ctx, nodeID, disqualifiedAt, reason)
if err != nil {
return err
}
// sync with database (this will get it marked as disqualified in the cache)
return cdb.RequestSync(ctx, nodeID)
}
// SuspendNodeUnknownAudit suspends a storage node for unknown audits.
func (cdb *CachingDB) SuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
err = cdb.backingStore.SuspendNodeUnknownAudit(ctx, nodeID, suspendedAt)
if err != nil {
return err
}
// sync with database (this will get it marked as suspended in the cache)
return cdb.RequestSync(ctx, nodeID)
}
// RequestSync requests the managing goroutine to perform a sync of cached info
// about the specified node to the backing store. This involves applying the
// cached mutations and resetting the info attribute to match a snapshot of what
// is in the backing store after the mutations.
func (cdb *CachingDB) RequestSync(ctx context.Context, nodeID storj.NodeID) (err error) {
defer mon.Task()(&ctx)(&err)
req := syncRequest{
nodeID: nodeID,
doneChan: make(chan struct{}, 1),
}
select {
case cdb.requestSyncChannel <- req:
case <-ctx.Done():
return ctx.Err()
}
select {
case <-req.doneChan:
case <-ctx.Done():
return ctx.Err()
}
return nil
}
// FlushAll syncs all pending reputation mutations to the backing store.
func (cdb *CachingDB) FlushAll(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
var copyOfEntries []*cachedNodeReputationInfo
func() {
cdb.lock.Lock()
defer cdb.lock.Unlock()
copyOfEntries = make([]*cachedNodeReputationInfo, 0, len(cdb.pending))
for _, entry := range cdb.pending {
copyOfEntries = append(copyOfEntries, entry)
}
}()
var errg errs.Group
for _, entry := range copyOfEntries {
errg.Add(func() error {
entry.entryLock.Lock()
defer entry.entryLock.Unlock()
cdb.syncEntry(ctx, entry, cdb.nowFunc())
return entry.syncError
}())
}
return errg.Err()
}
// Manage should be run in its own goroutine while a CachingDB is in use. This
// will schedule database flushes, trying to avoid too much load all at once.
func (cdb *CachingDB) Manage(ctx context.Context) error {
for {
select {
case <-cdb.nextSyncTimer.C:
cdb.syncDueEntries(ctx, cdb.nowFunc())
cdb.updateTimer(cdb.nowFunc(), false)
case request := <-cdb.requestSyncChannel:
cdb.syncNode(ctx, request, cdb.nowFunc())
case <-ctx.Done():
return ctx.Err()
}
}
}
// must not be called while there is a concurrent receive on
// cdb.nextSyncTimer.C (see the docs for time.(*Timer).Reset()).
//
// Here we achieve this requirement by calling this function only
// from the same goroutine that waits on that timer.
func (cdb *CachingDB) updateTimer(now time.Time, drainChannel bool) {
cdb.lock.Lock()
defer cdb.lock.Unlock()
var timeToNextSync time.Duration
if cdb.writeOrderHeap.Len() == 0 {
// We could use any large-ish duration here. We just need to
// keep the timer channel valid and want to avoid spinning on
// updateTimer() calls.
timeToNextSync = cdb.syncInterval
} else {
nextSync := cdb.writeOrderHeap[0].syncAt
timeToNextSync = nextSync.Sub(now) // note: may be negative
}
if drainChannel {
if !cdb.nextSyncTimer.Stop() {
<-cdb.nextSyncTimer.C
}
}
cdb.nextSyncTimer.Reset(timeToNextSync)
}
// getExistingEntry looks up an entry in the pending mutations cache, locks it,
// and calls f with the entry while holding the lock. If there is no entry in
// the cache with the given nodeID, f is not called.
func (cdb *CachingDB) getExistingEntry(nodeID storj.NodeID, f func(entryToSync *cachedNodeReputationInfo)) {
var entryToSync *cachedNodeReputationInfo
func() {
cdb.lock.Lock()
defer cdb.lock.Unlock()
entryToSync = cdb.pending[nodeID]
}()
if entryToSync == nil {
mon.Event("writecache-asked-for-unknown-node")
return
}
func() {
entryToSync.entryLock.Lock()
defer entryToSync.entryLock.Unlock()
f(entryToSync)
}()
}
func (cdb *CachingDB) syncNode(ctx context.Context, request syncRequest, now time.Time) {
defer close(request.doneChan)
cdb.getExistingEntry(request.nodeID, func(entryToSync *cachedNodeReputationInfo) {
cdb.syncEntry(ctx, entryToSync, now)
})
}
func (cdb *CachingDB) syncDueEntries(ctx context.Context, now time.Time) {
cdb.getEntriesToSync(now, func(entryToSync *cachedNodeReputationInfo) {
cdb.syncEntry(ctx, entryToSync, now)
})
}
// getEntriesToSync constructs a list of all entries due for syncing, updates
// the syncAt time for each, then locks each one individually and calls f()
// once for each entry while holding its lock.
func (cdb *CachingDB) getEntriesToSync(now time.Time, f func(entryToSync *cachedNodeReputationInfo)) {
var entriesToSync []*cachedNodeReputationInfo
func() {
cdb.lock.Lock()
defer cdb.lock.Unlock()
for {
if cdb.writeOrderHeap.Len() == 0 {
break
}
if !cdb.writeOrderHeap[0].syncAt.Before(now) {
break
}
entryToSync := cdb.writeOrderHeap[0]
// We bump syncAt regardless of whether we are about to sync. If
// something else is already syncing this entry, it has taken
// more time than expected, and the next flush is due. We need
// cdb.writeOrderHeap[0].syncAt.After(now) before we can exit
// from this loop.
entryToSync.syncAt = cdb.nextTimeForSync(entryToSync.nodeID, now)
// move element 0 to its new correct place in the heap. This
// shouldn't affect entryToSync, which is a pointer to the
// entry shared by the heap.
heap.Fix(&cdb.writeOrderHeap, 0)
entriesToSync = append(entriesToSync, entryToSync)
}
}()
for len(entriesToSync) > 0 {
entry := entriesToSync[0]
func() {
entry.entryLock.Lock()
defer entry.entryLock.Unlock()
f(entry)
}()
entriesToSync = entriesToSync[1:]
}
}
func (cdb *CachingDB) nextTimeForSync(nodeID storj.NodeID, now time.Time) time.Time {
return nextTimeForSync(cdb.instanceOffset, nodeID, now, cdb.syncInterval)
}
// nextTimeForSync decides the next time at which the given nodeID should next
// be synchronized with the backing store.
//
// We make an effort to distribute the nodes in time, so that the service
// is not usually trying to retrieve or update many rows at the same time. We
// also make an effort to offset this sync schedule by a random value unique
// to this process so that in most cases, instances will not be trying to
// update the same row at the same time, minimizing contention.
func nextTimeForSync(instanceOffset uint64, nodeID storj.NodeID, now time.Time, syncInterval time.Duration) time.Time {
// calculate the fraction into the FlushInterval at which this node
// should always be synchronized.
initialPosition := binary.BigEndian.Uint64(nodeID[:8])
finalPosition := initialPosition + instanceOffset
positionAsFraction := float64(finalPosition) / (1 << 64)
// and apply that fraction to the actual interval
periodStart := now.Truncate(syncInterval)
offsetFromStart := time.Duration(positionAsFraction * float64(syncInterval))
syncTime := periodStart.Add(offsetFromStart)
if syncTime.Before(now) {
syncTime = syncTime.Add(syncInterval)
}
// reapply monotonic time by applying the time delta to 'now'
timeToNextSync := syncTime.Sub(now)
return now.Add(timeToNextSync)
}
// syncEntry synchronizes an entry with the backing store. Any pending mutations
// will be applied to the backing store, and the info and syncError attributes
// will be updated according to the results.
//
// syncEntry must be called with the entry already locked.
func (cdb *CachingDB) syncEntry(ctx context.Context, entry *cachedNodeReputationInfo, now time.Time) {
defer mon.Task()(&ctx)(nil)
entry.info, entry.syncError = cdb.backingStore.ApplyUpdates(ctx, entry.nodeID, entry.mutations, cdb.reputationConfig, now)
// NOTE: If another process has been updating the same row in the
// backing store, it is possible that the node has become newly vetted,
// disqualified, or suspended without us knowing about it. In this case,
// the overlay will not know about the change until it next updates the
// reputation. We may need to add some way for this object to notify the
// overlay of updates such as this.
if entry.syncError != nil {
if ErrNodeNotFound.Has(entry.syncError) {
entry.errorRetryAt = now
} else {
entry.errorRetryAt = now.Add(cdb.errorRetryInterval)
}
}
entry.mutations = Mutations{
OnlineHistory: &pb.AuditHistory{},
}
}
// Get retrieves the cached *Info record for the given node ID. If the
// information is not already in the cache, the information is fetched from the
// backing store.
//
// If an error occurred syncing the entry with the backing store, it will be
// returned. In this case, the returned value for 'info' might be nil, or it
// might contain data cached longer than FlushInterval.
func (cdb *CachingDB) Get(ctx context.Context, nodeID storj.NodeID) (info *Info, err error) {
defer mon.Task()(&ctx)(&err)
cdb.getEntry(ctx, nodeID, cdb.nowFunc(), func(entry *cachedNodeReputationInfo) {
if entry.syncError != nil {
err = entry.syncError
}
if entry.info != nil {
info = entry.info.Copy()
}
})
return info, err
}
// getEntry acquires an entry (a *cachedNodeReputationInfo) in the reputation
// cache, locks it, and supplies the entry to the given callback function for
// access or mutation. The pointer to the entry will not remain valid after the
// callback function returns.
//
// If there is no record for the requested nodeID, a new record will be added
// for it, it will be synced with the backing store, and the new record will be
// supplied to the given callback function.
//
// If there was an error fetching up-to-date info from the backing store, the
// entry supplied to the callback will have entry.syncError != nil. In this
// case, entry.info may be nil, or it may have an out-of-date record. If the
// error occurred long enough ago that it is time to try again, another attempt
// to sync the entry will occur before the callback is made.
func (cdb *CachingDB) getEntry(ctx context.Context, nodeID storj.NodeID, now time.Time, f func(entry *cachedNodeReputationInfo)) {
defer mon.Task()(&ctx)(nil)
var nodeEntry *cachedNodeReputationInfo
func() {
cdb.lock.Lock()
defer cdb.lock.Unlock()
var ok bool
nodeEntry, ok = cdb.pending[nodeID]
if !ok {
nodeEntry = cdb.insertNode(nodeID, now)
}
}()
func() {
nodeEntry.entryLock.Lock()
defer nodeEntry.entryLock.Unlock()
if nodeEntry.syncError != nil && nodeEntry.errorRetryAt.Before(now) {
cdb.syncEntry(ctx, nodeEntry, now)
}
f(nodeEntry)
}()
}
// Inserts a mostly-empty *cachedNodeReputationInfo record into the pending
// list and the write-order heap.
//
// The syncError is pre-set so that the first caller to acquire the entryLock
// on the new entry should initiate an immediate sync with the backing store.
//
// cdb.lock must be held when calling.
func (cdb *CachingDB) insertNode(nodeID storj.NodeID, now time.Time) *cachedNodeReputationInfo {
syncTime := cdb.nextTimeForSync(nodeID, now)
mut := &cachedNodeReputationInfo{
nodeID: nodeID,
syncAt: syncTime,
syncError: notPopulated,
errorRetryAt: time.Time{}, // sync will be initiated right away
mutations: Mutations{
OnlineHistory: &pb.AuditHistory{},
},
}
cdb.pending[nodeID] = mut
heap.Push(&cdb.writeOrderHeap, mut)
return mut
}
// SetNowFunc supplies a new function to use for determining the current time,
// for synchronization timing and scheduling purposes. This is frequently useful
// in test scenarios.
func (cdb *CachingDB) SetNowFunc(timeFunc func() time.Time) {
cdb.nowFunc = timeFunc
}
// notPopulated is an error indicating that a cachedNodeReputationInfo
// structure has not yet been populated. The syncError field is initialized
// to this error, and the first access of the entry should cause an immediate
// lookup to the backing store. Therefore, this error should not normally
// escape outside writecache code.
var notPopulated = Error.New("not populated")
// nodeIDHeap is a heap of cachedNodeReputationInfo entries, ordered by the
// associated syncAt times. It implements heap.Interface.
type nodeIDHeap []*cachedNodeReputationInfo
// Len returns the length of the slice.
func (n nodeIDHeap) Len() int {
return len(n)
}
// Swap swaps the elements with indices i and j.
func (n nodeIDHeap) Swap(i, j int) {
n[i], n[j] = n[j], n[i]
}
// Less returns true if the syncAt time for the element with index i comes
// before the syncAt time for the element with index j.
func (n nodeIDHeap) Less(i, j int) bool {
return n[i].syncAt.Before(n[j].syncAt)
}
// Push appends an element to the slice.
func (n *nodeIDHeap) Push(x interface{}) {
*n = append(*n, x.(*cachedNodeReputationInfo))
}
// Pop removes and returns the last element in the slice.
func (n *nodeIDHeap) Pop() interface{} {
oldLen := len(*n)
item := (*n)[oldLen-1]
*n = (*n)[:oldLen-1]
return item
}