storj/satellite/metainfo/endpoint.go
Michal Niewrzal 67371c43bd satellite/metainfo: enable UsePendingObjectsTable by project
This small feature will give us ability to test pending_objects table
without enabling it globally.

Change-Id: I802f45987ad329f94adfc0f02957c802b21d8251
2023-08-30 15:44:29 +02:00

338 lines
11 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package metainfo
import (
"context"
"fmt"
"strings"
"time"
"github.com/jtolio/eventkit"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/time/rate"
"storj.io/common/encryption"
"storj.io/common/lrucache"
"storj.io/common/macaroon"
"storj.io/common/pb"
"storj.io/common/rpc/rpcstatus"
"storj.io/common/signing"
"storj.io/common/storj"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/attribution"
"storj.io/storj/satellite/buckets"
"storj.io/storj/satellite/console"
"storj.io/storj/satellite/internalpb"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metainfo/pointerverification"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/revocation"
)
const (
satIDExpiration = 48 * time.Hour
)
var (
mon = monkit.Package()
evs = eventkit.Package()
// Error general metainfo error.
Error = errs.Class("metainfo")
// ErrNodeAlreadyExists pointer already has a piece for a node err.
ErrNodeAlreadyExists = errs.Class("metainfo: node already exists")
// ErrBucketNotEmpty is returned when bucket is required to be empty for an operation.
ErrBucketNotEmpty = errs.Class("bucket not empty")
)
// APIKeys is api keys store methods used by endpoint.
//
// architecture: Database
type APIKeys interface {
GetByHead(ctx context.Context, head []byte) (*console.APIKeyInfo, error)
}
// Endpoint metainfo endpoint.
//
// architecture: Endpoint
type Endpoint struct {
pb.DRPCMetainfoUnimplementedServer
log *zap.Logger
buckets *buckets.Service
metabase *metabase.DB
orders *orders.Service
overlay *overlay.Service
attributions attribution.DB
pointerVerification *pointerverification.Service
projectUsage *accounting.Service
projectLimits *accounting.ProjectLimitCache
projects console.Projects
apiKeys APIKeys
satellite signing.Signer
limiterCache *lrucache.ExpiringLRUOf[*rate.Limiter]
singleObjectLimitCache *lrucache.ExpiringLRUOf[struct{}]
encInlineSegmentSize int64 // max inline segment size + encryption overhead
revocations revocation.DB
defaultRS *pb.RedundancyScheme
config ExtendedConfig
versionCollector *versionCollector
}
// NewEndpoint creates new metainfo endpoint instance.
func NewEndpoint(log *zap.Logger, buckets *buckets.Service, metabaseDB *metabase.DB,
orders *orders.Service, cache *overlay.Service, attributions attribution.DB, peerIdentities overlay.PeerIdentities,
apiKeys APIKeys, projectUsage *accounting.Service, projectLimits *accounting.ProjectLimitCache, projects console.Projects,
satellite signing.Signer, revocations revocation.DB, config Config) (*Endpoint, error) {
// TODO do something with too many params
extendedConfig, err := NewExtendedConfig(config)
if err != nil {
return nil, err
}
encInlineSegmentSize, err := encryption.CalcEncryptedSize(config.MaxInlineSegmentSize.Int64(), storj.EncryptionParameters{
CipherSuite: storj.EncAESGCM,
BlockSize: 128, // intentionally low block size to allow maximum possible encryption overhead
})
if err != nil {
return nil, err
}
defaultRSScheme := &pb.RedundancyScheme{
Type: pb.RedundancyScheme_RS,
MinReq: int32(config.RS.Min),
RepairThreshold: int32(config.RS.Repair),
SuccessThreshold: int32(config.RS.Success),
Total: int32(config.RS.Total),
ErasureShareSize: config.RS.ErasureShareSize.Int32(),
}
return &Endpoint{
log: log,
buckets: buckets,
metabase: metabaseDB,
orders: orders,
overlay: cache,
attributions: attributions,
pointerVerification: pointerverification.NewService(peerIdentities),
apiKeys: apiKeys,
projectUsage: projectUsage,
projectLimits: projectLimits,
projects: projects,
satellite: satellite,
limiterCache: lrucache.NewOf[*rate.Limiter](lrucache.Options{
Capacity: config.RateLimiter.CacheCapacity,
Expiration: config.RateLimiter.CacheExpiration,
Name: "metainfo-ratelimit",
}),
singleObjectLimitCache: lrucache.NewOf[struct{}](lrucache.Options{
Expiration: config.UploadLimiter.SingleObjectLimit,
Capacity: config.UploadLimiter.CacheCapacity,
}),
encInlineSegmentSize: encInlineSegmentSize,
revocations: revocations,
defaultRS: defaultRSScheme,
config: extendedConfig,
versionCollector: newVersionCollector(log),
}, nil
}
// Close closes resources.
func (endpoint *Endpoint) Close() error { return nil }
// ProjectInfo returns allowed ProjectInfo for the provided API key.
func (endpoint *Endpoint) ProjectInfo(ctx context.Context, req *pb.ProjectInfoRequest) (_ *pb.ProjectInfoResponse, err error) {
defer mon.Task()(&ctx)(&err)
endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionProjectInfo,
Time: time.Now(),
})
if err != nil {
return nil, err
}
endpoint.usageTracking(keyInfo, req.Header, fmt.Sprintf("%T", req))
salt, err := endpoint.projects.GetSalt(ctx, keyInfo.ProjectID)
if err != nil {
return nil, err
}
return &pb.ProjectInfoResponse{
ProjectPublicId: keyInfo.ProjectPublicID.Bytes(),
ProjectSalt: salt,
}, nil
}
// RevokeAPIKey handles requests to revoke an api key.
func (endpoint *Endpoint) RevokeAPIKey(ctx context.Context, req *pb.RevokeAPIKeyRequest) (resp *pb.RevokeAPIKeyResponse, err error) {
defer mon.Task()(&ctx)(&err)
endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
macToRevoke, err := macaroon.ParseMacaroon(req.GetApiKey())
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "API key to revoke is not a macaroon")
}
keyInfo, err := endpoint.validateRevoke(ctx, req.Header, macToRevoke)
if err != nil {
return nil, err
}
endpoint.usageTracking(keyInfo, req.Header, fmt.Sprintf("%T", req))
err = endpoint.revocations.Revoke(ctx, macToRevoke.Tail(), keyInfo.ID[:])
if err != nil {
endpoint.log.Error("Failed to revoke API key", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, "Failed to revoke API key")
}
return &pb.RevokeAPIKeyResponse{}, nil
}
func (endpoint *Endpoint) packStreamID(ctx context.Context, satStreamID *internalpb.StreamID) (streamID storj.StreamID, err error) {
defer mon.Task()(&ctx)(&err)
if satStreamID == nil {
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to create stream id")
}
if !satStreamID.ExpirationDate.IsZero() {
// DB can only preserve microseconds precision and nano seconds will be cut.
// To have stable StreamID/UploadID we need to always truncate it.
satStreamID.ExpirationDate = satStreamID.ExpirationDate.Truncate(time.Microsecond)
}
signedStreamID, err := SignStreamID(ctx, endpoint.satellite, satStreamID)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
encodedStreamID, err := pb.Marshal(signedStreamID)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
streamID, err = storj.StreamIDFromBytes(encodedStreamID)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
return streamID, nil
}
func (endpoint *Endpoint) packSegmentID(ctx context.Context, satSegmentID *internalpb.SegmentID) (segmentID storj.SegmentID, err error) {
defer mon.Task()(&ctx)(&err)
if satSegmentID == nil {
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to create segment id")
}
signedSegmentID, err := SignSegmentID(ctx, endpoint.satellite, satSegmentID)
if err != nil {
return nil, err
}
encodedSegmentID, err := pb.Marshal(signedSegmentID)
if err != nil {
return nil, err
}
segmentID, err = storj.SegmentIDFromBytes(encodedSegmentID)
if err != nil {
return nil, err
}
return segmentID, nil
}
func (endpoint *Endpoint) unmarshalSatStreamID(ctx context.Context, streamID storj.StreamID) (_ *internalpb.StreamID, err error) {
defer mon.Task()(&ctx)(&err)
satStreamID := &internalpb.StreamID{}
err = pb.Unmarshal(streamID, satStreamID)
if err != nil {
return nil, err
}
err = VerifyStreamID(ctx, endpoint.satellite, satStreamID)
if err != nil {
return nil, err
}
return satStreamID, nil
}
func (endpoint *Endpoint) unmarshalSatSegmentID(ctx context.Context, segmentID storj.SegmentID) (_ *internalpb.SegmentID, err error) {
defer mon.Task()(&ctx)(&err)
if len(segmentID) == 0 {
return nil, errs.New("segment ID missing")
}
satSegmentID := &internalpb.SegmentID{}
err = pb.Unmarshal(segmentID, satSegmentID)
if err != nil {
return nil, err
}
if satSegmentID.StreamId == nil {
return nil, errs.New("stream ID missing")
}
err = VerifySegmentID(ctx, endpoint.satellite, satSegmentID)
if err != nil {
return nil, err
}
if satSegmentID.CreationDate.Before(time.Now().Add(-satIDExpiration)) {
return nil, errs.New("segment ID expired")
}
return satSegmentID, nil
}
// convertMetabaseErr converts domain errors from metabase to appropriate rpc statuses errors.
func (endpoint *Endpoint) convertMetabaseErr(err error) error {
if rpcstatus.Code(err) != rpcstatus.Unknown {
// it's already RPC error
return err
}
switch {
case metabase.ErrObjectNotFound.Has(err):
message := strings.TrimPrefix(err.Error(), string(metabase.ErrObjectNotFound))
message = strings.TrimPrefix(message, ": ")
// uplink expects a message that starts with the specified prefix
return rpcstatus.Error(rpcstatus.NotFound, "object not found: "+message)
case metabase.ErrSegmentNotFound.Has(err):
message := strings.TrimPrefix(err.Error(), string(metabase.ErrSegmentNotFound))
message = strings.TrimPrefix(message, ": ")
// uplink expects a message that starts with the specified prefix
return rpcstatus.Error(rpcstatus.NotFound, "segment not found: "+message)
case metabase.ErrInvalidRequest.Has(err):
return rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
case metabase.ErrObjectAlreadyExists.Has(err):
return rpcstatus.Error(rpcstatus.AlreadyExists, err.Error())
case metabase.ErrPendingObjectMissing.Has(err):
return rpcstatus.Error(rpcstatus.NotFound, err.Error())
case metabase.ErrPermissionDenied.Has(err):
return rpcstatus.Error(rpcstatus.PermissionDenied, err.Error())
default:
endpoint.log.Error("internal", zap.Error(err))
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
}
func (endpoint *Endpoint) usageTracking(keyInfo *console.APIKeyInfo, header *pb.RequestHeader, name string, tags ...eventkit.Tag) {
evs.Event("usage", append([]eventkit.Tag{
eventkit.Bytes("project-public-id", keyInfo.ProjectPublicID[:]),
eventkit.String("user-agent", string(header.UserAgent)),
eventkit.String("request", name),
}, tags...)...)
}