storj/satellite/metainfo/endpoint.go
Michał Niewrzał db5d781735 satellite/metainfo: simplify version collector logging
Version collector previously returned errors and logged them in the
calling code.  It is cleaner to log inside version collector.

Change-Id: I52cb49a1ef53f3f1f51692ddb26ec095cfd0f100
2022-04-26 10:16:20 +00:00

282 lines
8.7 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package metainfo
import (
"context"
"crypto/sha256"
"time"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/encryption"
"storj.io/common/lrucache"
"storj.io/common/macaroon"
"storj.io/common/pb"
"storj.io/common/rpc/rpcstatus"
"storj.io/common/signing"
"storj.io/common/storj"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/attribution"
"storj.io/storj/satellite/buckets"
"storj.io/storj/satellite/console"
"storj.io/storj/satellite/internalpb"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metainfo/piecedeletion"
"storj.io/storj/satellite/metainfo/pointerverification"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/revocation"
"storj.io/storj/satellite/rewards"
)
const (
satIDExpiration = 48 * time.Hour
deleteObjectPiecesSuccessThreshold = 0.75
)
var (
mon = monkit.Package()
// Error general metainfo error.
Error = errs.Class("metainfo")
// ErrNodeAlreadyExists pointer already has a piece for a node err.
ErrNodeAlreadyExists = errs.Class("metainfo: node already exists")
// ErrBucketNotEmpty is returned when bucket is required to be empty for an operation.
ErrBucketNotEmpty = errs.Class("bucket not empty")
)
// APIKeys is api keys store methods used by endpoint.
//
// architecture: Database
type APIKeys interface {
GetByHead(ctx context.Context, head []byte) (*console.APIKeyInfo, error)
}
// Endpoint metainfo endpoint.
//
// architecture: Endpoint
type Endpoint struct {
pb.DRPCMetainfoUnimplementedServer
log *zap.Logger
buckets *buckets.Service
metabase *metabase.DB
deletePieces *piecedeletion.Service
orders *orders.Service
overlay *overlay.Service
attributions attribution.DB
partners *rewards.PartnersService
pointerVerification *pointerverification.Service
projectUsage *accounting.Service
projects console.Projects
apiKeys APIKeys
satellite signing.Signer
limiterCache *lrucache.ExpiringLRU
encInlineSegmentSize int64 // max inline segment size + encryption overhead
revocations revocation.DB
defaultRS *pb.RedundancyScheme
config Config
versionCollector *versionCollector
}
// NewEndpoint creates new metainfo endpoint instance.
func NewEndpoint(log *zap.Logger, buckets *buckets.Service, metabaseDB *metabase.DB,
deletePieces *piecedeletion.Service, orders *orders.Service, cache *overlay.Service,
attributions attribution.DB, partners *rewards.PartnersService, peerIdentities overlay.PeerIdentities,
apiKeys APIKeys, projectUsage *accounting.Service, projects console.Projects,
satellite signing.Signer, revocations revocation.DB, config Config) (*Endpoint, error) {
// TODO do something with too many params
encInlineSegmentSize, err := encryption.CalcEncryptedSize(config.MaxInlineSegmentSize.Int64(), storj.EncryptionParameters{
CipherSuite: storj.EncAESGCM,
BlockSize: 128, // intentionally low block size to allow maximum possible encryption overhead
})
if err != nil {
return nil, err
}
defaultRSScheme := &pb.RedundancyScheme{
Type: pb.RedundancyScheme_RS,
MinReq: int32(config.RS.Min),
RepairThreshold: int32(config.RS.Repair),
SuccessThreshold: int32(config.RS.Success),
Total: int32(config.RS.Total),
ErasureShareSize: config.RS.ErasureShareSize.Int32(),
}
return &Endpoint{
log: log,
buckets: buckets,
metabase: metabaseDB,
deletePieces: deletePieces,
orders: orders,
overlay: cache,
attributions: attributions,
partners: partners,
pointerVerification: pointerverification.NewService(peerIdentities),
apiKeys: apiKeys,
projectUsage: projectUsage,
projects: projects,
satellite: satellite,
limiterCache: lrucache.New(lrucache.Options{
Capacity: config.RateLimiter.CacheCapacity,
Expiration: config.RateLimiter.CacheExpiration,
}),
encInlineSegmentSize: encInlineSegmentSize,
revocations: revocations,
defaultRS: defaultRSScheme,
config: config,
versionCollector: newVersionCollector(log),
}, nil
}
// Close closes resources.
func (endpoint *Endpoint) Close() error { return nil }
// ProjectInfo returns allowed ProjectInfo for the provided API key.
func (endpoint *Endpoint) ProjectInfo(ctx context.Context, req *pb.ProjectInfoRequest) (_ *pb.ProjectInfoResponse, err error) {
defer mon.Task()(&ctx)(&err)
endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionProjectInfo,
Time: time.Now(),
})
if err != nil {
return nil, err
}
salt := sha256.Sum256(keyInfo.ProjectID[:])
return &pb.ProjectInfoResponse{
ProjectSalt: salt[:],
}, nil
}
// RevokeAPIKey handles requests to revoke an api key.
func (endpoint *Endpoint) RevokeAPIKey(ctx context.Context, req *pb.RevokeAPIKeyRequest) (resp *pb.RevokeAPIKeyResponse, err error) {
defer mon.Task()(&ctx)(&err)
endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
macToRevoke, err := macaroon.ParseMacaroon(req.GetApiKey())
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "API key to revoke is not a macaroon")
}
keyInfo, err := endpoint.validateRevoke(ctx, req.Header, macToRevoke)
if err != nil {
return nil, err
}
err = endpoint.revocations.Revoke(ctx, macToRevoke.Tail(), keyInfo.ID[:])
if err != nil {
endpoint.log.Error("Failed to revoke API key", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, "Failed to revoke API key")
}
return &pb.RevokeAPIKeyResponse{}, nil
}
func (endpoint *Endpoint) packStreamID(ctx context.Context, satStreamID *internalpb.StreamID) (streamID storj.StreamID, err error) {
defer mon.Task()(&ctx)(&err)
signedStreamID, err := SignStreamID(ctx, endpoint.satellite, satStreamID)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
encodedStreamID, err := pb.Marshal(signedStreamID)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
streamID, err = storj.StreamIDFromBytes(encodedStreamID)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
return streamID, nil
}
func (endpoint *Endpoint) packSegmentID(ctx context.Context, satSegmentID *internalpb.SegmentID) (segmentID storj.SegmentID, err error) {
defer mon.Task()(&ctx)(&err)
signedSegmentID, err := SignSegmentID(ctx, endpoint.satellite, satSegmentID)
if err != nil {
return nil, err
}
encodedSegmentID, err := pb.Marshal(signedSegmentID)
if err != nil {
return nil, err
}
segmentID, err = storj.SegmentIDFromBytes(encodedSegmentID)
if err != nil {
return nil, err
}
return segmentID, nil
}
func (endpoint *Endpoint) unmarshalSatStreamID(ctx context.Context, streamID storj.StreamID) (_ *internalpb.StreamID, err error) {
defer mon.Task()(&ctx)(&err)
satStreamID := &internalpb.StreamID{}
err = pb.Unmarshal(streamID, satStreamID)
if err != nil {
return nil, err
}
err = VerifyStreamID(ctx, endpoint.satellite, satStreamID)
if err != nil {
return nil, err
}
return satStreamID, nil
}
func (endpoint *Endpoint) unmarshalSatSegmentID(ctx context.Context, segmentID storj.SegmentID) (_ *internalpb.SegmentID, err error) {
defer mon.Task()(&ctx)(&err)
satSegmentID := &internalpb.SegmentID{}
err = pb.Unmarshal(segmentID, satSegmentID)
if err != nil {
return nil, err
}
if satSegmentID.StreamId == nil {
return nil, errs.New("stream ID missing")
}
err = VerifySegmentID(ctx, endpoint.satellite, satSegmentID)
if err != nil {
return nil, err
}
if satSegmentID.CreationDate.Before(time.Now().Add(-satIDExpiration)) {
return nil, errs.New("segment ID expired")
}
return satSegmentID, nil
}
// convertMetabaseErr converts domain errors from metabase to appropriate rpc statuses errors.
func (endpoint *Endpoint) convertMetabaseErr(err error) error {
switch {
case storj.ErrObjectNotFound.Has(err):
return rpcstatus.Error(rpcstatus.NotFound, err.Error())
case metabase.ErrSegmentNotFound.Has(err):
return rpcstatus.Error(rpcstatus.NotFound, err.Error())
case metabase.ErrInvalidRequest.Has(err):
return rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
case metabase.ErrObjectAlreadyExists.Has(err):
return rpcstatus.Error(rpcstatus.AlreadyExists, err.Error())
default:
endpoint.log.Error("internal", zap.Error(err))
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
}