storagenode/storagenodedb: Add audit_history to storagenode reputationdb

This change will give us the ability to make audit windows visible via
the storagenode api.

Change-Id: Ifb3d044ce9b456c16f8ea3b3b162ba894426477a
This commit is contained in:
Moby von Briesen 2020-12-30 14:47:14 -05:00
parent edbee53888
commit 39260b6fdc
6 changed files with 137 additions and 1 deletions

View File

@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"storj.io/common/pb"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/storagenode"
@ -67,6 +68,7 @@ func TestReputationDBGetInsert(t *testing.T) {
assert.True(t, res.OfflineSuspendedAt.Equal(*stats.OfflineSuspendedAt))
assert.True(t, res.OfflineUnderReviewAt.Equal(*stats.OfflineUnderReviewAt))
assert.Equal(t, res.OnlineScore, stats.OnlineScore)
assert.Nil(t, res.AuditHistory)
compareReputationMetric(t, &res.Uptime, &stats.Uptime)
compareReputationMetric(t, &res.Audit, &stats.Audit)
@ -133,6 +135,7 @@ func TestReputationDBGetAll(t *testing.T) {
assert.Equal(t, rep.OfflineSuspendedAt, stats[0].OfflineSuspendedAt)
assert.Equal(t, rep.OfflineUnderReviewAt, stats[0].OfflineUnderReviewAt)
assert.Equal(t, rep.OnlineScore, stats[0].OnlineScore)
assert.Nil(t, rep.AuditHistory)
compareReputationMetric(t, &rep.Uptime, &stats[0].Uptime)
compareReputationMetric(t, &rep.Audit, &stats[0].Audit)
@ -149,3 +152,44 @@ func compareReputationMetric(t *testing.T, a, b *reputation.Metric) {
assert.Equal(t, a.Beta, b.Beta)
assert.Equal(t, a.Score, b.Score)
}
func TestReputationDBGetInsertAuditHistory(t *testing.T) {
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {
timestamp := time.Now()
reputationDB := db.Reputation()
stats := reputation.Stats{
SatelliteID: testrand.NodeID(),
Uptime: reputation.Metric{},
Audit: reputation.Metric{},
AuditHistory: &pb.AuditHistory{
Score: 0.5,
Windows: []*pb.AuditWindow{
{
WindowStart: timestamp,
OnlineCount: 5,
TotalCount: 10,
},
},
},
}
t.Run("insert", func(t *testing.T) {
err := reputationDB.Store(ctx, stats)
assert.NoError(t, err)
})
t.Run("get", func(t *testing.T) {
res, err := reputationDB.Get(ctx, stats.SatelliteID)
assert.NoError(t, err)
assert.Equal(t, res.AuditHistory.Score, stats.AuditHistory.Score)
assert.Equal(t, len(res.AuditHistory.Windows), len(stats.AuditHistory.Windows))
resWindow := res.AuditHistory.Windows[0]
statsWindow := stats.AuditHistory.Windows[0]
assert.True(t, resWindow.WindowStart.Equal(statsWindow.WindowStart))
assert.Equal(t, resWindow.TotalCount, statsWindow.TotalCount)
assert.Equal(t, resWindow.OnlineCount, statsWindow.OnlineCount)
})
})
}

View File

@ -1830,6 +1830,14 @@ func (db *DB) Migration(ctx context.Context) *migrate.Migration {
);`,
},
},
{
DB: &db.reputationDB.DB,
Description: "Add audit_history field to reputation db",
Version: 47,
Action: migrate.SQL{
`ALTER TABLE reputation ADD COLUMN audit_history BLOB`,
},
},
},
}
}

View File

@ -10,6 +10,7 @@ import (
"github.com/zeebo/errs"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/storj/storagenode/reputation"
)
@ -45,13 +46,14 @@ func (db *reputationDB) Store(ctx context.Context, stats reputation.Stats) (err
audit_unknown_reputation_beta,
audit_unknown_reputation_score,
online_score,
audit_history,
disqualified_at,
suspended_at,
offline_suspended_at,
offline_under_review_at,
updated_at,
joined_at
) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)`
) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)`
// ensure we insert utc
if stats.DisqualifiedAt != nil {
@ -71,6 +73,14 @@ func (db *reputationDB) Store(ctx context.Context, stats reputation.Stats) (err
stats.OfflineUnderReviewAt = &utc
}
var auditHistoryBytes []byte
if stats.AuditHistory != nil {
auditHistoryBytes, err = pb.Marshal(stats.AuditHistory)
if err != nil {
return ErrReputation.Wrap(err)
}
}
_, err = db.ExecContext(ctx, query,
stats.SatelliteID,
stats.Uptime.SuccessCount,
@ -87,6 +97,7 @@ func (db *reputationDB) Store(ctx context.Context, stats reputation.Stats) (err
stats.Audit.UnknownBeta,
stats.Audit.UnknownScore,
stats.OnlineScore,
auditHistoryBytes,
stats.DisqualifiedAt,
stats.SuspendedAt,
stats.OfflineSuspendedAt,
@ -121,6 +132,7 @@ func (db *reputationDB) Get(ctx context.Context, satelliteID storj.NodeID) (_ *r
audit_unknown_reputation_beta,
audit_unknown_reputation_score,
online_score,
audit_history,
disqualified_at,
suspended_at,
offline_suspended_at,
@ -131,6 +143,7 @@ func (db *reputationDB) Get(ctx context.Context, satelliteID storj.NodeID) (_ *r
satelliteID,
)
var auditHistoryBytes []byte
err = row.Scan(
&stats.Uptime.SuccessCount,
&stats.Uptime.TotalCount,
@ -146,6 +159,7 @@ func (db *reputationDB) Get(ctx context.Context, satelliteID storj.NodeID) (_ *r
&stats.Audit.UnknownBeta,
&stats.Audit.UnknownScore,
&stats.OnlineScore,
&auditHistoryBytes,
&stats.DisqualifiedAt,
&stats.SuspendedAt,
&stats.OfflineSuspendedAt,
@ -156,8 +170,16 @@ func (db *reputationDB) Get(ctx context.Context, satelliteID storj.NodeID) (_ *r
if errors.Is(err, sql.ErrNoRows) {
err = nil
return &stats, nil
}
if err != nil {
return &stats, ErrReputation.Wrap(err)
}
if auditHistoryBytes != nil {
stats.AuditHistory = &pb.AuditHistory{}
err = pb.Unmarshal(auditHistoryBytes, stats.AuditHistory)
}
return &stats, ErrReputation.Wrap(err)
}

View File

@ -521,6 +521,11 @@ func Schema() map[string]*dbschema.Schema {
Name: "reputation",
PrimaryKey: []string{"satellite_id"},
Columns: []*dbschema.Column{
&dbschema.Column{
Name: "audit_history",
Type: "BLOB",
IsNullable: true,
},
&dbschema.Column{
Name: "audit_reputation_alpha",
Type: "REAL",

View File

@ -61,6 +61,7 @@ var States = MultiDBStates{
&v44,
&v45,
&v46,
&v47,
},
}

View File

@ -0,0 +1,56 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package testdata
import "storj.io/storj/storagenode/storagenodedb"
var v47 = MultiDBState{
Version: 47,
DBStates: DBStates{
storagenodedb.UsedSerialsDBName: v46.DBStates[storagenodedb.UsedSerialsDBName],
storagenodedb.StorageUsageDBName: v46.DBStates[storagenodedb.StorageUsageDBName],
storagenodedb.ReputationDBName: &DBState{
SQL: `
-- tables to store nodestats cache
CREATE TABLE reputation (
satellite_id BLOB NOT NULL,
uptime_success_count INTEGER NOT NULL,
uptime_total_count INTEGER NOT NULL,
uptime_reputation_alpha REAL NOT NULL,
uptime_reputation_beta REAL NOT NULL,
uptime_reputation_score REAL NOT NULL,
audit_success_count INTEGER NOT NULL,
audit_total_count INTEGER NOT NULL,
audit_reputation_alpha REAL NOT NULL,
audit_reputation_beta REAL NOT NULL,
audit_reputation_score REAL NOT NULL,
audit_unknown_reputation_alpha REAL NOT NULL,
audit_unknown_reputation_beta REAL NOT NULL,
audit_unknown_reputation_score REAL NOT NULL,
online_score REAL NOT NULL,
audit_history BLOB,
disqualified_at TIMESTAMP,
updated_at TIMESTAMP NOT NULL,
suspended_at TIMESTAMP,
offline_suspended_at TIMESTAMP,
offline_under_review_at TIMESTAMP,
joined_at TIMESTAMP NOT NULL,
PRIMARY KEY (satellite_id)
);
INSERT INTO reputation VALUES(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',1,1,1.0,1.0,1.0,1,1,1.0,1.0,1.0,1.0,1.0,1.0,1.0,NULL,'2019-07-19 20:00:00+00:00','2019-08-23 20:00:00+00:00',NULL,NULL,NULL,'1970-01-01 00:00:00+00:00');
`,
},
storagenodedb.PieceSpaceUsedDBName: v46.DBStates[storagenodedb.PieceSpaceUsedDBName],
storagenodedb.PieceInfoDBName: v46.DBStates[storagenodedb.PieceInfoDBName],
storagenodedb.PieceExpirationDBName: v46.DBStates[storagenodedb.PieceExpirationDBName],
storagenodedb.OrdersDBName: v46.DBStates[storagenodedb.OrdersDBName],
storagenodedb.BandwidthDBName: v46.DBStates[storagenodedb.BandwidthDBName],
storagenodedb.SatellitesDBName: v46.DBStates[storagenodedb.SatellitesDBName],
storagenodedb.DeprecatedInfoDBName: v46.DBStates[storagenodedb.DeprecatedInfoDBName],
storagenodedb.NotificationsDBName: v46.DBStates[storagenodedb.NotificationsDBName],
storagenodedb.HeldAmountDBName: v46.DBStates[storagenodedb.HeldAmountDBName],
storagenodedb.PricingDBName: v46.DBStates[storagenodedb.PricingDBName],
storagenodedb.SecretDBName: v46.DBStates[storagenodedb.SecretDBName],
},
}