satellite/satellitedb/reputation: replace audit_histories table with

reputation

Change-Id: I18417eaa0d146cec876020dc6a358d13992e1d5f
This commit is contained in:
Yingrong Zhao 2021-07-28 18:29:46 -04:00
parent b35b07081e
commit d3e51353ab
5 changed files with 27 additions and 96 deletions

View File

@ -4,23 +4,11 @@
package reputation
import (
"context"
"time"
"storj.io/common/pb"
"storj.io/common/storj"
)
// AuditHistoryDB implements the database for audit history.
//
// architecture: Database
type AuditHistoryDB interface {
// UpdateAuditHistory updates a node's audit history with an online or offline audit.
UpdateAuditHistory(ctx context.Context, nodeID storj.NodeID, auditTime time.Time, online bool, config AuditHistoryConfig) (*UpdateAuditHistoryResponse, error)
// GetAuditHistory gets a node's audit history.
GetAuditHistory(ctx context.Context, nodeID storj.NodeID) (*AuditHistory, error)
}
// AuditHistory represents a node's audit history for the most recent tracking period.
type AuditHistory struct {
Score float64
@ -31,6 +19,7 @@ type AuditHistory struct {
type UpdateAuditHistoryResponse struct {
NewScore float64
TrackingPeriodFull bool
History []byte
}
// AuditWindow represents the number of online and total audits a node received for a specific time period.
@ -40,20 +29,6 @@ type AuditWindow struct {
OnlineCount int32
}
// UpdateAuditHistory updates a node's audit history with an online or offline audit.
func (service *Service) UpdateAuditHistory(ctx context.Context, nodeID storj.NodeID, auditTime time.Time, online bool) (res *UpdateAuditHistoryResponse, err error) {
defer mon.Task()(&ctx)(&err)
return service.db.UpdateAuditHistory(ctx, nodeID, auditTime, online, service.config.AuditHistory)
}
// GetAuditHistory gets a node's audit history.
func (service *Service) GetAuditHistory(ctx context.Context, nodeID storj.NodeID) (auditHistory *AuditHistory, err error) {
defer mon.Task()(&ctx)(&err)
return service.db.GetAuditHistory(ctx, nodeID)
}
// AuditHistoryToPB converts an overlay.AuditHistory to a pb.AuditHistory.
func AuditHistoryToPB(auditHistory AuditHistory) (historyPB *pb.AuditHistory) {
historyPB = &pb.AuditHistory{

View File

@ -10,9 +10,11 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"storj.io/common/pb"
"storj.io/common/testcontext"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/satellite/internalpb"
)
func TestAuditHistoryBasic(t *testing.T) {
@ -28,20 +30,23 @@ func TestAuditHistoryBasic(t *testing.T) {
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
node := planet.StorageNodes[0]
service := planet.Satellites[0].Reputation.Service
db := planet.Satellites[0].DB.Reputation()
startingWindow := time.Now().Truncate(time.Hour)
windowsInTrackingPeriod := int(trackingPeriod.Seconds() / windowSize.Seconds())
currentWindow := startingWindow
config := planet.Satellites[0].Config.Reputation.AuditHistory
newHistory := &internalpb.AuditHistory{}
historyBytes, err := pb.Marshal(newHistory)
require.NoError(t, err)
// online score should be 1 until the first window is finished
res, err := service.UpdateAuditHistory(ctx, node.ID(), currentWindow.Add(2*time.Minute), false)
res, err := db.UpdateAuditHistory(ctx, historyBytes, currentWindow.Add(2*time.Minute), false, config)
require.NoError(t, err)
require.EqualValues(t, 1, res.NewScore)
require.False(t, res.TrackingPeriodFull)
res, err = service.UpdateAuditHistory(ctx, node.ID(), currentWindow.Add(20*time.Minute), true)
res, err = db.UpdateAuditHistory(ctx, res.History, currentWindow.Add(20*time.Minute), true, config)
require.NoError(t, err)
require.EqualValues(t, 1, res.NewScore)
require.False(t, res.TrackingPeriodFull)
@ -50,12 +55,12 @@ func TestAuditHistoryBasic(t *testing.T) {
currentWindow = currentWindow.Add(time.Hour)
// online score should be now be 0.5 since the first window is complete with one online audit and one offline audit
res, err = service.UpdateAuditHistory(ctx, node.ID(), currentWindow.Add(2*time.Minute), false)
res, err = db.UpdateAuditHistory(ctx, res.History, currentWindow.Add(2*time.Minute), false, config)
require.NoError(t, err)
require.EqualValues(t, 0.5, res.NewScore)
require.False(t, res.TrackingPeriodFull)
res, err = service.UpdateAuditHistory(ctx, node.ID(), currentWindow.Add(20*time.Minute), true)
res, err = db.UpdateAuditHistory(ctx, res.History, currentWindow.Add(20*time.Minute), true, config)
require.NoError(t, err)
require.EqualValues(t, 0.5, res.NewScore)
require.False(t, res.TrackingPeriodFull)
@ -64,17 +69,17 @@ func TestAuditHistoryBasic(t *testing.T) {
currentWindow = currentWindow.Add(time.Hour)
// try to add an audit for an old window, expect error
_, err = service.UpdateAuditHistory(ctx, node.ID(), startingWindow, true)
_, err = db.UpdateAuditHistory(ctx, res.History, startingWindow, true, config)
require.Error(t, err)
// add another online audit for the latest window; score should still be 0.5
res, err = service.UpdateAuditHistory(ctx, node.ID(), currentWindow, true)
res, err = db.UpdateAuditHistory(ctx, res.History, currentWindow, true, config)
require.NoError(t, err)
require.EqualValues(t, 0.5, res.NewScore)
// now that we have two full windows other than the current one, tracking period should be considered full.
require.True(t, res.TrackingPeriodFull)
// add another online audit for the latest window; score should still be 0.5
res, err = service.UpdateAuditHistory(ctx, node.ID(), currentWindow.Add(45*time.Minute), true)
res, err = db.UpdateAuditHistory(ctx, res.History, currentWindow.Add(45*time.Minute), true, config)
require.NoError(t, err)
require.EqualValues(t, 0.5, res.NewScore)
require.True(t, res.TrackingPeriodFull)
@ -85,7 +90,7 @@ func TestAuditHistoryBasic(t *testing.T) {
// window gets included in the tracking period, and the earliest 0.5 window gets dropped.
expectedScore := (0.5*float64(windowsInTrackingPeriod-1) + 1) / float64(windowsInTrackingPeriod)
// add online audit for next window; score should now be expectedScore
res, err = service.UpdateAuditHistory(ctx, node.ID(), currentWindow.Add(time.Minute), true)
res, err = db.UpdateAuditHistory(ctx, res.History, currentWindow.Add(time.Minute), true, config)
require.NoError(t, err)
require.EqualValues(t, expectedScore, res.NewScore)
require.True(t, res.TrackingPeriodFull)

View File

@ -29,8 +29,8 @@ type DB interface {
DisqualifyNode(ctx context.Context, nodeID storj.NodeID) (err error)
// SuspendNodeUnknownAudit suspends a storage node for unknown audits.
SuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error)
AuditHistoryDB
// UpdateAuditHistory updates a node's audit history
UpdateAuditHistory(ctx context.Context, oldHistory []byte, auditTime time.Time, online bool, config AuditHistoryConfig) (res *UpdateAuditHistoryResponse, err error)
}
// Info contains all reputation data to be stored in DB.

View File

@ -77,26 +77,7 @@ func addAudit(a *internalpb.AuditHistory, auditTime time.Time, online bool, conf
return nil
}
// UpdateAuditHistory updates a node's audit history with an online or offline audit.
func (reputations *reputations) UpdateAuditHistory(ctx context.Context, nodeID storj.NodeID, auditTime time.Time, online bool, config reputation.AuditHistoryConfig) (res *reputation.UpdateAuditHistoryResponse, err error) {
defer mon.Task()(&ctx)(&err)
err = reputations.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) (err error) {
_, err = tx.Tx.ExecContext(ctx, "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
if err != nil {
return err
}
res, err = reputations.updateAuditHistoryWithTx(ctx, tx, nodeID, auditTime, online, config)
if err != nil {
return err
}
return nil
})
return res, err
}
func (reputations *reputations) updateAuditHistoryWithTx(ctx context.Context, tx *dbx.Tx, nodeID storj.NodeID, auditTime time.Time, online bool, config reputation.AuditHistoryConfig) (res *reputation.UpdateAuditHistoryResponse, err error) {
func (reputations *reputations) UpdateAuditHistory(ctx context.Context, oldHistory []byte, auditTime time.Time, online bool, config reputation.AuditHistoryConfig) (res *reputation.UpdateAuditHistoryResponse, err error) {
defer mon.Task()(&ctx)(&err)
res = &reputation.UpdateAuditHistoryResponse{
@ -104,24 +85,9 @@ func (reputations *reputations) updateAuditHistoryWithTx(ctx context.Context, tx
TrackingPeriodFull: false,
}
// get and deserialize node audit history
historyBytes := []byte{}
newEntry := false
dbAuditHistory, err := tx.Get_AuditHistory_By_NodeId(
ctx,
dbx.AuditHistory_NodeId(nodeID.Bytes()),
)
if errs.Is(err, sql.ErrNoRows) {
// set flag to true so we know to create rather than update later
newEntry = true
} else if err != nil {
return res, Error.Wrap(err)
} else {
historyBytes = dbAuditHistory.History
}
// deserialize node audit history
history := &internalpb.AuditHistory{}
err = pb.Unmarshal(historyBytes, history)
err = pb.Unmarshal(oldHistory, history)
if err != nil {
return res, err
}
@ -131,29 +97,11 @@ func (reputations *reputations) updateAuditHistoryWithTx(ctx context.Context, tx
return res, err
}
historyBytes, err = pb.Marshal(history)
res.History, err = pb.Marshal(history)
if err != nil {
return res, err
}
// if the entry did not exist at the beginning, create a new one. Otherwise update
if newEntry {
_, err = tx.Create_AuditHistory(
ctx,
dbx.AuditHistory_NodeId(nodeID.Bytes()),
dbx.AuditHistory_History(historyBytes),
)
return res, Error.Wrap(err)
}
_, err = tx.Update_AuditHistory_By_NodeId(
ctx,
dbx.AuditHistory_NodeId(nodeID.Bytes()),
dbx.AuditHistory_Update_Fields{
History: dbx.AuditHistory_History(historyBytes),
},
)
windowsPerTrackingPeriod := int(config.TrackingPeriod.Seconds() / config.WindowSize.Seconds())
res.TrackingPeriodFull = len(history.Windows)-1 >= windowsPerTrackingPeriod
res.NewScore = history.Score

View File

@ -76,7 +76,7 @@ func (reputations *reputations) Update(ctx context.Context, updateReq reputation
}
isUp := updateReq.AuditOutcome != reputation.AuditOffline
auditHistoryResponse, err := reputations.updateAuditHistoryWithTx(ctx, tx, nodeID, now, isUp, updateReq.AuditHistory)
auditHistoryResponse, err := reputations.UpdateAuditHistory(ctx, dbNode.AuditHistory, now, isUp, updateReq.AuditHistory)
if err != nil {
return err
}
@ -283,7 +283,10 @@ func (reputations *reputations) Init(ctx context.Context, nodeID storj.NodeID) e
func (reputations *reputations) populateUpdateFields(dbNode *dbx.Reputation, updateReq reputation.UpdateRequest, auditHistoryResponse *reputation.UpdateAuditHistoryResponse, now time.Time) dbx.Reputation_Update_Fields {
update := reputations.populateUpdateNodeStats(dbNode, updateReq, auditHistoryResponse, now)
updateFields := dbx.Reputation_Update_Fields{}
updateFields := dbx.Reputation_Update_Fields{
AuditHistory: dbx.Reputation_AuditHistory(auditHistoryResponse.History),
}
if update.VettedAt.set {
updateFields.VettedAt = dbx.Reputation_VettedAt(update.VettedAt.value)
}