975d953cb8
We stoped returning lots of errors as is to avoid leaking our internals but some errors were meanigful for client. Example of such error is "exceeded maximum number of parts". With this change we are wrapping some important commit object errors with new ErrFailedPrecondition error to be able to return it easily to uplink. Change-Id: Id834b78362ed1920f0c3f6f1c7d9587bfd27e36a
345 lines
11 KiB
Go
345 lines
11 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package metainfo
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/jtolio/eventkit"
|
|
"github.com/spacemonkeygo/monkit/v3"
|
|
"github.com/zeebo/errs"
|
|
"go.uber.org/zap"
|
|
"golang.org/x/time/rate"
|
|
|
|
"storj.io/common/encryption"
|
|
"storj.io/common/lrucache"
|
|
"storj.io/common/macaroon"
|
|
"storj.io/common/pb"
|
|
"storj.io/common/rpc/rpcstatus"
|
|
"storj.io/common/signing"
|
|
"storj.io/common/storj"
|
|
"storj.io/storj/satellite/accounting"
|
|
"storj.io/storj/satellite/attribution"
|
|
"storj.io/storj/satellite/buckets"
|
|
"storj.io/storj/satellite/console"
|
|
"storj.io/storj/satellite/internalpb"
|
|
"storj.io/storj/satellite/metabase"
|
|
"storj.io/storj/satellite/metainfo/pointerverification"
|
|
"storj.io/storj/satellite/orders"
|
|
"storj.io/storj/satellite/overlay"
|
|
"storj.io/storj/satellite/revocation"
|
|
)
|
|
|
|
const (
|
|
satIDExpiration = 48 * time.Hour
|
|
)
|
|
|
|
var (
|
|
mon = monkit.Package()
|
|
evs = eventkit.Package()
|
|
|
|
// Error general metainfo error.
|
|
Error = errs.Class("metainfo")
|
|
// ErrNodeAlreadyExists pointer already has a piece for a node err.
|
|
ErrNodeAlreadyExists = errs.Class("metainfo: node already exists")
|
|
// ErrBucketNotEmpty is returned when bucket is required to be empty for an operation.
|
|
ErrBucketNotEmpty = errs.Class("bucket not empty")
|
|
)
|
|
|
|
// APIKeys is api keys store methods used by endpoint.
|
|
//
|
|
// architecture: Database
|
|
type APIKeys interface {
|
|
GetByHead(ctx context.Context, head []byte) (*console.APIKeyInfo, error)
|
|
}
|
|
|
|
// Endpoint metainfo endpoint.
|
|
//
|
|
// architecture: Endpoint
|
|
type Endpoint struct {
|
|
pb.DRPCMetainfoUnimplementedServer
|
|
|
|
log *zap.Logger
|
|
buckets *buckets.Service
|
|
metabase *metabase.DB
|
|
orders *orders.Service
|
|
overlay *overlay.Service
|
|
attributions attribution.DB
|
|
pointerVerification *pointerverification.Service
|
|
projectUsage *accounting.Service
|
|
projectLimits *accounting.ProjectLimitCache
|
|
projects console.Projects
|
|
apiKeys APIKeys
|
|
satellite signing.Signer
|
|
limiterCache *lrucache.ExpiringLRUOf[*rate.Limiter]
|
|
singleObjectLimitCache *lrucache.ExpiringLRUOf[struct{}]
|
|
encInlineSegmentSize int64 // max inline segment size + encryption overhead
|
|
revocations revocation.DB
|
|
defaultRS *pb.RedundancyScheme
|
|
config ExtendedConfig
|
|
versionCollector *versionCollector
|
|
}
|
|
|
|
// NewEndpoint creates new metainfo endpoint instance.
|
|
func NewEndpoint(log *zap.Logger, buckets *buckets.Service, metabaseDB *metabase.DB,
|
|
orders *orders.Service, cache *overlay.Service, attributions attribution.DB, peerIdentities overlay.PeerIdentities,
|
|
apiKeys APIKeys, projectUsage *accounting.Service, projectLimits *accounting.ProjectLimitCache, projects console.Projects,
|
|
satellite signing.Signer, revocations revocation.DB, config Config) (*Endpoint, error) {
|
|
// TODO do something with too many params
|
|
|
|
extendedConfig, err := NewExtendedConfig(config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
encInlineSegmentSize, err := encryption.CalcEncryptedSize(config.MaxInlineSegmentSize.Int64(), storj.EncryptionParameters{
|
|
CipherSuite: storj.EncAESGCM,
|
|
BlockSize: 128, // intentionally low block size to allow maximum possible encryption overhead
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
defaultRSScheme := &pb.RedundancyScheme{
|
|
Type: pb.RedundancyScheme_RS,
|
|
MinReq: int32(config.RS.Min),
|
|
RepairThreshold: int32(config.RS.Repair),
|
|
SuccessThreshold: int32(config.RS.Success),
|
|
Total: int32(config.RS.Total),
|
|
ErasureShareSize: config.RS.ErasureShareSize.Int32(),
|
|
}
|
|
|
|
return &Endpoint{
|
|
log: log,
|
|
buckets: buckets,
|
|
metabase: metabaseDB,
|
|
orders: orders,
|
|
overlay: cache,
|
|
attributions: attributions,
|
|
pointerVerification: pointerverification.NewService(peerIdentities),
|
|
apiKeys: apiKeys,
|
|
projectUsage: projectUsage,
|
|
projectLimits: projectLimits,
|
|
projects: projects,
|
|
satellite: satellite,
|
|
limiterCache: lrucache.NewOf[*rate.Limiter](lrucache.Options{
|
|
Capacity: config.RateLimiter.CacheCapacity,
|
|
Expiration: config.RateLimiter.CacheExpiration,
|
|
Name: "metainfo-ratelimit",
|
|
}),
|
|
singleObjectLimitCache: lrucache.NewOf[struct{}](lrucache.Options{
|
|
Expiration: config.UploadLimiter.SingleObjectLimit,
|
|
Capacity: config.UploadLimiter.CacheCapacity,
|
|
}),
|
|
encInlineSegmentSize: encInlineSegmentSize,
|
|
revocations: revocations,
|
|
defaultRS: defaultRSScheme,
|
|
config: extendedConfig,
|
|
versionCollector: newVersionCollector(log),
|
|
}, nil
|
|
}
|
|
|
|
// Close closes resources.
|
|
func (endpoint *Endpoint) Close() error { return nil }
|
|
|
|
// ProjectInfo returns allowed ProjectInfo for the provided API key.
|
|
func (endpoint *Endpoint) ProjectInfo(ctx context.Context, req *pb.ProjectInfoRequest) (_ *pb.ProjectInfoResponse, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
|
|
|
|
keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
|
|
Op: macaroon.ActionProjectInfo,
|
|
Time: time.Now(),
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
endpoint.usageTracking(keyInfo, req.Header, fmt.Sprintf("%T", req))
|
|
|
|
salt, err := endpoint.projects.GetSalt(ctx, keyInfo.ProjectID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &pb.ProjectInfoResponse{
|
|
ProjectPublicId: keyInfo.ProjectPublicID.Bytes(),
|
|
ProjectSalt: salt,
|
|
}, nil
|
|
}
|
|
|
|
// RevokeAPIKey handles requests to revoke an api key.
|
|
func (endpoint *Endpoint) RevokeAPIKey(ctx context.Context, req *pb.RevokeAPIKeyRequest) (resp *pb.RevokeAPIKeyResponse, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
|
|
|
|
macToRevoke, err := macaroon.ParseMacaroon(req.GetApiKey())
|
|
if err != nil {
|
|
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "API key to revoke is not a macaroon")
|
|
}
|
|
keyInfo, err := endpoint.validateRevoke(ctx, req.Header, macToRevoke)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
endpoint.usageTracking(keyInfo, req.Header, fmt.Sprintf("%T", req))
|
|
|
|
err = endpoint.revocations.Revoke(ctx, macToRevoke.Tail(), keyInfo.ID[:])
|
|
if err != nil {
|
|
endpoint.log.Error("Failed to revoke API key", zap.Error(err))
|
|
return nil, rpcstatus.Error(rpcstatus.Internal, "Failed to revoke API key")
|
|
}
|
|
|
|
return &pb.RevokeAPIKeyResponse{}, nil
|
|
}
|
|
|
|
func (endpoint *Endpoint) packStreamID(ctx context.Context, satStreamID *internalpb.StreamID) (streamID storj.StreamID, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
if satStreamID == nil {
|
|
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to create stream id")
|
|
}
|
|
|
|
if !satStreamID.ExpirationDate.IsZero() {
|
|
// DB can only preserve microseconds precision and nano seconds will be cut.
|
|
// To have stable StreamID/UploadID we need to always truncate it.
|
|
satStreamID.ExpirationDate = satStreamID.ExpirationDate.Truncate(time.Microsecond)
|
|
}
|
|
|
|
signedStreamID, err := SignStreamID(ctx, endpoint.satellite, satStreamID)
|
|
if err != nil {
|
|
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
|
}
|
|
|
|
encodedStreamID, err := pb.Marshal(signedStreamID)
|
|
if err != nil {
|
|
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
|
}
|
|
|
|
streamID, err = storj.StreamIDFromBytes(encodedStreamID)
|
|
if err != nil {
|
|
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
|
}
|
|
return streamID, nil
|
|
}
|
|
|
|
func (endpoint *Endpoint) packSegmentID(ctx context.Context, satSegmentID *internalpb.SegmentID) (segmentID storj.SegmentID, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
if satSegmentID == nil {
|
|
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to create segment id")
|
|
}
|
|
|
|
signedSegmentID, err := SignSegmentID(ctx, endpoint.satellite, satSegmentID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
encodedSegmentID, err := pb.Marshal(signedSegmentID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
segmentID, err = storj.SegmentIDFromBytes(encodedSegmentID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return segmentID, nil
|
|
}
|
|
|
|
func (endpoint *Endpoint) unmarshalSatStreamID(ctx context.Context, streamID storj.StreamID) (_ *internalpb.StreamID, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
satStreamID := &internalpb.StreamID{}
|
|
err = pb.Unmarshal(streamID, satStreamID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = VerifyStreamID(ctx, endpoint.satellite, satStreamID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return satStreamID, nil
|
|
}
|
|
|
|
func (endpoint *Endpoint) unmarshalSatSegmentID(ctx context.Context, segmentID storj.SegmentID) (_ *internalpb.SegmentID, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
if len(segmentID) == 0 {
|
|
return nil, errs.New("segment ID missing")
|
|
}
|
|
|
|
satSegmentID := &internalpb.SegmentID{}
|
|
err = pb.Unmarshal(segmentID, satSegmentID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if satSegmentID.StreamId == nil {
|
|
return nil, errs.New("stream ID missing")
|
|
}
|
|
|
|
err = VerifySegmentID(ctx, endpoint.satellite, satSegmentID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if satSegmentID.CreationDate.Before(time.Now().Add(-satIDExpiration)) {
|
|
return nil, errs.New("segment ID expired")
|
|
}
|
|
|
|
return satSegmentID, nil
|
|
}
|
|
|
|
// convertMetabaseErr converts domain errors from metabase to appropriate rpc statuses errors.
|
|
func (endpoint *Endpoint) convertMetabaseErr(err error) error {
|
|
switch {
|
|
case err == nil:
|
|
return nil
|
|
case errors.Is(err, context.Canceled):
|
|
return rpcstatus.Error(rpcstatus.Canceled, "context canceled")
|
|
case errors.Is(err, context.DeadlineExceeded):
|
|
return rpcstatus.Error(rpcstatus.DeadlineExceeded, "context deadline exceeded")
|
|
case rpcstatus.Code(err) != rpcstatus.Unknown:
|
|
// it's already RPC error
|
|
return err
|
|
case metabase.ErrObjectNotFound.Has(err):
|
|
message := strings.TrimPrefix(err.Error(), string(metabase.ErrObjectNotFound))
|
|
message = strings.TrimPrefix(message, ": ")
|
|
// uplink expects a message that starts with the specified prefix
|
|
return rpcstatus.Error(rpcstatus.NotFound, "object not found: "+message)
|
|
case metabase.ErrSegmentNotFound.Has(err):
|
|
message := strings.TrimPrefix(err.Error(), string(metabase.ErrSegmentNotFound))
|
|
message = strings.TrimPrefix(message, ": ")
|
|
// uplink expects a message that starts with the specified prefix
|
|
return rpcstatus.Error(rpcstatus.NotFound, "segment not found: "+message)
|
|
case metabase.ErrInvalidRequest.Has(err):
|
|
return rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
|
case metabase.ErrFailedPrecondition.Has(err):
|
|
return rpcstatus.Error(rpcstatus.FailedPrecondition, err.Error())
|
|
case metabase.ErrObjectAlreadyExists.Has(err):
|
|
return rpcstatus.Error(rpcstatus.AlreadyExists, err.Error())
|
|
case metabase.ErrPendingObjectMissing.Has(err):
|
|
return rpcstatus.Error(rpcstatus.NotFound, err.Error())
|
|
case metabase.ErrPermissionDenied.Has(err):
|
|
return rpcstatus.Error(rpcstatus.PermissionDenied, err.Error())
|
|
default:
|
|
endpoint.log.Error("internal", zap.Error(err))
|
|
return rpcstatus.Error(rpcstatus.Internal, "internal error")
|
|
}
|
|
}
|
|
|
|
func (endpoint *Endpoint) usageTracking(keyInfo *console.APIKeyInfo, header *pb.RequestHeader, name string, tags ...eventkit.Tag) {
|
|
evs.Event("usage", append([]eventkit.Tag{
|
|
eventkit.Bytes("project-public-id", keyInfo.ProjectPublicID[:]),
|
|
eventkit.String("user-agent", string(header.UserAgent)),
|
|
eventkit.String("request", name),
|
|
}, tags...)...)
|
|
}
|