satellite/metainfo: cleanup validation

Refactoring to do few things:
* move simple validation before validations with DB calls
* combine validation check/update for storage and segment
limits together

Change-Id: I6c2431ba236d4e388791d2e2d01ca7e0dd4439fc
This commit is contained in:
Michał Niewrzał 2022-01-13 10:57:31 +01:00
parent 18ab464b51
commit 5e9643e1b8
2 changed files with 81 additions and 124 deletions

View File

@ -8,7 +8,6 @@ import (
"context" "context"
"crypto/sha256" "crypto/sha256"
"fmt" "fmt"
"strconv"
"time" "time"
"github.com/spacemonkeygo/monkit/v3" "github.com/spacemonkeygo/monkit/v3"
@ -619,24 +618,14 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
} }
if endpoint.config.ProjectLimits.ValidateSegmentLimit { objectKeyLength := len(req.EncryptedPath)
if exceeded, limit, err := endpoint.projectUsage.ExceedsSegmentUsage(ctx, keyInfo.ProjectID); err != nil { if objectKeyLength > endpoint.config.MaxEncryptedObjectKeyLength {
if errs2.IsCanceled(err) { return nil, rpcstatus.Error(rpcstatus.InvalidArgument, fmt.Sprintf("key length is too big, got %v, maximum allowed is %v", objectKeyLength, endpoint.config.MaxEncryptedObjectKeyLength))
return nil, rpcstatus.Wrap(rpcstatus.Canceled, err)
} }
endpoint.log.Error( err = endpoint.checkUploadLimits(ctx, keyInfo.ProjectID)
"Retrieving project segment total failed; segment limit won't be enforced", if err != nil {
zap.Stringer("Project ID", keyInfo.ProjectID), return nil, err
zap.Error(err),
)
} else if exceeded {
endpoint.log.Warn("Segment limit exceeded",
zap.String("Limit", strconv.Itoa(int(limit))),
zap.Stringer("Project ID", keyInfo.ProjectID),
)
return nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Segments Limit")
}
} }
// TODO this needs to be optimized to avoid DB call on each request // TODO this needs to be optimized to avoid DB call on each request
@ -649,11 +638,6 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
} }
objectKeyLength := len(req.EncryptedPath)
if objectKeyLength > endpoint.config.MaxEncryptedObjectKeyLength {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, fmt.Sprintf("key length is too big, got %v, maximum allowed is %v", objectKeyLength, endpoint.config.MaxEncryptedObjectKeyLength))
}
if canDelete { if canDelete {
_, err = endpoint.DeleteObjectAnyStatus(ctx, metabase.ObjectLocation{ _, err = endpoint.DeleteObjectAnyStatus(ctx, metabase.ObjectLocation{
ProjectID: keyInfo.ProjectID, ProjectID: keyInfo.ProjectID,
@ -1693,33 +1677,13 @@ func (endpoint *Endpoint) BeginSegment(ctx context.Context, req *pb.SegmentBegin
return nil, err return nil, err
} }
if endpoint.config.ProjectLimits.ValidateSegmentLimit {
if exceeded, limit, err := endpoint.projectUsage.ExceedsSegmentUsage(ctx, keyInfo.ProjectID); err != nil {
if errs2.IsCanceled(err) {
return nil, rpcstatus.Wrap(rpcstatus.Canceled, err)
}
endpoint.log.Error(
"Retrieving project segment total failed; segment limit won't be enforced",
zap.Stringer("Project ID", keyInfo.ProjectID),
zap.Error(err),
)
} else if exceeded {
endpoint.log.Warn("Segment limit exceeded",
zap.String("Limit", strconv.Itoa(int(limit))),
zap.Stringer("Project ID", keyInfo.ProjectID),
)
return nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Segments Limit")
}
}
// no need to validate streamID fields because it was validated during BeginObject // no need to validate streamID fields because it was validated during BeginObject
if req.Position.Index < 0 { if req.Position.Index < 0 {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "segment index must be greater then 0") return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "segment index must be greater then 0")
} }
if err := endpoint.checkExceedsStorageUsage(ctx, keyInfo.ProjectID); err != nil { if err := endpoint.checkUploadLimits(ctx, keyInfo.ProjectID); err != nil {
return nil, err return nil, err
} }
@ -1962,38 +1926,13 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "mismatched segment size and piece usage") return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "mismatched segment size and piece usage")
} }
if err := endpoint.projectUsage.AddProjectStorageUsage(ctx, keyInfo.ProjectID, segmentSize); err != nil {
// log it and continue. it's most likely our own fault that we couldn't
// track it, and the only thing that will be affected is our per-project
// storage limits.
endpoint.log.Error("Could not track new project's storage usage",
zap.Stringer("Project ID", keyInfo.ProjectID),
zap.Error(err),
)
}
err = endpoint.metabase.CommitSegment(ctx, mbCommitSegment) err = endpoint.metabase.CommitSegment(ctx, mbCommitSegment)
if err != nil { if err != nil {
return nil, endpoint.convertMetabaseErr(err) return nil, endpoint.convertMetabaseErr(err)
} }
if endpoint.config.ProjectLimits.ValidateSegmentLimit { if err := endpoint.updateUploadLimits(ctx, keyInfo.ProjectID, segmentSize); err != nil {
// Update the current segment cache value incrementing by 1 as we commit single segment. return nil, err
err = endpoint.projectUsage.UpdateProjectSegmentUsage(ctx, keyInfo.ProjectID, 1)
if err != nil {
if errs2.IsCanceled(err) {
return nil, rpcstatus.Wrap(rpcstatus.Canceled, err)
}
// log it and continue. it's most likely our own fault that we couldn't
// track it, and the only thing that will be affected is our per-project
// segment limits.
endpoint.log.Error(
"Could not track the new project's segment usage when committing segment",
zap.Stringer("Project ID", keyInfo.ProjectID),
zap.Error(err),
)
}
} }
return &pb.SegmentCommitResponse{ return &pb.SegmentCommitResponse{
@ -2034,46 +1973,16 @@ func (endpoint *Endpoint) MakeInlineSegment(ctx context.Context, req *pb.Segment
return nil, rpcstatus.Errorf(rpcstatus.InvalidArgument, "inline segment size cannot be larger than %s", endpoint.config.MaxInlineSegmentSize) return nil, rpcstatus.Errorf(rpcstatus.InvalidArgument, "inline segment size cannot be larger than %s", endpoint.config.MaxInlineSegmentSize)
} }
if err := endpoint.checkExceedsStorageUsage(ctx, keyInfo.ProjectID); err != nil {
return nil, err
}
if endpoint.config.ProjectLimits.ValidateSegmentLimit {
if exceeded, limit, err := endpoint.projectUsage.ExceedsSegmentUsage(ctx, keyInfo.ProjectID); err != nil {
if errs2.IsCanceled(err) {
return nil, rpcstatus.Wrap(rpcstatus.Canceled, err)
}
endpoint.log.Error(
"Retrieving project segment total failed; segment limit won't be enforced",
zap.Stringer("Project ID", keyInfo.ProjectID),
zap.Error(err),
)
} else if exceeded {
endpoint.log.Warn("Segment limit exceeded",
zap.String("Limit", strconv.Itoa(int(limit))),
zap.Stringer("Project ID", keyInfo.ProjectID),
)
return nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Segments Limit")
}
}
if err := endpoint.projectUsage.AddProjectStorageUsage(ctx, keyInfo.ProjectID, inlineUsed); err != nil {
// log it and continue. it's most likely our own fault that we couldn't
// track it, and the only thing that will be affected is our per-project
// bandwidth and storage limits.
endpoint.log.Error("Could not track new project's storage usage",
zap.Stringer("Project ID", keyInfo.ProjectID),
zap.Error(err),
)
}
id, err := uuid.FromBytes(streamID.StreamId) id, err := uuid.FromBytes(streamID.StreamId)
if err != nil { if err != nil {
endpoint.log.Error("internal", zap.Error(err)) endpoint.log.Error("internal", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
} }
if err := endpoint.checkUploadLimits(ctx, keyInfo.ProjectID); err != nil {
return nil, err
}
var expiresAt *time.Time var expiresAt *time.Time
if !streamID.ExpirationDate.IsZero() { if !streamID.ExpirationDate.IsZero() {
expiresAt = &streamID.ExpirationDate expiresAt = &streamID.ExpirationDate
@ -2112,28 +2021,13 @@ func (endpoint *Endpoint) MakeInlineSegment(ctx context.Context, req *pb.Segment
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
} }
if err := endpoint.updateUploadLimits(ctx, keyInfo.ProjectID, inlineUsed); err != nil {
return nil, err
}
endpoint.log.Info("Inline Segment Upload", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "put"), zap.String("type", "inline")) endpoint.log.Info("Inline Segment Upload", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "put"), zap.String("type", "inline"))
mon.Meter("req_put_inline").Mark(1) mon.Meter("req_put_inline").Mark(1)
if endpoint.config.ProjectLimits.ValidateSegmentLimit {
// Update the current segment cache value incrementing by 1 as we commit single segment.
err = endpoint.projectUsage.UpdateProjectSegmentUsage(ctx, keyInfo.ProjectID, 1)
if err != nil {
if errs2.IsCanceled(err) {
return nil, rpcstatus.Wrap(rpcstatus.Canceled, err)
}
// log it and continue. it's most likely our own fault that we couldn't
// track it, and the only thing that will be affected is our per-project
// segment limits.
endpoint.log.Error(
"Could not track the new project's segment usage when committing segment",
zap.Stringer("Project ID", keyInfo.ProjectID),
zap.Error(err),
)
}
}
return &pb.SegmentMakeInlineResponse{}, nil return &pb.SegmentMakeInlineResponse{}, nil
} }

