storj/satellite/metainfo/validation.go

411 lines
12 KiB
Go
Raw Normal View History

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package metainfo
import (
"bytes"
"context"
2019-06-24 10:52:25 +01:00
"regexp"
"sync"
"time"
"github.com/gogo/protobuf/proto"
"github.com/skyrings/skyring-common/tools/uuid"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/time/rate"
"storj.io/common/encryption"
"storj.io/common/macaroon"
"storj.io/common/pb"
"storj.io/common/rpc/rpcstatus"
"storj.io/common/storj"
"storj.io/storj/pkg/auth"
"storj.io/storj/satellite/console"
)
2019-06-24 10:52:25 +01:00
const (
requestTTL = time.Hour * 4
)
var (
ipRegexp = regexp.MustCompile(`^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$`)
)
// TTLItem keeps association between serial number and ttl
type TTLItem struct {
serialNumber storj.SerialNumber
ttl time.Time
}
type createRequest struct {
Expiration time.Time
Redundancy *pb.RedundancyScheme
ttl time.Time
}
type createRequests struct {
mu sync.RWMutex
// orders limit serial number used because with CreateSegment we don't have path yet
entries map[storj.SerialNumber]*createRequest
muTTL sync.Mutex
entriesTTL []*TTLItem
}
func newCreateRequests() *createRequests {
return &createRequests{
entries: make(map[storj.SerialNumber]*createRequest),
entriesTTL: make([]*TTLItem, 0),
}
}
func (requests *createRequests) Put(serialNumber storj.SerialNumber, createRequest *createRequest) {
ttl := time.Now().Add(requestTTL)
go func() {
requests.muTTL.Lock()
requests.entriesTTL = append(requests.entriesTTL, &TTLItem{
serialNumber: serialNumber,
ttl: ttl,
})
requests.muTTL.Unlock()
}()
createRequest.ttl = ttl
requests.mu.Lock()
requests.entries[serialNumber] = createRequest
requests.mu.Unlock()
go requests.cleanup()
}
func (requests *createRequests) Load(serialNumber storj.SerialNumber) (*createRequest, bool) {
requests.mu.RLock()
request, found := requests.entries[serialNumber]
if request != nil && request.ttl.Before(time.Now()) {
request = nil
found = false
}
requests.mu.RUnlock()
return request, found
}
func (requests *createRequests) Remove(serialNumber storj.SerialNumber) {
requests.mu.Lock()
delete(requests.entries, serialNumber)
requests.mu.Unlock()
}
func (requests *createRequests) cleanup() {
requests.muTTL.Lock()
now := time.Now()
remove := make([]storj.SerialNumber, 0)
newStart := 0
for i, item := range requests.entriesTTL {
if item.ttl.Before(now) {
remove = append(remove, item.serialNumber)
newStart = i + 1
} else {
break
}
}
requests.entriesTTL = requests.entriesTTL[newStart:]
requests.muTTL.Unlock()
for _, serialNumber := range remove {
requests.Remove(serialNumber)
}
}
func getAPIKey(ctx context.Context, header *pb.RequestHeader) (key *macaroon.APIKey, err error) {
defer mon.Task()(&ctx)(&err)
if header != nil {
return macaroon.ParseRawAPIKey(header.ApiKey)
}
keyData, ok := auth.GetAPIKey(ctx)
if !ok {
return nil, errs.New("missing credentials")
}
return macaroon.ParseAPIKey(string(keyData))
}
// validateAuth validates things like API key, user permissions and rate limit and always returns valid rpc error.
func (endpoint *Endpoint) validateAuth(ctx context.Context, header *pb.RequestHeader, action macaroon.Action) (_ *console.APIKeyInfo, err error) {
defer mon.Task()(&ctx)(&err)
key, err := getAPIKey(ctx, header)
if err != nil {
endpoint.log.Debug("invalid request", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "Invalid API credentials")
}
keyInfo, err := endpoint.apiKeys.GetByHead(ctx, key.Head())
if err != nil {
endpoint.log.Debug("unauthorized request", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.PermissionDenied, "Unauthorized API credentials")
}
if err = endpoint.checkRate(ctx, keyInfo.ProjectID); err != nil {
endpoint.log.Debug("rate check failed", zap.Error(err))
return nil, err
}
// Revocations are currently handled by just deleting the key.
err = key.Check(ctx, keyInfo.Secret, action, nil)
if err != nil {
endpoint.log.Debug("unauthorized request", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.PermissionDenied, "Unauthorized API credentials")
}
return keyInfo, nil
}
func (endpoint *Endpoint) checkRate(ctx context.Context, projectID uuid.UUID) (err error) {
defer mon.Task()(&ctx)(&err)
if !endpoint.limiterConfig.Enabled {
return nil
}
limiter, err := endpoint.limiterCache.Get(projectID.String(), func() (interface{}, error) {
limit := rate.Limit(endpoint.limiterConfig.Rate)
project, err := endpoint.projects.Get(ctx, projectID)
if err != nil {
return false, err
}
if project.RateLimit != nil && *project.RateLimit > 0 {
limit = rate.Limit(*project.RateLimit)
}
// initialize the limiter with limit and burst the same so that we don't limit how quickly calls
// are made within the second.
return rate.NewLimiter(limit, int(limit)), nil
})
if err != nil {
return rpcstatus.Error(rpcstatus.Unavailable, err.Error())
}
if !limiter.(*rate.Limiter).Allow() {
endpoint.log.Warn("too many requests for project",
zap.Stringer("projectID", projectID),
zap.Float64("limit", float64(limiter.(*rate.Limiter).Limit())))
mon.Event("metainfo_rate_limit_exceeded") //locked
return rpcstatus.Error(rpcstatus.ResourceExhausted, "Too Many Requests")
}
return nil
}
func (endpoint *Endpoint) validateCommitSegment(ctx context.Context, req *pb.SegmentCommitRequestOld) (err error) {
defer mon.Task()(&ctx)(&err)
err = endpoint.validateBucket(ctx, req.Bucket)
if err != nil {
return err
}
err = endpoint.validatePointer(ctx, req.Pointer, req.OriginalLimits)
if err != nil {
return err
}
if len(req.OriginalLimits) > 0 {
createRequest, found := endpoint.createRequests.Load(req.OriginalLimits[0].SerialNumber)
switch {
case !found:
return Error.New("missing create request or request expired")
case !createRequest.Expiration.Equal(req.Pointer.ExpirationDate):
return Error.New("pointer expiration date does not match requested one")
case !proto.Equal(createRequest.Redundancy, req.Pointer.Remote.Redundancy):
return Error.New("pointer redundancy scheme date does not match requested one")
}
}
return nil
}
func (endpoint *Endpoint) validateBucket(ctx context.Context, bucket []byte) (err error) {
defer mon.Task()(&ctx)(&err)
if len(bucket) == 0 {
return Error.Wrap(storj.ErrNoBucket.New(""))
}
2019-06-24 10:52:25 +01:00
if len(bucket) < 3 || len(bucket) > 63 {
return Error.New("bucket name must be at least 3 and no more than 63 characters long")
}
2019-06-24 10:52:25 +01:00
// Regexp not used because benchmark shows it will be slower for valid bucket names
// https://gist.github.com/mniewrzal/49de3af95f36e63e88fac24f565e444c
labels := bytes.Split(bucket, []byte("."))
for _, label := range labels {
err = validateBucketLabel(label)
if err != nil {
return err
}
}
if ipRegexp.MatchString(string(bucket)) {
return Error.New("bucket name cannot be formatted as an IP address")
}
return nil
}
2019-06-24 10:52:25 +01:00
func validateBucketLabel(label []byte) error {
if len(label) == 0 {
return Error.New("bucket label cannot be empty")
}
if !isLowerLetter(label[0]) && !isDigit(label[0]) {
return Error.New("bucket label must start with a lowercase letter or number")
}
if label[0] == '-' || label[len(label)-1] == '-' {
return Error.New("bucket label cannot start or end with a hyphen")
}
for i := 1; i < len(label)-1; i++ {
if !isLowerLetter(label[i]) && !isDigit(label[i]) && (label[i] != '-') && (label[i] != '.') {
return Error.New("bucket name must contain only lowercase letters, numbers or hyphens")
}
}
return nil
}
func isLowerLetter(r byte) bool {
return r >= 'a' && r <= 'z'
}
func isDigit(r byte) bool {
return r >= '0' && r <= '9'
}
func (endpoint *Endpoint) validatePointer(ctx context.Context, pointer *pb.Pointer, originalLimits []*pb.OrderLimit) (err error) {
defer mon.Task()(&ctx)(&err)
if pointer == nil {
return Error.New("no pointer specified")
}
if pointer.Type == pb.Pointer_INLINE && pointer.Remote != nil {
return Error.New("pointer type is INLINE but remote segment is set")
}
if pointer.Type == pb.Pointer_REMOTE {
switch {
case pointer.Remote == nil:
return Error.New("no remote segment specified")
case pointer.Remote.RemotePieces == nil:
return Error.New("no remote segment pieces specified")
case pointer.Remote.Redundancy == nil:
return Error.New("no redundancy scheme specified")
}
remote := pointer.Remote
if len(originalLimits) == 0 {
return Error.New("no order limits")
}
if int32(len(originalLimits)) != remote.Redundancy.Total {
return Error.New("invalid no order limit for piece")
}
maxAllowed, err := encryption.CalcEncryptedSize(endpoint.requiredRSConfig.MaxSegmentSize.Int64(), storj.EncryptionParameters{
CipherSuite: storj.EncAESGCM,
BlockSize: 128, // intentionally low block size to allow maximum possible encryption overhead
})
if err != nil {
return err
}
if pointer.SegmentSize > maxAllowed || pointer.SegmentSize < 0 {
return Error.New("segment size %v is out of range, maximum allowed is %v", pointer.SegmentSize, maxAllowed)
}
pieceNums := make(map[int32]struct{})
nodeIds := make(map[storj.NodeID]struct{})
for _, piece := range remote.RemotePieces {
if piece.PieceNum >= int32(len(originalLimits)) {
return Error.New("invalid piece number")
}
limit := originalLimits[piece.PieceNum]
if limit == nil {
return Error.New("empty order limit for piece")
}
err := endpoint.orders.VerifyOrderLimitSignature(ctx, limit)
if err != nil {
return err
}
// expect that too much time has not passed between order limit creation and now
if time.Since(limit.OrderCreation) > endpoint.maxCommitInterval {
return Error.New("Segment not committed before max commit interval of %f minutes.", endpoint.maxCommitInterval.Minutes())
}
derivedPieceID := remote.RootPieceId.Derive(piece.NodeId, piece.PieceNum)
if limit.PieceId.IsZero() || limit.PieceId != derivedPieceID {
return Error.New("invalid order limit piece id")
}
if piece.NodeId != limit.StorageNodeId {
return Error.New("piece NodeID != order limit NodeID")
}
if _, ok := pieceNums[piece.PieceNum]; ok {
return Error.New("piece num %d is duplicated", piece.PieceNum)
}
if _, ok := nodeIds[piece.NodeId]; ok {
return Error.New("node id %s for piece num %d is duplicated", piece.NodeId.String(), piece.PieceNum)
}
pieceNums[piece.PieceNum] = struct{}{}
nodeIds[piece.NodeId] = struct{}{}
}
}
return nil
}
func (endpoint *Endpoint) validateRedundancy(ctx context.Context, redundancy *pb.RedundancyScheme) (err error) {
defer mon.Task()(&ctx)(&err)
if endpoint.requiredRSConfig.Validate {
if endpoint.requiredRSConfig.ErasureShareSize.Int32() != redundancy.ErasureShareSize ||
endpoint.requiredRSConfig.MinTotalThreshold > int(redundancy.Total) ||
endpoint.requiredRSConfig.MaxTotalThreshold < int(redundancy.Total) ||
endpoint.requiredRSConfig.MinThreshold != int(redundancy.MinReq) ||
endpoint.requiredRSConfig.RepairThreshold != int(redundancy.RepairThreshold) ||
endpoint.requiredRSConfig.SuccessThreshold != int(redundancy.SuccessThreshold) {
return Error.New("provided redundancy scheme parameters not allowed: want [%d, %d, %d, %d-%d, %d] got [%d, %d, %d, %d, %d]",
endpoint.requiredRSConfig.MinThreshold,
endpoint.requiredRSConfig.RepairThreshold,
endpoint.requiredRSConfig.SuccessThreshold,
endpoint.requiredRSConfig.MinTotalThreshold,
endpoint.requiredRSConfig.MaxTotalThreshold,
endpoint.requiredRSConfig.ErasureShareSize.Int32(),
2019-06-29 16:05:46 +01:00
redundancy.MinReq,
redundancy.RepairThreshold,
redundancy.SuccessThreshold,
redundancy.Total,
redundancy.ErasureShareSize,
)
}
}
return nil
}