satellite/accounting: modify tally.go to use returned segments values
modify tally to calculate how we need to update segments in the live accounting cache with UpdateProjectSegmentUsage method. adjust accounting.ExceedsUploadLimits to use only cache for segment validation, if cache returns 0 or key not found then we shouldn't reject such project as its possible that we won't have this value before first object iteration https://github.com/storj/storj/issues/4744 Change-Id: I32c22d7fb71236e354653ba8719e029fc71f04c7
This commit is contained in:
parent
aa728bd6ea
commit
a447b27ffc
@ -132,20 +132,9 @@ func (usage *Service) ExceedsUploadLimits(ctx context.Context, projectID uuid.UU
|
||||
})
|
||||
group.Go(func() error {
|
||||
var err error
|
||||
segmentUsage, err = usage.GetProjectSegmentUsage(ctx, projectID)
|
||||
segmentUsage, err = usage.liveAccounting.GetProjectSegmentUsage(ctx, projectID)
|
||||
// Verify If the cache key was not found
|
||||
if err != nil && ErrKeyNotFound.Has(err) {
|
||||
segmentGetTotal, err := usage.GetProjectSegments(ctx, projectID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create cache key with database value.
|
||||
if err := usage.liveAccounting.UpdateProjectSegmentUsage(ctx, projectID, segmentGetTotal); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
segmentUsage = segmentGetTotal
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
|
@ -234,36 +234,6 @@ func TestProjectSegmentLimitInline(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestProjectSegmentLimitWithoutCache(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
config.Metainfo.ProjectLimits.ValidateSegmentLimit = true
|
||||
config.Console.UsageLimits.Segment.Free = 5
|
||||
config.Console.UsageLimits.Segment.Paid = 5
|
||||
// this effectively disable live accounting cache
|
||||
config.LiveAccounting.BandwidthCacheTTL = -1
|
||||
config.LiveAccounting.AsOfSystemInterval = 0
|
||||
},
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
data := testrand.Bytes(1 * memory.KiB)
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
// successful upload
|
||||
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path/"+strconv.Itoa(i), data)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// upload fails due to segment limit
|
||||
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path/5", data)
|
||||
require.Error(t, err)
|
||||
// TODO should compare to uplink API error when exposed
|
||||
require.Contains(t, strings.ToLower(err.Error()), "segments limit")
|
||||
})
|
||||
}
|
||||
|
||||
func TestProjectBandwidthLimitWithoutCache(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
|
@ -112,7 +112,7 @@ func (service *Service) Tally(ctx context.Context) (err error) {
|
||||
|
||||
// No-op unless that there isn't an error getting the
|
||||
// liveAccounting.GetAllProjectTotals
|
||||
updateLiveAccountingTotals := func(_ map[uuid.UUID]int64) {}
|
||||
updateLiveAccountingTotals := func(_ map[uuid.UUID]accounting.Usage) {}
|
||||
|
||||
initialLiveTotals, err := service.liveAccounting.GetAllProjectTotals(ctx)
|
||||
if err != nil {
|
||||
@ -121,7 +121,7 @@ func (service *Service) Tally(ctx context.Context) (err error) {
|
||||
zap.Error(err),
|
||||
)
|
||||
} else {
|
||||
updateLiveAccountingTotals = func(tallyProjectTotals map[uuid.UUID]int64) {
|
||||
updateLiveAccountingTotals = func(tallyProjectTotals map[uuid.UUID]accounting.Usage) {
|
||||
latestLiveTotals, err := service.liveAccounting.GetAllProjectTotals(ctx)
|
||||
if err != nil {
|
||||
service.log.Error(
|
||||
@ -136,7 +136,7 @@ func (service *Service) Tally(ctx context.Context) (err error) {
|
||||
// Thus, we add them and set the total to 0.
|
||||
for projectID := range latestLiveTotals {
|
||||
if _, ok := tallyProjectTotals[projectID]; !ok {
|
||||
tallyProjectTotals[projectID] = 0
|
||||
tallyProjectTotals[projectID] = accounting.Usage{}
|
||||
}
|
||||
}
|
||||
|
||||
@ -148,7 +148,7 @@ func (service *Service) Tally(ctx context.Context) (err error) {
|
||||
|
||||
// read the method documentation why the increase passed to this method
|
||||
// is calculated in this way
|
||||
err = service.liveAccounting.AddProjectStorageUsage(ctx, projectID, -latestLiveTotals[projectID].Storage+tallyTotal+(delta/2))
|
||||
err = service.liveAccounting.AddProjectStorageUsage(ctx, projectID, -latestLiveTotals[projectID].Storage+tallyTotal.Storage+(delta/2))
|
||||
if err != nil {
|
||||
if accounting.ErrSystemOrNetError.Has(err) {
|
||||
service.log.Error(
|
||||
@ -160,8 +160,28 @@ func (service *Service) Tally(ctx context.Context) (err error) {
|
||||
|
||||
service.log.Error(
|
||||
"tally isn't updating the live accounting storage usage of the project in this cycle",
|
||||
zap.Error(err),
|
||||
zap.String("projectID", projectID.String()),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
|
||||
// difference between cached project totals and latest tally collector
|
||||
increment := tallyTotal.Segments - latestLiveTotals[projectID].Segments
|
||||
|
||||
err = service.liveAccounting.UpdateProjectSegmentUsage(ctx, projectID, increment)
|
||||
if err != nil {
|
||||
if accounting.ErrSystemOrNetError.Has(err) {
|
||||
service.log.Error(
|
||||
"tally isn't updating the live accounting segment usages of the projects in this cycle",
|
||||
zap.Error(err),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
service.log.Error(
|
||||
"tally isn't updating the live accounting segment usage of the project in this cycle",
|
||||
zap.String("projectID", projectID.String()),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
}
|
||||
@ -263,7 +283,7 @@ func (observer *BucketTallyCollector) Run(ctx context.Context) (err error) {
|
||||
}
|
||||
|
||||
// ensureBucket returns bucket corresponding to the passed in path.
|
||||
func (observer *BucketTallyCollector) ensureBucket(ctx context.Context, location metabase.ObjectLocation) *accounting.BucketTally {
|
||||
func (observer *BucketTallyCollector) ensureBucket(location metabase.ObjectLocation) *accounting.BucketTally {
|
||||
bucketLocation := location.Bucket()
|
||||
bucket, exists := observer.Bucket[bucketLocation]
|
||||
if !exists {
|
||||
@ -283,7 +303,7 @@ func (observer *BucketTallyCollector) object(ctx context.Context, object metabas
|
||||
return nil
|
||||
}
|
||||
|
||||
bucket := observer.ensureBucket(ctx, object.ObjectStream.Location())
|
||||
bucket := observer.ensureBucket(object.ObjectStream.Location())
|
||||
bucket.TotalSegments += int64(object.SegmentCount)
|
||||
bucket.TotalBytes += object.TotalEncryptedSize
|
||||
bucket.MetadataSize += int64(object.EncryptedMetadataSize)
|
||||
@ -292,10 +312,13 @@ func (observer *BucketTallyCollector) object(ctx context.Context, object metabas
|
||||
return nil
|
||||
}
|
||||
|
||||
func projectTotalsFromBuckets(buckets map[metabase.BucketLocation]*accounting.BucketTally) map[uuid.UUID]int64 {
|
||||
projectTallyTotals := make(map[uuid.UUID]int64)
|
||||
func projectTotalsFromBuckets(buckets map[metabase.BucketLocation]*accounting.BucketTally) map[uuid.UUID]accounting.Usage {
|
||||
projectTallyTotals := make(map[uuid.UUID]accounting.Usage)
|
||||
for _, bucket := range buckets {
|
||||
projectTallyTotals[bucket.ProjectID] += bucket.TotalBytes
|
||||
projectUsage := projectTallyTotals[bucket.ProjectID]
|
||||
projectUsage.Storage += bucket.TotalBytes
|
||||
projectUsage.Segments += bucket.TotalSegments
|
||||
projectTallyTotals[bucket.ProjectID] = projectUsage
|
||||
}
|
||||
return projectTallyTotals
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user