View File

@ -8,6 +8,7 @@ import (
"context" "context"
"crypto/subtle" "crypto/subtle"
"regexp" "regexp"
"strconv"
"time" "time"
"github.com/zeebo/errs" "github.com/zeebo/errs"
@ -15,6 +16,7 @@ import (
"golang.org/x/time/rate" "golang.org/x/time/rate"
"storj.io/common/encryption" "storj.io/common/encryption"
"storj.io/common/errs2"
"storj.io/common/macaroon" "storj.io/common/macaroon"
"storj.io/common/pb" "storj.io/common/pb"
"storj.io/common/rpc/rpcstatus" "storj.io/common/rpc/rpcstatus"
@ -327,3 +329,64 @@ func (endpoint *Endpoint) validateRemoteSegment(ctx context.Context, commitReque
return nil return nil
} }
func (endpoint *Endpoint) checkUploadLimits(ctx context.Context, projectID uuid.UUID) error {
if err := endpoint.checkExceedsStorageUsage(ctx, projectID); err != nil {
return err
}
if endpoint.config.ProjectLimits.ValidateSegmentLimit {
if exceeded, limit, err := endpoint.projectUsage.ExceedsSegmentUsage(ctx, projectID); err != nil {
if errs2.IsCanceled(err) {
return rpcstatus.Wrap(rpcstatus.Canceled, err)
}
endpoint.log.Error(
"Retrieving project segment total failed; segment limit won't be enforced",
zap.Stringer("Project ID", projectID),
zap.Error(err),
)
} else if exceeded {
endpoint.log.Warn("Segment limit exceeded",
zap.String("Limit", strconv.Itoa(int(limit))),
zap.Stringer("Project ID", projectID),
)
return rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Segments Limit")
}
}
return nil
}
func (endpoint *Endpoint) updateUploadLimits(ctx context.Context, projectID uuid.UUID, segmentSize int64) error {
if err := endpoint.projectUsage.AddProjectStorageUsage(ctx, projectID, segmentSize); err != nil {
// log it and continue. it's most likely our own fault that we couldn't
// track it, and the only thing that will be affected is our per-project
// bandwidth and storage limits.
endpoint.log.Error("Could not track new project's storage usage",
zap.Stringer("Project ID", projectID),
zap.Error(err),
)
}
if endpoint.config.ProjectLimits.ValidateSegmentLimit {
// Update the current segment cache value incrementing by 1 as we commit single segment.
err := endpoint.projectUsage.UpdateProjectSegmentUsage(ctx, projectID, 1)
if err != nil {
if errs2.IsCanceled(err) {
return rpcstatus.Wrap(rpcstatus.Canceled, err)
}
// log it and continue. it's most likely our own fault that we couldn't
// track it, and the only thing that will be affected is our per-project
// segment limits.
endpoint.log.Error(
"Could not track the new project's segment usage when committing segment",
zap.Stringer("Project ID", projectID),
zap.Error(err),
)
}
}
return nil
}