satellite/metabase: add monitoring for objects and segments
Currently it's difficult to gather how many objects and segments are being inserted. Adding separate monitoring counters make this easier. Change-Id: I986cd82f03e99d2aa6fc76028255ee1090d1b294
This commit is contained in:
parent
b17d684f40
commit
5044337440
@ -83,6 +83,8 @@ func (db *DB) BeginObjectNextVersion(ctx context.Context, opts BeginObjectNextVe
|
||||
return -1, Error.New("unable to insert object: %w", err)
|
||||
}
|
||||
|
||||
mon.Meter("object_begin").Mark(1)
|
||||
|
||||
return Version(v), nil
|
||||
}
|
||||
|
||||
@ -150,6 +152,8 @@ func (db *DB) BeginObjectExactVersion(ctx context.Context, opts BeginObjectExact
|
||||
return Object{}, Error.New("unable to insert object: %w", err)
|
||||
}
|
||||
|
||||
mon.Meter("object_begin").Mark(1)
|
||||
|
||||
return object, nil
|
||||
}
|
||||
|
||||
@ -183,7 +187,7 @@ func (db *DB) BeginSegment(ctx context.Context, opts BeginSegment) (err error) {
|
||||
|
||||
// NOTE: these queries could be combined into one.
|
||||
|
||||
return txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) {
|
||||
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) {
|
||||
// Verify that object exists and is partial.
|
||||
var value int
|
||||
err = tx.QueryRow(ctx, `
|
||||
@ -217,6 +221,13 @@ func (db *DB) BeginSegment(ctx context.Context, opts BeginSegment) (err error) {
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
mon.Meter("segment_begin").Mark(1)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// CommitSegment contains all necessary information about the segment.
|
||||
@ -316,6 +327,10 @@ func (db *DB) CommitSegment(ctx context.Context, opts CommitSegment) (err error)
|
||||
}
|
||||
return Error.New("unable to insert segment: %w", err)
|
||||
}
|
||||
|
||||
mon.Meter("segment_commit").Mark(1)
|
||||
mon.IntVal("segment_commit_encrypted_size").Observe(int64(opts.EncryptedSize))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -392,6 +407,10 @@ func (db *DB) CommitInlineSegment(ctx context.Context, opts CommitInlineSegment)
|
||||
}
|
||||
return Error.New("unable to insert segment: %w", err)
|
||||
}
|
||||
|
||||
mon.Meter("segment_commit").Mark(1)
|
||||
mon.IntVal("segment_commit_encrypted_size").Observe(int64(len(opts.InlineData)))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -522,6 +541,11 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
|
||||
if err != nil {
|
||||
return Object{}, err
|
||||
}
|
||||
|
||||
mon.Meter("object_commit").Mark(1)
|
||||
mon.IntVal("object_commit_segments").Observe(int64(object.SegmentCount))
|
||||
mon.IntVal("object_commit_encrypted_size").Observe(object.TotalEncryptedSize)
|
||||
|
||||
return object, nil
|
||||
}
|
||||
|
||||
@ -580,5 +604,7 @@ func (db *DB) UpdateObjectMetadata(ctx context.Context, opts UpdateObjectMetadat
|
||||
)
|
||||
}
|
||||
|
||||
mon.Meter("object_update_metadata").Mark(1)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -146,6 +146,12 @@ func (db *DB) CommitObjectWithSegments(ctx context.Context, opts CommitObjectWit
|
||||
if err != nil {
|
||||
return Object{}, nil, err
|
||||
}
|
||||
|
||||
mon.Meter("object_commit").Mark(1)
|
||||
mon.IntVal("object_commit_segments").Observe(int64(object.SegmentCount))
|
||||
mon.IntVal("object_commit_encrypted_size").Observe(object.TotalEncryptedSize)
|
||||
mon.Meter("segment_delete").Mark(len(deletedSegments))
|
||||
|
||||
return object, deletedSegments, nil
|
||||
}
|
||||
|
||||
|
@ -141,6 +141,9 @@ func (db *DB) DeleteObjectExactVersion(ctx context.Context, opts DeleteObjectExa
|
||||
return DeleteObjectResult{}, storj.ErrObjectNotFound.Wrap(Error.New("no rows deleted"))
|
||||
}
|
||||
|
||||
mon.Meter("object_delete").Mark(len(result.Objects))
|
||||
mon.Meter("segment_delete").Mark(len(result.Segments))
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
@ -210,6 +213,9 @@ func (db *DB) DeletePendingObject(ctx context.Context, opts DeletePendingObject)
|
||||
return DeleteObjectResult{}, storj.ErrObjectNotFound.Wrap(Error.New("no rows deleted"))
|
||||
}
|
||||
|
||||
mon.Meter("object_delete").Mark(len(result.Objects))
|
||||
mon.Meter("segment_delete").Mark(len(result.Segments))
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
@ -314,6 +320,9 @@ func (db *DB) DeleteObjectLatestVersion(ctx context.Context, opts DeleteObjectLa
|
||||
return DeleteObjectResult{}, storj.ErrObjectNotFound.Wrap(Error.New("no rows deleted"))
|
||||
}
|
||||
|
||||
mon.Meter("object_delete").Mark(len(result.Objects))
|
||||
mon.Meter("segment_delete").Mark(len(result.Segments))
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
@ -367,6 +376,9 @@ func (db *DB) DeleteObjectAnyStatusAllVersions(ctx context.Context, opts DeleteO
|
||||
return DeleteObjectResult{}, storj.ErrObjectNotFound.Wrap(Error.New("no rows deleted"))
|
||||
}
|
||||
|
||||
mon.Meter("object_delete").Mark(len(result.Objects))
|
||||
mon.Meter("segment_delete").Mark(len(result.Segments))
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
@ -437,6 +449,10 @@ func (db *DB) DeleteObjectsAllVersions(ctx context.Context, opts DeleteObjectsAl
|
||||
if err != nil {
|
||||
return DeleteObjectResult{}, err
|
||||
}
|
||||
|
||||
mon.Meter("object_delete").Mark(len(result.Objects))
|
||||
mon.Meter("segment_delete").Mark(len(result.Segments))
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
|
@ -67,14 +67,14 @@ func (db *DB) DeleteBucketObjects(ctx context.Context, opts DeleteBucketObjects)
|
||||
RETURNING segments.stream_id, segments.root_piece_id, segments.remote_alias_pieces
|
||||
`
|
||||
default:
|
||||
return deletedObjectCount, Error.New("unhandled database: %v", db.impl)
|
||||
return 0, Error.New("unhandled database: %v", db.impl)
|
||||
}
|
||||
|
||||
// TODO: fix the count for objects without segments
|
||||
|
||||
var deleteSegments []DeletedSegmentInfo
|
||||
for {
|
||||
deleteSegments = deleteSegments[:0]
|
||||
batchDeletedObjects := 0
|
||||
err = withRows(db.db.Query(ctx, query,
|
||||
opts.Bucket.ProjectID, []byte(opts.Bucket.BucketName), batchSize))(func(rows tagsql.Rows) error {
|
||||
ids := map[uuid.UUID]struct{}{} // TODO: avoid map here
|
||||
@ -94,9 +94,14 @@ func (db *DB) DeleteBucketObjects(ctx context.Context, opts DeleteBucketObjects)
|
||||
ids[streamID] = struct{}{}
|
||||
deleteSegments = append(deleteSegments, segment)
|
||||
}
|
||||
batchDeletedObjects = len(ids)
|
||||
deletedObjectCount += int64(len(ids))
|
||||
return nil
|
||||
})
|
||||
|
||||
mon.Meter("object_delete").Mark(batchDeletedObjects)
|
||||
mon.Meter("segment_delete").Mark(len(deleteSegments))
|
||||
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return deletedObjectCount, nil
|
||||
|
@ -196,12 +196,30 @@ func (db *DB) deleteObjectsAndSegments(ctx context.Context, objects []ObjectStre
|
||||
results := conn.SendBatch(ctx, &batch)
|
||||
defer func() { err = errs.Combine(err, results.Close()) }()
|
||||
|
||||
var objectsDeleted, segmentsDeleted int64
|
||||
|
||||
var errlist errs.Group
|
||||
for i := 0; i < batch.Len(); i++ {
|
||||
_, err := results.Exec()
|
||||
result, err := results.Exec()
|
||||
errlist.Add(err)
|
||||
|
||||
switch i % 3 {
|
||||
case 0: // start transcation
|
||||
case 1: // delete objects
|
||||
if err == nil {
|
||||
objectsDeleted += result.RowsAffected()
|
||||
}
|
||||
case 2: // delete segments
|
||||
if err == nil {
|
||||
segmentsDeleted += result.RowsAffected()
|
||||
}
|
||||
case 3: // commit transaction
|
||||
}
|
||||
}
|
||||
|
||||
mon.Meter("object_delete").Mark64(objectsDeleted)
|
||||
mon.Meter("segment_delete").Mark64(segmentsDeleted)
|
||||
|
||||
return errlist.Err()
|
||||
})
|
||||
if err != nil {
|
||||
|
@ -71,6 +71,8 @@ func (db *DB) DeletePart(ctx context.Context, opts DeletePart) (err error) {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
mon.Meter("segment_delete").Mark(len(deleted))
|
||||
|
||||
for _, item := range deleted {
|
||||
deleteInfo := DeletedSegmentInfo{
|
||||
RootPieceID: item.RootPieceID,
|
||||
|
@ -106,5 +106,7 @@ func (db *DB) UpdateSegmentPieces(ctx context.Context, opts UpdateSegmentPieces)
|
||||
return storage.ErrValueChanged.New("segment remote_alias_pieces field was changed")
|
||||
}
|
||||
|
||||
mon.Meter("segment_update").Mark(1)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user