storj/satellite/metainfo/endpoint.go
Michal Niewrzal c9591e9754 satellite/metainfo: better metabase errors handling
Some errors were returned as metabase errors, not pure drpc
errors because of how rpcstatus.Code method is working. Status
code was returned for errors like metabase context canceled but
we would like to not leak our internals to the client.

Change-Id: I3f0194755f8d7359b1e3d342fa3be3d984019ecb
2023-09-01 11:32:10 +00:00

343 lines
11 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package metainfo
import (
"context"
"errors"
"fmt"
"strings"
"time"
"github.com/jtolio/eventkit"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/time/rate"
"storj.io/common/encryption"
"storj.io/common/lrucache"
"storj.io/common/macaroon"
"storj.io/common/pb"
"storj.io/common/rpc/rpcstatus"
"storj.io/common/signing"
"storj.io/common/storj"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/attribution"
"storj.io/storj/satellite/buckets"
"storj.io/storj/satellite/console"
"storj.io/storj/satellite/internalpb"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metainfo/pointerverification"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/revocation"
)
const (
satIDExpiration = 48 * time.Hour
)
var (
mon = monkit.Package()
evs = eventkit.Package()
// Error general metainfo error.
Error = errs.Class("metainfo")
// ErrNodeAlreadyExists pointer already has a piece for a node err.
ErrNodeAlreadyExists = errs.Class("metainfo: node already exists")
// ErrBucketNotEmpty is returned when bucket is required to be empty for an operation.
ErrBucketNotEmpty = errs.Class("bucket not empty")
)
// APIKeys is api keys store methods used by endpoint.
//
// architecture: Database
type APIKeys interface {
GetByHead(ctx context.Context, head []byte) (*console.APIKeyInfo, error)
}
// Endpoint metainfo endpoint.
//
// architecture: Endpoint
type Endpoint struct {
pb.DRPCMetainfoUnimplementedServer
log *zap.Logger
buckets *buckets.Service
metabase *metabase.DB
orders *orders.Service
overlay *overlay.Service
attributions attribution.DB
pointerVerification *pointerverification.Service
projectUsage *accounting.Service
projectLimits *accounting.ProjectLimitCache
projects console.Projects
apiKeys APIKeys
satellite signing.Signer
limiterCache *lrucache.ExpiringLRUOf[*rate.Limiter]
singleObjectLimitCache *lrucache.ExpiringLRUOf[struct{}]
encInlineSegmentSize int64 // max inline segment size + encryption overhead
revocations revocation.DB
defaultRS *pb.RedundancyScheme
config ExtendedConfig
versionCollector *versionCollector
}
// NewEndpoint creates new metainfo endpoint instance.
func NewEndpoint(log *zap.Logger, buckets *buckets.Service, metabaseDB *metabase.DB,
orders *orders.Service, cache *overlay.Service, attributions attribution.DB, peerIdentities overlay.PeerIdentities,
apiKeys APIKeys, projectUsage *accounting.Service, projectLimits *accounting.ProjectLimitCache, projects console.Projects,
satellite signing.Signer, revocations revocation.DB, config Config) (*Endpoint, error) {
// TODO do something with too many params
extendedConfig, err := NewExtendedConfig(config)
if err != nil {
return nil, err
}
encInlineSegmentSize, err := encryption.CalcEncryptedSize(config.MaxInlineSegmentSize.Int64(), storj.EncryptionParameters{
CipherSuite: storj.EncAESGCM,
BlockSize: 128, // intentionally low block size to allow maximum possible encryption overhead
})
if err != nil {
return nil, err
}
defaultRSScheme := &pb.RedundancyScheme{
Type: pb.RedundancyScheme_RS,
MinReq: int32(config.RS.Min),
RepairThreshold: int32(config.RS.Repair),
SuccessThreshold: int32(config.RS.Success),
Total: int32(config.RS.Total),
ErasureShareSize: config.RS.ErasureShareSize.Int32(),
}
return &Endpoint{
log: log,
buckets: buckets,
metabase: metabaseDB,
orders: orders,
overlay: cache,
attributions: attributions,
pointerVerification: pointerverification.NewService(peerIdentities),
apiKeys: apiKeys,
projectUsage: projectUsage,
projectLimits: projectLimits,
projects: projects,
satellite: satellite,
limiterCache: lrucache.NewOf[*rate.Limiter](lrucache.Options{
Capacity: config.RateLimiter.CacheCapacity,
Expiration: config.RateLimiter.CacheExpiration,
Name: "metainfo-ratelimit",
}),
singleObjectLimitCache: lrucache.NewOf[struct{}](lrucache.Options{
Expiration: config.UploadLimiter.SingleObjectLimit,
Capacity: config.UploadLimiter.CacheCapacity,
}),
encInlineSegmentSize: encInlineSegmentSize,
revocations: revocations,
defaultRS: defaultRSScheme,
config: extendedConfig,
versionCollector: newVersionCollector(log),
}, nil
}
// Close closes resources.
func (endpoint *Endpoint) Close() error { return nil }
// ProjectInfo returns allowed ProjectInfo for the provided API key.
func (endpoint *Endpoint) ProjectInfo(ctx context.Context, req *pb.ProjectInfoRequest) (_ *pb.ProjectInfoResponse, err error) {
defer mon.Task()(&ctx)(&err)
endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionProjectInfo,
Time: time.Now(),
})
if err != nil {
return nil, err
}
endpoint.usageTracking(keyInfo, req.Header, fmt.Sprintf("%T", req))
salt, err := endpoint.projects.GetSalt(ctx, keyInfo.ProjectID)
if err != nil {
return nil, err
}
return &pb.ProjectInfoResponse{
ProjectPublicId: keyInfo.ProjectPublicID.Bytes(),
ProjectSalt: salt,
}, nil
}
// RevokeAPIKey handles requests to revoke an api key.
func (endpoint *Endpoint) RevokeAPIKey(ctx context.Context, req *pb.RevokeAPIKeyRequest) (resp *pb.RevokeAPIKeyResponse, err error) {
defer mon.Task()(&ctx)(&err)
endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
macToRevoke, err := macaroon.ParseMacaroon(req.GetApiKey())
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "API key to revoke is not a macaroon")
}
keyInfo, err := endpoint.validateRevoke(ctx, req.Header, macToRevoke)
if err != nil {
return nil, err
}
endpoint.usageTracking(keyInfo, req.Header, fmt.Sprintf("%T", req))
err = endpoint.revocations.Revoke(ctx, macToRevoke.Tail(), keyInfo.ID[:])
if err != nil {
endpoint.log.Error("Failed to revoke API key", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, "Failed to revoke API key")
}
return &pb.RevokeAPIKeyResponse{}, nil
}
func (endpoint *Endpoint) packStreamID(ctx context.Context, satStreamID *internalpb.StreamID) (streamID storj.StreamID, err error) {
defer mon.Task()(&ctx)(&err)
if satStreamID == nil {
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to create stream id")
}
if !satStreamID.ExpirationDate.IsZero() {
// DB can only preserve microseconds precision and nano seconds will be cut.
// To have stable StreamID/UploadID we need to always truncate it.
satStreamID.ExpirationDate = satStreamID.ExpirationDate.Truncate(time.Microsecond)
}
signedStreamID, err := SignStreamID(ctx, endpoint.satellite, satStreamID)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
encodedStreamID, err := pb.Marshal(signedStreamID)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
streamID, err = storj.StreamIDFromBytes(encodedStreamID)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
return streamID, nil
}
func (endpoint *Endpoint) packSegmentID(ctx context.Context, satSegmentID *internalpb.SegmentID) (segmentID storj.SegmentID, err error) {
defer mon.Task()(&ctx)(&err)
if satSegmentID == nil {
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to create segment id")
}
signedSegmentID, err := SignSegmentID(ctx, endpoint.satellite, satSegmentID)
if err != nil {
return nil, err
}
encodedSegmentID, err := pb.Marshal(signedSegmentID)
if err != nil {
return nil, err
}
segmentID, err = storj.SegmentIDFromBytes(encodedSegmentID)
if err != nil {
return nil, err
}
return segmentID, nil
}
func (endpoint *Endpoint) unmarshalSatStreamID(ctx context.Context, streamID storj.StreamID) (_ *internalpb.StreamID, err error) {
defer mon.Task()(&ctx)(&err)
satStreamID := &internalpb.StreamID{}
err = pb.Unmarshal(streamID, satStreamID)
if err != nil {
return nil, err
}
err = VerifyStreamID(ctx, endpoint.satellite, satStreamID)
if err != nil {
return nil, err
}
return satStreamID, nil
}
func (endpoint *Endpoint) unmarshalSatSegmentID(ctx context.Context, segmentID storj.SegmentID) (_ *internalpb.SegmentID, err error) {
defer mon.Task()(&ctx)(&err)
if len(segmentID) == 0 {
return nil, errs.New("segment ID missing")
}
satSegmentID := &internalpb.SegmentID{}
err = pb.Unmarshal(segmentID, satSegmentID)
if err != nil {
return nil, err
}
if satSegmentID.StreamId == nil {
return nil, errs.New("stream ID missing")
}
err = VerifySegmentID(ctx, endpoint.satellite, satSegmentID)
if err != nil {
return nil, err
}
if satSegmentID.CreationDate.Before(time.Now().Add(-satIDExpiration)) {
return nil, errs.New("segment ID expired")
}
return satSegmentID, nil
}
// convertMetabaseErr converts domain errors from metabase to appropriate rpc statuses errors.
func (endpoint *Endpoint) convertMetabaseErr(err error) error {
switch {
case err == nil:
return nil
case errors.Is(err, context.Canceled):
return rpcstatus.Error(rpcstatus.Canceled, "context canceled")
case errors.Is(err, context.DeadlineExceeded):
return rpcstatus.Error(rpcstatus.DeadlineExceeded, "context deadline exceeded")
case rpcstatus.Code(err) != rpcstatus.Unknown:
// it's already RPC error
return err
case metabase.ErrObjectNotFound.Has(err):
message := strings.TrimPrefix(err.Error(), string(metabase.ErrObjectNotFound))
message = strings.TrimPrefix(message, ": ")
// uplink expects a message that starts with the specified prefix
return rpcstatus.Error(rpcstatus.NotFound, "object not found: "+message)
case metabase.ErrSegmentNotFound.Has(err):
message := strings.TrimPrefix(err.Error(), string(metabase.ErrSegmentNotFound))
message = strings.TrimPrefix(message, ": ")
// uplink expects a message that starts with the specified prefix
return rpcstatus.Error(rpcstatus.NotFound, "segment not found: "+message)
case metabase.ErrInvalidRequest.Has(err):
return rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
case metabase.ErrObjectAlreadyExists.Has(err):
return rpcstatus.Error(rpcstatus.AlreadyExists, err.Error())
case metabase.ErrPendingObjectMissing.Has(err):
return rpcstatus.Error(rpcstatus.NotFound, err.Error())
case metabase.ErrPermissionDenied.Has(err):
return rpcstatus.Error(rpcstatus.PermissionDenied, err.Error())
default:
endpoint.log.Error("internal", zap.Error(err))
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
}
func (endpoint *Endpoint) usageTracking(keyInfo *console.APIKeyInfo, header *pb.RequestHeader, name string, tags ...eventkit.Tag) {
evs.Event("usage", append([]eventkit.Tag{
eventkit.Bytes("project-public-id", keyInfo.ProjectPublicID[:]),
eventkit.String("user-agent", string(header.UserAgent)),
eventkit.String("request", name),
}, tags...)...)
}