satellite/metainfo: сombine checks for storage and segment limit
We need to combine methods from accounting.Service (ExceedsStorageUsage and ExceedsSegmentUsage) to run checks concurrently. Resolves https://github.com/storj/team-metainfo/issues/73 Change-Id: I47831bca92457f16cfda789da89dbd460738ac97
This commit is contained in:
parent
270c6e24f2
commit
228f465d45
@ -109,81 +109,76 @@ func (usage *Service) ExceedsBandwidthUsage(ctx context.Context, projectID uuid.
|
||||
return false, limit, nil
|
||||
}
|
||||
|
||||
// ExceedsStorageUsage returns true if the storage usage for a project is currently over that project's limit.
|
||||
func (usage *Service) ExceedsStorageUsage(ctx context.Context, projectID uuid.UUID) (_ bool, limit memory.Size, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
var group errgroup.Group
|
||||
var totalUsed int64
|
||||
|
||||
group.Go(func() error {
|
||||
var err error
|
||||
limit, err = usage.projectLimitCache.GetProjectStorageLimit(ctx, projectID)
|
||||
return err
|
||||
})
|
||||
group.Go(func() error {
|
||||
var err error
|
||||
totalUsed, err = usage.GetProjectStorageTotals(ctx, projectID)
|
||||
return err
|
||||
})
|
||||
|
||||
err = group.Wait()
|
||||
if err != nil {
|
||||
return false, 0, ErrProjectUsage.Wrap(err)
|
||||
}
|
||||
|
||||
if totalUsed >= limit.Int64() {
|
||||
return true, limit, nil
|
||||
}
|
||||
|
||||
return false, limit, nil
|
||||
// UploadLimit contains upload limit characteristics.
|
||||
type UploadLimit struct {
|
||||
ExceedsStorage bool
|
||||
StorageLimit memory.Size
|
||||
ExceedsSegments bool
|
||||
SegmentsLimit int64
|
||||
}
|
||||
|
||||
// ExceedsSegmentUsage returns true if the segment usage for a project is currently over that project's limit.
|
||||
func (usage *Service) ExceedsSegmentUsage(ctx context.Context, projectID uuid.UUID) (_ bool, limit int64, err error) {
|
||||
// ExceedsUploadLimits returns combined checks for storage and segment limits.
|
||||
func (usage *Service) ExceedsUploadLimits(ctx context.Context, projectID uuid.UUID, checkSegmentsLimit bool) (limit UploadLimit, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
var group errgroup.Group
|
||||
var segmentUsage int64
|
||||
var segmentUsage, storageUsage int64
|
||||
|
||||
if checkSegmentsLimit {
|
||||
group.Go(func() error {
|
||||
var err error
|
||||
limit.SegmentsLimit, err = usage.projectLimitCache.GetProjectSegmentLimit(ctx, projectID)
|
||||
return err
|
||||
})
|
||||
group.Go(func() error {
|
||||
var err error
|
||||
segmentUsage, err = usage.GetProjectSegmentUsage(ctx, projectID)
|
||||
if err != nil {
|
||||
// Verify If the cache key was not found
|
||||
if ErrKeyNotFound.Has(err) {
|
||||
segmentGetTotal, err := usage.GetProjectSegments(ctx, projectID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create cache key with database value.
|
||||
err = usage.liveAccounting.UpdateProjectSegmentUsage(ctx, projectID, segmentUsage, usage.bandwidthCacheTTL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
segmentUsage = segmentGetTotal
|
||||
}
|
||||
}
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
group.Go(func() error {
|
||||
var err error
|
||||
limit, err = usage.projectLimitCache.GetProjectSegmentLimit(ctx, projectID)
|
||||
limit.StorageLimit, err = usage.projectLimitCache.GetProjectStorageLimit(ctx, projectID)
|
||||
return err
|
||||
})
|
||||
group.Go(func() error {
|
||||
var err error
|
||||
segmentUsage, err = usage.GetProjectSegmentUsage(ctx, projectID)
|
||||
if err != nil {
|
||||
// Verify If the cache key was not found
|
||||
if ErrKeyNotFound.Has(err) {
|
||||
segmentGetTotal, err := usage.GetProjectSegments(ctx, projectID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create cache key with database value.
|
||||
err = usage.liveAccounting.UpdateProjectSegmentUsage(ctx, projectID, segmentUsage, usage.bandwidthCacheTTL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
segmentUsage = segmentGetTotal
|
||||
}
|
||||
}
|
||||
storageUsage, err = usage.GetProjectStorageTotals(ctx, projectID)
|
||||
return err
|
||||
})
|
||||
|
||||
err = group.Wait()
|
||||
if err != nil {
|
||||
return false, 0, ErrProjectUsage.Wrap(err)
|
||||
return UploadLimit{}, ErrProjectUsage.Wrap(err)
|
||||
}
|
||||
|
||||
if segmentUsage >= limit {
|
||||
return true, limit, nil
|
||||
if segmentUsage >= limit.SegmentsLimit {
|
||||
limit.ExceedsSegments = true
|
||||
}
|
||||
|
||||
return false, limit, nil
|
||||
if storageUsage >= limit.StorageLimit.Int64() {
|
||||
limit.ExceedsStorage = true
|
||||
}
|
||||
|
||||
return limit, nil
|
||||
}
|
||||
|
||||
// GetProjectStorageTotals returns total amount of storage used by project.
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
@ -93,7 +94,7 @@ func TestProjectUsageStorage(t *testing.T) {
|
||||
// upload fails due to storage limit
|
||||
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path/1", data)
|
||||
require.Error(t, err)
|
||||
if !errors.Is(err, uplink.ErrBandwidthLimitExceeded) {
|
||||
if !strings.Contains(err.Error(), "Exceeded Storage Limit") {
|
||||
t.Fatal("Expected resource exhausted error. Got", err.Error())
|
||||
}
|
||||
|
||||
@ -475,10 +476,10 @@ func TestProjectUsageCustomLimit(t *testing.T) {
|
||||
err = projectUsage.AddProjectStorageUsage(ctx, project.ID, expectedLimit.Int64())
|
||||
require.NoError(t, err)
|
||||
|
||||
actualExceeded, limit, err := projectUsage.ExceedsStorageUsage(ctx, project.ID)
|
||||
limit, err := projectUsage.ExceedsUploadLimits(ctx, project.ID, false)
|
||||
require.NoError(t, err)
|
||||
require.True(t, actualExceeded)
|
||||
require.Equal(t, expectedLimit.Int64(), limit.Int64())
|
||||
require.True(t, limit.ExceedsStorage)
|
||||
require.Equal(t, expectedLimit.Int64(), limit.StorageLimit.Int64())
|
||||
|
||||
// Setup: create some bytes for the uplink to upload
|
||||
expectedData := testrand.Bytes(50 * memory.KiB)
|
||||
@ -805,7 +806,7 @@ func TestProjectUsage_FreeUsedStorageSpace(t *testing.T) {
|
||||
// we used limit so we should get error
|
||||
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "bucket", "3", data)
|
||||
require.Error(t, err)
|
||||
require.True(t, errors.Is(err, uplink.ErrBandwidthLimitExceeded))
|
||||
require.True(t, strings.Contains(err.Error(), "Exceeded Storage Limit"))
|
||||
|
||||
// delete object to free some storage space
|
||||
err = planet.Uplinks[0].DeleteObject(ctx, planet.Satellites[0], "bucket", "2")
|
||||
@ -821,7 +822,7 @@ func TestProjectUsage_FreeUsedStorageSpace(t *testing.T) {
|
||||
// should fail because we once again used space up to limit
|
||||
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "bucket", "2", data)
|
||||
require.Error(t, err)
|
||||
require.True(t, errors.Is(err, uplink.ErrBandwidthLimitExceeded))
|
||||
require.True(t, strings.Contains(err.Error(), "Exceeded Storage Limit"))
|
||||
})
|
||||
}
|
||||
|
||||
@ -870,7 +871,6 @@ func TestProjectUsageBandwidthResetAfter3days(t *testing.T) {
|
||||
actualExceeded, _, err := projectUsage.ExceedsBandwidthUsage(ctx, bucket.ProjectID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tt.expectedExceeds, actualExceeded, tt.description)
|
||||
|
||||
}
|
||||
})
|
||||
|
||||
@ -899,7 +899,7 @@ func TestProjectUsage_ResetLimitsFirstDayOfNextMonth(t *testing.T) {
|
||||
// verify that storage limit is all used
|
||||
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path2", data)
|
||||
require.Error(t, err)
|
||||
require.True(t, errors.Is(err, uplink.ErrBandwidthLimitExceeded))
|
||||
require.True(t, strings.Contains(err.Error(), "Exceeded Storage Limit"))
|
||||
|
||||
_, err = planet.Uplinks[0].Download(ctx, planet.Satellites[0], "testbucket", "test/path1")
|
||||
require.NoError(t, err)
|
||||
@ -915,7 +915,9 @@ func TestProjectUsage_ResetLimitsFirstDayOfNextMonth(t *testing.T) {
|
||||
// verify that bandwidth limit is all used
|
||||
_, err = planet.Uplinks[0].Download(ctx, planet.Satellites[0], "testbucket", "test/path1")
|
||||
require.Error(t, err)
|
||||
require.True(t, errors.Is(err, uplink.ErrBandwidthLimitExceeded))
|
||||
if !errors.Is(err, uplink.ErrBandwidthLimitExceeded) {
|
||||
t.Fatal("Expected resource exhausted error. Got", err.Error())
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
planet.Satellites[0].API.Accounting.ProjectUsage.SetNow(func() time.Time {
|
||||
@ -925,7 +927,7 @@ func TestProjectUsage_ResetLimitsFirstDayOfNextMonth(t *testing.T) {
|
||||
// verify that storage limit is all used even at the new billing cycle
|
||||
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path3", data)
|
||||
require.Error(t, err)
|
||||
require.True(t, errors.Is(err, uplink.ErrBandwidthLimitExceeded))
|
||||
require.True(t, strings.Contains(err.Error(), "Exceeded Storage Limit"))
|
||||
|
||||
// verify that new billing cycle reset bandwidth limit
|
||||
_, err = planet.Uplinks[0].Download(ctx, planet.Satellites[0], "testbucket", "test/path1")
|
||||
|
@ -1905,7 +1905,7 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm
|
||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
if err := endpoint.checkExceedsStorageUsage(ctx, keyInfo.ProjectID); err != nil {
|
||||
if err := endpoint.checkUploadLimits(ctx, keyInfo.ProjectID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -2709,31 +2709,6 @@ func (endpoint *Endpoint) RevokeAPIKey(ctx context.Context, req *pb.RevokeAPIKey
|
||||
return &pb.RevokeAPIKeyResponse{}, nil
|
||||
}
|
||||
|
||||
func (endpoint *Endpoint) checkExceedsStorageUsage(ctx context.Context, projectID uuid.UUID) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
exceeded, limit, err := endpoint.projectUsage.ExceedsStorageUsage(ctx, projectID)
|
||||
if err != nil {
|
||||
if errs2.IsCanceled(err) {
|
||||
return rpcstatus.Wrap(rpcstatus.Canceled, err)
|
||||
}
|
||||
|
||||
endpoint.log.Error(
|
||||
"Retrieving project storage totals failed; storage usage limit won't be enforced",
|
||||
zap.Stringer("Project ID", projectID),
|
||||
zap.Error(err),
|
||||
)
|
||||
} else if exceeded {
|
||||
endpoint.log.Warn("Monthly storage limit exceeded",
|
||||
zap.Stringer("Limit", limit),
|
||||
zap.Stringer("Project ID", projectID),
|
||||
)
|
||||
return rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Server side move.
|
||||
|
||||
// BeginMoveObject begins moving object to different key.
|
||||
|
@ -331,28 +331,34 @@ func (endpoint *Endpoint) validateRemoteSegment(ctx context.Context, commitReque
|
||||
}
|
||||
|
||||
func (endpoint *Endpoint) checkUploadLimits(ctx context.Context, projectID uuid.UUID) error {
|
||||
if err := endpoint.checkExceedsStorageUsage(ctx, projectID); err != nil {
|
||||
return err
|
||||
}
|
||||
validateSegments := endpoint.config.ProjectLimits.ValidateSegmentLimit
|
||||
|
||||
if endpoint.config.ProjectLimits.ValidateSegmentLimit {
|
||||
if exceeded, limit, err := endpoint.projectUsage.ExceedsSegmentUsage(ctx, projectID); err != nil {
|
||||
if errs2.IsCanceled(err) {
|
||||
return rpcstatus.Wrap(rpcstatus.Canceled, err)
|
||||
}
|
||||
if limit, err := endpoint.projectUsage.ExceedsUploadLimits(ctx, projectID, validateSegments); err != nil {
|
||||
if errs2.IsCanceled(err) {
|
||||
return rpcstatus.Wrap(rpcstatus.Canceled, err)
|
||||
}
|
||||
|
||||
endpoint.log.Error(
|
||||
"Retrieving project segment total failed; segment limit won't be enforced",
|
||||
zap.Stringer("Project ID", projectID),
|
||||
zap.Error(err),
|
||||
)
|
||||
} else if exceeded {
|
||||
endpoint.log.Error(
|
||||
"Retrieving project upload limit failed; limit won't be enforced",
|
||||
zap.Stringer("Project ID", projectID),
|
||||
zap.Error(err),
|
||||
)
|
||||
} else {
|
||||
if validateSegments && limit.ExceedsSegments {
|
||||
endpoint.log.Warn("Segment limit exceeded",
|
||||
zap.String("Limit", strconv.Itoa(int(limit))),
|
||||
zap.String("Limit", strconv.Itoa(int(limit.SegmentsLimit))),
|
||||
zap.Stringer("Project ID", projectID),
|
||||
)
|
||||
return rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Segments Limit")
|
||||
}
|
||||
|
||||
if limit.ExceedsStorage {
|
||||
endpoint.log.Warn("Storage limit exceeded",
|
||||
zap.String("Limit", strconv.Itoa(limit.StorageLimit.Int())),
|
||||
zap.Stringer("Project ID", projectID),
|
||||
)
|
||||
return rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Storage Limit")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
Loading…
Reference in New Issue
Block a user