From 39260b6fdc62c93efdb248d9fc928b28ea82dd37 Mon Sep 17 00:00:00 2001 From: Moby von Briesen Date: Wed, 30 Dec 2020 14:47:14 -0500 Subject: [PATCH] storagenode/storagenodedb: Add audit_history to storagenode reputationdb This change will give us the ability to make audit windows visible via the storagenode api. Change-Id: Ifb3d044ce9b456c16f8ea3b3b162ba894426477a --- storagenode/reputation/reputation_test.go | 44 +++++++++++++++ storagenode/storagenodedb/database.go | 8 +++ storagenode/storagenodedb/reputation.go | 24 +++++++- storagenode/storagenodedb/schema.go | 5 ++ .../storagenodedb/testdata/multidbsnapshot.go | 1 + storagenode/storagenodedb/testdata/v47.go | 56 +++++++++++++++++++ 6 files changed, 137 insertions(+), 1 deletion(-) create mode 100644 storagenode/storagenodedb/testdata/v47.go diff --git a/storagenode/reputation/reputation_test.go b/storagenode/reputation/reputation_test.go index 0721153c0..1bba18d71 100644 --- a/storagenode/reputation/reputation_test.go +++ b/storagenode/reputation/reputation_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "storj.io/common/pb" "storj.io/common/testcontext" "storj.io/common/testrand" "storj.io/storj/storagenode" @@ -67,6 +68,7 @@ func TestReputationDBGetInsert(t *testing.T) { assert.True(t, res.OfflineSuspendedAt.Equal(*stats.OfflineSuspendedAt)) assert.True(t, res.OfflineUnderReviewAt.Equal(*stats.OfflineUnderReviewAt)) assert.Equal(t, res.OnlineScore, stats.OnlineScore) + assert.Nil(t, res.AuditHistory) compareReputationMetric(t, &res.Uptime, &stats.Uptime) compareReputationMetric(t, &res.Audit, &stats.Audit) @@ -133,6 +135,7 @@ func TestReputationDBGetAll(t *testing.T) { assert.Equal(t, rep.OfflineSuspendedAt, stats[0].OfflineSuspendedAt) assert.Equal(t, rep.OfflineUnderReviewAt, stats[0].OfflineUnderReviewAt) assert.Equal(t, rep.OnlineScore, stats[0].OnlineScore) + assert.Nil(t, rep.AuditHistory) compareReputationMetric(t, &rep.Uptime, &stats[0].Uptime) compareReputationMetric(t, &rep.Audit, &stats[0].Audit) @@ -149,3 +152,44 @@ func compareReputationMetric(t *testing.T, a, b *reputation.Metric) { assert.Equal(t, a.Beta, b.Beta) assert.Equal(t, a.Score, b.Score) } + +func TestReputationDBGetInsertAuditHistory(t *testing.T) { + storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) { + timestamp := time.Now() + reputationDB := db.Reputation() + + stats := reputation.Stats{ + SatelliteID: testrand.NodeID(), + Uptime: reputation.Metric{}, + Audit: reputation.Metric{}, + AuditHistory: &pb.AuditHistory{ + Score: 0.5, + Windows: []*pb.AuditWindow{ + { + WindowStart: timestamp, + OnlineCount: 5, + TotalCount: 10, + }, + }, + }, + } + + t.Run("insert", func(t *testing.T) { + err := reputationDB.Store(ctx, stats) + assert.NoError(t, err) + }) + + t.Run("get", func(t *testing.T) { + res, err := reputationDB.Get(ctx, stats.SatelliteID) + assert.NoError(t, err) + + assert.Equal(t, res.AuditHistory.Score, stats.AuditHistory.Score) + assert.Equal(t, len(res.AuditHistory.Windows), len(stats.AuditHistory.Windows)) + resWindow := res.AuditHistory.Windows[0] + statsWindow := stats.AuditHistory.Windows[0] + assert.True(t, resWindow.WindowStart.Equal(statsWindow.WindowStart)) + assert.Equal(t, resWindow.TotalCount, statsWindow.TotalCount) + assert.Equal(t, resWindow.OnlineCount, statsWindow.OnlineCount) + }) + }) +} diff --git a/storagenode/storagenodedb/database.go b/storagenode/storagenodedb/database.go index 2c25740a8..70ac9f348 100644 --- a/storagenode/storagenodedb/database.go +++ b/storagenode/storagenodedb/database.go @@ -1830,6 +1830,14 @@ func (db *DB) Migration(ctx context.Context) *migrate.Migration { );`, }, }, + { + DB: &db.reputationDB.DB, + Description: "Add audit_history field to reputation db", + Version: 47, + Action: migrate.SQL{ + `ALTER TABLE reputation ADD COLUMN audit_history BLOB`, + }, + }, }, } } diff --git a/storagenode/storagenodedb/reputation.go b/storagenode/storagenodedb/reputation.go index 74b14172e..df0607673 100644 --- a/storagenode/storagenodedb/reputation.go +++ b/storagenode/storagenodedb/reputation.go @@ -10,6 +10,7 @@ import ( "github.com/zeebo/errs" + "storj.io/common/pb" "storj.io/common/storj" "storj.io/storj/storagenode/reputation" ) @@ -45,13 +46,14 @@ func (db *reputationDB) Store(ctx context.Context, stats reputation.Stats) (err audit_unknown_reputation_beta, audit_unknown_reputation_score, online_score, + audit_history, disqualified_at, suspended_at, offline_suspended_at, offline_under_review_at, updated_at, joined_at - ) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)` + ) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)` // ensure we insert utc if stats.DisqualifiedAt != nil { @@ -71,6 +73,14 @@ func (db *reputationDB) Store(ctx context.Context, stats reputation.Stats) (err stats.OfflineUnderReviewAt = &utc } + var auditHistoryBytes []byte + if stats.AuditHistory != nil { + auditHistoryBytes, err = pb.Marshal(stats.AuditHistory) + if err != nil { + return ErrReputation.Wrap(err) + } + } + _, err = db.ExecContext(ctx, query, stats.SatelliteID, stats.Uptime.SuccessCount, @@ -87,6 +97,7 @@ func (db *reputationDB) Store(ctx context.Context, stats reputation.Stats) (err stats.Audit.UnknownBeta, stats.Audit.UnknownScore, stats.OnlineScore, + auditHistoryBytes, stats.DisqualifiedAt, stats.SuspendedAt, stats.OfflineSuspendedAt, @@ -121,6 +132,7 @@ func (db *reputationDB) Get(ctx context.Context, satelliteID storj.NodeID) (_ *r audit_unknown_reputation_beta, audit_unknown_reputation_score, online_score, + audit_history, disqualified_at, suspended_at, offline_suspended_at, @@ -131,6 +143,7 @@ func (db *reputationDB) Get(ctx context.Context, satelliteID storj.NodeID) (_ *r satelliteID, ) + var auditHistoryBytes []byte err = row.Scan( &stats.Uptime.SuccessCount, &stats.Uptime.TotalCount, @@ -146,6 +159,7 @@ func (db *reputationDB) Get(ctx context.Context, satelliteID storj.NodeID) (_ *r &stats.Audit.UnknownBeta, &stats.Audit.UnknownScore, &stats.OnlineScore, + &auditHistoryBytes, &stats.DisqualifiedAt, &stats.SuspendedAt, &stats.OfflineSuspendedAt, @@ -156,8 +170,16 @@ func (db *reputationDB) Get(ctx context.Context, satelliteID storj.NodeID) (_ *r if errors.Is(err, sql.ErrNoRows) { err = nil + return &stats, nil + } + if err != nil { + return &stats, ErrReputation.Wrap(err) } + if auditHistoryBytes != nil { + stats.AuditHistory = &pb.AuditHistory{} + err = pb.Unmarshal(auditHistoryBytes, stats.AuditHistory) + } return &stats, ErrReputation.Wrap(err) } diff --git a/storagenode/storagenodedb/schema.go b/storagenode/storagenodedb/schema.go index e1cae71d0..d79494878 100644 --- a/storagenode/storagenodedb/schema.go +++ b/storagenode/storagenodedb/schema.go @@ -521,6 +521,11 @@ func Schema() map[string]*dbschema.Schema { Name: "reputation", PrimaryKey: []string{"satellite_id"}, Columns: []*dbschema.Column{ + &dbschema.Column{ + Name: "audit_history", + Type: "BLOB", + IsNullable: true, + }, &dbschema.Column{ Name: "audit_reputation_alpha", Type: "REAL", diff --git a/storagenode/storagenodedb/testdata/multidbsnapshot.go b/storagenode/storagenodedb/testdata/multidbsnapshot.go index 2a2bca340..7022875dd 100644 --- a/storagenode/storagenodedb/testdata/multidbsnapshot.go +++ b/storagenode/storagenodedb/testdata/multidbsnapshot.go @@ -61,6 +61,7 @@ var States = MultiDBStates{ &v44, &v45, &v46, + &v47, }, } diff --git a/storagenode/storagenodedb/testdata/v47.go b/storagenode/storagenodedb/testdata/v47.go new file mode 100644 index 000000000..50461064b --- /dev/null +++ b/storagenode/storagenodedb/testdata/v47.go @@ -0,0 +1,56 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +package testdata + +import "storj.io/storj/storagenode/storagenodedb" + +var v47 = MultiDBState{ + Version: 47, + DBStates: DBStates{ + storagenodedb.UsedSerialsDBName: v46.DBStates[storagenodedb.UsedSerialsDBName], + storagenodedb.StorageUsageDBName: v46.DBStates[storagenodedb.StorageUsageDBName], + storagenodedb.ReputationDBName: &DBState{ + SQL: ` + -- tables to store nodestats cache + CREATE TABLE reputation ( + satellite_id BLOB NOT NULL, + uptime_success_count INTEGER NOT NULL, + uptime_total_count INTEGER NOT NULL, + uptime_reputation_alpha REAL NOT NULL, + uptime_reputation_beta REAL NOT NULL, + uptime_reputation_score REAL NOT NULL, + audit_success_count INTEGER NOT NULL, + audit_total_count INTEGER NOT NULL, + audit_reputation_alpha REAL NOT NULL, + audit_reputation_beta REAL NOT NULL, + audit_reputation_score REAL NOT NULL, + audit_unknown_reputation_alpha REAL NOT NULL, + audit_unknown_reputation_beta REAL NOT NULL, + audit_unknown_reputation_score REAL NOT NULL, + online_score REAL NOT NULL, + audit_history BLOB, + disqualified_at TIMESTAMP, + updated_at TIMESTAMP NOT NULL, + suspended_at TIMESTAMP, + offline_suspended_at TIMESTAMP, + offline_under_review_at TIMESTAMP, + joined_at TIMESTAMP NOT NULL, + PRIMARY KEY (satellite_id) + ); + INSERT INTO reputation VALUES(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',1,1,1.0,1.0,1.0,1,1,1.0,1.0,1.0,1.0,1.0,1.0,1.0,NULL,'2019-07-19 20:00:00+00:00','2019-08-23 20:00:00+00:00',NULL,NULL,NULL,'1970-01-01 00:00:00+00:00'); + `, + }, + storagenodedb.PieceSpaceUsedDBName: v46.DBStates[storagenodedb.PieceSpaceUsedDBName], + storagenodedb.PieceInfoDBName: v46.DBStates[storagenodedb.PieceInfoDBName], + storagenodedb.PieceExpirationDBName: v46.DBStates[storagenodedb.PieceExpirationDBName], + storagenodedb.OrdersDBName: v46.DBStates[storagenodedb.OrdersDBName], + storagenodedb.BandwidthDBName: v46.DBStates[storagenodedb.BandwidthDBName], + storagenodedb.SatellitesDBName: v46.DBStates[storagenodedb.SatellitesDBName], + storagenodedb.DeprecatedInfoDBName: v46.DBStates[storagenodedb.DeprecatedInfoDBName], + storagenodedb.NotificationsDBName: v46.DBStates[storagenodedb.NotificationsDBName], + storagenodedb.HeldAmountDBName: v46.DBStates[storagenodedb.HeldAmountDBName], + storagenodedb.PricingDBName: v46.DBStates[storagenodedb.PricingDBName], + storagenodedb.SecretDBName: v46.DBStates[storagenodedb.SecretDBName], + }, +}