storj/satellite/metainfo/endpoint.go
Michał Niewrzał 2e31ef3f29 satellite/metabase: better error message while move
Before this change we were returning full DB error message.
That can be very confusing for end user. This change is translating
error message into more user frindly version and fixes also DRPC
error status code.

Fixes https://github.com/storj/team-metainfo/issues/76

Change-Id: I29b06ab4ba50a0d14db7a822a2906d95d65ab524
2022-02-02 15:03:24 +00:00

288 lines
8.9 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package metainfo
import (
"context"
"crypto/sha256"
"time"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/encryption"
"storj.io/common/lrucache"
"storj.io/common/macaroon"
"storj.io/common/pb"
"storj.io/common/rpc/rpcstatus"
"storj.io/common/signing"
"storj.io/common/storj"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/attribution"
"storj.io/storj/satellite/buckets"
"storj.io/storj/satellite/console"
"storj.io/storj/satellite/internalpb"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metainfo/piecedeletion"
"storj.io/storj/satellite/metainfo/pointerverification"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/revocation"
"storj.io/storj/satellite/rewards"
)
const (
satIDExpiration = 48 * time.Hour
deleteObjectPiecesSuccessThreshold = 0.75
)
var (
mon = monkit.Package()
// Error general metainfo error.
Error = errs.Class("metainfo")
// ErrNodeAlreadyExists pointer already has a piece for a node err.
ErrNodeAlreadyExists = errs.Class("metainfo: node already exists")
// ErrBucketNotEmpty is returned when bucket is required to be empty for an operation.
ErrBucketNotEmpty = errs.Class("bucket not empty")
)
// APIKeys is api keys store methods used by endpoint.
//
// architecture: Database
type APIKeys interface {
GetByHead(ctx context.Context, head []byte) (*console.APIKeyInfo, error)
}
// Endpoint metainfo endpoint.
//
// architecture: Endpoint
type Endpoint struct {
pb.DRPCMetainfoUnimplementedServer
log *zap.Logger
buckets *buckets.Service
metabase *metabase.DB
deletePieces *piecedeletion.Service
orders *orders.Service
overlay *overlay.Service
attributions attribution.DB
partners *rewards.PartnersService
pointerVerification *pointerverification.Service
projectUsage *accounting.Service
projects console.Projects
apiKeys APIKeys
satellite signing.Signer
limiterCache *lrucache.ExpiringLRU
encInlineSegmentSize int64 // max inline segment size + encryption overhead
revocations revocation.DB
defaultRS *pb.RedundancyScheme
config Config
versionCollector *versionCollector
}
// NewEndpoint creates new metainfo endpoint instance.
func NewEndpoint(log *zap.Logger, buckets *buckets.Service, metabaseDB *metabase.DB,
deletePieces *piecedeletion.Service, orders *orders.Service, cache *overlay.Service,
attributions attribution.DB, partners *rewards.PartnersService, peerIdentities overlay.PeerIdentities,
apiKeys APIKeys, projectUsage *accounting.Service, projects console.Projects,
satellite signing.Signer, revocations revocation.DB, config Config) (*Endpoint, error) {
// TODO do something with too many params
encInlineSegmentSize, err := encryption.CalcEncryptedSize(config.MaxInlineSegmentSize.Int64(), storj.EncryptionParameters{
CipherSuite: storj.EncAESGCM,
BlockSize: 128, // intentionally low block size to allow maximum possible encryption overhead
})
if err != nil {
return nil, err
}
defaultRSScheme := &pb.RedundancyScheme{
Type: pb.RedundancyScheme_RS,
MinReq: int32(config.RS.Min),
RepairThreshold: int32(config.RS.Repair),
SuccessThreshold: int32(config.RS.Success),
Total: int32(config.RS.Total),
ErasureShareSize: config.RS.ErasureShareSize.Int32(),
}
return &Endpoint{
log: log,
buckets: buckets,
metabase: metabaseDB,
deletePieces: deletePieces,
orders: orders,
overlay: cache,
attributions: attributions,
partners: partners,
pointerVerification: pointerverification.NewService(peerIdentities),
apiKeys: apiKeys,
projectUsage: projectUsage,
projects: projects,
satellite: satellite,
limiterCache: lrucache.New(lrucache.Options{
Capacity: config.RateLimiter.CacheCapacity,
Expiration: config.RateLimiter.CacheExpiration,
}),
encInlineSegmentSize: encInlineSegmentSize,
revocations: revocations,
defaultRS: defaultRSScheme,
config: config,
versionCollector: newVersionCollector(log),
}, nil
}
// Close closes resources.
func (endpoint *Endpoint) Close() error { return nil }
// ProjectInfo returns allowed ProjectInfo for the provided API key.
func (endpoint *Endpoint) ProjectInfo(ctx context.Context, req *pb.ProjectInfoRequest) (_ *pb.ProjectInfoResponse, err error) {
defer mon.Task()(&ctx)(&err)
err = endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
if err != nil {
endpoint.log.Warn("unable to collect uplink version", zap.Error(err))
}
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionProjectInfo,
Time: time.Now(),
})
if err != nil {
return nil, err
}
salt := sha256.Sum256(keyInfo.ProjectID[:])
return &pb.ProjectInfoResponse{
ProjectSalt: salt[:],
}, nil
}
// RevokeAPIKey handles requests to revoke an api key.
func (endpoint *Endpoint) RevokeAPIKey(ctx context.Context, req *pb.RevokeAPIKeyRequest) (resp *pb.RevokeAPIKeyResponse, err error) {
defer mon.Task()(&ctx)(&err)
err = endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
if err != nil {
endpoint.log.Warn("unable to collect uplink version", zap.Error(err))
}
macToRevoke, err := macaroon.ParseMacaroon(req.GetApiKey())
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "API key to revoke is not a macaroon")
}
keyInfo, err := endpoint.validateRevoke(ctx, req.Header, macToRevoke)
if err != nil {
return nil, err
}
err = endpoint.revocations.Revoke(ctx, macToRevoke.Tail(), keyInfo.ID[:])
if err != nil {
endpoint.log.Error("Failed to revoke API key", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, "Failed to revoke API key")
}
return &pb.RevokeAPIKeyResponse{}, nil
}
func (endpoint *Endpoint) packStreamID(ctx context.Context, satStreamID *internalpb.StreamID) (streamID storj.StreamID, err error) {
defer mon.Task()(&ctx)(&err)
signedStreamID, err := SignStreamID(ctx, endpoint.satellite, satStreamID)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
encodedStreamID, err := pb.Marshal(signedStreamID)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
streamID, err = storj.StreamIDFromBytes(encodedStreamID)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
return streamID, nil
}
func (endpoint *Endpoint) packSegmentID(ctx context.Context, satSegmentID *internalpb.SegmentID) (segmentID storj.SegmentID, err error) {
defer mon.Task()(&ctx)(&err)
signedSegmentID, err := SignSegmentID(ctx, endpoint.satellite, satSegmentID)
if err != nil {
return nil, err
}
encodedSegmentID, err := pb.Marshal(signedSegmentID)
if err != nil {
return nil, err
}
segmentID, err = storj.SegmentIDFromBytes(encodedSegmentID)
if err != nil {
return nil, err
}
return segmentID, nil
}
func (endpoint *Endpoint) unmarshalSatStreamID(ctx context.Context, streamID storj.StreamID) (_ *internalpb.StreamID, err error) {
defer mon.Task()(&ctx)(&err)
satStreamID := &internalpb.StreamID{}
err = pb.Unmarshal(streamID, satStreamID)
if err != nil {
return nil, err
}
err = VerifyStreamID(ctx, endpoint.satellite, satStreamID)
if err != nil {
return nil, err
}
return satStreamID, nil
}
func (endpoint *Endpoint) unmarshalSatSegmentID(ctx context.Context, segmentID storj.SegmentID) (_ *internalpb.SegmentID, err error) {
defer mon.Task()(&ctx)(&err)
satSegmentID := &internalpb.SegmentID{}
err = pb.Unmarshal(segmentID, satSegmentID)
if err != nil {
return nil, err
}
if satSegmentID.StreamId == nil {
return nil, errs.New("stream ID missing")
}
err = VerifySegmentID(ctx, endpoint.satellite, satSegmentID)
if err != nil {
return nil, err
}
if satSegmentID.CreationDate.Before(time.Now().Add(-satIDExpiration)) {
return nil, errs.New("segment ID expired")
}
return satSegmentID, nil
}
// convertMetabaseErr converts domain errors from metabase to appropriate rpc statuses errors.
func (endpoint *Endpoint) convertMetabaseErr(err error) error {
switch {
case storj.ErrObjectNotFound.Has(err):
return rpcstatus.Error(rpcstatus.NotFound, err.Error())
case metabase.ErrSegmentNotFound.Has(err):
return rpcstatus.Error(rpcstatus.NotFound, err.Error())
case metabase.ErrInvalidRequest.Has(err):
return rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
case metabase.ErrObjectAlreadyExists.Has(err):
return rpcstatus.Error(rpcstatus.AlreadyExists, err.Error())
default:
endpoint.log.Error("internal", zap.Error(err))
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
}