diff --git a/satellite/metabase/rangedloop/rangedlooptest/provider.go b/satellite/metabase/rangedloop/rangedlooptest/provider.go index fdd09a10d..9147cddee 100644 --- a/satellite/metabase/rangedloop/rangedlooptest/provider.go +++ b/satellite/metabase/rangedloop/rangedlooptest/provider.go @@ -5,7 +5,6 @@ package rangedlooptest import ( "context" - "fmt" "math" "sort" @@ -74,9 +73,6 @@ func min(x, y int) int { func streamsFromSegments(segments []segmentloop.Segment) [][]segmentloop.Segment { // Duplicate and sort the segments by stream ID segments = append([]segmentloop.Segment(nil), segments...) - for i, segment := range segments { - fmt.Println("BEFORE:", i, segment.StreamID, segment.Position) - } sort.Slice(segments, func(i int, j int) bool { idcmp := segments[i].StreamID.Compare(segments[j].StreamID) switch { @@ -88,9 +84,6 @@ func streamsFromSegments(segments []segmentloop.Segment) [][]segmentloop.Segment return segments[i].Position.Less(segments[j].Position) } }) - for i, segment := range segments { - fmt.Println("AFTER:", i, segment.StreamID, segment.Position) - } // Break up the sorted segments into streams var streams [][]segmentloop.Segment var stream []segmentloop.Segment @@ -107,13 +100,6 @@ func streamsFromSegments(segments []segmentloop.Segment) [][]segmentloop.Segment if len(stream) > 0 { streams = append(streams, stream) } - - for i, stream := range streams { - for j, segment := range stream { - fmt.Println("STREAM:", i, j, segment.StreamID, segment.Position) - } - } - return streams } diff --git a/satellite/metrics/metrics.go b/satellite/metrics/metrics.go new file mode 100644 index 000000000..b5c985d53 --- /dev/null +++ b/satellite/metrics/metrics.go @@ -0,0 +1,40 @@ +// Copyright (C) 2022 Storj Labs, Inc. +// See LICENSE for copying information. + +package metrics + +// Metrics represents the metrics that are tracked by this package. +type Metrics struct { + // RemoteObjects is the count of objects with at least one remote segment. + RemoteObjects int64 + + // InlineObjects is the count of objects with only inline segments. + InlineObjects int64 + + // TotalInlineBytes is the amount of bytes across all inline segments. + TotalInlineBytes int64 + + // TotalRemoteBytes is the amount of bytes across all remote segments. + TotalRemoteBytes int64 + + // TotalInlineSegments is the count of inline segments across all objects. + TotalInlineSegments int64 + + // TotalRemoteSegments is the count of remote segments across all objects. + TotalRemoteSegments int64 +} + +// Reset resets the invidual metrics back to zero. +func (metrics *Metrics) Reset() { + *metrics = Metrics{} +} + +// Aggregate aggregates the given metrics into the receiver. +func (metrics *Metrics) Aggregate(partial Metrics) { + metrics.RemoteObjects += partial.RemoteObjects + metrics.InlineObjects += partial.InlineObjects + metrics.TotalInlineBytes += partial.TotalInlineBytes + metrics.TotalRemoteBytes += partial.TotalRemoteBytes + metrics.TotalInlineSegments += partial.TotalInlineSegments + metrics.TotalRemoteSegments += partial.TotalRemoteSegments +} diff --git a/satellite/metrics/observer.go b/satellite/metrics/observer.go new file mode 100644 index 000000000..729364e67 --- /dev/null +++ b/satellite/metrics/observer.go @@ -0,0 +1,126 @@ +// Copyright (C) 2022 Storj Labs, Inc. +// See LICENSE for copying information. + +package metrics + +import ( + "context" + "time" + + "storj.io/common/uuid" + "storj.io/storj/satellite/metabase/rangedloop" + "storj.io/storj/satellite/metabase/segmentloop" +) + +// Observer implements the ranged segment loop observer interface for data +// science metrics collection. +type Observer struct { + metrics Metrics +} + +var _ rangedloop.Observer = (*Observer)(nil) + +// NewObserver instantiates a new rangedloop observer which aggregates +// object statistics from observed segments. +func NewObserver() *Observer { + return &Observer{} +} + +// Start implements the Observer method of the same name by resetting the +// aggregated metrics. +func (obs *Observer) Start(ctx context.Context, startTime time.Time) error { + obs.metrics.Reset() + return nil +} + +// Fork implements the Observer method of the same name by returning a partial +// implementation that aggregates metrics about the observed segments/streams. +// These metrics will be aggregated into the observed totals during Join. +func (obs *Observer) Fork(ctx context.Context) (rangedloop.Partial, error) { + return &observerFork{}, nil +} + +// Join aggregates the partial metrics. +func (obs *Observer) Join(ctx context.Context, partial rangedloop.Partial) error { + fork, ok := partial.(*observerFork) + if !ok { + return Error.New("expected %T but got %T", fork, partial) + } + + // Flushing to count the stats for the last observed stream. + fork.Flush() + obs.metrics.Aggregate(fork.totals) + return nil +} + +// Finish emits the aggregated metrics. +func (obs *Observer) Finish(ctx context.Context) error { + mon.IntVal("remote_dependent_object_count").Observe(obs.metrics.RemoteObjects) + mon.IntVal("inline_object_count").Observe(obs.metrics.InlineObjects) + + mon.IntVal("total_inline_bytes").Observe(obs.metrics.TotalInlineBytes) //mon:locked + mon.IntVal("total_remote_bytes").Observe(obs.metrics.TotalRemoteBytes) //mon:locked + + mon.IntVal("total_inline_segments").Observe(obs.metrics.TotalInlineSegments) //mon:locked + mon.IntVal("total_remote_segments").Observe(obs.metrics.TotalRemoteSegments) //mon:locked + return nil +} + +// TestingMetrics returns the accumulated metrics. It is intended to be called +// from tests. +func (obs *Observer) TestingMetrics() Metrics { + return obs.metrics +} + +type observerFork struct { + totals Metrics + stream streamMetrics + streamID uuid.UUID +} + +// Process aggregates metrics about a range of metrics provided by the +// segment ranged loop. +func (fork *observerFork) Process(ctx context.Context, segments []segmentloop.Segment) error { + for _, segment := range segments { + if fork.streamID != segment.StreamID { + // Stream ID has changed. Flush what we have so far. + fork.Flush() + fork.streamID = segment.StreamID + } + if segment.Inline() { + fork.stream.inlineSegments++ + fork.stream.inlineBytes += int64(segment.EncryptedSize) + } else { + fork.stream.remoteSegments++ + fork.stream.remoteBytes += int64(segment.EncryptedSize) + } + } + return nil +} + +// Flush is called whenever a new stream is observed and when the fork is +// joined to aggregate the accumulated stream stats into the totals. +func (fork *observerFork) Flush() { + fork.totals.TotalInlineSegments += fork.stream.inlineSegments + fork.totals.TotalRemoteSegments += fork.stream.remoteSegments + fork.totals.TotalInlineBytes += fork.stream.inlineBytes + fork.totals.TotalRemoteBytes += fork.stream.remoteBytes + if fork.stream.remoteSegments > 0 { + // At least one remote segment was found for this stream so classify + // as a remote object. + fork.totals.RemoteObjects++ + } else if fork.stream.inlineSegments > 0 { + // Only count an inline object if there is at least one inline segment + // and no remote segments. + fork.totals.InlineObjects++ + } + fork.stream = streamMetrics{} +} + +// streamMetrics tracks the metrics for an individual stream. +type streamMetrics struct { + remoteSegments int64 + remoteBytes int64 + inlineSegments int64 + inlineBytes int64 +} diff --git a/satellite/metrics/observer_test.go b/satellite/metrics/observer_test.go new file mode 100644 index 000000000..a9337d647 --- /dev/null +++ b/satellite/metrics/observer_test.go @@ -0,0 +1,100 @@ +// Copyright (C) 2022 Storj Labs, Inc. +// See LICENSE for copying information. + +package metrics + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "storj.io/common/testcontext" + "storj.io/common/uuid" + "storj.io/storj/satellite/metabase" + "storj.io/storj/satellite/metabase/rangedloop" + "storj.io/storj/satellite/metabase/rangedloop/rangedlooptest" + "storj.io/storj/satellite/metabase/segmentloop" +) + +var ( + inline1 = []segmentloop.Segment{ + {StreamID: uuid.UUID{1}, EncryptedSize: 10}, + } + remote2 = []segmentloop.Segment{ + {StreamID: uuid.UUID{2}, EncryptedSize: 16, Pieces: metabase.Pieces{{}}}, + {StreamID: uuid.UUID{2}, EncryptedSize: 10}, + } + remote3 = []segmentloop.Segment{ + {StreamID: uuid.UUID{3}, EncryptedSize: 16, Pieces: metabase.Pieces{{}}}, + {StreamID: uuid.UUID{3}, EncryptedSize: 16, Pieces: metabase.Pieces{{}}}, + {StreamID: uuid.UUID{3}, EncryptedSize: 16, Pieces: metabase.Pieces{{}}}, + {StreamID: uuid.UUID{3}, EncryptedSize: 10}, + } +) + +func TestObserver(t *testing.T) { + ctx := testcontext.New(t) + + loop := func(tb testing.TB, obs *Observer, streams ...[]segmentloop.Segment) Metrics { + service := rangedloop.NewService( + zap.NewNop(), + rangedloop.Config{BatchSize: 2, Parallelism: 2}, + &rangedlooptest.RangeSplitter{Segments: combineSegments(streams...)}, + []rangedloop.Observer{obs}) + err := service.RunOnce(ctx) + require.NoError(tb, err) + return obs.TestingMetrics() + } + + t.Run("stats aggregation", func(t *testing.T) { + obs := NewObserver() + + metrics := loop(t, obs, inline1, remote2, remote3) + + require.Equal(t, Metrics{ + InlineObjects: 1, + RemoteObjects: 2, + TotalInlineSegments: 3, + TotalRemoteSegments: 4, + TotalInlineBytes: 30, + TotalRemoteBytes: 64, + }, metrics) + }) + + t.Run("stats reset by start", func(t *testing.T) { + obs := NewObserver() + + _ = loop(t, obs, inline1) + + // Any metrics gathered during the first loop should be dropped. + metrics := loop(t, obs, remote3) + + require.Equal(t, Metrics{ + InlineObjects: 0, + RemoteObjects: 1, + TotalInlineSegments: 1, + TotalRemoteSegments: 3, + TotalInlineBytes: 10, + TotalRemoteBytes: 48, + }, metrics) + }) + + t.Run("join fails gracefully on bad partial type", func(t *testing.T) { + type wrongPartial struct{ rangedloop.Partial } + obs := NewObserver() + err := obs.Start(ctx, time.Time{}) + require.NoError(t, err) + err = obs.Join(ctx, wrongPartial{}) + require.EqualError(t, err, "metrics: expected *metrics.observerFork but got metrics.wrongPartial") + }) +} + +func combineSegments(ss ...[]segmentloop.Segment) []segmentloop.Segment { + var combined []segmentloop.Segment + for _, s := range ss { + combined = append(combined, s...) + } + return combined +}