storagenode/reputation: offline_under_review_at added

Change-Id: Ia7ec79b2d6f20fe29de0c36223f9485380d2845c
This commit is contained in:
Qweder93 2020-09-02 18:37:54 +03:00
parent 7d9897b7af
commit 36d752e92d
11 changed files with 194 additions and 27 deletions

2
go.mod
View File

@ -42,7 +42,7 @@ require (
golang.org/x/sys v0.0.0-20200808120158-1030fc2bf1d9
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
golang.org/x/tools v0.0.0-20200428211428-0c9eba77bc32 // indirect
storj.io/common v0.0.0-20200902103030-f5a3ffb9dfaf
storj.io/common v0.0.0-20200902145110-08513ed10a7d
storj.io/drpc v0.0.14
storj.io/monkit-jaeger v0.0.0-20200518165323-80778fc3f91b
storj.io/private v0.0.0-20200818170340-c2963305092f

2
go.sum
View File

@ -732,6 +732,8 @@ storj.io/common v0.0.0-20200831132053-0001821ca995 h1:6Kdfm+kjIBZjuixAyyZIixgAJS
storj.io/common v0.0.0-20200831132053-0001821ca995/go.mod h1:ILr54ISCqCQ6MmIwT7eaR/fEGrBfgfxiPt8nmpWqnUM=
storj.io/common v0.0.0-20200902103030-f5a3ffb9dfaf h1:nAwoJql2NF89sN+k3EplJzP582T1VKBcmBT/R/z8haM=
storj.io/common v0.0.0-20200902103030-f5a3ffb9dfaf/go.mod h1:ILr54ISCqCQ6MmIwT7eaR/fEGrBfgfxiPt8nmpWqnUM=
storj.io/common v0.0.0-20200902145110-08513ed10a7d h1:4wHqMxq39SW2LaUveSwUIt7GY/kM+ANeqeoKmkuAJ7k=
storj.io/common v0.0.0-20200902145110-08513ed10a7d/go.mod h1:ILr54ISCqCQ6MmIwT7eaR/fEGrBfgfxiPt8nmpWqnUM=
storj.io/drpc v0.0.11/go.mod h1:TiFc2obNjL9/3isMW1Rpxjy8V9uE0B2HMeMFGiiI7Iw=
storj.io/drpc v0.0.11/go.mod h1:TiFc2obNjL9/3isMW1Rpxjy8V9uE0B2HMeMFGiiI7Iw=
storj.io/drpc v0.0.14 h1:GCBdymTt1BRw4oHmmUZZlxYXLVRxxYj6x3Ivide2J+I=

View File

@ -81,11 +81,12 @@ func (e *Endpoint) GetStats(ctx context.Context, req *pb.GetStatsRequest) (_ *pb
ReputationScore: auditScore,
UnknownReputationScore: unknownScore,
},
OnlineScore: node.Reputation.OnlineScore,
Disqualified: node.Disqualified,
Suspended: node.UnknownAuditSuspended,
OfflineSuspended: node.OfflineSuspended,
JoinedAt: node.CreatedAt,
OnlineScore: node.Reputation.OnlineScore,
Disqualified: node.Disqualified,
Suspended: node.UnknownAuditSuspended,
OfflineSuspended: node.OfflineSuspended,
OfflineUnderReview: node.OfflineUnderReview,
JoinedAt: node.CreatedAt,
}, nil
}

View File

@ -93,12 +93,13 @@ func (s *Service) GetReputationStats(ctx context.Context, satelliteID storj.Node
UnknownBeta: audit.GetUnknownReputationBeta(),
UnknownScore: audit.GetUnknownReputationScore(),
},
OnlineScore: resp.OnlineScore,
DisqualifiedAt: resp.GetDisqualified(),
SuspendedAt: resp.GetSuspended(),
OfflineSuspendedAt: resp.GetOfflineSuspended(),
UpdatedAt: time.Now(),
JoinedAt: resp.JoinedAt,
OnlineScore: resp.OnlineScore,
DisqualifiedAt: resp.GetDisqualified(),
SuspendedAt: resp.GetSuspended(),
OfflineSuspendedAt: resp.GetOfflineSuspended(),
OfflineUnderReviewAt: resp.GetOfflineUnderReview(),
UpdatedAt: time.Now(),
JoinedAt: resp.JoinedAt,
}, nil
}

View File

