satellite/accounting/tally: ignore expired pointers
until we do a good job of cleaning them up, we should at least not charge or pay people for them. nodes already locally delete expired segments. subsumes the tests in 1112. Change-Id: I5961185764e02f6136b3231b44ecc75a9a8832c9
This commit is contained in:
parent
02613407ae
commit
83bbc7a37e
@ -332,6 +332,60 @@ func TestBilling_DownloadAndNoUploadTraffic(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestBilling_ExpiredFiles(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
const (
|
||||
bucketName = "a-bucket"
|
||||
objectKey = "object-filename"
|
||||
)
|
||||
|
||||
satelliteSys := planet.Satellites[0]
|
||||
satelliteSys.Audit.Chore.Loop.Stop()
|
||||
satelliteSys.Repair.Repairer.Loop.Stop()
|
||||
|
||||
satelliteSys.Accounting.Tally.Loop.Pause()
|
||||
|
||||
tallies := getTallies(ctx, t, planet, 0)
|
||||
require.Zero(t, len(tallies), "There should be no tally at this point")
|
||||
|
||||
now := time.Now()
|
||||
expirationDate := now.Add(time.Hour)
|
||||
|
||||
{
|
||||
uplink := planet.Uplinks[0]
|
||||
data := testrand.Bytes(128 * memory.KiB)
|
||||
err := uplink.UploadWithExpiration(ctx, satelliteSys, bucketName, objectKey, data, expirationDate)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, planet.WaitForStorageNodeEndpoints(ctx))
|
||||
|
||||
tallies = getTallies(ctx, t, planet, 0)
|
||||
require.NotZero(t, len(tallies), "There should be at least one tally")
|
||||
|
||||
// set the tally service to be in the future for the next get tallies call. it should
|
||||
// not add any tallies.
|
||||
planet.Satellites[0].Accounting.Tally.SetNow(func() time.Time {
|
||||
return now.Add(2 * time.Hour)
|
||||
})
|
||||
newTallies := getTallies(ctx, t, planet, 0)
|
||||
require.Equal(t, tallies, newTallies)
|
||||
})
|
||||
}
|
||||
|
||||
func getTallies(ctx context.Context, t *testing.T, planet *testplanet.Planet, satelliteIdx int) []accounting.BucketTally {
|
||||
t.Helper()
|
||||
sat := planet.Satellites[satelliteIdx]
|
||||
sat.Accounting.Tally.Loop.TriggerWait()
|
||||
sat.Accounting.Tally.Loop.Pause()
|
||||
|
||||
tallies, err := sat.DB.ProjectAccounting().GetTallies(ctx)
|
||||
require.NoError(t, err)
|
||||
return tallies
|
||||
|
||||
}
|
||||
|
||||
func TestBilling_ZombieSegments(t *testing.T) {
|
||||
// failing test - see https://storjlabs.atlassian.net/browse/SM-592
|
||||
t.Skip("Zombie segments do get billed. Wait for resolution of SM-592")
|
||||
|
@ -41,6 +41,7 @@ type Service struct {
|
||||
liveAccounting accounting.Cache
|
||||
storagenodeAccountingDB accounting.StoragenodeAccounting
|
||||
projectAccountingDB accounting.ProjectAccounting
|
||||
nowFn func() time.Time
|
||||
}
|
||||
|
||||
// New creates a new tally Service
|
||||
@ -53,6 +54,7 @@ func New(log *zap.Logger, sdb accounting.StoragenodeAccounting, pdb accounting.P
|
||||
liveAccounting: liveAccounting,
|
||||
storagenodeAccountingDB: sdb,
|
||||
projectAccountingDB: pdb,
|
||||
nowFn: time.Now,
|
||||
}
|
||||
}
|
||||
|
||||
@ -75,6 +77,12 @@ func (service *Service) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetNow allows tests to have the Service act as if the current time is whatever
|
||||
// they want. This avoids races and sleeping, making tests more reliable and efficient.
|
||||
func (service *Service) SetNow(now func() time.Time) {
|
||||
service.nowFn = now
|
||||
}
|
||||
|
||||
// Tally calculates data-at-rest usage once
|
||||
func (service *Service) Tally(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
@ -89,16 +97,16 @@ func (service *Service) Tally(ctx context.Context) (err error) {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
if lastTime.IsZero() {
|
||||
lastTime = time.Now()
|
||||
lastTime = service.nowFn()
|
||||
}
|
||||
|
||||
// add up all nodes and buckets
|
||||
observer := NewObserver(service.log.Named("observer"))
|
||||
observer := NewObserver(service.log.Named("observer"), service.nowFn())
|
||||
err = service.metainfoLoop.Join(ctx, observer)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
finishTime := time.Now()
|
||||
finishTime := service.nowFn()
|
||||
|
||||
// calculate byte hours, not just bytes
|
||||
hours := time.Since(lastTime).Hours()
|
||||
@ -184,20 +192,27 @@ var _ metainfo.Observer = (*Observer)(nil)
|
||||
|
||||
// Observer observes metainfo and adds up tallies for nodes and buckets
|
||||
type Observer struct {
|
||||
Now time.Time
|
||||
Log *zap.Logger
|
||||
Node map[storj.NodeID]float64
|
||||
Bucket map[string]*accounting.BucketTally
|
||||
}
|
||||
|
||||
// NewObserver returns an metainfo loop observer that adds up totals for buckets and nodes.
|
||||
func NewObserver(log *zap.Logger) *Observer {
|
||||
// The now argument controls when the observer considers pointers to be expired.
|
||||
func NewObserver(log *zap.Logger, now time.Time) *Observer {
|
||||
return &Observer{
|
||||
Now: now,
|
||||
Log: log,
|
||||
Node: make(map[storj.NodeID]float64),
|
||||
Bucket: make(map[string]*accounting.BucketTally),
|
||||
}
|
||||
}
|
||||
|
||||
func (observer *Observer) pointerExpired(pointer *pb.Pointer) bool {
|
||||
return !pointer.ExpirationDate.IsZero() && pointer.ExpirationDate.Before(observer.Now)
|
||||
}
|
||||
|
||||
// ensureBucket returns bucket corresponding to the passed in path
|
||||
func (observer *Observer) ensureBucket(ctx context.Context, path metainfo.ScopedPath) *accounting.BucketTally {
|
||||
bucketID := storj.JoinPaths(path.ProjectIDString, path.BucketName)
|
||||
@ -215,6 +230,10 @@ func (observer *Observer) ensureBucket(ctx context.Context, path metainfo.Scoped
|
||||
|
||||
// Object is called for each object once.
|
||||
func (observer *Observer) Object(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error) {
|
||||
if observer.pointerExpired(pointer) {
|
||||
return nil
|
||||
}
|
||||
|
||||
bucket := observer.ensureBucket(ctx, path)
|
||||
bucket.ObjectCount++
|
||||
return nil
|
||||
@ -222,6 +241,10 @@ func (observer *Observer) Object(ctx context.Context, path metainfo.ScopedPath,
|
||||
|
||||
// InlineSegment is called for each inline segment.
|
||||
func (observer *Observer) InlineSegment(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error) {
|
||||
if observer.pointerExpired(pointer) {
|
||||
return nil
|
||||
}
|
||||
|
||||
bucket := observer.ensureBucket(ctx, path)
|
||||
bucket.InlineSegments++
|
||||
bucket.InlineBytes += int64(len(pointer.InlineSegment))
|
||||
@ -232,6 +255,10 @@ func (observer *Observer) InlineSegment(ctx context.Context, path metainfo.Scope
|
||||
|
||||
// RemoteSegment is called for each remote segment.
|
||||
func (observer *Observer) RemoteSegment(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error) {
|
||||
if observer.pointerExpired(pointer) {
|
||||
return nil
|
||||
}
|
||||
|
||||
bucket := observer.ensureBucket(ctx, path)
|
||||
bucket.RemoteSegments++
|
||||
bucket.RemoteBytes += pointer.GetSegmentSize()
|
||||
|
@ -96,7 +96,7 @@ func TestOnlyInline(t *testing.T) {
|
||||
|
||||
// run multiple times to ensure we add tallies
|
||||
for i := 0; i < 2; i++ {
|
||||
obs := tally.NewObserver(planet.Satellites[0].Log.Named("observer"))
|
||||
obs := tally.NewObserver(planet.Satellites[0].Log.Named("observer"), time.Now())
|
||||
err := planet.Satellites[0].Metainfo.Loop.Join(ctx, obs)
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -133,7 +133,7 @@ func TestCalculateNodeAtRestData(t *testing.T) {
|
||||
err = uplink.Upload(ctx, planet.Satellites[0], expectedBucketName, "test/path", expectedData)
|
||||
require.NoError(t, err)
|
||||
|
||||
obs := tally.NewObserver(planet.Satellites[0].Log.Named("observer"))
|
||||
obs := tally.NewObserver(planet.Satellites[0].Log.Named("observer"), time.Now())
|
||||
err = planet.Satellites[0].Metainfo.Loop.Join(ctx, obs)
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -192,7 +192,7 @@ func TestCalculateBucketAtRestData(t *testing.T) {
|
||||
newTally.ProjectID = projectID
|
||||
expectedBucketTallies[bucketID] = newTally
|
||||
|
||||
obs := tally.NewObserver(satellitePeer.Log.Named("observer"))
|
||||
obs := tally.NewObserver(satellitePeer.Log.Named("observer"), time.Now())
|
||||
err = satellitePeer.Metainfo.Loop.Join(ctx, obs)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedBucketTallies, obs.Bucket)
|
||||
@ -201,6 +201,34 @@ func TestCalculateBucketAtRestData(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestTallyIgnoresExpiredPointers(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
satellitePeer := planet.Satellites[0]
|
||||
redundancyScheme := planet.Uplinks[0].GetConfig(satellitePeer).GetRedundancyScheme()
|
||||
|
||||
project := "9656af6e-2d9c-42fa-91f2-bfd516a722d7"
|
||||
bucket := "bucket"
|
||||
|
||||
// setup: create an expired pointer and save it to pointerDB
|
||||
pointer := makePointer(planet.StorageNodes, redundancyScheme, int64(2), false)
|
||||
pointer.ExpirationDate = time.Now().Add(-24 * time.Hour)
|
||||
|
||||
metainfo := satellitePeer.Metainfo.Service
|
||||
objectPath := fmt.Sprintf("%s/%s/%s/%s", project, "l", bucket, "object/name")
|
||||
err := metainfo.Put(ctx, objectPath, pointer)
|
||||
require.NoError(t, err)
|
||||
|
||||
obs := tally.NewObserver(satellitePeer.Log.Named("observer"), time.Now())
|
||||
err = satellitePeer.Metainfo.Loop.Join(ctx, obs)
|
||||
require.NoError(t, err)
|
||||
|
||||
// there should be no observed buckets because all of the pointers are expired
|
||||
require.Equal(t, obs.Bucket, map[string]*accounting.BucketTally{})
|
||||
})
|
||||
}
|
||||
|
||||
func TestTallyLiveAccounting(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
|
||||
|
Loading…
Reference in New Issue
Block a user