storj/satellite/metainfo/endpoint.go
Cameron 41c2bca096 satellite/{console,metainfo,satellitedb}: add projectdb method GetSalt
Add new project db method, GetSalt, to get project salt. If salt
column is empty, return the sha-256 hash of the project ID. This
new method is used in metainfo endpoint ProjectInfo to return the
project salt to the client. This is backwards compatible because
the salt column is not populated yet. The updated endpoint will
do the same thing as the current endpoint.

Change-Id: I7eba376c865e10995a5a916302feca7cd7c7efa2
2022-09-26 13:12:21 +00:00

307 lines
9.5 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package metainfo
import (
"context"
"time"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/encryption"
"storj.io/common/eventstat"
"storj.io/common/lrucache"
"storj.io/common/macaroon"
"storj.io/common/pb"
"storj.io/common/rpc/rpcstatus"
"storj.io/common/signing"
"storj.io/common/storj"
"storj.io/private/debug"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/attribution"
"storj.io/storj/satellite/buckets"
"storj.io/storj/satellite/console"
"storj.io/storj/satellite/internalpb"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metainfo/piecedeletion"
"storj.io/storj/satellite/metainfo/pointerverification"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/revocation"
"storj.io/storj/satellite/rewards"
)
const (
satIDExpiration = 48 * time.Hour
deleteObjectPiecesSuccessThreshold = 0.75
)
var (
mon = monkit.Package()
// Error general metainfo error.
Error = errs.Class("metainfo")
// ErrNodeAlreadyExists pointer already has a piece for a node err.
ErrNodeAlreadyExists = errs.Class("metainfo: node already exists")
// ErrBucketNotEmpty is returned when bucket is required to be empty for an operation.
ErrBucketNotEmpty = errs.Class("bucket not empty")
)
// APIKeys is api keys store methods used by endpoint.
//
// architecture: Database
type APIKeys interface {
GetByHead(ctx context.Context, head []byte) (*console.APIKeyInfo, error)
}
// Endpoint metainfo endpoint.
//
// architecture: Endpoint
type Endpoint struct {
pb.DRPCMetainfoUnimplementedServer
log *zap.Logger
buckets *buckets.Service
metabase *metabase.DB
deletePieces *piecedeletion.Service
orders *orders.Service
overlay *overlay.Service
attributions attribution.DB
partners *rewards.PartnersService
pointerVerification *pointerverification.Service
projectUsage *accounting.Service
projects console.Projects
apiKeys APIKeys
satellite signing.Signer
limiterCache *lrucache.ExpiringLRU
encInlineSegmentSize int64 // max inline segment size + encryption overhead
revocations revocation.DB
defaultRS *pb.RedundancyScheme
config Config
versionCollector *versionCollector
top endpointTop
}
// endpointTop represents in-memory counter stats.
// Cached info can be retrieved from the /mon monitoring endpoint.
type endpointTop struct {
Project eventstat.Sink
Partner eventstat.Sink
UserAgent eventstat.Sink
}
// NewEndpoint creates new metainfo endpoint instance.
func NewEndpoint(log *zap.Logger, buckets *buckets.Service, metabaseDB *metabase.DB,
deletePieces *piecedeletion.Service, orders *orders.Service, cache *overlay.Service,
attributions attribution.DB, partners *rewards.PartnersService, peerIdentities overlay.PeerIdentities,
apiKeys APIKeys, projectUsage *accounting.Service, projects console.Projects,
satellite signing.Signer, revocations revocation.DB, config Config) (*Endpoint, error) {
// TODO do something with too many params
encInlineSegmentSize, err := encryption.CalcEncryptedSize(config.MaxInlineSegmentSize.Int64(), storj.EncryptionParameters{
CipherSuite: storj.EncAESGCM,
BlockSize: 128, // intentionally low block size to allow maximum possible encryption overhead
})
if err != nil {
return nil, err
}
defaultRSScheme := &pb.RedundancyScheme{
Type: pb.RedundancyScheme_RS,
MinReq: int32(config.RS.Min),
RepairThreshold: int32(config.RS.Repair),
SuccessThreshold: int32(config.RS.Success),
Total: int32(config.RS.Total),
ErasureShareSize: config.RS.ErasureShareSize.Int32(),
}
return &Endpoint{
log: log,
buckets: buckets,
metabase: metabaseDB,
deletePieces: deletePieces,
orders: orders,
overlay: cache,
attributions: attributions,
partners: partners,
pointerVerification: pointerverification.NewService(peerIdentities),
apiKeys: apiKeys,
projectUsage: projectUsage,
projects: projects,
satellite: satellite,
limiterCache: lrucache.New(lrucache.Options{
Capacity: config.RateLimiter.CacheCapacity,
Expiration: config.RateLimiter.CacheExpiration,
}),
encInlineSegmentSize: encInlineSegmentSize,
revocations: revocations,
defaultRS: defaultRSScheme,
config: config,
versionCollector: newVersionCollector(log),
top: endpointTop{
Project: debug.Top.NewTagCounter("auth_request_project", "project"),
Partner: debug.Top.NewTagCounter("auth_request_partner", "partner"),
UserAgent: debug.Top.NewTagCounter("auth_request_user_agent", "agent"),
},
}, nil
}
// Close closes resources.
func (endpoint *Endpoint) Close() error { return nil }
// ProjectInfo returns allowed ProjectInfo for the provided API key.
func (endpoint *Endpoint) ProjectInfo(ctx context.Context, req *pb.ProjectInfoRequest) (_ *pb.ProjectInfoResponse, err error) {
defer mon.Task()(&ctx)(&err)
endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionProjectInfo,
Time: time.Now(),
})
if err != nil {
return nil, err
}
salt, err := endpoint.projects.GetSalt(ctx, keyInfo.ProjectID)
if err != nil {
return nil, err
}
return &pb.ProjectInfoResponse{
ProjectSalt: salt,
}, nil
}
// RevokeAPIKey handles requests to revoke an api key.
func (endpoint *Endpoint) RevokeAPIKey(ctx context.Context, req *pb.RevokeAPIKeyRequest) (resp *pb.RevokeAPIKeyResponse, err error) {
defer mon.Task()(&ctx)(&err)
endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
macToRevoke, err := macaroon.ParseMacaroon(req.GetApiKey())
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "API key to revoke is not a macaroon")
}
keyInfo, err := endpoint.validateRevoke(ctx, req.Header, macToRevoke)
if err != nil {
return nil, err
}
err = endpoint.revocations.Revoke(ctx, macToRevoke.Tail(), keyInfo.ID[:])
if err != nil {
endpoint.log.Error("Failed to revoke API key", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, "Failed to revoke API key")
}
return &pb.RevokeAPIKeyResponse{}, nil
}
func (endpoint *Endpoint) packStreamID(ctx context.Context, satStreamID *internalpb.StreamID) (streamID storj.StreamID, err error) {
defer mon.Task()(&ctx)(&err)
signedStreamID, err := SignStreamID(ctx, endpoint.satellite, satStreamID)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
encodedStreamID, err := pb.Marshal(signedStreamID)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
streamID, err = storj.StreamIDFromBytes(encodedStreamID)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
return streamID, nil
}
func (endpoint *Endpoint) packSegmentID(ctx context.Context, satSegmentID *internalpb.SegmentID) (segmentID storj.SegmentID, err error) {
defer mon.Task()(&ctx)(&err)
signedSegmentID, err := SignSegmentID(ctx, endpoint.satellite, satSegmentID)
if err != nil {
return nil, err
}
encodedSegmentID, err := pb.Marshal(signedSegmentID)
if err != nil {
return nil, err
}
segmentID, err = storj.SegmentIDFromBytes(encodedSegmentID)
if err != nil {
return nil, err
}
return segmentID, nil
}
func (endpoint *Endpoint) unmarshalSatStreamID(ctx context.Context, streamID storj.StreamID) (_ *internalpb.StreamID, err error) {
defer mon.Task()(&ctx)(&err)
satStreamID := &internalpb.StreamID{}
err = pb.Unmarshal(streamID, satStreamID)
if err != nil {
return nil, err
}
err = VerifyStreamID(ctx, endpoint.satellite, satStreamID)
if err != nil {
return nil, err
}
return satStreamID, nil
}
func (endpoint *Endpoint) unmarshalSatSegmentID(ctx context.Context, segmentID storj.SegmentID) (_ *internalpb.SegmentID, err error) {
defer mon.Task()(&ctx)(&err)
satSegmentID := &internalpb.SegmentID{}
err = pb.Unmarshal(segmentID, satSegmentID)
if err != nil {
return nil, err
}
if satSegmentID.StreamId == nil {
return nil, errs.New("stream ID missing")
}
err = VerifySegmentID(ctx, endpoint.satellite, satSegmentID)
if err != nil {
return nil, err
}
if satSegmentID.CreationDate.Before(time.Now().Add(-satIDExpiration)) {
return nil, errs.New("segment ID expired")
}
return satSegmentID, nil
}
// convertMetabaseErr converts domain errors from metabase to appropriate rpc statuses errors.
func (endpoint *Endpoint) convertMetabaseErr(err error) error {
if rpcstatus.Code(err) != rpcstatus.Unknown {
// it's already RPC error
return err
}
switch {
case storj.ErrObjectNotFound.Has(err):
return rpcstatus.Error(rpcstatus.NotFound, err.Error())
case metabase.ErrSegmentNotFound.Has(err):
return rpcstatus.Error(rpcstatus.NotFound, err.Error())
case metabase.ErrInvalidRequest.Has(err):
return rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
case metabase.ErrObjectAlreadyExists.Has(err):
return rpcstatus.Error(rpcstatus.AlreadyExists, err.Error())
case metabase.ErrPendingObjectMissing.Has(err):
return rpcstatus.Error(rpcstatus.NotFound, err.Error())
default:
endpoint.log.Error("internal", zap.Error(err))
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
}