satellite/accounting: Use metabase.AliasPiece with tally observer

We want to eliminate usages of LoopSegmentEntry.Pieces, because it is
costing a lot of cpu time to look up node IDs with every piece of every
segment we read.

In this change, we are eliminating use of LoopSegmentEntry.Pieces in the
node tally observer (both the ranged loop and segments loop variants).
It is not necessary to have a fully resolved nodeID until it is time to
store totals in the database. We can use NodeAliases as the map key
instead, and resolve NodeIDs just before storing totals.

Refs: https://github.com/storj/storj/issues/5622
Change-Id: Iec12aa393072436d7c22cc5a4ae1b63966cbcc18
This commit is contained in:
paul cannon 2023-03-22 13:05:48 -05:00 committed by Storj Robot
parent e750f6f145
commit ae5947327b
11 changed files with 119 additions and 74 deletions

View File

@ -175,7 +175,7 @@ type Usage struct {
// architecture: Database
type StoragenodeAccounting interface {
// SaveTallies records tallies of data at rest
SaveTallies(ctx context.Context, latestTally time.Time, nodeData map[storj.NodeID]float64) error
SaveTallies(ctx context.Context, latestTally time.Time, nodes []storj.NodeID, tallies []float64) error
// GetTallies retrieves all tallies
GetTallies(ctx context.Context) ([]*StoragenodeStorageTally, error)
// GetTalliesSince retrieves all tallies since latestRollup

View File

@ -71,7 +71,7 @@ func TestStorageNodeUsage(t *testing.T) {
// save tallies
for latestTally, tallies := range tallies {
err = accountingDB.SaveTallies(ctx, latestTally, tallies)
err = accountingDB.SaveTallies(ctx, latestTally, tallies.nodeIDs, tallies.totals)
require.NoError(t, err)
}
@ -244,10 +244,15 @@ func createBucketStorageTallies(projectID uuid.UUID) (map[metabase.BucketLocatio
return bucketTallies, expectedTallies, nil
}
type nodesAndTallies struct {
nodeIDs []storj.NodeID
totals []float64
}
// make rollups and tallies for specified nodes and date range.
func makeRollupsAndStorageNodeStorageTallies(nodes []storj.NodeID, start time.Time, days int) (accounting.RollupStats, map[time.Time]map[storj.NodeID]float64, time.Time) {
func makeRollupsAndStorageNodeStorageTallies(nodes []storj.NodeID, start time.Time, days int) (accounting.RollupStats, map[time.Time]nodesAndTallies, time.Time) {
rollups := make(accounting.RollupStats)
tallies := make(map[time.Time]map[storj.NodeID]float64)
tallies := make(map[time.Time]nodesAndTallies)
const (
hours = 12
@ -259,7 +264,15 @@ func makeRollupsAndStorageNodeStorageTallies(nodes []storj.NodeID, start time.Ti
rollups[startDay] = make(map[storj.NodeID]*accounting.Rollup)
}
for _, node := range nodes {
for h := 0; h < hours; h++ {
intervalEndTime := startDay.Add(time.Hour * time.Duration(h))
tallies[intervalEndTime] = nodesAndTallies{
nodeIDs: make([]storj.NodeID, len(nodes)),
totals: make([]float64, len(nodes)),
}
}
for nodeIndex, node := range nodes {
rollup := &accounting.Rollup{
NodeID: node,
StartTime: startDay,
@ -267,12 +280,10 @@ func makeRollupsAndStorageNodeStorageTallies(nodes []storj.NodeID, start time.Ti
for h := 0; h < hours; h++ {
intervalEndTime := startDay.Add(time.Hour * time.Duration(h))
if tallies[intervalEndTime] == nil {
tallies[intervalEndTime] = make(map[storj.NodeID]float64)
}
tallieAtRest := math.Round(testrand.Float64n(1000))
tallies[intervalEndTime][node] = tallieAtRest
tallies[intervalEndTime].nodeIDs[nodeIndex] = node
tallies[intervalEndTime].totals[nodeIndex] = tallieAtRest
rollup.AtRestTotal += tallieAtRest
rollup.IntervalEndTime = intervalEndTime
}

View File

@ -14,6 +14,7 @@ import (
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/segmentloop"
)
@ -32,17 +33,19 @@ type Service struct {
segmentLoop *segmentloop.Service
storagenodeAccountingDB accounting.StoragenodeAccounting
metabaseDB *metabase.DB
nowFn func() time.Time
}
// New creates a new node tally Service.
func New(log *zap.Logger, sdb accounting.StoragenodeAccounting, loop *segmentloop.Service, interval time.Duration) *Service {
func New(log *zap.Logger, sdb accounting.StoragenodeAccounting, mdb *metabase.DB, loop *segmentloop.Service, interval time.Duration) *Service {
return &Service{
log: log,
Loop: sync2.NewCycle(interval),
segmentLoop: loop,
storagenodeAccountingDB: sdb,
metabaseDB: mdb,
nowFn: time.Now,
}
}
@ -106,7 +109,22 @@ func (service *Service) Tally(ctx context.Context) (err error) {
monTally.IntVal("nodetallies.totalsum").Observe(int64(totalSum)) //mon:locked
if len(observer.Node) > 0 {
err = service.storagenodeAccountingDB.SaveTallies(ctx, finishTime, observer.Node)
nodeIDs := make([]storj.NodeID, 0, len(observer.Node))
nodeTotals := make([]float64, 0, len(observer.Node))
nodeAliasMap, err := service.metabaseDB.LatestNodesAliasMap(ctx)
if err != nil {
return Error.Wrap(err)
}
for nodeAlias, total := range observer.Node {
nodeID, ok := nodeAliasMap.Node(nodeAlias)
if !ok {
observer.log.Error("unrecognized node alias in tally", zap.Int32("node-alias", int32(nodeAlias)))
continue
}
nodeIDs = append(nodeIDs, nodeID)
nodeTotals = append(nodeTotals, total)
}
err = service.storagenodeAccountingDB.SaveTallies(ctx, finishTime, nodeIDs, nodeTotals)
if err != nil {
return Error.New("StorageNodeAccounting.SaveTallies failed: %v", err)
}
@ -122,7 +140,7 @@ type Observer struct {
log *zap.Logger
now time.Time
Node map[storj.NodeID]float64
Node map[metabase.NodeAlias]float64
}
// NewObserver returns an segment loop observer that adds up totals for nodes.
@ -131,7 +149,7 @@ func NewObserver(log *zap.Logger, now time.Time) *Observer {
log: log,
now: now,
Node: make(map[storj.NodeID]float64),
Node: make(map[metabase.NodeAlias]float64),
}
}
@ -158,8 +176,8 @@ func (observer *Observer) RemoteSegment(ctx context.Context, segment *segmentloo
pieceSize := float64(segment.EncryptedSize / int32(minimumRequired)) // TODO: Add this as a method to RedundancyScheme
for _, piece := range segment.Pieces {
observer.Node[piece.StorageNode] += pieceSize
for _, piece := range segment.AliasPieces {
observer.Node[piece.Alias] += pieceSize
}
return nil

View File

@ -12,6 +12,7 @@ import (
"storj.io/common/storj"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/rangedloop"
"storj.io/storj/satellite/metabase/segmentloop"
)
@ -27,18 +28,21 @@ type RangedLoopObserver struct {
log *zap.Logger
accounting accounting.StoragenodeAccounting
metabaseDB *metabase.DB
nowFn func() time.Time
lastTallyTime time.Time
Node map[storj.NodeID]float64
Node map[metabase.NodeAlias]float64
}
// NewRangedLoopObserver creates new RangedLoopObserver.
func NewRangedLoopObserver(log *zap.Logger, accounting accounting.StoragenodeAccounting) *RangedLoopObserver {
func NewRangedLoopObserver(log *zap.Logger, accounting accounting.StoragenodeAccounting, metabaseDB *metabase.DB) *RangedLoopObserver {
return &RangedLoopObserver{
log: log,
accounting: accounting,
metabaseDB: metabaseDB,
nowFn: time.Now,
Node: map[storj.NodeID]float64{},
Node: map[metabase.NodeAlias]float64{},
}
}
@ -46,7 +50,7 @@ func NewRangedLoopObserver(log *zap.Logger, accounting accounting.StoragenodeAcc
func (observer *RangedLoopObserver) Start(ctx context.Context, time time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
observer.Node = map[storj.NodeID]float64{}
observer.Node = map[metabase.NodeAlias]float64{}
observer.lastTallyTime, err = observer.accounting.LastTimestamp(ctx, accounting.LastAtRestTally)
if err != nil {
return err
@ -73,8 +77,8 @@ func (observer *RangedLoopObserver) Join(ctx context.Context, partial rangedloop
return Error.New("expected partial type %T but got %T", tallyPartial, partial)
}
for nodeID, val := range tallyPartial.Node {
observer.Node[nodeID] += val
for alias, val := range tallyPartial.Node {
observer.Node[alias] += val
}
return nil
@ -91,16 +95,28 @@ func (observer *RangedLoopObserver) Finish(ctx context.Context) (err error) {
// calculate byte hours, not just bytes
hours := finishTime.Sub(observer.lastTallyTime).Hours()
byteHours := make(map[storj.NodeID]float64)
var totalSum float64
for id, pieceSize := range observer.Node {
nodeIDs := make([]storj.NodeID, 0, len(observer.Node))
byteHours := make([]float64, 0, len(observer.Node))
nodeAliasMap, err := observer.metabaseDB.LatestNodesAliasMap(ctx)
if err != nil {
return err
}
for alias, pieceSize := range observer.Node {
totalSum += pieceSize
byteHours[id] = pieceSize * hours
nodeID, ok := nodeAliasMap.Node(alias)
if !ok {
observer.log.Error("unrecognized node alias in ranged-loop tally", zap.Int32("node-alias", int32(alias)))
continue
}
nodeIDs = append(nodeIDs, nodeID)
byteHours = append(byteHours, pieceSize*hours)
}
monRangedTally.IntVal("nodetallies.totalsum").Observe(int64(totalSum)) //mon:locked
err = observer.accounting.SaveTallies(ctx, finishTime, byteHours)
err = observer.accounting.SaveTallies(ctx, finishTime, nodeIDs, byteHours)
if err != nil {
return Error.New("StorageNodeAccounting.SaveTallies failed: %v", err)
}
@ -118,7 +134,7 @@ type RangedLoopPartial struct {
log *zap.Logger
nowFn func() time.Time
Node map[storj.NodeID]float64
Node map[metabase.NodeAlias]float64
}
// NewRangedLoopPartial creates new node tally ranged loop partial.
@ -126,7 +142,7 @@ func NewRangedLoopPartial(log *zap.Logger, nowFn func() time.Time) *RangedLoopPa
return &RangedLoopPartial{
log: log,
nowFn: nowFn,
Node: map[storj.NodeID]float64{},
Node: map[metabase.NodeAlias]float64{},
}
}
@ -160,7 +176,7 @@ func (partial *RangedLoopPartial) processSegment(now time.Time, segment segmentl
pieceSize := float64(segment.EncryptedSize / int32(minimumRequired)) // TODO: Add this as a method to RedundancyScheme
for _, piece := range segment.Pieces {
partial.Node[piece.StorageNode] += pieceSize
for _, piece := range segment.AliasPieces {
partial.Node[piece.Alias] += pieceSize
}
}

View File

@ -84,8 +84,12 @@ func TestSingleObjectNodeTallyRangedLoop(t *testing.T) {
require.LessOrEqual(t, len(tallies), int(rs.TotalShares))
require.GreaterOrEqual(t, len(tallies), int(rs.OptimalShares))
aliasMap, err := planet.Satellites[0].Metabase.DB.LatestNodesAliasMap(ctx)
require.NoError(t, err)
for _, tally := range tallies {
require.Equal(t, obs.Node[tally.NodeID]*float64(timespanHours), tally.DataTotal)
nodeAlias, ok := aliasMap.Alias(tally.NodeID)
require.Truef(t, ok, "could not get node alias for node %s", tally.NodeID)
require.Equal(t, obs.Node[nodeAlias]*float64(timespanHours), tally.DataTotal)
}
thirdNow := secondNow.Add(2 * time.Hour)
@ -101,8 +105,12 @@ func TestSingleObjectNodeTallyRangedLoop(t *testing.T) {
require.LessOrEqual(t, len(tallies), int(rs.TotalShares))
require.GreaterOrEqual(t, len(tallies), int(rs.OptimalShares))
aliasMap, err = planet.Satellites[0].Metabase.DB.LatestNodesAliasMap(ctx)
require.NoError(t, err)
for _, tally := range tallies {
require.Equal(t, obs.Node[tally.NodeID]*float64(timespanHours), tally.DataTotal)
nodeAlias, ok := aliasMap.Alias(tally.NodeID)
require.Truef(t, ok, "could not get node alias for node %s", tally.NodeID)
require.Equal(t, obs.Node[nodeAlias]*float64(timespanHours), tally.DataTotal)
}
})
}
@ -126,12 +134,10 @@ func TestManyObjectsNodeTallyRangedLoop(t *testing.T) {
// Set previous accounting run timestamp
err := planet.Satellites[0].DB.StoragenodeAccounting().DeleteTalliesBefore(ctx, now.Add(1*time.Second), 5000)
require.NoError(t, err)
err = planet.Satellites[0].DB.StoragenodeAccounting().SaveTallies(ctx, lastTally, map[storj.NodeID]float64{
planet.StorageNodes[0].ID(): 0,
planet.StorageNodes[1].ID(): 0,
planet.StorageNodes[2].ID(): 0,
planet.StorageNodes[3].ID(): 0,
})
err = planet.Satellites[0].DB.StoragenodeAccounting().SaveTallies(ctx, lastTally,
[]storj.NodeID{planet.StorageNodes[0].ID(), planet.StorageNodes[1].ID(), planet.StorageNodes[2].ID(), planet.StorageNodes[3].ID()},
[]float64{0, 0, 0, 0},
)
require.NoError(t, err)
// Setup: create 50KiB of data for the uplink to upload
@ -208,12 +214,10 @@ func TestExpiredObjectsNotCountedInNodeTally(t *testing.T) {
})
lastTally := now.Add(time.Duration(-1 * timespanHours * int(time.Hour)))
err := planet.Satellites[0].DB.StoragenodeAccounting().SaveTallies(ctx, lastTally, map[storj.NodeID]float64{
planet.StorageNodes[0].ID(): 0,
planet.StorageNodes[1].ID(): 0,
planet.StorageNodes[2].ID(): 0,
planet.StorageNodes[3].ID(): 0,
})
err := planet.Satellites[0].DB.StoragenodeAccounting().SaveTallies(ctx, lastTally,
[]storj.NodeID{planet.StorageNodes[0].ID(), planet.StorageNodes[1].ID(), planet.StorageNodes[2].ID(), planet.StorageNodes[3].ID()},
[]float64{0, 0, 0, 0},
)
require.NoError(t, err)
// Setup: create 50KiB of data for the uplink to upload

View File

@ -58,17 +58,19 @@ func TestRollupNoDeletes(t *testing.T) {
initialTime := time.Now().UTC().AddDate(0, 0, -days)
currentTime := initialTime
nodeData := map[storj.NodeID]float64{}
nodeData := make([]storj.NodeID, len(storageNodes))
bwAmount := make([]float64, len(storageNodes))
bwTotals := make(map[storj.NodeID][]int64)
for _, storageNodeID := range storageNodes {
nodeData[storageNodeID] = float64(atRestAmount)
for i, storageNodeID := range storageNodes {
nodeData[i] = storageNodeID
bwAmount[i] = float64(atRestAmount)
bwTotals[storageNodeID] = []int64{putAmount, getAmount, getAuditAmount, getRepairAmount, putRepairAmount}
}
// Create 5 days worth of tally and rollup data.
// Add one additional day of data since the rollup service will truncate data from the most recent day.
for i := 0; i < days+1; i++ {
require.NoError(t, snAccountingDB.SaveTallies(ctx, currentTime, nodeData))
require.NoError(t, snAccountingDB.SaveTallies(ctx, currentTime, nodeData, bwAmount))
require.NoError(t, saveBWPhase3(ctx, ordersDB, bwTotals, currentTime))
require.NoError(t, rollupService.Rollup(ctx))
@ -168,17 +170,19 @@ func TestRollupDeletes(t *testing.T) {
currentTime := initialTime
nodeData := map[storj.NodeID]float64{}
nodeData := make([]storj.NodeID, len(storageNodes))
bwAmount := make([]float64, len(storageNodes))
bwTotals := make(map[storj.NodeID][]int64)
for _, storageNodeID := range storageNodes {
nodeData[storageNodeID] = float64(atRestAmount)
for i, storageNodeID := range storageNodes {
nodeData[i] = storageNodeID
bwAmount[i] = float64(atRestAmount)
bwTotals[storageNodeID] = []int64{putAmount, getAmount, getAuditAmount, getRepairAmount, putRepairAmount}
}
// Create 5 days worth of tally and rollup data.
// Add one additional day of data since the rollup service will truncate data from the most recent day.
for i := 0; i < days+1; i++ {
require.NoError(t, snAccountingDB.SaveTallies(ctx, currentTime, nodeData))
require.NoError(t, snAccountingDB.SaveTallies(ctx, currentTime, nodeData, bwAmount))
require.NoError(t, saveBWPhase3(ctx, ordersDB, bwTotals, currentTime))
// Since the config.Rollup.DeleteTallies is set to true, at the end of the Rollup(),
@ -267,11 +271,11 @@ func TestRollupOldOrders(t *testing.T) {
)
// Phase 1
storageTotalsPhase1 := make(map[storj.NodeID]float64)
storageTotalsPhase1[nodeA.ID()] = float64(AtRestAmount1)
require.NoError(t, snAccountingDB.SaveTallies(ctx, initialTime.Add(2*time.Hour), storageTotalsPhase1))
storageNodesPhase1 := []storj.NodeID{nodeA.ID()}
storageTotalsPhase1 := []float64{AtRestAmount1}
require.NoError(t, snAccountingDB.SaveTallies(ctx, initialTime.Add(2*time.Hour), storageNodesPhase1, storageTotalsPhase1))
// save tallies for the next day too, so that the period we are testing is not truncated by the rollup service.
require.NoError(t, snAccountingDB.SaveTallies(ctx, initialTime.Add(26*time.Hour), storageTotalsPhase1))
require.NoError(t, snAccountingDB.SaveTallies(ctx, initialTime.Add(26*time.Hour), storageNodesPhase1, storageTotalsPhase1))
bwTotalsPhase1 := make(map[storj.NodeID][]int64)
bwTotalsPhase1[nodeA.ID()] = []int64{PutActionAmount1, GetActionAmount1, GetAuditActionAmount1, GetRepairActionAmount1, PutRepairActionAmount1}
@ -298,10 +302,9 @@ func TestRollupOldOrders(t *testing.T) {
require.EqualValues(t, AtRestAmount1, accountingCSVRow.AtRestTotal)
// Phase 2
storageTotalsPhase2 := make(map[storj.NodeID]float64)
storageTotalsPhase2[nodeA.ID()] = float64(AtRestAmount2)
storageTotalsPhase2[nodeB.ID()] = float64(AtRestAmount2)
require.NoError(t, snAccountingDB.SaveTallies(ctx, initialTime.Add(-2*time.Hour), storageTotalsPhase2))
storageNodesPhase2 := []storj.NodeID{nodeA.ID(), nodeB.ID()}
storageTotalsPhase2 := []float64{AtRestAmount2, AtRestAmount2}
require.NoError(t, snAccountingDB.SaveTallies(ctx, initialTime.Add(-2*time.Hour), storageNodesPhase2, storageTotalsPhase2))
bwTotalsPhase2 := make(map[storj.NodeID][]int64)
bwTotalsPhase2[nodeA.ID()] = []int64{PutActionAmount2, GetActionAmount2, GetAuditActionAmount2, GetRepairActionAmount2, PutRepairActionAmount2}

View File

@ -46,12 +46,10 @@ func TestDeleteTalliesBefore(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
nodeData := make(map[storj.NodeID]float64)
nodeData[storj.NodeID{1}] = float64(1000)
nodeData[storj.NodeID{2}] = float64(1000)
nodeData[storj.NodeID{3}] = float64(1000)
nodeIDs := []storj.NodeID{{1}, {2}, {3}}
nodeBWAmounts := []float64{1000, 1000, 1000}
err := planet.Satellites[0].DB.StoragenodeAccounting().SaveTallies(ctx, time.Now(), nodeData)
err := planet.Satellites[0].DB.StoragenodeAccounting().SaveTallies(ctx, time.Now(), nodeIDs, nodeBWAmounts)
require.NoError(t, err)
err = planet.Satellites[0].DB.StoragenodeAccounting().DeleteTalliesBefore(ctx, test.eraseBefore, 1)

View File

@ -456,7 +456,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
if config.Tally.UseRangedLoop {
nodeTallyLog.Info("using ranged loop")
} else {
peer.Accounting.NodeTally = nodetally.New(nodeTallyLog, peer.DB.StoragenodeAccounting(), peer.Metainfo.SegmentLoop, config.Tally.Interval)
peer.Accounting.NodeTally = nodetally.New(nodeTallyLog, peer.DB.StoragenodeAccounting(), peer.Metainfo.Metabase, peer.Metainfo.SegmentLoop, config.Tally.Interval)
peer.Services.Add(lifecycle.Item{
Name: "accounting:nodetally",
Run: peer.Accounting.NodeTally.Run,

View File

@ -406,6 +406,7 @@ func TestAllInOne(t *testing.T) {
metrics.NewObserver(),
nodetally.NewRangedLoopObserver(log.Named("accounting:nodetally"),
satellite.DB.StoragenodeAccounting(),
satellite.Metabase.DB,
),
audit.NewObserver(log.Named("audit"),
satellite.DB.VerifyQueue(),

View File

@ -124,7 +124,8 @@ func NewRangedLoop(log *zap.Logger, db DB, metabaseDB *metabase.DB, config *Conf
{ // setup node tally observer
peer.Accounting.NodeTallyObserver = nodetally.NewRangedLoopObserver(
log.Named("accounting:nodetally"),
db.StoragenodeAccounting())
db.StoragenodeAccounting(),
metabaseDB)
}
{ // setup overlay

View File

@ -25,16 +25,9 @@ type StoragenodeAccounting struct {
}
// SaveTallies records raw tallies of at rest data to the database.
func (db *StoragenodeAccounting) SaveTallies(ctx context.Context, latestTally time.Time, nodeData map[storj.NodeID]float64) (err error) {
func (db *StoragenodeAccounting) SaveTallies(ctx context.Context, latestTally time.Time, nodeIDs []storj.NodeID, totals []float64) (err error) {
defer mon.Task()(&ctx)(&err)
var nodeIDs []storj.NodeID
var totals []float64
for id, total := range nodeData {
nodeIDs = append(nodeIDs, id)
totals = append(totals, total)
}
err = db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
_, err = tx.Tx.ExecContext(ctx, db.db.Rebind(`
INSERT INTO storagenode_storage_tallies (