satellite/satellitedb: optimize StoragenodeAccounting.SaveRollup

Change-Id: I758049872b4ea1ae22657dd9fcb47c228468b1d0
This commit is contained in:
Egon Elbre 2021-03-16 16:02:53 +02:00
parent 6a553ec9c5
commit 9491df76e7
3 changed files with 71 additions and 136 deletions

View File

@ -89,7 +89,7 @@ read scalar (
) )
model accounting_rollup ( model accounting_rollup (
key node_id start_time key node_id start_time
index ( fields start_time ) index ( fields start_time )
field node_id blob field node_id blob
@ -102,8 +102,6 @@ model accounting_rollup (
field at_rest_total float64 field at_rest_total float64
) )
create accounting_rollup ( noreturn, replace )
//--- overlay cache ---// //--- overlay cache ---//
model node ( model node (

View File

@ -9061,42 +9061,6 @@ func (obj *pgxImpl) CreateNoReturn_AccountingTimestamps(ctx context.Context,
} }
func (obj *pgxImpl) ReplaceNoReturn_AccountingRollup(ctx context.Context,
accounting_rollup_node_id AccountingRollup_NodeId_Field,
accounting_rollup_start_time AccountingRollup_StartTime_Field,
accounting_rollup_put_total AccountingRollup_PutTotal_Field,
accounting_rollup_get_total AccountingRollup_GetTotal_Field,
accounting_rollup_get_audit_total AccountingRollup_GetAuditTotal_Field,
accounting_rollup_get_repair_total AccountingRollup_GetRepairTotal_Field,
accounting_rollup_put_repair_total AccountingRollup_PutRepairTotal_Field,
accounting_rollup_at_rest_total AccountingRollup_AtRestTotal_Field) (
err error) {
defer mon.Task()(&ctx)(&err)
__node_id_val := accounting_rollup_node_id.value()
__start_time_val := accounting_rollup_start_time.value()
__put_total_val := accounting_rollup_put_total.value()
__get_total_val := accounting_rollup_get_total.value()
__get_audit_total_val := accounting_rollup_get_audit_total.value()
__get_repair_total_val := accounting_rollup_get_repair_total.value()
__put_repair_total_val := accounting_rollup_put_repair_total.value()
__at_rest_total_val := accounting_rollup_at_rest_total.value()
var __embed_stmt = __sqlbundle_Literal("INSERT INTO accounting_rollups ( node_id, start_time, put_total, get_total, get_audit_total, get_repair_total, put_repair_total, at_rest_total ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ? ) ON CONFLICT ( node_id, start_time ) DO UPDATE SET node_id = EXCLUDED.node_id, start_time = EXCLUDED.start_time, put_total = EXCLUDED.put_total, get_total = EXCLUDED.get_total, get_audit_total = EXCLUDED.get_audit_total, get_repair_total = EXCLUDED.get_repair_total, put_repair_total = EXCLUDED.put_repair_total, at_rest_total = EXCLUDED.at_rest_total")
var __values []interface{}
__values = append(__values, __node_id_val, __start_time_val, __put_total_val, __get_total_val, __get_audit_total_val, __get_repair_total_val, __put_repair_total_val, __at_rest_total_val)
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __values...)
_, err = obj.driver.ExecContext(ctx, __stmt, __values...)
if err != nil {
return obj.makeErr(err)
}
return nil
}
func (obj *pgxImpl) Create_AuditHistory(ctx context.Context, func (obj *pgxImpl) Create_AuditHistory(ctx context.Context,
audit_history_node_id AuditHistory_NodeId_Field, audit_history_node_id AuditHistory_NodeId_Field,
audit_history_history AuditHistory_History_Field) ( audit_history_history AuditHistory_History_Field) (
@ -14484,42 +14448,6 @@ func (obj *pgxcockroachImpl) CreateNoReturn_AccountingTimestamps(ctx context.Con
} }
func (obj *pgxcockroachImpl) ReplaceNoReturn_AccountingRollup(ctx context.Context,
accounting_rollup_node_id AccountingRollup_NodeId_Field,
accounting_rollup_start_time AccountingRollup_StartTime_Field,
accounting_rollup_put_total AccountingRollup_PutTotal_Field,
accounting_rollup_get_total AccountingRollup_GetTotal_Field,
accounting_rollup_get_audit_total AccountingRollup_GetAuditTotal_Field,
accounting_rollup_get_repair_total AccountingRollup_GetRepairTotal_Field,
accounting_rollup_put_repair_total AccountingRollup_PutRepairTotal_Field,
accounting_rollup_at_rest_total AccountingRollup_AtRestTotal_Field) (
err error) {
defer mon.Task()(&ctx)(&err)
__node_id_val := accounting_rollup_node_id.value()
__start_time_val := accounting_rollup_start_time.value()
__put_total_val := accounting_rollup_put_total.value()
__get_total_val := accounting_rollup_get_total.value()
__get_audit_total_val := accounting_rollup_get_audit_total.value()
__get_repair_total_val := accounting_rollup_get_repair_total.value()
__put_repair_total_val := accounting_rollup_put_repair_total.value()
__at_rest_total_val := accounting_rollup_at_rest_total.value()
var __embed_stmt = __sqlbundle_Literal("UPSERT INTO accounting_rollups ( node_id, start_time, put_total, get_total, get_audit_total, get_repair_total, put_repair_total, at_rest_total ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ? )")
var __values []interface{}
__values = append(__values, __node_id_val, __start_time_val, __put_total_val, __get_total_val, __get_audit_total_val, __get_repair_total_val, __put_repair_total_val, __at_rest_total_val)
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __values...)
_, err = obj.driver.ExecContext(ctx, __stmt, __values...)
if err != nil {
return obj.makeErr(err)
}
return nil
}
func (obj *pgxcockroachImpl) Create_AuditHistory(ctx context.Context, func (obj *pgxcockroachImpl) Create_AuditHistory(ctx context.Context,
audit_history_node_id AuditHistory_NodeId_Field, audit_history_node_id AuditHistory_NodeId_Field,
audit_history_history AuditHistory_History_Field) ( audit_history_history AuditHistory_History_Field) (
@ -21053,24 +20981,6 @@ func (rx *Rx) Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStar
return tx.Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual(ctx, storagenode_bandwidth_rollup_storagenode_id, storagenode_bandwidth_rollup_interval_start_greater_or_equal, limit, start) return tx.Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual(ctx, storagenode_bandwidth_rollup_storagenode_id, storagenode_bandwidth_rollup_interval_start_greater_or_equal, limit, start)
} }
func (rx *Rx) ReplaceNoReturn_AccountingRollup(ctx context.Context,
accounting_rollup_node_id AccountingRollup_NodeId_Field,
accounting_rollup_start_time AccountingRollup_StartTime_Field,
accounting_rollup_put_total AccountingRollup_PutTotal_Field,
accounting_rollup_get_total AccountingRollup_GetTotal_Field,
accounting_rollup_get_audit_total AccountingRollup_GetAuditTotal_Field,
accounting_rollup_get_repair_total AccountingRollup_GetRepairTotal_Field,
accounting_rollup_put_repair_total AccountingRollup_PutRepairTotal_Field,
accounting_rollup_at_rest_total AccountingRollup_AtRestTotal_Field) (
err error) {
var tx *Tx
if tx, err = rx.getTx(ctx); err != nil {
return
}
return tx.ReplaceNoReturn_AccountingRollup(ctx, accounting_rollup_node_id, accounting_rollup_start_time, accounting_rollup_put_total, accounting_rollup_get_total, accounting_rollup_get_audit_total, accounting_rollup_get_repair_total, accounting_rollup_put_repair_total, accounting_rollup_at_rest_total)
}
func (rx *Rx) ReplaceNoReturn_NodeApiVersion(ctx context.Context, func (rx *Rx) ReplaceNoReturn_NodeApiVersion(ctx context.Context,
node_api_version_id NodeApiVersion_Id_Field, node_api_version_id NodeApiVersion_Id_Field,
node_api_version_api_version NodeApiVersion_ApiVersion_Field) ( node_api_version_api_version NodeApiVersion_ApiVersion_Field) (
@ -21846,17 +21756,6 @@ type Methods interface {
limit int, start *Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation) ( limit int, start *Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation) (
rows []*StoragenodeBandwidthRollup, next *Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation, err error) rows []*StoragenodeBandwidthRollup, next *Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation, err error)
ReplaceNoReturn_AccountingRollup(ctx context.Context,
accounting_rollup_node_id AccountingRollup_NodeId_Field,
accounting_rollup_start_time AccountingRollup_StartTime_Field,
accounting_rollup_put_total AccountingRollup_PutTotal_Field,
accounting_rollup_get_total AccountingRollup_GetTotal_Field,
accounting_rollup_get_audit_total AccountingRollup_GetAuditTotal_Field,
accounting_rollup_get_repair_total AccountingRollup_GetRepairTotal_Field,
accounting_rollup_put_repair_total AccountingRollup_PutRepairTotal_Field,
accounting_rollup_at_rest_total AccountingRollup_AtRestTotal_Field) (
err error)
ReplaceNoReturn_NodeApiVersion(ctx context.Context, ReplaceNoReturn_NodeApiVersion(ctx context.Context,
node_api_version_id NodeApiVersion_Id_Field, node_api_version_id NodeApiVersion_Id_Field,
node_api_version_api_version NodeApiVersion_ApiVersion_Field) ( node_api_version_api_version NodeApiVersion_ApiVersion_Field) (

View File

@ -256,46 +256,84 @@ func (db *StoragenodeAccounting) SaveRollup(ctx context.Context, latestRollup ti
rollups = append(rollups, ar) rollups = append(rollups, ar)
} }
} }
finished := false
for !finished { insertBatch := func(ctx context.Context, db *dbx.DB, batch []*accounting.Rollup) (err error) {
err = db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error { defer mon.Task()(&ctx)(&err)
for i := 0; i < batchSize && len(rollups) > 0; i++ { n := len(batch)
ar := rollups[0]
rollups = rollups[1:]
nID := dbx.AccountingRollup_NodeId(ar.NodeID.Bytes()) nodeID := make([]storj.NodeID, n)
start := dbx.AccountingRollup_StartTime(ar.StartTime) startTime := make([]time.Time, n)
put := dbx.AccountingRollup_PutTotal(ar.PutTotal) putTotal := make([]int64, n)
get := dbx.AccountingRollup_GetTotal(ar.GetTotal) getTotal := make([]int64, n)
audit := dbx.AccountingRollup_GetAuditTotal(ar.GetAuditTotal) getAuditTotal := make([]int64, n)
getRepair := dbx.AccountingRollup_GetRepairTotal(ar.GetRepairTotal) getRepairTotal := make([]int64, n)
putRepair := dbx.AccountingRollup_PutRepairTotal(ar.PutRepairTotal) putRepairTotal := make([]int64, n)
atRest := dbx.AccountingRollup_AtRestTotal(ar.AtRestTotal) atRestTotal := make([]float64, n)
err := tx.ReplaceNoReturn_AccountingRollup(ctx, nID, start, put, get, audit, getRepair, putRepair, atRest) for i, ar := range batch {
if err != nil { nodeID[i] = ar.NodeID
return err startTime[i] = ar.StartTime
} putTotal[i] = ar.PutTotal
} getTotal[i] = ar.GetTotal
getAuditTotal[i] = ar.GetAuditTotal
getRepairTotal[i] = ar.GetRepairTotal
putRepairTotal[i] = ar.PutRepairTotal
atRestTotal[i] = ar.AtRestTotal
}
if len(rollups) == 0 { _, err = db.ExecContext(ctx, `
finished = true INSERT INTO accounting_rollups (
return tx.UpdateNoReturn_AccountingTimestamps_By_Name(ctx, node_id, start_time,
dbx.AccountingTimestamps_Name(accounting.LastRollup), put_total, get_total,
dbx.AccountingTimestamps_Update_Fields{ get_audit_total, get_repair_total, put_repair_total,
Value: dbx.AccountingTimestamps_Value(latestRollup), at_rest_total
}, )
) SELECT * FROM unnest(
} $1::bytea[], $2::timestamptz[],
$3::int8[], $4::int8[],
$5::int8[], $6::int8[], $7::int8[],
$8::float8[]
)
ON CONFLICT ( node_id, start_time )
DO UPDATE SET
put_total = EXCLUDED.put_total,
get_total = EXCLUDED.get_total,
get_audit_total = EXCLUDED.get_audit_total,
get_repair_total = EXCLUDED.get_repair_total,
put_repair_total = EXCLUDED.put_repair_total,
at_rest_total = EXCLUDED.at_rest_total
`, pgutil.NodeIDArray(nodeID), pgutil.TimestampTZArray(startTime),
pgutil.Int8Array(putTotal), pgutil.Int8Array(getTotal),
pgutil.Int8Array(getAuditTotal), pgutil.Int8Array(getRepairTotal), pgutil.Int8Array(putRepairTotal),
pgutil.Float8Array(atRestTotal))
return nil return Error.Wrap(err)
}) }
if err != nil {
// Note: we do not need here a transaction because we will "update" the
// columns when we do not update accounting.LastRollup. We will end up
// with partial data in the database, however in the next runs, we will
// try to fix them.
for len(rollups) > 0 {
batch := rollups
if len(batch) > batchSize {
batch = batch[:batchSize]
}
rollups = rollups[len(batch):]
if err := insertBatch(ctx, db.db.DB, batch); err != nil {
return Error.Wrap(err) return Error.Wrap(err)
} }
} }
return nil
err = db.db.UpdateNoReturn_AccountingTimestamps_By_Name(ctx,
dbx.AccountingTimestamps_Name(accounting.LastRollup),
dbx.AccountingTimestamps_Update_Fields{
Value: dbx.AccountingTimestamps_Value(latestRollup),
},
)
return Error.Wrap(err)
} }
// LastTimestamp records the greatest last tallied time. // LastTimestamp records the greatest last tallied time.