satellite/metrics: provide a rangedloop observer

https://github.com/storj/storj/issues/5236

Change-Id: Ic1ed7a5533dccacd58285b64579dbdd6210de4f9
This commit is contained in:
Andrew Harding 2022-12-05 15:23:13 -07:00
parent 633ab8dcf6
commit b562cbf98f
4 changed files with 266 additions and 14 deletions

View File

@ -5,7 +5,6 @@ package rangedlooptest
import (
"context"
"fmt"
"math"
"sort"
@ -74,9 +73,6 @@ func min(x, y int) int {
func streamsFromSegments(segments []segmentloop.Segment) [][]segmentloop.Segment {
// Duplicate and sort the segments by stream ID
segments = append([]segmentloop.Segment(nil), segments...)
for i, segment := range segments {
fmt.Println("BEFORE:", i, segment.StreamID, segment.Position)
}
sort.Slice(segments, func(i int, j int) bool {
idcmp := segments[i].StreamID.Compare(segments[j].StreamID)
switch {
@ -88,9 +84,6 @@ func streamsFromSegments(segments []segmentloop.Segment) [][]segmentloop.Segment
return segments[i].Position.Less(segments[j].Position)
}
})
for i, segment := range segments {
fmt.Println("AFTER:", i, segment.StreamID, segment.Position)
}
// Break up the sorted segments into streams
var streams [][]segmentloop.Segment
var stream []segmentloop.Segment
@ -107,13 +100,6 @@ func streamsFromSegments(segments []segmentloop.Segment) [][]segmentloop.Segment
if len(stream) > 0 {
streams = append(streams, stream)
}
for i, stream := range streams {
for j, segment := range stream {
fmt.Println("STREAM:", i, j, segment.StreamID, segment.Position)
}
}
return streams
}

View File

@ -0,0 +1,40 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package metrics
// Metrics represents the metrics that are tracked by this package.
type Metrics struct {
// RemoteObjects is the count of objects with at least one remote segment.
RemoteObjects int64
// InlineObjects is the count of objects with only inline segments.
InlineObjects int64
// TotalInlineBytes is the amount of bytes across all inline segments.
TotalInlineBytes int64
// TotalRemoteBytes is the amount of bytes across all remote segments.
TotalRemoteBytes int64
// TotalInlineSegments is the count of inline segments across all objects.
TotalInlineSegments int64
// TotalRemoteSegments is the count of remote segments across all objects.
TotalRemoteSegments int64
}
// Reset resets the invidual metrics back to zero.
func (metrics *Metrics) Reset() {
*metrics = Metrics{}
}
// Aggregate aggregates the given metrics into the receiver.
func (metrics *Metrics) Aggregate(partial Metrics) {
metrics.RemoteObjects += partial.RemoteObjects
metrics.InlineObjects += partial.InlineObjects
metrics.TotalInlineBytes += partial.TotalInlineBytes
metrics.TotalRemoteBytes += partial.TotalRemoteBytes
metrics.TotalInlineSegments += partial.TotalInlineSegments
metrics.TotalRemoteSegments += partial.TotalRemoteSegments
}

View File

@ -0,0 +1,126 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package metrics
import (
"context"
"time"
"storj.io/common/uuid"
"storj.io/storj/satellite/metabase/rangedloop"
"storj.io/storj/satellite/metabase/segmentloop"
)
// Observer implements the ranged segment loop observer interface for data
// science metrics collection.
type Observer struct {
metrics Metrics
}
var _ rangedloop.Observer = (*Observer)(nil)
// NewObserver instantiates a new rangedloop observer which aggregates
// object statistics from observed segments.
func NewObserver() *Observer {
return &Observer{}
}
// Start implements the Observer method of the same name by resetting the
// aggregated metrics.
func (obs *Observer) Start(ctx context.Context, startTime time.Time) error {
obs.metrics.Reset()
return nil
}
// Fork implements the Observer method of the same name by returning a partial
// implementation that aggregates metrics about the observed segments/streams.
// These metrics will be aggregated into the observed totals during Join.
func (obs *Observer) Fork(ctx context.Context) (rangedloop.Partial, error) {
return &observerFork{}, nil
}
// Join aggregates the partial metrics.
func (obs *Observer) Join(ctx context.Context, partial rangedloop.Partial) error {
fork, ok := partial.(*observerFork)
if !ok {
return Error.New("expected %T but got %T", fork, partial)
}
// Flushing to count the stats for the last observed stream.
fork.Flush()
obs.metrics.Aggregate(fork.totals)
return nil
}
// Finish emits the aggregated metrics.
func (obs *Observer) Finish(ctx context.Context) error {
mon.IntVal("remote_dependent_object_count").Observe(obs.metrics.RemoteObjects)
mon.IntVal("inline_object_count").Observe(obs.metrics.InlineObjects)
mon.IntVal("total_inline_bytes").Observe(obs.metrics.TotalInlineBytes) //mon:locked
mon.IntVal("total_remote_bytes").Observe(obs.metrics.TotalRemoteBytes) //mon:locked
mon.IntVal("total_inline_segments").Observe(obs.metrics.TotalInlineSegments) //mon:locked
mon.IntVal("total_remote_segments").Observe(obs.metrics.TotalRemoteSegments) //mon:locked
return nil
}
// TestingMetrics returns the accumulated metrics. It is intended to be called
// from tests.
func (obs *Observer) TestingMetrics() Metrics {
return obs.metrics
}
type observerFork struct {
totals Metrics
stream streamMetrics
streamID uuid.UUID
}
// Process aggregates metrics about a range of metrics provided by the
// segment ranged loop.
func (fork *observerFork) Process(ctx context.Context, segments []segmentloop.Segment) error {
for _, segment := range segments {
if fork.streamID != segment.StreamID {
// Stream ID has changed. Flush what we have so far.
fork.Flush()
fork.streamID = segment.StreamID
}
if segment.Inline() {
fork.stream.inlineSegments++
fork.stream.inlineBytes += int64(segment.EncryptedSize)
} else {
fork.stream.remoteSegments++
fork.stream.remoteBytes += int64(segment.EncryptedSize)
}
}
return nil
}
// Flush is called whenever a new stream is observed and when the fork is
// joined to aggregate the accumulated stream stats into the totals.
func (fork *observerFork) Flush() {
fork.totals.TotalInlineSegments += fork.stream.inlineSegments
fork.totals.TotalRemoteSegments += fork.stream.remoteSegments
fork.totals.TotalInlineBytes += fork.stream.inlineBytes
fork.totals.TotalRemoteBytes += fork.stream.remoteBytes
if fork.stream.remoteSegments > 0 {
// At least one remote segment was found for this stream so classify
// as a remote object.
fork.totals.RemoteObjects++
} else if fork.stream.inlineSegments > 0 {
// Only count an inline object if there is at least one inline segment
// and no remote segments.
fork.totals.InlineObjects++
}
fork.stream = streamMetrics{}
}
// streamMetrics tracks the metrics for an individual stream.
type streamMetrics struct {
remoteSegments int64
remoteBytes int64
inlineSegments int64
inlineBytes int64
}

View File

@ -0,0 +1,100 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package metrics
import (
"testing"
"time"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"storj.io/common/testcontext"
"storj.io/common/uuid"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/rangedloop"
"storj.io/storj/satellite/metabase/rangedloop/rangedlooptest"
"storj.io/storj/satellite/metabase/segmentloop"
)
var (
inline1 = []segmentloop.Segment{
{StreamID: uuid.UUID{1}, EncryptedSize: 10},
}
remote2 = []segmentloop.Segment{
{StreamID: uuid.UUID{2}, EncryptedSize: 16, Pieces: metabase.Pieces{{}}},
{StreamID: uuid.UUID{2}, EncryptedSize: 10},
}
remote3 = []segmentloop.Segment{
{StreamID: uuid.UUID{3}, EncryptedSize: 16, Pieces: metabase.Pieces{{}}},
{StreamID: uuid.UUID{3}, EncryptedSize: 16, Pieces: metabase.Pieces{{}}},
{StreamID: uuid.UUID{3}, EncryptedSize: 16, Pieces: metabase.Pieces{{}}},
{StreamID: uuid.UUID{3}, EncryptedSize: 10},
}
)
func TestObserver(t *testing.T) {
ctx := testcontext.New(t)
loop := func(tb testing.TB, obs *Observer, streams ...[]segmentloop.Segment) Metrics {
service := rangedloop.NewService(
zap.NewNop(),
rangedloop.Config{BatchSize: 2, Parallelism: 2},
&rangedlooptest.RangeSplitter{Segments: combineSegments(streams...)},
[]rangedloop.Observer{obs})
err := service.RunOnce(ctx)
require.NoError(tb, err)
return obs.TestingMetrics()
}
t.Run("stats aggregation", func(t *testing.T) {
obs := NewObserver()
metrics := loop(t, obs, inline1, remote2, remote3)
require.Equal(t, Metrics{
InlineObjects: 1,
RemoteObjects: 2,
TotalInlineSegments: 3,
TotalRemoteSegments: 4,
TotalInlineBytes: 30,
TotalRemoteBytes: 64,
}, metrics)
})
t.Run("stats reset by start", func(t *testing.T) {
obs := NewObserver()
_ = loop(t, obs, inline1)
// Any metrics gathered during the first loop should be dropped.
metrics := loop(t, obs, remote3)
require.Equal(t, Metrics{
InlineObjects: 0,
RemoteObjects: 1,
TotalInlineSegments: 1,
TotalRemoteSegments: 3,
TotalInlineBytes: 10,
TotalRemoteBytes: 48,
}, metrics)
})
t.Run("join fails gracefully on bad partial type", func(t *testing.T) {
type wrongPartial struct{ rangedloop.Partial }
obs := NewObserver()
err := obs.Start(ctx, time.Time{})
require.NoError(t, err)
err = obs.Join(ctx, wrongPartial{})
require.EqualError(t, err, "metrics: expected *metrics.observerFork but got metrics.wrongPartial")
})
}
func combineSegments(ss ...[]segmentloop.Segment) []segmentloop.Segment {
var combined []segmentloop.Segment
for _, s := range ss {
combined = append(combined, s...)
}
return combined
}