satellite/{projectaccounting, console}:query to get single bucket rollup
Added new projectaccounting query to get project's single bucket usage rollup. Added new service method to call new query. Added implementation for IsAuthenticated method which is used by new generated API. Change-Id: I7cde5656d489953b6c7d109f236362eb465fa64a
This commit is contained in:
parent
d4a6524673
commit
9b5904cd49
@ -3,10 +3,13 @@
|
||||
|
||||
package api
|
||||
|
||||
import "net/http"
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
// Auth exposes methods to control authentication process for each endpoint.
|
||||
type Auth interface {
|
||||
// IsAuthenticated checks if request is performed with all needed authorization credentials.
|
||||
IsAuthenticated(r *http.Request) error
|
||||
IsAuthenticated(ctx context.Context, r *http.Request) (context.Context, error)
|
||||
}
|
||||
|
@ -158,7 +158,7 @@ func (a *API) generateGo() ([]byte, error) {
|
||||
p("")
|
||||
|
||||
if !endpoint.NoCookieAuth {
|
||||
p("err = h.auth.IsAuthenticated(r)")
|
||||
p("ctx, err = h.auth.IsAuthenticated(ctx, r)")
|
||||
p("if err != nil {")
|
||||
p("api.ServeError(h.log, w, http.StatusUnauthorized, err)")
|
||||
p("return")
|
||||
|
@ -143,7 +143,7 @@ type BucketUsagePage struct {
|
||||
// for certain period.
|
||||
type BucketUsageRollup struct {
|
||||
ProjectID uuid.UUID
|
||||
BucketName []byte
|
||||
BucketName string
|
||||
|
||||
TotalStoredData float64
|
||||
|
||||
@ -232,6 +232,8 @@ type ProjectAccounting interface {
|
||||
GetProjectObjectsSegments(ctx context.Context, projectID uuid.UUID) (*ProjectObjectsSegments, error)
|
||||
// GetBucketUsageRollups returns usage rollup per each bucket for specified period of time.
|
||||
GetBucketUsageRollups(ctx context.Context, projectID uuid.UUID, since, before time.Time) ([]BucketUsageRollup, error)
|
||||
// GetSingleBucketUsageRollup returns usage rollup per single bucket for specified period of time.
|
||||
GetSingleBucketUsageRollup(ctx context.Context, projectID uuid.UUID, bucket string, since, before time.Time) (*BucketUsageRollup, error)
|
||||
// GetBucketTotals returns per bucket usage summary for specified period of time.
|
||||
GetBucketTotals(ctx context.Context, projectID uuid.UUID, cursor BucketUsageCursor, since, before time.Time) (*BucketUsagePage, error)
|
||||
// ArchiveRollupsBefore archives rollups older than a given time and returns number of bucket bandwidth rollups archived.
|
||||
|
@ -49,7 +49,7 @@ func (h *Handler) handleGetUserProjects(w http.ResponseWriter, r *http.Request)
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
|
||||
err = h.auth.IsAuthenticated(r)
|
||||
ctx, err = h.auth.IsAuthenticated(ctx, r)
|
||||
if err != nil {
|
||||
api.ServeError(h.log, w, http.StatusUnauthorized, err)
|
||||
return
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"context"
|
||||
"crypto/subtle"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/mail"
|
||||
"sort"
|
||||
"time"
|
||||
@ -23,6 +24,7 @@ import (
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/private/cfgstruct"
|
||||
"storj.io/storj/private/api"
|
||||
"storj.io/storj/satellite/accounting"
|
||||
"storj.io/storj/satellite/analytics"
|
||||
"storj.io/storj/satellite/console/consoleauth"
|
||||
@ -1098,6 +1100,30 @@ func (s *Service) GetUsersProjects(ctx context.Context) (ps []Project, err error
|
||||
return
|
||||
}
|
||||
|
||||
// GenGetUsersProjects is a method for querying all projects for generated api.
|
||||
func (s *Service) GenGetUsersProjects(ctx context.Context) (ps []Project, httpErr api.HTTPError) {
|
||||
var err error
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
auth, err := s.getAuthAndAuditLog(ctx, "get users projects")
|
||||
if err != nil {
|
||||
return nil, api.HTTPError{
|
||||
Status: http.StatusUnauthorized,
|
||||
Err: Error.Wrap(err),
|
||||
}
|
||||
}
|
||||
|
||||
ps, err = s.store.Projects().GetByUserID(ctx, auth.User.ID)
|
||||
if err != nil {
|
||||
return nil, api.HTTPError{
|
||||
Status: http.StatusInternalServerError,
|
||||
Err: Error.Wrap(err),
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// GetUsersOwnedProjectsPage is a method for querying paged projects.
|
||||
func (s *Service) GetUsersOwnedProjectsPage(ctx context.Context, cursor ProjectsCursor) (_ ProjectsPage, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
@ -1658,6 +1684,38 @@ func (s *Service) GetBucketUsageRollups(ctx context.Context, projectID uuid.UUID
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// GenGetSingleBucketUsageRollup retrieves usage rollup for single bucket of particular project for a given period for generated api.
|
||||
func (s *Service) GenGetSingleBucketUsageRollup(ctx context.Context, projectID uuid.UUID, bucket string, since, before time.Time) (rollup *accounting.BucketUsageRollup, httpError api.HTTPError) {
|
||||
var err error
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
auth, err := s.getAuthAndAuditLog(ctx, "get single bucket usage rollup", zap.String("projectID", projectID.String()))
|
||||
if err != nil {
|
||||
return nil, api.HTTPError{
|
||||
Status: http.StatusUnauthorized,
|
||||
Err: Error.Wrap(err),
|
||||
}
|
||||
}
|
||||
|
||||
_, err = s.isProjectMember(ctx, auth.User.ID, projectID)
|
||||
if err != nil {
|
||||
return nil, api.HTTPError{
|
||||
Status: http.StatusUnauthorized,
|
||||
Err: Error.Wrap(err),
|
||||
}
|
||||
}
|
||||
|
||||
rollup, err = s.projectAccounting.GetSingleBucketUsageRollup(ctx, projectID, bucket, since, before)
|
||||
if err != nil {
|
||||
return nil, api.HTTPError{
|
||||
Status: http.StatusInternalServerError,
|
||||
Err: Error.Wrap(err),
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// GetDailyProjectUsage returns daily usage by project ID.
|
||||
func (s *Service) GetDailyProjectUsage(ctx context.Context, projectID uuid.UUID, from, to time.Time) (_ *accounting.ProjectDailyUsage, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
@ -1814,6 +1872,21 @@ func (s *Service) Authorize(ctx context.Context) (a Authorization, err error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
// IsAuthenticated checks if request has an authorization cookie.
|
||||
func (s *Service) IsAuthenticated(ctx context.Context, r *http.Request) (context.Context, error) {
|
||||
cookie, err := r.Cookie("_tokenKey")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
auth, err := s.Authorize(consoleauth.WithAPIKey(ctx, []byte(cookie.Value)))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return WithAuth(ctx, auth), nil
|
||||
}
|
||||
|
||||
// checkProjectCanBeDeleted ensures that all data, api-keys and buckets are deleted and usage has been accounted.
|
||||
// no error means the project status is clean.
|
||||
func (s *Service) checkProjectCanBeDeleted(ctx context.Context, project uuid.UUID) (err error) {
|
||||
|
@ -584,6 +584,34 @@ func (db *ProjectAccounting) GetBucketUsageRollups(ctx context.Context, projectI
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var bucketUsageRollups []accounting.BucketUsageRollup
|
||||
for _, bucket := range buckets {
|
||||
bucketRollup, err := db.getSingleBucketRollup(ctx, projectID, bucket, since, before)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
bucketUsageRollups = append(bucketUsageRollups, *bucketRollup)
|
||||
}
|
||||
|
||||
return bucketUsageRollups, nil
|
||||
}
|
||||
|
||||
// GetSingleBucketUsageRollup retrieves usage rollup for a single bucket of particular project for a given period.
|
||||
func (db *ProjectAccounting) GetSingleBucketUsageRollup(ctx context.Context, projectID uuid.UUID, bucket string, since, before time.Time) (_ *accounting.BucketUsageRollup, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
since = timeTruncateDown(since.UTC())
|
||||
before = before.UTC()
|
||||
|
||||
bucketRollup, err := db.getSingleBucketRollup(ctx, projectID, bucket, since, before)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return bucketRollup, nil
|
||||
}
|
||||
|
||||
func (db *ProjectAccounting) getSingleBucketRollup(ctx context.Context, projectID uuid.UUID, bucket string, since, before time.Time) (*accounting.BucketUsageRollup, error) {
|
||||
roullupsQuery := db.db.Rebind(`SELECT SUM(settled), SUM(inline), action
|
||||
FROM bucket_bandwidth_rollups
|
||||
WHERE project_id = ? AND bucket_name = ? AND interval_start >= ? AND interval_start <= ?
|
||||
@ -592,89 +620,78 @@ func (db *ProjectAccounting) GetBucketUsageRollups(ctx context.Context, projectI
|
||||
// TODO: should be optimized
|
||||
storageQuery := db.db.All_BucketStorageTally_By_ProjectId_And_BucketName_And_IntervalStart_GreaterOrEqual_And_IntervalStart_LessOrEqual_OrderBy_Desc_IntervalStart
|
||||
|
||||
var bucketUsageRollups []accounting.BucketUsageRollup
|
||||
for _, bucket := range buckets {
|
||||
err := func() error {
|
||||
bucketRollup := accounting.BucketUsageRollup{
|
||||
ProjectID: projectID,
|
||||
BucketName: []byte(bucket),
|
||||
Since: since,
|
||||
Before: before,
|
||||
}
|
||||
bucketRollup := &accounting.BucketUsageRollup{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucket,
|
||||
Since: since,
|
||||
Before: before,
|
||||
}
|
||||
|
||||
// get bucket_bandwidth_rollups
|
||||
rollupsRows, err := db.db.QueryContext(ctx, roullupsQuery, projectID[:], []byte(bucket), since, before)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() { err = errs.Combine(err, rollupsRows.Close()) }()
|
||||
// get bucket_bandwidth_rollup
|
||||
rollupRows, err := db.db.QueryContext(ctx, roullupsQuery, projectID[:], []byte(bucket), since, before)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() { err = errs.Combine(err, rollupRows.Close()) }()
|
||||
|
||||
// fill egress
|
||||
for rollupsRows.Next() {
|
||||
var action pb.PieceAction
|
||||
var settled, inline int64
|
||||
// fill egress
|
||||
for rollupRows.Next() {
|
||||
var action pb.PieceAction
|
||||
var settled, inline int64
|
||||
|
||||
err = rollupsRows.Scan(&settled, &inline, &action)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
switch action {
|
||||
case pb.PieceAction_GET:
|
||||
bucketRollup.GetEgress += memory.Size(settled + inline).GB()
|
||||
case pb.PieceAction_GET_AUDIT:
|
||||
bucketRollup.AuditEgress += memory.Size(settled + inline).GB()
|
||||
case pb.PieceAction_GET_REPAIR:
|
||||
bucketRollup.RepairEgress += memory.Size(settled + inline).GB()
|
||||
default:
|
||||
continue
|
||||
}
|
||||
}
|
||||
if err := rollupsRows.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
bucketStorageTallies, err := storageQuery(ctx,
|
||||
dbx.BucketStorageTally_ProjectId(projectID[:]),
|
||||
dbx.BucketStorageTally_BucketName([]byte(bucket)),
|
||||
dbx.BucketStorageTally_IntervalStart(since),
|
||||
dbx.BucketStorageTally_IntervalStart(before))
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// fill metadata, objects and stored data
|
||||
// hours calculated from previous tallies,
|
||||
// so we skip the most recent one
|
||||
for i := len(bucketStorageTallies) - 1; i > 0; i-- {
|
||||
current := bucketStorageTallies[i]
|
||||
|
||||
hours := bucketStorageTallies[i-1].IntervalStart.Sub(current.IntervalStart).Hours()
|
||||
|
||||
if current.TotalBytes > 0 {
|
||||
bucketRollup.TotalStoredData += memory.Size(current.TotalBytes).GB() * hours
|
||||
} else {
|
||||
bucketRollup.TotalStoredData += memory.Size(current.Remote+current.Inline).GB() * hours
|
||||
}
|
||||
bucketRollup.MetadataSize += memory.Size(current.MetadataSize).GB() * hours
|
||||
if current.TotalSegmentsCount > 0 {
|
||||
bucketRollup.TotalSegments += float64(current.TotalSegmentsCount) * hours
|
||||
} else {
|
||||
bucketRollup.TotalSegments += float64(current.RemoteSegmentsCount+current.InlineSegmentsCount) * hours
|
||||
}
|
||||
bucketRollup.ObjectCount += float64(current.ObjectCount) * hours
|
||||
}
|
||||
|
||||
bucketUsageRollups = append(bucketUsageRollups, bucketRollup)
|
||||
return nil
|
||||
}()
|
||||
err = rollupRows.Scan(&settled, &inline, &action)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch action {
|
||||
case pb.PieceAction_GET:
|
||||
bucketRollup.GetEgress += memory.Size(settled + inline).GB()
|
||||
case pb.PieceAction_GET_AUDIT:
|
||||
bucketRollup.AuditEgress += memory.Size(settled + inline).GB()
|
||||
case pb.PieceAction_GET_REPAIR:
|
||||
bucketRollup.RepairEgress += memory.Size(settled + inline).GB()
|
||||
default:
|
||||
continue
|
||||
}
|
||||
}
|
||||
if err := rollupRows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return bucketUsageRollups, nil
|
||||
bucketStorageTallies, err := storageQuery(ctx,
|
||||
dbx.BucketStorageTally_ProjectId(projectID[:]),
|
||||
dbx.BucketStorageTally_BucketName([]byte(bucket)),
|
||||
dbx.BucketStorageTally_IntervalStart(since),
|
||||
dbx.BucketStorageTally_IntervalStart(before))
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// fill metadata, objects and stored data
|
||||
// hours calculated from previous tallies,
|
||||
// so we skip the most recent one
|
||||
for i := len(bucketStorageTallies) - 1; i > 0; i-- {
|
||||
current := bucketStorageTallies[i]
|
||||
|
||||
hours := bucketStorageTallies[i-1].IntervalStart.Sub(current.IntervalStart).Hours()
|
||||
|
||||
if current.TotalBytes > 0 {
|
||||
bucketRollup.TotalStoredData += memory.Size(current.TotalBytes).GB() * hours
|
||||
} else {
|
||||
bucketRollup.TotalStoredData += memory.Size(current.Remote+current.Inline).GB() * hours
|
||||
}
|
||||
bucketRollup.MetadataSize += memory.Size(current.MetadataSize).GB() * hours
|
||||
if current.TotalSegmentsCount > 0 {
|
||||
bucketRollup.TotalSegments += float64(current.TotalSegmentsCount) * hours
|
||||
} else {
|
||||
bucketRollup.TotalSegments += float64(current.RemoteSegmentsCount+current.InlineSegmentsCount) * hours
|
||||
}
|
||||
bucketRollup.ObjectCount += float64(current.ObjectCount) * hours
|
||||
}
|
||||
|
||||
return bucketRollup, nil
|
||||
}
|
||||
|
||||
// prefixIncrement returns the lexicographically lowest byte string which is
|
||||
|
@ -81,3 +81,85 @@ func Test_DailyUsage(t *testing.T) {
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
func Test_GetSingleBucketRollup(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1},
|
||||
func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
const (
|
||||
bucketName = "testbucket"
|
||||
firstPath = "path"
|
||||
secondPath = "another_path"
|
||||
)
|
||||
|
||||
now := time.Now().UTC()
|
||||
inFiveMinutes := time.Now().Add(5 * time.Minute).UTC()
|
||||
|
||||
var (
|
||||
satelliteSys = planet.Satellites[0]
|
||||
uplink = planet.Uplinks[0]
|
||||
projectID = uplink.Projects[0].ID
|
||||
)
|
||||
|
||||
newUser := console.CreateUser{
|
||||
FullName: "Project Single Bucket Rollup",
|
||||
ShortName: "",
|
||||
Email: "sbur@test.test",
|
||||
}
|
||||
|
||||
user, err := satelliteSys.AddUser(ctx, newUser, 3)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = satelliteSys.DB.Console().ProjectMembers().Insert(ctx, user.ID, projectID)
|
||||
require.NoError(t, err)
|
||||
|
||||
planet.Satellites[0].Orders.Chore.Loop.Pause()
|
||||
satelliteSys.Accounting.Tally.Loop.Pause()
|
||||
|
||||
timeTruncateDown := func(t time.Time) time.Time {
|
||||
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 0, t.Location())
|
||||
}
|
||||
|
||||
usage0, err := satelliteSys.DB.ProjectAccounting().GetSingleBucketUsageRollup(ctx, projectID, bucketName, now, inFiveMinutes)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, bucketName, usage0.BucketName)
|
||||
require.Equal(t, projectID, usage0.ProjectID)
|
||||
require.Equal(t, timeTruncateDown(now), usage0.Since)
|
||||
require.Equal(t, inFiveMinutes, usage0.Before)
|
||||
require.Zero(t, usage0.GetEgress)
|
||||
require.Zero(t, usage0.ObjectCount)
|
||||
require.Zero(t, usage0.AuditEgress)
|
||||
require.Zero(t, usage0.RepairEgress)
|
||||
require.Zero(t, usage0.MetadataSize)
|
||||
require.Zero(t, usage0.TotalSegments)
|
||||
require.Zero(t, usage0.TotalStoredData)
|
||||
|
||||
firstSegment := testrand.Bytes(100 * memory.KiB)
|
||||
secondSegment := testrand.Bytes(200 * memory.KiB)
|
||||
|
||||
err = uplink.Upload(ctx, satelliteSys, bucketName, firstPath, firstSegment)
|
||||
require.NoError(t, err)
|
||||
err = uplink.Upload(ctx, satelliteSys, bucketName, secondPath, secondSegment)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = uplink.Download(ctx, satelliteSys, bucketName, firstPath)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, planet.WaitForStorageNodeEndpoints(ctx))
|
||||
tomorrow := time.Now().Add(24 * time.Hour)
|
||||
planet.StorageNodes[0].Storage2.Orders.SendOrders(ctx, tomorrow)
|
||||
|
||||
planet.Satellites[0].Orders.Chore.Loop.TriggerWait()
|
||||
satelliteSys.Accounting.Tally.Loop.TriggerWait()
|
||||
// We trigger tally one more time because the most recent tally is skipped in service method.
|
||||
satelliteSys.Accounting.Tally.Loop.TriggerWait()
|
||||
|
||||
usage1, err := satelliteSys.DB.ProjectAccounting().GetSingleBucketUsageRollup(ctx, projectID, bucketName, now, inFiveMinutes)
|
||||
require.NoError(t, err)
|
||||
require.Greater(t, usage1.GetEgress, 0.0)
|
||||
require.Greater(t, usage1.ObjectCount, 0.0)
|
||||
require.Greater(t, usage1.MetadataSize, 0.0)
|
||||
require.Greater(t, usage1.TotalSegments, 0.0)
|
||||
require.Greater(t, usage1.TotalStoredData, 0.0)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user