storj/storagenode/storagenodedb/reputation.go

232 lines
5.2 KiB
Go
Raw Normal View History

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package storagenodedb
import (
"context"
"database/sql"
"errors"
"github.com/zeebo/errs"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/storj/storagenode/reputation"
)
// ErrReputation represents errors from the reputation database.
var ErrReputation = errs.Class("reputation")
storagenode/storagenodedb: refactor both data access objects and migrations to support multiple DB connections (#3057) * Split the info.db database into multiple DBs using Backup API. * Remove location. Prev refactor assumed we would need this but don't. * Added VACUUM to reclaim space after splitting storage node databases. * Added unique names to SQLite3 connection hooks to fix testplanet. * Moving DB closing to the migration step. * Removing the closing of the versions DB. It's already getting closed. * Swapping the database connection references on reconnect. * Moved sqlite closing logic away from the boltdb closing logic. * Moved sqlite closing logic away from the boltdb closing logic. * Remove certificate and vouchers from DB split migration. * Removed vouchers and bumped up the migration version. * Use same constructor in tests for storage node databases. * Use same constructor in tests for storage node databases. * Adding method to access underlining SQL database connections and cleanup * Adding logging for migration diagnostics. * Moved migration closing database logic to minimize disk usage. * Cleaning up error handling. * Fix missing copyright. * Fix linting error. * Add test for migration 21 (#3012) * Refactoring migration code into a nicer to use object. * Refactoring migration code into a nicer to use object. * Fixing broken migration test. * Removed unnecessary code that is no longer needed now that we close DBs. * Removed unnecessary code that is no longer needed now that we close DBs. * Fixed bug where an invalid database path was being opened. * Fixed linting errors. * Renamed VersionsDB to LegacyInfoDB and refactored DB lookup keys. * Renamed VersionsDB to LegacyInfoDB and refactored DB lookup keys. * Fix migration test. NOTE: This change does not address new tables satellites and satellite_exit_progress * Removing v22 migration to move into it's own PR. * Removing v22 migration to move into it's own PR. * Refactored schema, rebind and configure functions to be re-useable. * Renamed LegacyInfoDB to DeprecatedInfoDB. * Cleaned up closeDatabase function. * Renamed storageNodeSQLDB to migratableDB. * Switched from using errs.Combine() to errs.Group in closeDatabases func. * Removed constructors from storage node data access objects. * Reformatted usage of const. * Fixed broken test snapshots. * Fixed linting error.
2019-09-18 17:17:28 +01:00
// ReputationDBName represents the database name.
const ReputationDBName = "reputation"
// reputation works with node reputation DB.
type reputationDB struct {
dbContainerImpl
}
// Store inserts or updates reputation stats into the db.
func (db *reputationDB) Store(ctx context.Context, stats reputation.Stats) (err error) {
defer mon.Task()(&ctx)(&err)
query := `INSERT OR REPLACE INTO reputation (
satellite_id,
audit_success_count,
audit_total_count,
audit_reputation_alpha,
audit_reputation_beta,
audit_reputation_score,
audit_unknown_reputation_alpha,
audit_unknown_reputation_beta,
audit_unknown_reputation_score,
online_score,
audit_history,
disqualified_at,
suspended_at,
offline_suspended_at,
offline_under_review_at,
vetted_at,
updated_at,
joined_at
) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)`
// ensure we insert utc
if stats.DisqualifiedAt != nil {
utc := stats.DisqualifiedAt.UTC()
stats.DisqualifiedAt = &utc
}
if stats.SuspendedAt != nil {
utc := stats.SuspendedAt.UTC()
stats.SuspendedAt = &utc
}
if stats.OfflineSuspendedAt != nil {
utc := stats.OfflineSuspendedAt.UTC()
stats.OfflineSuspendedAt = &utc
}
if stats.OfflineUnderReviewAt != nil {
utc := stats.OfflineUnderReviewAt.UTC()
stats.OfflineUnderReviewAt = &utc
}
var auditHistoryBytes []byte
if stats.AuditHistory != nil {
auditHistoryBytes, err = pb.Marshal(stats.AuditHistory)
if err != nil {
return ErrReputation.Wrap(err)
}
}
_, err = db.ExecContext(ctx, query,
stats.SatelliteID,
stats.Audit.SuccessCount,
stats.Audit.TotalCount,
stats.Audit.Alpha,
stats.Audit.Beta,
stats.Audit.Score,
stats.Audit.UnknownAlpha,
stats.Audit.UnknownBeta,
stats.Audit.UnknownScore,
stats.OnlineScore,
auditHistoryBytes,
stats.DisqualifiedAt,
stats.SuspendedAt,
stats.OfflineSuspendedAt,
stats.OfflineUnderReviewAt,
stats.VettedAt,
stats.UpdatedAt.UTC(),
stats.JoinedAt.UTC(),
)
return ErrReputation.Wrap(err)
}
// Get retrieves stats for specific satellite.
func (db *reputationDB) Get(ctx context.Context, satelliteID storj.NodeID) (_ *reputation.Stats, err error) {
defer mon.Task()(&ctx)(&err)
stats := reputation.Stats{
SatelliteID: satelliteID,
}
row := db.QueryRowContext(ctx,
`SELECT audit_success_count,
audit_total_count,
audit_reputation_alpha,
audit_reputation_beta,
audit_reputation_score,
audit_unknown_reputation_alpha,
audit_unknown_reputation_beta,
audit_unknown_reputation_score,
online_score,
audit_history,
disqualified_at,
suspended_at,
offline_suspended_at,
offline_under_review_at,
vetted_at,
updated_at,
joined_at
FROM reputation WHERE satellite_id = ?`,
satelliteID,
)
var auditHistoryBytes []byte
err = row.Scan(
&stats.Audit.SuccessCount,
&stats.Audit.TotalCount,
&stats.Audit.Alpha,
&stats.Audit.Beta,
&stats.Audit.Score,
&stats.Audit.UnknownAlpha,
&stats.Audit.UnknownBeta,
&stats.Audit.UnknownScore,
&stats.OnlineScore,
&auditHistoryBytes,
&stats.DisqualifiedAt,
&stats.SuspendedAt,
&stats.OfflineSuspendedAt,
&stats.OfflineUnderReviewAt,
&stats.VettedAt,
&stats.UpdatedAt,
&stats.JoinedAt,
)
if errors.Is(err, sql.ErrNoRows) {
err = nil
return &stats, nil
}
if err != nil {
return &stats, ErrReputation.Wrap(err)
}
if auditHistoryBytes != nil {
stats.AuditHistory = &pb.AuditHistory{}
err = pb.Unmarshal(auditHistoryBytes, stats.AuditHistory)
}
return &stats, ErrReputation.Wrap(err)
}
// All retrieves all stats from DB.
func (db *reputationDB) All(ctx context.Context) (_ []reputation.Stats, err error) {
defer mon.Task()(&ctx)(&err)
query := `SELECT satellite_id,
audit_success_count,
audit_total_count,
audit_reputation_alpha,
audit_reputation_beta,
audit_reputation_score,
audit_unknown_reputation_alpha,
audit_unknown_reputation_beta,
audit_unknown_reputation_score,
online_score,
disqualified_at,
suspended_at,
offline_suspended_at,
offline_under_review_at,
vetted_at,
updated_at,
joined_at
FROM reputation`
rows, err := db.QueryContext(ctx, query)
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, rows.Close()) }()
var statsList []reputation.Stats
for rows.Next() {
var stats reputation.Stats
err := rows.Scan(&stats.SatelliteID,
&stats.Audit.SuccessCount,
&stats.Audit.TotalCount,
&stats.Audit.Alpha,
&stats.Audit.Beta,
&stats.Audit.Score,
&stats.Audit.UnknownAlpha,
&stats.Audit.UnknownBeta,
&stats.Audit.UnknownScore,
&stats.OnlineScore,
&stats.DisqualifiedAt,
&stats.SuspendedAt,
&stats.OfflineSuspendedAt,
&stats.OfflineUnderReviewAt,
&stats.VettedAt,
&stats.UpdatedAt,
&stats.JoinedAt,
)
if err != nil {
return nil, ErrReputation.Wrap(err)
}
statsList = append(statsList, stats)
}
return statsList, rows.Err()
}