diff --git a/satellite/satellitedb/dbx/satellitedb.dbx b/satellite/satellitedb/dbx/satellitedb.dbx index 617a7c258..201e172ba 100644 --- a/satellite/satellitedb/dbx/satellitedb.dbx +++ b/satellite/satellitedb/dbx/satellitedb.dbx @@ -89,7 +89,7 @@ read scalar ( ) model accounting_rollup ( - key node_id start_time + key node_id start_time index ( fields start_time ) field node_id blob @@ -102,8 +102,6 @@ model accounting_rollup ( field at_rest_total float64 ) -create accounting_rollup ( noreturn, replace ) - //--- overlay cache ---// model node ( diff --git a/satellite/satellitedb/dbx/satellitedb.dbx.go b/satellite/satellitedb/dbx/satellitedb.dbx.go index 93ba86c72..9a064c869 100644 --- a/satellite/satellitedb/dbx/satellitedb.dbx.go +++ b/satellite/satellitedb/dbx/satellitedb.dbx.go @@ -9061,42 +9061,6 @@ func (obj *pgxImpl) CreateNoReturn_AccountingTimestamps(ctx context.Context, } -func (obj *pgxImpl) ReplaceNoReturn_AccountingRollup(ctx context.Context, - accounting_rollup_node_id AccountingRollup_NodeId_Field, - accounting_rollup_start_time AccountingRollup_StartTime_Field, - accounting_rollup_put_total AccountingRollup_PutTotal_Field, - accounting_rollup_get_total AccountingRollup_GetTotal_Field, - accounting_rollup_get_audit_total AccountingRollup_GetAuditTotal_Field, - accounting_rollup_get_repair_total AccountingRollup_GetRepairTotal_Field, - accounting_rollup_put_repair_total AccountingRollup_PutRepairTotal_Field, - accounting_rollup_at_rest_total AccountingRollup_AtRestTotal_Field) ( - err error) { - defer mon.Task()(&ctx)(&err) - __node_id_val := accounting_rollup_node_id.value() - __start_time_val := accounting_rollup_start_time.value() - __put_total_val := accounting_rollup_put_total.value() - __get_total_val := accounting_rollup_get_total.value() - __get_audit_total_val := accounting_rollup_get_audit_total.value() - __get_repair_total_val := accounting_rollup_get_repair_total.value() - __put_repair_total_val := accounting_rollup_put_repair_total.value() - __at_rest_total_val := accounting_rollup_at_rest_total.value() - - var __embed_stmt = __sqlbundle_Literal("INSERT INTO accounting_rollups ( node_id, start_time, put_total, get_total, get_audit_total, get_repair_total, put_repair_total, at_rest_total ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ? ) ON CONFLICT ( node_id, start_time ) DO UPDATE SET node_id = EXCLUDED.node_id, start_time = EXCLUDED.start_time, put_total = EXCLUDED.put_total, get_total = EXCLUDED.get_total, get_audit_total = EXCLUDED.get_audit_total, get_repair_total = EXCLUDED.get_repair_total, put_repair_total = EXCLUDED.put_repair_total, at_rest_total = EXCLUDED.at_rest_total") - - var __values []interface{} - __values = append(__values, __node_id_val, __start_time_val, __put_total_val, __get_total_val, __get_audit_total_val, __get_repair_total_val, __put_repair_total_val, __at_rest_total_val) - - var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) - obj.logStmt(__stmt, __values...) - - _, err = obj.driver.ExecContext(ctx, __stmt, __values...) - if err != nil { - return obj.makeErr(err) - } - return nil - -} - func (obj *pgxImpl) Create_AuditHistory(ctx context.Context, audit_history_node_id AuditHistory_NodeId_Field, audit_history_history AuditHistory_History_Field) ( @@ -14484,42 +14448,6 @@ func (obj *pgxcockroachImpl) CreateNoReturn_AccountingTimestamps(ctx context.Con } -func (obj *pgxcockroachImpl) ReplaceNoReturn_AccountingRollup(ctx context.Context, - accounting_rollup_node_id AccountingRollup_NodeId_Field, - accounting_rollup_start_time AccountingRollup_StartTime_Field, - accounting_rollup_put_total AccountingRollup_PutTotal_Field, - accounting_rollup_get_total AccountingRollup_GetTotal_Field, - accounting_rollup_get_audit_total AccountingRollup_GetAuditTotal_Field, - accounting_rollup_get_repair_total AccountingRollup_GetRepairTotal_Field, - accounting_rollup_put_repair_total AccountingRollup_PutRepairTotal_Field, - accounting_rollup_at_rest_total AccountingRollup_AtRestTotal_Field) ( - err error) { - defer mon.Task()(&ctx)(&err) - __node_id_val := accounting_rollup_node_id.value() - __start_time_val := accounting_rollup_start_time.value() - __put_total_val := accounting_rollup_put_total.value() - __get_total_val := accounting_rollup_get_total.value() - __get_audit_total_val := accounting_rollup_get_audit_total.value() - __get_repair_total_val := accounting_rollup_get_repair_total.value() - __put_repair_total_val := accounting_rollup_put_repair_total.value() - __at_rest_total_val := accounting_rollup_at_rest_total.value() - - var __embed_stmt = __sqlbundle_Literal("UPSERT INTO accounting_rollups ( node_id, start_time, put_total, get_total, get_audit_total, get_repair_total, put_repair_total, at_rest_total ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ? )") - - var __values []interface{} - __values = append(__values, __node_id_val, __start_time_val, __put_total_val, __get_total_val, __get_audit_total_val, __get_repair_total_val, __put_repair_total_val, __at_rest_total_val) - - var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt) - obj.logStmt(__stmt, __values...) - - _, err = obj.driver.ExecContext(ctx, __stmt, __values...) - if err != nil { - return obj.makeErr(err) - } - return nil - -} - func (obj *pgxcockroachImpl) Create_AuditHistory(ctx context.Context, audit_history_node_id AuditHistory_NodeId_Field, audit_history_history AuditHistory_History_Field) ( @@ -21053,24 +20981,6 @@ func (rx *Rx) Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStar return tx.Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual(ctx, storagenode_bandwidth_rollup_storagenode_id, storagenode_bandwidth_rollup_interval_start_greater_or_equal, limit, start) } -func (rx *Rx) ReplaceNoReturn_AccountingRollup(ctx context.Context, - accounting_rollup_node_id AccountingRollup_NodeId_Field, - accounting_rollup_start_time AccountingRollup_StartTime_Field, - accounting_rollup_put_total AccountingRollup_PutTotal_Field, - accounting_rollup_get_total AccountingRollup_GetTotal_Field, - accounting_rollup_get_audit_total AccountingRollup_GetAuditTotal_Field, - accounting_rollup_get_repair_total AccountingRollup_GetRepairTotal_Field, - accounting_rollup_put_repair_total AccountingRollup_PutRepairTotal_Field, - accounting_rollup_at_rest_total AccountingRollup_AtRestTotal_Field) ( - err error) { - var tx *Tx - if tx, err = rx.getTx(ctx); err != nil { - return - } - return tx.ReplaceNoReturn_AccountingRollup(ctx, accounting_rollup_node_id, accounting_rollup_start_time, accounting_rollup_put_total, accounting_rollup_get_total, accounting_rollup_get_audit_total, accounting_rollup_get_repair_total, accounting_rollup_put_repair_total, accounting_rollup_at_rest_total) - -} - func (rx *Rx) ReplaceNoReturn_NodeApiVersion(ctx context.Context, node_api_version_id NodeApiVersion_Id_Field, node_api_version_api_version NodeApiVersion_ApiVersion_Field) ( @@ -21846,17 +21756,6 @@ type Methods interface { limit int, start *Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation) ( rows []*StoragenodeBandwidthRollup, next *Paged_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart_GreaterOrEqual_Continuation, err error) - ReplaceNoReturn_AccountingRollup(ctx context.Context, - accounting_rollup_node_id AccountingRollup_NodeId_Field, - accounting_rollup_start_time AccountingRollup_StartTime_Field, - accounting_rollup_put_total AccountingRollup_PutTotal_Field, - accounting_rollup_get_total AccountingRollup_GetTotal_Field, - accounting_rollup_get_audit_total AccountingRollup_GetAuditTotal_Field, - accounting_rollup_get_repair_total AccountingRollup_GetRepairTotal_Field, - accounting_rollup_put_repair_total AccountingRollup_PutRepairTotal_Field, - accounting_rollup_at_rest_total AccountingRollup_AtRestTotal_Field) ( - err error) - ReplaceNoReturn_NodeApiVersion(ctx context.Context, node_api_version_id NodeApiVersion_Id_Field, node_api_version_api_version NodeApiVersion_ApiVersion_Field) ( diff --git a/satellite/satellitedb/storagenodeaccounting.go b/satellite/satellitedb/storagenodeaccounting.go index 89612dd29..a5372c6cf 100644 --- a/satellite/satellitedb/storagenodeaccounting.go +++ b/satellite/satellitedb/storagenodeaccounting.go @@ -256,46 +256,84 @@ func (db *StoragenodeAccounting) SaveRollup(ctx context.Context, latestRollup ti rollups = append(rollups, ar) } } - finished := false - for !finished { - err = db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error { - for i := 0; i < batchSize && len(rollups) > 0; i++ { - ar := rollups[0] - rollups = rollups[1:] + insertBatch := func(ctx context.Context, db *dbx.DB, batch []*accounting.Rollup) (err error) { + defer mon.Task()(&ctx)(&err) + n := len(batch) - nID := dbx.AccountingRollup_NodeId(ar.NodeID.Bytes()) - start := dbx.AccountingRollup_StartTime(ar.StartTime) - put := dbx.AccountingRollup_PutTotal(ar.PutTotal) - get := dbx.AccountingRollup_GetTotal(ar.GetTotal) - audit := dbx.AccountingRollup_GetAuditTotal(ar.GetAuditTotal) - getRepair := dbx.AccountingRollup_GetRepairTotal(ar.GetRepairTotal) - putRepair := dbx.AccountingRollup_PutRepairTotal(ar.PutRepairTotal) - atRest := dbx.AccountingRollup_AtRestTotal(ar.AtRestTotal) + nodeID := make([]storj.NodeID, n) + startTime := make([]time.Time, n) + putTotal := make([]int64, n) + getTotal := make([]int64, n) + getAuditTotal := make([]int64, n) + getRepairTotal := make([]int64, n) + putRepairTotal := make([]int64, n) + atRestTotal := make([]float64, n) - err := tx.ReplaceNoReturn_AccountingRollup(ctx, nID, start, put, get, audit, getRepair, putRepair, atRest) - if err != nil { - return err - } - } + for i, ar := range batch { + nodeID[i] = ar.NodeID + startTime[i] = ar.StartTime + putTotal[i] = ar.PutTotal + getTotal[i] = ar.GetTotal + getAuditTotal[i] = ar.GetAuditTotal + getRepairTotal[i] = ar.GetRepairTotal + putRepairTotal[i] = ar.PutRepairTotal + atRestTotal[i] = ar.AtRestTotal + } - if len(rollups) == 0 { - finished = true - return tx.UpdateNoReturn_AccountingTimestamps_By_Name(ctx, - dbx.AccountingTimestamps_Name(accounting.LastRollup), - dbx.AccountingTimestamps_Update_Fields{ - Value: dbx.AccountingTimestamps_Value(latestRollup), - }, - ) - } + _, err = db.ExecContext(ctx, ` + INSERT INTO accounting_rollups ( + node_id, start_time, + put_total, get_total, + get_audit_total, get_repair_total, put_repair_total, + at_rest_total + ) + SELECT * FROM unnest( + $1::bytea[], $2::timestamptz[], + $3::int8[], $4::int8[], + $5::int8[], $6::int8[], $7::int8[], + $8::float8[] + ) + ON CONFLICT ( node_id, start_time ) + DO UPDATE SET + put_total = EXCLUDED.put_total, + get_total = EXCLUDED.get_total, + get_audit_total = EXCLUDED.get_audit_total, + get_repair_total = EXCLUDED.get_repair_total, + put_repair_total = EXCLUDED.put_repair_total, + at_rest_total = EXCLUDED.at_rest_total + `, pgutil.NodeIDArray(nodeID), pgutil.TimestampTZArray(startTime), + pgutil.Int8Array(putTotal), pgutil.Int8Array(getTotal), + pgutil.Int8Array(getAuditTotal), pgutil.Int8Array(getRepairTotal), pgutil.Int8Array(putRepairTotal), + pgutil.Float8Array(atRestTotal)) - return nil - }) - if err != nil { + return Error.Wrap(err) + } + + // Note: we do not need here a transaction because we will "update" the + // columns when we do not update accounting.LastRollup. We will end up + // with partial data in the database, however in the next runs, we will + // try to fix them. + + for len(rollups) > 0 { + batch := rollups + if len(batch) > batchSize { + batch = batch[:batchSize] + } + rollups = rollups[len(batch):] + + if err := insertBatch(ctx, db.db.DB, batch); err != nil { return Error.Wrap(err) } } - return nil + + err = db.db.UpdateNoReturn_AccountingTimestamps_By_Name(ctx, + dbx.AccountingTimestamps_Name(accounting.LastRollup), + dbx.AccountingTimestamps_Update_Fields{ + Value: dbx.AccountingTimestamps_Value(latestRollup), + }, + ) + return Error.Wrap(err) } // LastTimestamp records the greatest last tallied time.