satellite/metainfo: collect uplink versions with monkit

This change adds metrics for uplink version and method which was used by this version.

Change-Id: I05fc425a024805cfcd6d8add810a2a0749405f4f
This commit is contained in:
Michał Niewrzał 2021-02-09 23:40:23 +01:00
parent 9cfaba2c5d
commit dc2bec9f89
5 changed files with 159 additions and 9 deletions

4
go.mod
View File

@ -44,9 +44,9 @@ require (
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e
google.golang.org/api v0.20.0 // indirect
google.golang.org/protobuf v1.25.0 // indirect
storj.io/common v0.0.0-20210204114159-106c992bbc05
storj.io/common v0.0.0-20210208122718-577b1f8a0a0f
storj.io/drpc v0.0.16
storj.io/monkit-jaeger v0.0.0-20210205021559-85f08034688c
storj.io/private v0.0.0-20210203200143-9d2ec06f0d3c
storj.io/uplink v1.4.6-0.20210204154407-ab245b221ef3
storj.io/uplink v1.4.6-0.20210209192332-b38c7a40bcd4
)

10
go.sum
View File

@ -936,10 +936,8 @@ sourcegraph.com/sourcegraph/go-diff v0.5.0/go.mod h1:kuch7UrkMzY0X+p9CRK03kfuPQ2
sourcegraph.com/sqs/pbtypes v0.0.0-20180604144634-d3ebe8f20ae4/go.mod h1:ketZ/q3QxT9HOBeFhu6RdvsftgpsbFHBF5Cas6cDKZ0=
storj.io/common v0.0.0-20200424175742-65ac59022f4f/go.mod h1:pZyXiIE7bGETIRXtfs0nICqMwp7PM8HqnDuyUeldNA0=
storj.io/common v0.0.0-20201026135900-1aaeec90670b/go.mod h1:GqdmNf3fLm2UZX/7Zr0BLFCJ4gFjgm6eHrk/fnmr5jQ=
storj.io/common v0.0.0-20210204114159-106c992bbc05 h1:2FMeP4zBLcRueApRPEhkrPkNI9FifmTPln3orfX/CBo=
storj.io/common v0.0.0-20210204114159-106c992bbc05 h1:2FMeP4zBLcRueApRPEhkrPkNI9FifmTPln3orfX/CBo=
storj.io/common v0.0.0-20210204114159-106c992bbc05/go.mod h1:b8XP/TdW8OyTZ/J2BDFOIE9KojSUNZgImBFZI99zS04=
storj.io/common v0.0.0-20210204114159-106c992bbc05/go.mod h1:b8XP/TdW8OyTZ/J2BDFOIE9KojSUNZgImBFZI99zS04=
storj.io/common v0.0.0-20210208122718-577b1f8a0a0f h1:O2/ia55Q/xhMBJ/WgeTQBEST7h8IWXZE4FEQyiM+RYc=
storj.io/common v0.0.0-20210208122718-577b1f8a0a0f/go.mod h1:b8XP/TdW8OyTZ/J2BDFOIE9KojSUNZgImBFZI99zS04=
storj.io/drpc v0.0.11/go.mod h1:TiFc2obNjL9/3isMW1Rpxjy8V9uE0B2HMeMFGiiI7Iw=
storj.io/drpc v0.0.14/go.mod h1:82nfl+6YwRwF6UG31cEWWUqv/FaKvP5SGqUvoqTxCMA=
storj.io/drpc v0.0.16 h1:9sxypc5lKi/0D69cR21BR0S21+IvXfON8L5nXMVNTwQ=
@ -950,5 +948,5 @@ storj.io/monkit-jaeger v0.0.0-20210205021559-85f08034688c h1:6B1nHL8pGEjxzAHoADZ
storj.io/monkit-jaeger v0.0.0-20210205021559-85f08034688c/go.mod h1:gj4vuCeyCRjRmH8LIrgoyU9Dc9uR6H+/GcDUXmTbf80=
storj.io/private v0.0.0-20210203200143-9d2ec06f0d3c h1:9sLvfSIZgUhw98J8/3FBOVVJ+huhgYedhYpbrLbE+uk=
storj.io/private v0.0.0-20210203200143-9d2ec06f0d3c/go.mod h1:VHaDkpBka3Pp5rXqFSDHbEmzMaFFW4BYrXJfGIN1Udo=
storj.io/uplink v1.4.6-0.20210204154407-ab245b221ef3 h1:F8MQKVqS7wmQFjfhtUk6410Jvi4ryWDKBSPNnWlXnNc=
storj.io/uplink v1.4.6-0.20210204154407-ab245b221ef3/go.mod h1:tCrrBIAhIkBTSVGOaRExkE9T1b4hRHw0ANIP0ORu1rY=
storj.io/uplink v1.4.6-0.20210209192332-b38c7a40bcd4 h1:6sNuRj9xZO3iDlFDd42pIECXSG3IIfn4+vVkN+Qtjpo=
storj.io/uplink v1.4.6-0.20210209192332-b38c7a40bcd4/go.mod h1:IXHjzdYHnFChyTE7YSD7UlSz2jyAMlFFFeFoQEgsCmg=