@ -30,9 +30,10 @@ type Stats struct {
Audit Metric
OnlineScore float64
DisqualifiedAt *time.Time
SuspendedAt *time.Time
OfflineSuspendedAt *time.Time
DisqualifiedAt *time.Time
SuspendedAt *time.Time
OfflineSuspendedAt *time.Time
OfflineUnderReviewAt *time.Time
UpdatedAt time.Time
JoinedAt time.Time

View File

@ -41,10 +41,13 @@ func TestReputationDBGetInsert(t *testing.T) {
UnknownBeta: 12,
UnknownScore: 13,
},
DisqualifiedAt: &timestamp,
SuspendedAt: &timestamp,
UpdatedAt: timestamp,
JoinedAt: timestamp,
OnlineScore: 14,
OfflineUnderReviewAt: &timestamp,
OfflineSuspendedAt: &timestamp,
DisqualifiedAt: &timestamp,
SuspendedAt: &timestamp,
UpdatedAt: timestamp,
JoinedAt: timestamp,
}
t.Run("insert", func(t *testing.T) {
@ -61,6 +64,9 @@ func TestReputationDBGetInsert(t *testing.T) {
assert.True(t, res.SuspendedAt.Equal(*stats.SuspendedAt))
assert.True(t, res.UpdatedAt.Equal(stats.UpdatedAt))
assert.True(t, res.JoinedAt.Equal(stats.JoinedAt))
assert.True(t, res.OfflineSuspendedAt.Equal(*stats.OfflineSuspendedAt))
assert.True(t, res.OfflineUnderReviewAt.Equal(*stats.OfflineUnderReviewAt))
assert.Equal(t, res.OnlineScore, stats.OnlineScore)
compareReputationMetric(t, &res.Uptime, &stats.Uptime)
compareReputationMetric(t, &res.Audit, &stats.Audit)
@ -96,10 +102,13 @@ func TestReputationDBGetAll(t *testing.T) {
UnknownBeta: float64(i + 12),
UnknownScore: float64(i + 13),
},
DisqualifiedAt: &timestamp,
SuspendedAt: &timestamp,
UpdatedAt: timestamp,
JoinedAt: timestamp,
OnlineScore: float64(i + 14),
OfflineUnderReviewAt: &timestamp,
OfflineSuspendedAt: &timestamp,
DisqualifiedAt: &timestamp,
SuspendedAt: &timestamp,
UpdatedAt: timestamp,
JoinedAt: timestamp,
}
err := reputationDB.Store(ctx, rep)
@ -121,6 +130,9 @@ func TestReputationDBGetAll(t *testing.T) {
assert.Equal(t, rep.SuspendedAt, stats[0].SuspendedAt)
assert.Equal(t, rep.UpdatedAt, stats[0].UpdatedAt)
assert.Equal(t, rep.JoinedAt, stats[0].JoinedAt)
assert.Equal(t, rep.OfflineSuspendedAt, stats[0].OfflineSuspendedAt)
assert.Equal(t, rep.OfflineUnderReviewAt, stats[0].OfflineUnderReviewAt)
assert.Equal(t, rep.OnlineScore, stats[0].OnlineScore)
compareReputationMetric(t, &rep.Uptime, &stats[0].Uptime)
compareReputationMetric(t, &rep.Audit, &stats[0].Audit)

View File

@ -1378,7 +1378,7 @@ func (db *DB) Migration(ctx context.Context) *migrate.Migration {
},
{
DB: db.reputationDB,
Description: "Add unknown_audit_reputation_alpha and unknown_audit_reputation_beta fields to satellites db",
Description: "Add unknown_audit_reputation_alpha and unknown_audit_reputation_beta fields to reputation db",
Version: 39,
Action: migrate.Func(func(ctx context.Context, _ *zap.Logger, rdb tagsql.DB, rtx tagsql.Tx) (err error) {
_, err = rtx.Exec(ctx, `ALTER TABLE reputation ADD COLUMN audit_unknown_reputation_alpha REAL`)
@ -1449,7 +1449,7 @@ func (db *DB) Migration(ctx context.Context) *migrate.Migration {
},
{
DB: db.reputationDB,
Description: "Add unknown_audit_reputation_score field to satellites db",
Description: "Add unknown_audit_reputation_score field to reputation db",
Version: 40,
Action: migrate.Func(func(ctx context.Context, _ *zap.Logger, rdb tagsql.DB, rtx tagsql.Tx) (err error) {
stx, err := db.satellitesDB.Begin(ctx)
@ -1597,7 +1597,7 @@ func (db *DB) Migration(ctx context.Context) *migrate.Migration {
},
{
DB: db.reputationDB,
Description: "Add online_score and offline_suspended fields to satellites db, rename disqualified and suspended to disqualified_at and suspended_at",
Description: "Add online_score and offline_suspended fields to reputation db, rename disqualified and suspended to disqualified_at and suspended_at",
Version: 44,
Action: migrate.Func(func(ctx context.Context, _ *zap.Logger, rdb tagsql.DB, rtx tagsql.Tx) (err error) {
stx, err := db.satellitesDB.Begin(ctx)
@ -1691,6 +1691,86 @@ func (db *DB) Migration(ctx context.Context) *migrate.Migration {
return errs.Wrap(err)
}
return nil
}),
},
{
DB: db.reputationDB,
Description: "Add offline_under_review_at field to reputation db",
Version: 45,
Action: migrate.Func(func(ctx context.Context, _ *zap.Logger, rdb tagsql.DB, rtx tagsql.Tx) (err error) {
stx, err := db.satellitesDB.Begin(ctx)
if err != nil {
return errs.Wrap(err)
}
defer func() {
if err != nil {
err = errs.Combine(err, stx.Rollback())
} else {
err = errs.Wrap(stx.Commit())
}
}()
_, err = rtx.Exec(ctx, `ALTER TABLE reputation ADD COLUMN offline_under_review_at TIMESTAMP`)
if err != nil {
return errs.Wrap(err)
}
_, err = rtx.Exec(ctx, `
CREATE TABLE reputation_new (
satellite_id BLOB NOT NULL,
uptime_success_count INTEGER NOT NULL,
uptime_total_count INTEGER NOT NULL,
uptime_reputation_alpha REAL NOT NULL,
uptime_reputation_beta REAL NOT NULL,
uptime_reputation_score REAL NOT NULL,
audit_success_count INTEGER NOT NULL,
audit_total_count INTEGER NOT NULL,
audit_reputation_alpha REAL NOT NULL,
audit_reputation_beta REAL NOT NULL,
audit_reputation_score REAL NOT NULL,
audit_unknown_reputation_alpha REAL NOT NULL,
audit_unknown_reputation_beta REAL NOT NULL,
audit_unknown_reputation_score REAL NOT NULL,
online_score REAL NOT NULL,
disqualified_at TIMESTAMP,
updated_at TIMESTAMP NOT NULL,
suspended_at TIMESTAMP,
offline_suspended_at TIMESTAMP,
offline_under_review_at TIMESTAMP,
joined_at TIMESTAMP NOT NULL,
PRIMARY KEY (satellite_id)
);
INSERT INTO reputation_new SELECT
satellite_id,
uptime_success_count,
uptime_total_count,
uptime_reputation_alpha,
uptime_reputation_beta,
uptime_reputation_score,
audit_success_count,
audit_total_count,
audit_reputation_alpha,
audit_reputation_beta,
audit_reputation_score,
audit_unknown_reputation_alpha,
audit_unknown_reputation_beta,
audit_unknown_reputation_score,
online_score,
disqualified_at,
updated_at,
suspended_at,
offline_suspended_at,
offline_under_review_at,
joined_at
FROM reputation;
DROP TABLE reputation;
ALTER TABLE reputation_new RENAME TO reputation;
`)
if err != nil {
return errs.Wrap(err)
}
return nil
}),
},

View File

@ -48,9 +48,10 @@ func (db *reputationDB) Store(ctx context.Context, stats reputation.Stats) (err
disqualified_at,
suspended_at,
offline_suspended_at,
offline_under_review_at,
updated_at,
joined_at
) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)`
) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)`
// ensure we insert utc
if stats.DisqualifiedAt != nil {
@ -65,6 +66,10 @@ func (db *reputationDB) Store(ctx context.Context, stats reputation.Stats) (err
utc := stats.OfflineSuspendedAt.UTC()
stats.OfflineSuspendedAt = &utc
}
if stats.OfflineUnderReviewAt != nil {
utc := stats.OfflineUnderReviewAt.UTC()
stats.OfflineUnderReviewAt = &utc
}
_, err = db.ExecContext(ctx, query,
stats.SatelliteID,
@ -85,6 +90,7 @@ func (db *reputationDB) Store(ctx context.Context, stats reputation.Stats) (err
stats.DisqualifiedAt,
stats.SuspendedAt,
stats.OfflineSuspendedAt,
stats.OfflineUnderReviewAt,
stats.UpdatedAt.UTC(),
stats.JoinedAt.UTC(),
)
@ -118,6 +124,7 @@ func (db *reputationDB) Get(ctx context.Context, satelliteID storj.NodeID) (_ *r
disqualified_at,
suspended_at,
offline_suspended_at,
offline_under_review_at,
updated_at,
joined_at
FROM reputation WHERE satellite_id = ?`,
@ -142,6 +149,7 @@ func (db *reputationDB) Get(ctx context.Context, satelliteID storj.NodeID) (_ *r
&stats.DisqualifiedAt,
&stats.SuspendedAt,
&stats.OfflineSuspendedAt,
&stats.OfflineUnderReviewAt,
&stats.UpdatedAt,
&stats.JoinedAt,
)
@ -175,6 +183,7 @@ func (db *reputationDB) All(ctx context.Context) (_ []reputation.Stats, err erro
disqualified_at,
suspended_at,
offline_suspended_at,
offline_under_review_at,
updated_at,
joined_at
FROM reputation`
@ -208,6 +217,7 @@ func (db *reputationDB) All(ctx context.Context) (_ []reputation.Stats, err erro
&stats.DisqualifiedAt,
&stats.SuspendedAt,
&stats.OfflineSuspendedAt,
&stats.OfflineUnderReviewAt,
&stats.UpdatedAt,
&stats.JoinedAt,
)

View File

@ -576,6 +576,11 @@ func Schema() map[string]*dbschema.Schema {
Type: "TIMESTAMP",
IsNullable: true,
},
&dbschema.Column{
Name: "offline_under_review_at",
Type: "TIMESTAMP",
IsNullable: true,
},
&dbschema.Column{
Name: "online_score",
Type: "REAL",

View File

@ -59,6 +59,7 @@ var States = MultiDBStates{
&v42,
&v43,
&v44,
&v45,
},
}

View File

@ -0,0 +1,54 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package testdata
import "storj.io/storj/storagenode/storagenodedb"
var v45 = MultiDBState{
Version: 45,
DBStates: DBStates{
storagenodedb.UsedSerialsDBName: v43.DBStates[storagenodedb.UsedSerialsDBName],
storagenodedb.StorageUsageDBName: v43.DBStates[storagenodedb.StorageUsageDBName],
storagenodedb.ReputationDBName: &DBState{
SQL: `
-- tables to store nodestats cache
CREATE TABLE reputation (
satellite_id BLOB NOT NULL,
uptime_success_count INTEGER NOT NULL,
uptime_total_count INTEGER NOT NULL,
uptime_reputation_alpha REAL NOT NULL,
uptime_reputation_beta REAL NOT NULL,
uptime_reputation_score REAL NOT NULL,
audit_success_count INTEGER NOT NULL,
audit_total_count INTEGER NOT NULL,
audit_reputation_alpha REAL NOT NULL,
audit_reputation_beta REAL NOT NULL,
audit_reputation_score REAL NOT NULL,
audit_unknown_reputation_alpha REAL NOT NULL,
audit_unknown_reputation_beta REAL NOT NULL,
audit_unknown_reputation_score REAL NOT NULL,
online_score REAL NOT NULL,
disqualified_at TIMESTAMP,
updated_at TIMESTAMP NOT NULL,
suspended_at TIMESTAMP,
offline_suspended_at TIMESTAMP,
offline_under_review_at TIMESTAMP,
joined_at TIMESTAMP NOT NULL,
PRIMARY KEY (satellite_id)
);
INSERT INTO reputation VALUES(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000',1,1,1.0,1.0,1.0,1,1,1.0,1.0,1.0,1.0,1.0,1.0,1.0,'2019-07-19 20:00:00+00:00','2019-08-23 20:00:00+00:00',NULL,NULL,NULL,'1970-01-01 00:00:00+00:00');
`,
},
storagenodedb.PieceSpaceUsedDBName: v43.DBStates[storagenodedb.PieceSpaceUsedDBName],
storagenodedb.PieceInfoDBName: v43.DBStates[storagenodedb.PieceInfoDBName],
storagenodedb.PieceExpirationDBName: v43.DBStates[storagenodedb.PieceExpirationDBName],
storagenodedb.OrdersDBName: v43.DBStates[storagenodedb.OrdersDBName],
storagenodedb.BandwidthDBName: v43.DBStates[storagenodedb.BandwidthDBName],
storagenodedb.SatellitesDBName: v43.DBStates[storagenodedb.SatellitesDBName],
storagenodedb.DeprecatedInfoDBName: v43.DBStates[storagenodedb.DeprecatedInfoDBName],
storagenodedb.NotificationsDBName: v43.DBStates[storagenodedb.NotificationsDBName],
storagenodedb.HeldAmountDBName: v43.DBStates[storagenodedb.HeldAmountDBName],
storagenodedb.PricingDBName: v43.DBStates[storagenodedb.PricingDBName],
},
}