storagenode/pieces: more granular io and hashing statistic

This patch adds two new monkit metric:
 * piece_writer_io: the sum of the time, which is spent with io.Write during a piece upload (excluding the fs sync of the commit)
 * piece_writer_hash: the sum of the time, which is spent with hashing

The second is especially important. My storagenode (hosted on a cloud server) spend ~30 ms on hasing data, piece_write_io time is usually 5ms for me.

These metrics can help us to identify the reason of slownes on storagenode sides.

Both of these depend on the size of the piece. To make it more meaningfull without exploding the cardinality, I created a few size categories and classified the pieces based on these. Measurements shows that it can provide usefull results (>2MB uploads are usually 23-28 ms).

Change-Id: Ifa1c205a490046655bcc34891003e7b43ed9c0bc
This commit is contained in:
Márton Elek 2023-09-28 12:39:37 +02:00 committed by Elek, Márton
parent 6308da2cc0
commit 9186365507
3 changed files with 224 additions and 2 deletions

View File

@ -88,9 +88,9 @@ func NewWriter(log *zap.Logger, blobWriter blobstore.BlobWriter, blobs blobstore
return nil, Error.Wrap(err)
}
}
w.blob = blobWriter
w.blob = MonitorBlobWriter("pieces_writer_io", blobWriter)
w.hash = pb.NewHashFromAlgorithm(hashAlgorithm)
w.hash = MonitorHash("pieces_writer_hash", pb.NewHashFromAlgorithm(hashAlgorithm))
w.blobs = blobs
w.satellite = satellite

142
storagenode/pieces/stat.go Normal file
View File

@ -0,0 +1,142 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package pieces
import (
"context"
"hash"
"time"
"github.com/spacemonkeygo/monkit/v3"
"storj.io/storj/storagenode/blobstore"
)
var sizes = sizeCategory()
var canceledTag = monkit.SeriesTag{
Key: "cancelled",
Val: "true",
}
var committedTag = monkit.SeriesTag{
Key: "cancelled",
Val: "false",
}
// MonitoredBlobWriter is a blobstore.BlobWriter wrapper with additional Monkit metrics.
type MonitoredBlobWriter struct {
name string
blobstore.BlobWriter
writeTime time.Duration
failed bool
writtenBytes int
}
// MonitorBlobWriter wraps the original BlobWriter and measures writing time.
func MonitorBlobWriter(name string, writer blobstore.BlobWriter) blobstore.BlobWriter {
return &MonitoredBlobWriter{
name: name,
BlobWriter: writer,
}
}
// Write implements io.Write.
func (m *MonitoredBlobWriter) Write(p []byte) (n int, err error) {
start := time.Now()
n, err = m.BlobWriter.Write(p)
m.writtenBytes += n
m.writeTime += time.Since(start)
if err != nil {
m.failed = true
}
return n, err
}
// Cancel implements io.Write.
func (m *MonitoredBlobWriter) Cancel(ctx context.Context) error {
err := m.BlobWriter.Cancel(ctx)
mon.DurationVal(m.name, canceledTag, sizes(m.writtenBytes)).Observe(m.writeTime)
return err
}
// Commit implements io.Commit.
func (m *MonitoredBlobWriter) Commit(ctx context.Context) error {
err := m.BlobWriter.Commit(ctx)
mon.DurationVal(m.name, committedTag, sizes(m.writtenBytes)).Observe(m.writeTime)
return err
}
// MonitoredHash is a hash.Hash wrapper with additional Monkit metrics.
type MonitoredHash struct {
name string
hash.Hash
writeTime time.Duration
writtenBytes int
}
// MonitorHash wraps the original Hash with an instance which also measures the hashing time.
func MonitorHash(name string, hash hash.Hash) hash.Hash {
return &MonitoredHash{
name: name,
Hash: hash,
}
}
// Sum implements hash.Hash.
func (m *MonitoredHash) Sum(b []byte) []byte {
start := time.Now()
sum := m.Hash.Sum(b)
m.writeTime += time.Since(start)
if m.writtenBytes > 0 {
mon.DurationVal(m.name, sizes(m.writtenBytes)).Observe(m.writeTime)
}
m.writeTime = 0
m.writtenBytes = 0
return sum
}
// Reset implements hash.Hash.
func (m *MonitoredHash) Reset() {
m.Hash.Reset()
m.writeTime = 0
m.writtenBytes = 0
}
// Write implements io.Writer.
func (m *MonitoredHash) Write(p []byte) (n int, err error) {
start := time.Now()
n, err = m.Hash.Write(p)
m.writtenBytes += n
m.writeTime += time.Since(start)
return n, err
}
func sizeCategory() func(int) monkit.SeriesTag {
var s2m = monkit.NewSeriesTag("size", "2m")
var s1m = monkit.NewSeriesTag("size", "1m")
var s500k = monkit.NewSeriesTag("size", "500k")
var s50k = monkit.NewSeriesTag("size", "50k")
var small = monkit.NewSeriesTag("size", "small")
return func(byteNo int) monkit.SeriesTag {
if byteNo > 2000000 {
return s2m
}
if byteNo > 1000000 {
return s1m
}
if byteNo > 500000 {
return s500k
}
if byteNo > 50000 {
return s50k
}
return small
}
}

View File

@ -0,0 +1,80 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package pieces
import (
"crypto/sha256"
"encoding/hex"
"testing"
"github.com/spacemonkeygo/monkit/v3"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"storj.io/common/testcontext"
"storj.io/storj/storagenode/blobstore"
"storj.io/storj/storagenode/blobstore/filestore"
)
func TestMonitoredBlobWriter(t *testing.T) {
dir, err := filestore.NewDir(zaptest.NewLogger(t), t.TempDir())
require.NoError(t, err)
blobs := filestore.New(zaptest.NewLogger(t), dir, filestore.Config{})
defer func() { require.NoError(t, blobs.Close()) }()
ctx := testcontext.New(t)
f1, err := blobs.Create(ctx, blobstore.BlobRef{
Namespace: []byte("test"),
Key: []byte("key"),
}, 10)
require.NoError(t, err)
statName := "test_monitored_blob_writer"
m1 := MonitorBlobWriter(statName, f1)
_, err = m1.Write([]byte("01234"))
require.NoError(t, err)
_, err = m1.Write([]byte("56789"))
require.NoError(t, err)
err = m1.Commit(ctx)
require.NoError(t, err)
found := false
mon.Stats(func(key monkit.SeriesKey, field string, val float64) {
if key.Measurement == statName && field == "count" {
found = true
require.Equal(t, float64(1), val)
require.Equal(t, "small", key.Tags.Get("size"))
}
})
require.True(t, found)
}
func TestMonitoredHash(t *testing.T) {
statName := "test_monitored_hash"
m1 := MonitorHash(statName, sha256.New())
_, err := m1.Write([]byte("01234"))
require.NoError(t, err)
_, err = m1.Write([]byte("56789"))
require.NoError(t, err)
hash := m1.Sum([]byte{})
require.NoError(t, err)
require.Equal(t, "84d89877f0d4041efb6bf91a16f0248f2fd573e6af05c19f96bedb9f882f7882", hex.EncodeToString(hash))
found := false
mon.Stats(func(key monkit.SeriesKey, field string, val float64) {
if key.Measurement == statName && field == "count" {
found = true
require.Equal(t, float64(1), val)
require.Equal(t, "small", key.Tags.Get("size"))
}
})
require.True(t, found)
}