View File

@ -103,7 +103,7 @@ func removeUplinkUserAgent(entries []useragent.Entry) []useragent.Entry {
var xs []useragent.Entry
for i := 0; i < len(entries); i++ {
// If it's "uplink" then skip it.
if strings.EqualFold(entries[i].Product, "uplink") {
if strings.EqualFold(entries[i].Product, uplinkProduct) {
// also skip any associated comments
for i+1 < len(entries) && entries[i+1].Comment != "" {
i++

View File

@ -83,6 +83,7 @@ type Endpoint struct {
revocations revocation.DB
defaultRS *pb.RedundancyScheme
config Config
versionCollector *versionCollector
}
// NewEndpoint creates new metainfo endpoint instance.
@ -136,6 +137,7 @@ func NewEndpoint(log *zap.Logger, metainfo *Service, deletePieces *piecedeletion
revocations: revocations,
defaultRS: defaultRSScheme,
config: config,
versionCollector: newVersionCollector(),
}, nil
}
@ -223,6 +225,11 @@ func (endpoint *Endpoint) filterValidPieces(ctx context.Context, pointer *pb.Poi
func (endpoint *Endpoint) ProjectInfo(ctx context.Context, req *pb.ProjectInfoRequest) (_ *pb.ProjectInfoResponse, err error) {
defer mon.Task()(&ctx)(&err)
err = endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
if err != nil {
endpoint.log.Warn("unable to collect uplink version", zap.Error(err))
}
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionProjectInfo,
Time: time.Now(),
@ -242,6 +249,11 @@ func (endpoint *Endpoint) ProjectInfo(ctx context.Context, req *pb.ProjectInfoRe
func (endpoint *Endpoint) GetBucket(ctx context.Context, req *pb.BucketGetRequest) (resp *pb.BucketGetResponse, err error) {
defer mon.Task()(&ctx)(&err)
err = endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
if err != nil {
endpoint.log.Warn("unable to collect uplink version", zap.Error(err))
}
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionRead,
Bucket: req.Name,
@ -274,6 +286,11 @@ func (endpoint *Endpoint) GetBucket(ctx context.Context, req *pb.BucketGetReques
func (endpoint *Endpoint) CreateBucket(ctx context.Context, req *pb.BucketCreateRequest) (resp *pb.BucketCreateResponse, err error) {
defer mon.Task()(&ctx)(&err)
err = endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
if err != nil {
endpoint.log.Warn("unable to collect uplink version", zap.Error(err))
}
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionWrite,
Bucket: req.Name,
@ -350,6 +367,11 @@ func (endpoint *Endpoint) CreateBucket(ctx context.Context, req *pb.BucketCreate
func (endpoint *Endpoint) DeleteBucket(ctx context.Context, req *pb.BucketDeleteRequest) (resp *pb.BucketDeleteResponse, err error) {
defer mon.Task()(&ctx)(&err)
err = endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
if err != nil {
endpoint.log.Warn("unable to collect uplink version", zap.Error(err))
}
now := time.Now()
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
@ -498,6 +520,12 @@ func (endpoint *Endpoint) deleteByPrefix(ctx context.Context, projectID uuid.UUI
// ListBuckets returns buckets in a project where the bucket name matches the request cursor.
func (endpoint *Endpoint) ListBuckets(ctx context.Context, req *pb.BucketListRequest) (resp *pb.BucketListResponse, err error) {
defer mon.Task()(&ctx)(&err)
err = endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
if err != nil {
endpoint.log.Warn("unable to collect uplink version", zap.Error(err))
}
action := macaroon.Action{
// TODO: This has to be ActionList, but it seems to be set to
// ActionRead as a hacky workaround to make bucket listing possible.
@ -641,6 +669,11 @@ func convertBucketToProto(bucket storj.Bucket, rs *pb.RedundancyScheme) (pbBucke
func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRequest) (resp *pb.ObjectBeginResponse, err error) {
defer mon.Task()(&ctx)(&err)
err = endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
if err != nil {
endpoint.log.Warn("unable to collect uplink version", zap.Error(err))
}
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionWrite,
Bucket: req.Bucket,
@ -733,6 +766,11 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe
func (endpoint *Endpoint) CommitObject(ctx context.Context, req *pb.ObjectCommitRequest) (resp *pb.ObjectCommitResponse, err error) {
defer mon.Task()(&ctx)(&err)
err = endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
if err != nil {
endpoint.log.Warn("unable to collect uplink version", zap.Error(err))
}
return endpoint.commitObject(ctx, req, nil)
}
@ -816,6 +854,11 @@ func (endpoint *Endpoint) commitObject(ctx context.Context, req *pb.ObjectCommit
func (endpoint *Endpoint) GetObject(ctx context.Context, req *pb.ObjectGetRequest) (resp *pb.ObjectGetResponse, err error) {
defer mon.Task()(&ctx)(&err)
err = endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
if err != nil {
endpoint.log.Warn("unable to collect uplink version", zap.Error(err))
}
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionRead,
Bucket: req.Bucket,
@ -925,6 +968,11 @@ func (endpoint *Endpoint) getObject(ctx context.Context, projectID uuid.UUID, bu
func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListRequest) (resp *pb.ObjectListResponse, err error) {
defer mon.Task()(&ctx)(&err)
err = endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
if err != nil {
endpoint.log.Warn("unable to collect uplink version", zap.Error(err))
}
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionList,
Bucket: req.Bucket,
@ -987,6 +1035,11 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq
func (endpoint *Endpoint) BeginDeleteObject(ctx context.Context, req *pb.ObjectBeginDeleteRequest) (resp *pb.ObjectBeginDeleteResponse, err error) {
defer mon.Task()(&ctx)(&err)
err = endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
if err != nil {
endpoint.log.Warn("unable to collect uplink version", zap.Error(err))
}
now := time.Now()
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
@ -1065,6 +1118,11 @@ func (endpoint *Endpoint) FinishDeleteObject(ctx context.Context, req *pb.Object
func (endpoint *Endpoint) GetObjectIPs(ctx context.Context, req *pb.ObjectGetIPsRequest) (resp *pb.ObjectGetIPsResponse, err error) {
defer mon.Task()(&ctx)(&err)
err = endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
if err != nil {
endpoint.log.Warn("unable to collect uplink version", zap.Error(err))
}
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionRead,
Bucket: req.Bucket,
@ -1165,6 +1223,11 @@ func (endpoint *Endpoint) GetObjectIPs(ctx context.Context, req *pb.ObjectGetIPs
func (endpoint *Endpoint) BeginSegment(ctx context.Context, req *pb.SegmentBeginRequest) (resp *pb.SegmentBeginResponse, err error) {
defer mon.Task()(&ctx)(&err)
err = endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
if err != nil {
endpoint.log.Warn("unable to collect uplink version", zap.Error(err))
}
streamID, err := endpoint.unmarshalSatStreamID(ctx, req.StreamId)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
@ -1233,6 +1296,11 @@ func (endpoint *Endpoint) BeginSegment(ctx context.Context, req *pb.SegmentBegin
func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentCommitRequest) (resp *pb.SegmentCommitResponse, err error) {
defer mon.Task()(&ctx)(&err)
err = endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
if err != nil {
endpoint.log.Warn("unable to collect uplink version", zap.Error(err))
}
_, resp, err = endpoint.commitSegment(ctx, req, true)
return resp, err
}
@ -1376,6 +1444,11 @@ func (endpoint *Endpoint) commitSegment(ctx context.Context, req *pb.SegmentComm
func (endpoint *Endpoint) MakeInlineSegment(ctx context.Context, req *pb.SegmentMakeInlineRequest) (resp *pb.SegmentMakeInlineResponse, err error) {
defer mon.Task()(&ctx)(&err)
err = endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
if err != nil {
endpoint.log.Warn("unable to collect uplink version", zap.Error(err))
}
_, resp, err = endpoint.makeInlineSegment(ctx, req, true)
return resp, err
}
@ -1493,6 +1566,11 @@ func (endpoint *Endpoint) ListPendingObjectStreams(ctx context.Context, req *pb.
func (endpoint *Endpoint) DownloadSegment(ctx context.Context, req *pb.SegmentDownloadRequest) (resp *pb.SegmentDownloadResponse, err error) {
defer mon.Task()(&ctx)(&err)
err = endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
if err != nil {
endpoint.log.Warn("unable to collect uplink version", zap.Error(err))
}
streamID, err := endpoint.unmarshalSatStreamID(ctx, req.StreamId)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
@ -1830,6 +1908,12 @@ func (endpoint *Endpoint) deleteObjectsPieces(ctx context.Context, reqs ...*meta
// RevokeAPIKey handles requests to revoke an api key.
func (endpoint *Endpoint) RevokeAPIKey(ctx context.Context, req *pb.RevokeAPIKeyRequest) (resp *pb.RevokeAPIKeyResponse, err error) {
defer mon.Task()(&ctx)(&err)
err = endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
if err != nil {
endpoint.log.Warn("unable to collect uplink version", zap.Error(err))
}
macToRevoke, err := macaroon.ParseMacaroon(req.GetApiKey())
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "API key to revoke is not a macaroon")

View File

@ -0,0 +1,68 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package metainfo
import (
"strings"
"sync"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"storj.io/common/useragent"
)
const uplinkProduct = "uplink"
type versionOccurrence struct {
Version string
Method string
}
type versionCollector struct {
mu sync.Mutex
versions map[versionOccurrence]*monkit.Meter
}
func newVersionCollector() *versionCollector {
return &versionCollector{
versions: make(map[versionOccurrence]*monkit.Meter),
}
}
func (vc *versionCollector) collect(useragentRaw []byte, method string) error {
var meter *monkit.Meter
version := "unknown"
if len(useragentRaw) != 0 {
entries, err := useragent.ParseEntries(useragentRaw)
if err != nil {
return errs.New("invalid user agent %q: %v", string(useragentRaw), err)
}
for _, entry := range entries {
if strings.EqualFold(entry.Product, uplinkProduct) {
version = entry.Version
break
}
}
}
vo := versionOccurrence{
Version: version,
Method: method,
}
vc.mu.Lock()
meter, ok := vc.versions[vo]
if !ok {
meter = monkit.NewMeter(monkit.NewSeriesKey("uplink_versions").WithTag("version", version).WithTag("method", method))
mon.Chain(meter)
vc.versions[vo] = meter
}
vc.mu.Unlock()
meter.Mark(1)
return nil
}