satellite/satellitedb: persist piece counts to/from db (#2803)

This commit is contained in:
Bryan White 2019-08-27 14:37:42 +02:00 committed by GitHub
parent d0ab3c03ec
commit a33106df1c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 367 additions and 10 deletions

View File

@ -20,7 +20,8 @@ type PieceTracker struct {
log *zap.Logger
config Config
creationDate time.Time
pieceCounts map[storj.NodeID]int
// TODO: should we use int or int64 consistently for piece count (db type is int64)?
pieceCounts map[storj.NodeID]int
retainInfos map[storj.NodeID]*RetainInfo
}

View File

@ -75,8 +75,14 @@ func (service *Service) Run(ctx context.Context) (err error) {
return nil
}
// TODO retrieve piece counts from overlay (when there is a column for them)
lastPieceCounts := make(map[storj.NodeID]int)
// load last piece counts from overlay db
lastPieceCounts, err := service.overlay.AllPieceCounts(ctx)
if err != nil {
service.log.Error("error getting last piece counts", zap.Error(err))
}
if lastPieceCounts == nil {
lastPieceCounts = make(map[storj.NodeID]int)
}
return service.Loop.Run(ctx, func(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
@ -90,7 +96,7 @@ func (service *Service) Run(ctx context.Context) (err error) {
return nil
}
// save piece counts for next iteration
// save piece counts in memory for next iteration
for id := range lastPieceCounts {
delete(lastPieceCounts, id)
}
@ -98,6 +104,12 @@ func (service *Service) Run(ctx context.Context) (err error) {
lastPieceCounts[id] = info.Count
}
// save piece counts to db for next satellite restart
err = service.overlay.UpdatePieceCounts(ctx, lastPieceCounts)
if err != nil {
service.log.Error("error updating piece counts", zap.Error(err))
}
// monitor information
for _, info := range pieceTracker.retainInfos {
mon.IntVal("node_piece_count").Observe(int64(info.Count))

View File

@ -66,6 +66,11 @@ type DB interface {
UpdateNodeInfo(ctx context.Context, node storj.NodeID, nodeInfo *pb.InfoResponse) (stats *NodeDossier, err error)
// UpdateUptime updates a single storagenode's uptime stats.
UpdateUptime(ctx context.Context, nodeID storj.NodeID, isUp bool, lambda, weight, uptimeDQ float64) (stats *NodeStats, err error)
// AllPieceCounts returns a map of node IDs to piece counts from the db.
AllPieceCounts(ctx context.Context) (pieceCounts map[storj.NodeID]int, err error)
// UpdatePieceCounts sets the piece count field for the given node IDs.
UpdatePieceCounts(ctx context.Context, pieceCounts map[storj.NodeID]int) (err error)
}
// FindStorageNodesRequest defines easy request parameters.

View File

@ -190,6 +190,11 @@ read limitoffset (
orderby asc node.id
)
read all (
select node.id node.piece_count
where node.piece_count != 0
)
//--- repairqueue ---//
model injuredsegment (

View File

@ -5544,6 +5544,11 @@ type Id_LastNet_Address_Protocol_Row struct {
Protocol int
}
type Id_PieceCount_Row struct {
Id []byte
PieceCount int64
}
type Id_Row struct {
Id []byte
}
@ -6614,6 +6619,38 @@ func (obj *postgresImpl) Limited_Node_Id_Node_LastNet_Node_Address_Node_Protocol
}
func (obj *postgresImpl) All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number(ctx context.Context) (
rows []*Id_PieceCount_Row, err error) {
var __embed_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.piece_count FROM nodes WHERE nodes.piece_count != 0")
var __values []interface{}
__values = append(__values)
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __values...)
__rows, err := obj.driver.Query(__stmt, __values...)
if err != nil {
return nil, obj.makeErr(err)
}
defer __rows.Close()
for __rows.Next() {
row := &Id_PieceCount_Row{}
err = __rows.Scan(&row.Id, &row.PieceCount)
if err != nil {
return nil, obj.makeErr(err)
}
rows = append(rows, row)
}
if err := __rows.Err(); err != nil {
return nil, obj.makeErr(err)
}
return rows, nil
}
func (obj *postgresImpl) Get_User_By_Email_And_Status_Not_Number(ctx context.Context,
user_email User_Email_Field) (
user *User, err error) {
@ -10460,6 +10497,38 @@ func (obj *sqlite3Impl) Limited_Node_Id_Node_LastNet_Node_Address_Node_Protocol_
}
func (obj *sqlite3Impl) All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number(ctx context.Context) (
rows []*Id_PieceCount_Row, err error) {
var __embed_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.piece_count FROM nodes WHERE nodes.piece_count != 0")
var __values []interface{}
__values = append(__values)
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __values...)
__rows, err := obj.driver.Query(__stmt, __values...)
if err != nil {
return nil, obj.makeErr(err)
}
defer __rows.Close()
for __rows.Next() {
row := &Id_PieceCount_Row{}
err = __rows.Scan(&row.Id, &row.PieceCount)
if err != nil {
return nil, obj.makeErr(err)
}
rows = append(rows, row)
}
if err := __rows.Err(); err != nil {
return nil, obj.makeErr(err)
}
return rows, nil
}
func (obj *sqlite3Impl) Get_User_By_Email_And_Status_Not_Number(ctx context.Context,
user_email User_Email_Field) (
user *User, err error) {
@ -13817,6 +13886,15 @@ func (rx *Rx) All_Node_Id(ctx context.Context) (
return tx.All_Node_Id(ctx)
}
func (rx *Rx) All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number(ctx context.Context) (
rows []*Id_PieceCount_Row, err error) {
var tx *Tx
if tx, err = rx.getTx(ctx); err != nil {
return
}
return tx.All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number(ctx)
}
func (rx *Rx) All_Offer_OrderBy_Asc_Id(ctx context.Context) (
rows []*Offer, err error) {
var tx *Tx
@ -15032,6 +15110,9 @@ type Methods interface {
All_Node_Id(ctx context.Context) (
rows []*Id_Row, err error)
All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number(ctx context.Context) (
rows []*Id_PieceCount_Row, err error)
All_Offer_OrderBy_Asc_Id(ctx context.Context) (
rows []*Offer, err error)

View File

@ -774,6 +774,13 @@ func (m *lockedOrders) GetStorageNodeBandwidth(ctx context.Context, nodeID storj
return m.db.GetStorageNodeBandwidth(ctx, nodeID, from, to)
}
// ProcessOrders takes a list of order requests and processes them in a batch
func (m *lockedOrders) ProcessOrders(ctx context.Context, requests []*orders.ProcessOrderRequest) (responses []*orders.ProcessOrderResponse, err error) {
m.Lock()
defer m.Unlock()
return m.db.ProcessOrders(ctx, requests)
}
// UnuseSerialNumber removes pair serial number -> storage node id from database
func (m *lockedOrders) UnuseSerialNumber(ctx context.Context, serialNumber storj.SerialNumber, storageNodeID storj.NodeID) error {
m.Lock()
@ -823,12 +830,6 @@ func (m *lockedOrders) UseSerialNumber(ctx context.Context, serialNumber storj.S
return m.db.UseSerialNumber(ctx, serialNumber, storageNodeID)
}
func (m *lockedOrders) ProcessOrders(ctx context.Context, requests []*orders.ProcessOrderRequest) (responses []*orders.ProcessOrderResponse, err error) {
m.Lock()
defer m.Unlock()
return m.db.ProcessOrders(ctx, requests)
}
// OverlayCache returns database for caching overlay information
func (m *locked) OverlayCache() overlay.DB {
m.Lock()
@ -842,6 +843,13 @@ type lockedOverlayCache struct {
db overlay.DB
}
// AllPieceCounts returns a map of node IDs to piece counts from the db.
func (m *lockedOverlayCache) AllPieceCounts(ctx context.Context) (pieceCounts map[storj.NodeID]int, err error) {
m.Lock()
defer m.Unlock()
return m.db.AllPieceCounts(ctx)
}
// BatchUpdateStats updates multiple storagenode's stats in one transaction
func (m *lockedOverlayCache) BatchUpdateStats(ctx context.Context, updateRequests []*overlay.UpdateRequest, batchSize int) (failed storj.NodeIDList, err error) {
m.Lock()
@ -926,6 +934,13 @@ func (m *lockedOverlayCache) UpdateNodeInfo(ctx context.Context, node storj.Node
return m.db.UpdateNodeInfo(ctx, node, nodeInfo)
}
// UpdatePieceCounts sets the piece count field for the given node IDs.
func (m *lockedOverlayCache) UpdatePieceCounts(ctx context.Context, pieceCounts map[storj.NodeID]int) (err error) {
m.Lock()
defer m.Unlock()
return m.db.UpdatePieceCounts(ctx, pieceCounts)
}
// UpdateStats all parts of single storagenode's stats.
func (m *lockedOverlayCache) UpdateStats(ctx context.Context, request *overlay.UpdateRequest) (stats *overlay.NodeStats, err error) {
m.Lock()

View File

@ -902,6 +902,78 @@ func (cache *overlaycache) UpdateUptime(ctx context.Context, nodeID storj.NodeID
return getNodeStats(dbNode), Error.Wrap(tx.Commit())
}
// AllPieceCounts returns a map of node IDs to piece counts from the db.
// NB: a valid, partial piece map can be returned even if node ID parsing error(s) are returned.
func (cache *overlaycache) AllPieceCounts(ctx context.Context) (_ map[storj.NodeID]int, err error) {
defer mon.Task()(&ctx)(&err)
// NB: `All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number` selects node
// ID and piece count from the nodes table where piece count is not zero.
rows, err := cache.db.All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number(ctx)
if err != nil {
return nil, Error.Wrap(err)
}
pieceCounts := make(map[storj.NodeID]int)
nodeIDErrs := errs.Group{}
for _, row := range rows {
nodeID, err := storj.NodeIDFromBytes(row.Id)
if err != nil {
nodeIDErrs.Add(err)
continue
}
pieceCounts[nodeID] = int(row.PieceCount)
}
return pieceCounts, nodeIDErrs.Err()
}
func (cache *overlaycache) UpdatePieceCounts(ctx context.Context, pieceCounts map[storj.NodeID]int) (err error) {
// TODO: add monkit stuff
defer mon.Task()(&ctx)(&err)
if len(pieceCounts) == 0 {
return nil
}
switch cache.db.Driver().(type) {
case *pq.Driver:
return Error.Wrap(cache.postgresUpdatePieceCounts(ctx, pieceCounts))
default:
return Error.Wrap(cache.sqliteUpdatePieceCounts(ctx, pieceCounts))
}
}
func (cache *overlaycache) postgresUpdatePieceCounts(ctx context.Context, pieceCounts map[storj.NodeID]int) (err error) {
defer mon.Task()(&ctx)(&err)
sqlQuery := `UPDATE nodes SET piece_count = newvals.piece_count FROM ( VALUES `
args := make([]interface{}, 0, len(pieceCounts)*2)
for nodeID, pieceCount := range pieceCounts {
sqlQuery += `(?::BYTEA, ?::BIGINT), `
args = append(args, nodeID, pieceCount)
}
sqlQuery = sqlQuery[:len(sqlQuery)-2]
// trim off the last comma+space
sqlQuery += `) newvals(nodeid, piece_count) WHERE nodes.id = newvals.nodeid`
_, err = cache.db.DB.ExecContext(ctx, cache.db.Rebind(sqlQuery), args...)
return err
}
func (cache *overlaycache) sqliteUpdatePieceCounts(ctx context.Context, pieceCounts map[storj.NodeID]int) (err error) {
defer mon.Task()(&ctx)(&err)
var args []interface{}
sqlQuery := ""
updateSQL := "UPDATE nodes SET ( piece_count ) = ( ? ) WHERE id == ?;"
for nodeID, pieceCount := range pieceCounts {
sqlQuery += updateSQL
args = append(args, pieceCount, nodeID)
}
_, err = cache.db.DB.ExecContext(ctx, cache.db.Rebind(sqlQuery), args...)
return err
}
func convertDBNode(ctx context.Context, info *dbx.Node) (_ *overlay.NodeDossier, err error) {
defer mon.Task()(&ctx)(&err)
if info == nil {

View File

@ -0,0 +1,166 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package satellitedb_test
import (
"math"
"testing"
"time"
"github.com/lib/pq"
"github.com/stretchr/testify/require"
"storj.io/storj/internal/testcontext"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
"storj.io/storj/satellite"
dbx "storj.io/storj/satellite/satellitedb/dbx"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
)
func TestOverlaycache_AllPieceCounts(t *testing.T) {
satellitedbtest.Run(t, func(t *testing.T, db satellite.DB) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
// get overlay db
overlay := db.OverlayCache()
// get dbx db access
dbAccess := db.(interface{ TestDBAccess() *dbx.DB }).TestDBAccess()
require.NotNil(t, dbAccess)
// create test nodes in overlay db
testNodes := newTestNodes(ctx, 10, t, dbAccess)
// build and set expected piece counts
expectedPieceCounts := newTestPieceCounts(t, testNodes)
setTestNodePieceCounts(t, dbAccess, expectedPieceCounts)
// expected and actual piece count maps should match
actualPieceCounts, err := overlay.AllPieceCounts(ctx)
require.NoError(t, err)
require.Equal(t, expectedPieceCounts, actualPieceCounts)
})
}
func TestOverlaycache_UpdatePieceCounts(t *testing.T) {
satellitedbtest.Run(t, func(t *testing.T, db satellite.DB) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
// get overlay db
overlay := db.OverlayCache()
// get dbx db access
dbAccess := db.(interface{ TestDBAccess() *dbx.DB }).TestDBAccess()
require.NotNil(t, dbAccess)
// create test nodes in overlay db
testNodes := newTestNodes(ctx, 10, t, dbAccess)
// build and set expected piece counts
expectedPieceCounts := newTestPieceCounts(t, testNodes)
err := overlay.UpdatePieceCounts(ctx, expectedPieceCounts)
require.NoError(t, err)
// build actual piece counts map
actualPieceCounts := make(map[storj.NodeID]int)
rows, err := dbAccess.All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number(ctx)
for _, row := range rows {
var nodeID storj.NodeID
copy(nodeID[:], row.Id)
actualPieceCounts[nodeID] = int(row.PieceCount)
}
require.NoError(t, err)
// expected and actual piece count maps should match
require.Equal(t, expectedPieceCounts, actualPieceCounts)
})
}
func setTestNodePieceCounts(t *testing.T, dbAccess *dbx.DB, pieceCounts map[storj.NodeID]int) {
for nodeID, pieceCount := range pieceCounts {
var args []interface{}
var sqlQuery string
args = append(args, pieceCount, nodeID)
switch dbAccess.Driver().(type) {
case *pq.Driver:
sqlQuery = "UPDATE nodes SET piece_count = ( ?::BIGINT ) WHERE id = ?::BYTEA;"
default:
sqlQuery = "UPDATE nodes SET piece_count = ( ? ) WHERE id = ?;"
}
_, err := dbAccess.Exec(dbAccess.Rebind(sqlQuery), args...)
if err != nil {
t.Fatal(err)
}
}
}
// newTestPieceCounts builds a piece count map from the node ids of `testNodes`,
// incrementing the piece count exponentially from one node to the next.
func newTestPieceCounts(t *testing.T, testNodes []*dbx.Node) map[storj.NodeID]int {
pieceCounts := make(map[storj.NodeID]int)
for i, node := range testNodes {
pieceCount := math.Pow10(i + 1)
nodeID, err := storj.NodeIDFromBytes(node.Id)
if err != nil {
t.Fatal(err)
}
pieceCounts[nodeID] = int(pieceCount)
}
return pieceCounts
}
func newTestNodes(ctx *testcontext.Context, count int, t *testing.T, db *dbx.DB) (nodes []*dbx.Node) {
for i := 0; i < count; i++ {
nodeID := storj.NodeID{byte(i + 1)}
node, err := db.Create_Node(
ctx,
dbx.Node_Id(nodeID.Bytes()),
dbx.Node_Address("0.0.0.0:0"),
dbx.Node_LastNet("0.0.0.0"),
dbx.Node_Protocol(0),
dbx.Node_Type(int(pb.NodeType_INVALID)),
dbx.Node_Email(""),
dbx.Node_Wallet(""),
dbx.Node_FreeBandwidth(-1),
dbx.Node_FreeDisk(-1),
dbx.Node_Major(0),
dbx.Node_Minor(0),
dbx.Node_Patch(0),
dbx.Node_Hash(""),
dbx.Node_Timestamp(time.Time{}),
dbx.Node_Release(false),
dbx.Node_Latency90(0),
dbx.Node_AuditSuccessCount(0),
dbx.Node_TotalAuditCount(0),
dbx.Node_UptimeSuccessCount(0),
dbx.Node_TotalUptimeCount(0),
dbx.Node_LastContactSuccess(time.Now()),
dbx.Node_LastContactFailure(time.Time{}),
dbx.Node_Contained(false),
dbx.Node_AuditReputationAlpha(0),
dbx.Node_AuditReputationBeta(0),
dbx.Node_UptimeReputationAlpha(0),
dbx.Node_UptimeReputationBeta(0),
dbx.Node_Create_Fields{
Disqualified: dbx.Node_Disqualified_Null(),
},
)
require.NoError(t, err)
nodes = append(nodes, node)
}
rows, err := db.All_Node_Id(ctx)
require.NoError(t, err)
require.NotEmpty(t, rows)
require.Len(t, rows, count)
return nodes
}