satellite/metrics: allow for multipart objects
We have multipart objects so we may get multiple inline segments sequences or no segments at all for objects. Change-Id: Ic19150efe2ca2b1c60ddd5d30b1317922221c221
This commit is contained in:
parent
adf687aebb
commit
8093c666a6
@ -62,8 +62,8 @@ func (chore *Chore) Run(ctx context.Context) (err error) {
|
||||
return nil
|
||||
}
|
||||
mon.IntVal("remote_dependent_object_count").Observe(chore.Counter.RemoteDependent)
|
||||
mon.IntVal("inline_object_count").Observe(chore.Counter.Inline)
|
||||
mon.IntVal("total_object_count").Observe(chore.Counter.Total)
|
||||
mon.IntVal("inline_object_count").Observe(chore.Counter.InlineObjectCount())
|
||||
mon.IntVal("total_object_count").Observe(chore.Counter.ObjectCount)
|
||||
|
||||
return nil
|
||||
})
|
||||
|
@ -14,10 +14,10 @@ import (
|
||||
//
|
||||
// architecture: Observer
|
||||
type Counter struct {
|
||||
ObjectCount int64
|
||||
RemoteDependent int64
|
||||
Inline int64
|
||||
Total int64
|
||||
streamIDCursor uuid.UUID
|
||||
|
||||
checkObjectRemoteness uuid.UUID
|
||||
}
|
||||
|
||||
// NewCounter instantiates a new counter to be subscribed to the metainfo loop.
|
||||
@ -29,19 +29,8 @@ func NewCounter() *Counter {
|
||||
func (counter *Counter) Object(ctx context.Context, object *metainfo.Object) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
counter.Total++
|
||||
|
||||
if object.SegmentCount == 0 {
|
||||
counter.Inline++
|
||||
return nil
|
||||
}
|
||||
|
||||
if !counter.streamIDCursor.IsZero() {
|
||||
return Error.New("unexpected cursor: wants zero, got %s", counter.streamIDCursor.String())
|
||||
}
|
||||
|
||||
counter.streamIDCursor = object.StreamID
|
||||
|
||||
counter.ObjectCount++
|
||||
counter.checkObjectRemoteness = object.StreamID
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -49,38 +38,20 @@ func (counter *Counter) Object(ctx context.Context, object *metainfo.Object) (er
|
||||
func (counter *Counter) RemoteSegment(ctx context.Context, segment *metainfo.Segment) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if counter.streamIDCursor.IsZero() {
|
||||
return nil
|
||||
if counter.checkObjectRemoteness == segment.StreamID {
|
||||
counter.RemoteDependent++
|
||||
// we need to count this only once
|
||||
counter.checkObjectRemoteness = uuid.UUID{}
|
||||
}
|
||||
|
||||
if counter.streamIDCursor != segment.StreamID {
|
||||
return Error.New("unexpected cursor: wants %s, got %s", segment.StreamID.String(), counter.streamIDCursor.String())
|
||||
}
|
||||
|
||||
counter.RemoteDependent++
|
||||
|
||||
// reset the cursor to ensure we don't count multi-segment objects more than once.
|
||||
counter.streamIDCursor = uuid.UUID{}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// InlineSegment increments the count for inline objects.
|
||||
func (counter *Counter) InlineSegment(ctx context.Context, segment *metainfo.Segment) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if counter.streamIDCursor.IsZero() {
|
||||
return nil
|
||||
}
|
||||
|
||||
if counter.streamIDCursor != segment.StreamID {
|
||||
return Error.New("unexpected cursor: wants %s, got %s", segment.StreamID.String(), counter.streamIDCursor.String())
|
||||
}
|
||||
|
||||
counter.Inline++
|
||||
|
||||
// reset the cursor to ensure we don't count multi-segment objects more than once.
|
||||
counter.streamIDCursor = uuid.UUID{}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// InlineObjectCount returns the count of objects that are inline only.
|
||||
func (counter *Counter) InlineObjectCount() int64 {
|
||||
return counter.ObjectCount - counter.RemoteDependent
|
||||
}
|
||||
|
@ -17,10 +17,7 @@ import (
|
||||
|
||||
func TestCounterInlineAndRemote(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 5, UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: testplanet.ReconfigureRS(3, 4, 5, 5),
|
||||
},
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
satellite := planet.Satellites[0]
|
||||
ul := planet.Uplinks[0]
|
||||
@ -46,15 +43,15 @@ func TestCounterInlineAndRemote(t *testing.T) {
|
||||
}
|
||||
|
||||
metricsChore.Loop.TriggerWait()
|
||||
require.EqualValues(t, 2, metricsChore.Counter.Inline)
|
||||
require.EqualValues(t, 2, metricsChore.Counter.InlineObjectCount())
|
||||
require.EqualValues(t, 2, metricsChore.Counter.RemoteDependent)
|
||||
require.EqualValues(t, 4, metricsChore.Counter.Total)
|
||||
require.EqualValues(t, 4, metricsChore.Counter.ObjectCount)
|
||||
})
|
||||
}
|
||||
|
||||
func TestCounterInlineOnly(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 5, UplinkCount: 1,
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
satellite := planet.Satellites[0]
|
||||
ul := planet.Uplinks[0]
|
||||
@ -70,17 +67,17 @@ func TestCounterInlineOnly(t *testing.T) {
|
||||
}
|
||||
|
||||
metricsChore.Loop.TriggerWait()
|
||||
require.EqualValues(t, 2, metricsChore.Counter.Inline)
|
||||
require.EqualValues(t, 2, metricsChore.Counter.InlineObjectCount())
|
||||
require.EqualValues(t, 0, metricsChore.Counter.RemoteDependent)
|
||||
require.EqualValues(t, 2, metricsChore.Counter.Total)
|
||||
require.EqualValues(t, 2, metricsChore.Counter.ObjectCount)
|
||||
})
|
||||
}
|
||||
|
||||
func TestCounterRemoteOnly(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 5, UplinkCount: 1,
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: testplanet.ReconfigureRS(3, 4, 5, 5),
|
||||
Satellite: testplanet.MaxSegmentSize(16 * memory.KiB),
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
satellite := planet.Satellites[0]
|
||||
@ -88,17 +85,18 @@ func TestCounterRemoteOnly(t *testing.T) {
|
||||
metricsChore := satellite.Metrics.Chore
|
||||
metricsChore.Loop.Pause()
|
||||
|
||||
// upload 2 remote files with 1 segment
|
||||
// upload 2 remote files with multiple segments
|
||||
for i := 0; i < 2; i++ {
|
||||
testData := testrand.Bytes(8 * memory.KiB)
|
||||
testData := testrand.Bytes(32 * memory.KiB)
|
||||
path := "/some/remote/path/" + strconv.Itoa(i)
|
||||
err := ul.Upload(ctx, satellite, "testbucket", path, testData)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
metricsChore.Loop.TriggerWait()
|
||||
require.EqualValues(t, 0, metricsChore.Counter.Inline)
|
||||
t.Log(metricsChore.Counter.ObjectCount, metricsChore.Counter.RemoteDependent)
|
||||
require.EqualValues(t, 0, metricsChore.Counter.InlineObjectCount())
|
||||
require.EqualValues(t, 2, metricsChore.Counter.RemoteDependent)
|
||||
require.EqualValues(t, 2, metricsChore.Counter.Total)
|
||||
require.EqualValues(t, 2, metricsChore.Counter.ObjectCount)
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user