16878a22ea
Metainfo method validateAuth checks things like API key, user permission and rate limit but at the end all errors were returned as rpcstatus.Unauthenticated. Old Metainfo is not touched to avoid backward compatibility issues. Change-Id: I78eb276210fc50151da58a5c84e13ecd0961da29
447 lines
13 KiB
Go
447 lines
13 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package metainfo
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"regexp"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gogo/protobuf/proto"
|
|
"github.com/skyrings/skyring-common/tools/uuid"
|
|
"github.com/zeebo/errs"
|
|
"go.uber.org/zap"
|
|
"golang.org/x/time/rate"
|
|
|
|
"storj.io/common/encryption"
|
|
"storj.io/common/macaroon"
|
|
"storj.io/common/pb"
|
|
"storj.io/common/rpc/rpcstatus"
|
|
"storj.io/common/signing"
|
|
"storj.io/common/storj"
|
|
"storj.io/storj/pkg/auth"
|
|
"storj.io/storj/satellite/console"
|
|
)
|
|
|
|
const (
|
|
requestTTL = time.Hour * 4
|
|
)
|
|
|
|
var (
|
|
ipRegexp = regexp.MustCompile(`^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$`)
|
|
)
|
|
|
|
// TTLItem keeps association between serial number and ttl
|
|
type TTLItem struct {
|
|
serialNumber storj.SerialNumber
|
|
ttl time.Time
|
|
}
|
|
|
|
type createRequest struct {
|
|
Expiration time.Time
|
|
Redundancy *pb.RedundancyScheme
|
|
|
|
ttl time.Time
|
|
}
|
|
|
|
type createRequests struct {
|
|
mu sync.RWMutex
|
|
// orders limit serial number used because with CreateSegment we don't have path yet
|
|
entries map[storj.SerialNumber]*createRequest
|
|
|
|
muTTL sync.Mutex
|
|
entriesTTL []*TTLItem
|
|
}
|
|
|
|
func newCreateRequests() *createRequests {
|
|
return &createRequests{
|
|
entries: make(map[storj.SerialNumber]*createRequest),
|
|
entriesTTL: make([]*TTLItem, 0),
|
|
}
|
|
}
|
|
|
|
func (requests *createRequests) Put(serialNumber storj.SerialNumber, createRequest *createRequest) {
|
|
ttl := time.Now().Add(requestTTL)
|
|
|
|
go func() {
|
|
requests.muTTL.Lock()
|
|
requests.entriesTTL = append(requests.entriesTTL, &TTLItem{
|
|
serialNumber: serialNumber,
|
|
ttl: ttl,
|
|
})
|
|
requests.muTTL.Unlock()
|
|
}()
|
|
|
|
createRequest.ttl = ttl
|
|
requests.mu.Lock()
|
|
requests.entries[serialNumber] = createRequest
|
|
requests.mu.Unlock()
|
|
|
|
go requests.cleanup()
|
|
}
|
|
|
|
func (requests *createRequests) Load(serialNumber storj.SerialNumber) (*createRequest, bool) {
|
|
requests.mu.RLock()
|
|
request, found := requests.entries[serialNumber]
|
|
if request != nil && request.ttl.Before(time.Now()) {
|
|
request = nil
|
|
found = false
|
|
}
|
|
requests.mu.RUnlock()
|
|
|
|
return request, found
|
|
}
|
|
|
|
func (requests *createRequests) Remove(serialNumber storj.SerialNumber) {
|
|
requests.mu.Lock()
|
|
delete(requests.entries, serialNumber)
|
|
requests.mu.Unlock()
|
|
}
|
|
|
|
func (requests *createRequests) cleanup() {
|
|
requests.muTTL.Lock()
|
|
now := time.Now()
|
|
remove := make([]storj.SerialNumber, 0)
|
|
newStart := 0
|
|
for i, item := range requests.entriesTTL {
|
|
if item.ttl.Before(now) {
|
|
remove = append(remove, item.serialNumber)
|
|
newStart = i + 1
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
requests.entriesTTL = requests.entriesTTL[newStart:]
|
|
requests.muTTL.Unlock()
|
|
|
|
for _, serialNumber := range remove {
|
|
requests.Remove(serialNumber)
|
|
}
|
|
}
|
|
|
|
func getAPIKey(ctx context.Context, header *pb.RequestHeader) (key *macaroon.APIKey, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
if header != nil {
|
|
return macaroon.ParseRawAPIKey(header.ApiKey)
|
|
}
|
|
|
|
keyData, ok := auth.GetAPIKey(ctx)
|
|
if !ok {
|
|
return nil, errs.New("missing credentials")
|
|
}
|
|
|
|
return macaroon.ParseAPIKey(string(keyData))
|
|
}
|
|
|
|
// validateAuth validates things like API key, user permissions and rate limit and always returns valid rpc error.
|
|
func (endpoint *Endpoint) validateAuth(ctx context.Context, header *pb.RequestHeader, action macaroon.Action) (_ *console.APIKeyInfo, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
key, err := getAPIKey(ctx, header)
|
|
if err != nil {
|
|
endpoint.log.Debug("invalid request", zap.Error(err))
|
|
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "Invalid API credentials")
|
|
}
|
|
|
|
keyInfo, err := endpoint.apiKeys.GetByHead(ctx, key.Head())
|
|
if err != nil {
|
|
endpoint.log.Debug("unauthorized request", zap.Error(err))
|
|
return nil, rpcstatus.Error(rpcstatus.PermissionDenied, "Unauthorized API credentials")
|
|
}
|
|
|
|
if err = endpoint.checkRate(ctx, keyInfo.ProjectID); err != nil {
|
|
endpoint.log.Debug("rate check failed", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
// Revocations are currently handled by just deleting the key.
|
|
err = key.Check(ctx, keyInfo.Secret, action, nil)
|
|
if err != nil {
|
|
endpoint.log.Debug("unauthorized request", zap.Error(err))
|
|
return nil, rpcstatus.Error(rpcstatus.PermissionDenied, "Unauthorized API credentials")
|
|
}
|
|
|
|
return keyInfo, nil
|
|
}
|
|
|
|
func (endpoint *Endpoint) checkRate(ctx context.Context, projectID uuid.UUID) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
if !endpoint.limiterConfig.Enabled {
|
|
return nil
|
|
}
|
|
limiter, err := endpoint.limiterCache.Get(projectID.String(), func() (interface{}, error) {
|
|
limit := rate.Limit(endpoint.limiterConfig.Rate)
|
|
|
|
project, err := endpoint.projects.Get(ctx, projectID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if project.RateLimit != nil && *project.RateLimit > 0 {
|
|
limit = rate.Limit(*project.RateLimit)
|
|
}
|
|
|
|
// initialize the limiter with limit and burst the same so that we don't limit how quickly calls
|
|
// are made within the second.
|
|
return rate.NewLimiter(limit, int(limit)), nil
|
|
})
|
|
if err != nil {
|
|
return rpcstatus.Error(rpcstatus.Unavailable, err.Error())
|
|
}
|
|
|
|
if !limiter.(*rate.Limiter).Allow() {
|
|
endpoint.log.Warn("too many requests for project",
|
|
zap.Stringer("projectID", projectID),
|
|
zap.Float64("limit", float64(limiter.(*rate.Limiter).Limit())))
|
|
|
|
mon.Event("metainfo_rate_limit_exceeded") //locked
|
|
|
|
return rpcstatus.Error(rpcstatus.ResourceExhausted, "Too Many Requests")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (endpoint *Endpoint) validateCommitSegment(ctx context.Context, req *pb.SegmentCommitRequestOld) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
err = endpoint.validateBucket(ctx, req.Bucket)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = endpoint.validatePointer(ctx, req.Pointer, req.OriginalLimits)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(req.OriginalLimits) > 0 {
|
|
createRequest, found := endpoint.createRequests.Load(req.OriginalLimits[0].SerialNumber)
|
|
|
|
switch {
|
|
case !found:
|
|
return Error.New("missing create request or request expired")
|
|
case !createRequest.Expiration.Equal(req.Pointer.ExpirationDate):
|
|
return Error.New("pointer expiration date does not match requested one")
|
|
case !proto.Equal(createRequest.Redundancy, req.Pointer.Remote.Redundancy):
|
|
return Error.New("pointer redundancy scheme date does not match requested one")
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (endpoint *Endpoint) validateBucket(ctx context.Context, bucket []byte) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
if len(bucket) == 0 {
|
|
return Error.New("bucket not specified")
|
|
}
|
|
|
|
if len(bucket) < 3 || len(bucket) > 63 {
|
|
return Error.New("bucket name must be at least 3 and no more than 63 characters long")
|
|
}
|
|
|
|
// Regexp not used because benchmark shows it will be slower for valid bucket names
|
|
// https://gist.github.com/mniewrzal/49de3af95f36e63e88fac24f565e444c
|
|
labels := bytes.Split(bucket, []byte("."))
|
|
for _, label := range labels {
|
|
err = validateBucketLabel(label)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if ipRegexp.MatchString(string(bucket)) {
|
|
return Error.New("bucket name cannot be formatted as an IP address")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func validateBucketLabel(label []byte) error {
|
|
if len(label) == 0 {
|
|
return Error.New("bucket label cannot be empty")
|
|
}
|
|
|
|
if !isLowerLetter(label[0]) && !isDigit(label[0]) {
|
|
return Error.New("bucket label must start with a lowercase letter or number")
|
|
}
|
|
|
|
if label[0] == '-' || label[len(label)-1] == '-' {
|
|
return Error.New("bucket label cannot start or end with a hyphen")
|
|
}
|
|
|
|
for i := 1; i < len(label)-1; i++ {
|
|
if !isLowerLetter(label[i]) && !isDigit(label[i]) && (label[i] != '-') && (label[i] != '.') {
|
|
return Error.New("bucket name must contain only lowercase letters, numbers or hyphens")
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func isLowerLetter(r byte) bool {
|
|
return r >= 'a' && r <= 'z'
|
|
}
|
|
|
|
func isDigit(r byte) bool {
|
|
return r >= '0' && r <= '9'
|
|
}
|
|
|
|
func (endpoint *Endpoint) validatePointer(ctx context.Context, pointer *pb.Pointer, originalLimits []*pb.OrderLimit) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
if pointer == nil {
|
|
return Error.New("no pointer specified")
|
|
}
|
|
|
|
if pointer.Type == pb.Pointer_INLINE && pointer.Remote != nil {
|
|
return Error.New("pointer type is INLINE but remote segment is set")
|
|
}
|
|
|
|
if pointer.Type == pb.Pointer_REMOTE {
|
|
switch {
|
|
case pointer.Remote == nil:
|
|
return Error.New("no remote segment specified")
|
|
case pointer.Remote.RemotePieces == nil:
|
|
return Error.New("no remote segment pieces specified")
|
|
case pointer.Remote.Redundancy == nil:
|
|
return Error.New("no redundancy scheme specified")
|
|
}
|
|
|
|
remote := pointer.Remote
|
|
|
|
if len(originalLimits) == 0 {
|
|
return Error.New("no order limits")
|
|
}
|
|
if int32(len(originalLimits)) != remote.Redundancy.Total {
|
|
return Error.New("invalid no order limit for piece")
|
|
}
|
|
|
|
maxAllowed, err := encryption.CalcEncryptedSize(endpoint.requiredRSConfig.MaxSegmentSize.Int64(), storj.EncryptionParameters{
|
|
CipherSuite: storj.EncAESGCM,
|
|
BlockSize: 128, // intentionally low block size to allow maximum possible encryption overhead
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if pointer.SegmentSize > maxAllowed || pointer.SegmentSize < 0 {
|
|
return Error.New("segment size %v is out of range, maximum allowed is %v", pointer.SegmentSize, maxAllowed)
|
|
}
|
|
|
|
pieceNums := make(map[int32]struct{})
|
|
nodeIds := make(map[storj.NodeID]struct{})
|
|
for _, piece := range remote.RemotePieces {
|
|
if piece.PieceNum >= int32(len(originalLimits)) {
|
|
return Error.New("invalid piece number")
|
|
}
|
|
|
|
limit := originalLimits[piece.PieceNum]
|
|
|
|
if limit == nil {
|
|
return Error.New("empty order limit for piece")
|
|
}
|
|
|
|
err := endpoint.orders.VerifyOrderLimitSignature(ctx, limit)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// expect that too much time has not passed between order limit creation and now
|
|
if time.Since(limit.OrderCreation) > endpoint.maxCommitInterval {
|
|
return Error.New("Segment not committed before max commit interval of %f minutes.", endpoint.maxCommitInterval.Minutes())
|
|
}
|
|
|
|
derivedPieceID := remote.RootPieceId.Derive(piece.NodeId, piece.PieceNum)
|
|
if limit.PieceId.IsZero() || limit.PieceId != derivedPieceID {
|
|
return Error.New("invalid order limit piece id")
|
|
}
|
|
if piece.NodeId != limit.StorageNodeId {
|
|
return Error.New("piece NodeID != order limit NodeID")
|
|
}
|
|
|
|
if _, ok := pieceNums[piece.PieceNum]; ok {
|
|
return Error.New("piece num %d is duplicated", piece.PieceNum)
|
|
}
|
|
|
|
if _, ok := nodeIds[piece.NodeId]; ok {
|
|
return Error.New("node id %s for piece num %d is duplicated", piece.NodeId.String(), piece.PieceNum)
|
|
}
|
|
|
|
pieceNums[piece.PieceNum] = struct{}{}
|
|
nodeIds[piece.NodeId] = struct{}{}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (endpoint *Endpoint) validateRedundancy(ctx context.Context, redundancy *pb.RedundancyScheme) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
if endpoint.requiredRSConfig.Validate {
|
|
if endpoint.requiredRSConfig.ErasureShareSize.Int32() != redundancy.ErasureShareSize ||
|
|
endpoint.requiredRSConfig.MinTotalThreshold > int(redundancy.Total) ||
|
|
endpoint.requiredRSConfig.MaxTotalThreshold < int(redundancy.Total) ||
|
|
endpoint.requiredRSConfig.MinThreshold != int(redundancy.MinReq) ||
|
|
endpoint.requiredRSConfig.RepairThreshold != int(redundancy.RepairThreshold) ||
|
|
endpoint.requiredRSConfig.SuccessThreshold != int(redundancy.SuccessThreshold) {
|
|
return Error.New("provided redundancy scheme parameters not allowed: want [%d, %d, %d, %d-%d, %d] got [%d, %d, %d, %d, %d]",
|
|
endpoint.requiredRSConfig.MinThreshold,
|
|
endpoint.requiredRSConfig.RepairThreshold,
|
|
endpoint.requiredRSConfig.SuccessThreshold,
|
|
endpoint.requiredRSConfig.MinTotalThreshold,
|
|
endpoint.requiredRSConfig.MaxTotalThreshold,
|
|
endpoint.requiredRSConfig.ErasureShareSize.Int32(),
|
|
|
|
redundancy.MinReq,
|
|
redundancy.RepairThreshold,
|
|
redundancy.SuccessThreshold,
|
|
redundancy.Total,
|
|
redundancy.ErasureShareSize,
|
|
)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (endpoint *Endpoint) validatePieceHash(ctx context.Context, piece *pb.RemotePiece, originalLimit *pb.OrderLimit, signee signing.Signee) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
if piece.Hash == nil {
|
|
return errs.New("no piece hash. NodeID: %v, PieceNum: %d", piece.NodeId, piece.PieceNum)
|
|
}
|
|
|
|
err = signing.VerifyPieceHashSignature(ctx, signee, piece.Hash)
|
|
if err != nil {
|
|
return errs.New("piece hash signature could not be verified for node (NodeID: %v, PieceNum: %d): %+v",
|
|
piece.NodeId, piece.PieceNum, err,
|
|
)
|
|
}
|
|
|
|
timestamp := piece.Hash.Timestamp
|
|
if timestamp.Before(time.Now().Add(-pieceHashExpiration)) {
|
|
return errs.New("piece hash timestamp is too old (%v). NodeId: %v, PieceNum: %d)",
|
|
timestamp, piece.NodeId, piece.PieceNum,
|
|
)
|
|
}
|
|
|
|
switch {
|
|
case originalLimit.PieceId != piece.Hash.PieceId:
|
|
return errs.New("piece hash pieceID (%v) doesn't match limit pieceID (%v). NodeID: %v, PieceNum: %d",
|
|
piece.Hash.PieceId, originalLimit.PieceId, piece.NodeId, piece.PieceNum,
|
|
)
|
|
case originalLimit.Limit < piece.Hash.PieceSize:
|
|
return errs.New("piece hash PieceSize (%d) is larger than order limit (%d). NodeID: %v, PieceNum: %d",
|
|
piece.Hash.PieceSize, originalLimit.Limit, piece.NodeId, piece.PieceNum,
|
|
)
|
|
}
|
|
|
|
return nil
|
|
}
|