From 9b5904cd4911d49ec325a02e17b1422a508b0960 Mon Sep 17 00:00:00 2001 From: Vitalii Shpital Date: Thu, 17 Feb 2022 15:48:39 +0200 Subject: [PATCH] satellite/{projectaccounting, console}:query to get single bucket rollup Added new projectaccounting query to get project's single bucket usage rollup. Added new service method to call new query. Added implementation for IsAuthenticated method which is used by new generated API. Change-Id: I7cde5656d489953b6c7d109f236362eb465fa64a --- private/api/authentication.go | 7 +- private/apigen/api.go | 2 +- satellite/accounting/db.go | 4 +- .../console/consoleweb/consoleapi/api.gen.go | 2 +- satellite/console/service.go | 73 ++++++++ satellite/satellitedb/projectaccounting.go | 167 ++++++++++-------- .../satellitedb/projectaccounting_test.go | 82 +++++++++ 7 files changed, 257 insertions(+), 80 deletions(-) diff --git a/private/api/authentication.go b/private/api/authentication.go index c7ebb8e56..aedceff5c 100644 --- a/private/api/authentication.go +++ b/private/api/authentication.go @@ -3,10 +3,13 @@ package api -import "net/http" +import ( + "context" + "net/http" +) // Auth exposes methods to control authentication process for each endpoint. type Auth interface { // IsAuthenticated checks if request is performed with all needed authorization credentials. - IsAuthenticated(r *http.Request) error + IsAuthenticated(ctx context.Context, r *http.Request) (context.Context, error) } diff --git a/private/apigen/api.go b/private/apigen/api.go index 8a63941dc..97c1d388a 100644 --- a/private/apigen/api.go +++ b/private/apigen/api.go @@ -158,7 +158,7 @@ func (a *API) generateGo() ([]byte, error) { p("") if !endpoint.NoCookieAuth { - p("err = h.auth.IsAuthenticated(r)") + p("ctx, err = h.auth.IsAuthenticated(ctx, r)") p("if err != nil {") p("api.ServeError(h.log, w, http.StatusUnauthorized, err)") p("return") diff --git a/satellite/accounting/db.go b/satellite/accounting/db.go index ab4154004..d43b0e6ca 100644 --- a/satellite/accounting/db.go +++ b/satellite/accounting/db.go @@ -143,7 +143,7 @@ type BucketUsagePage struct { // for certain period. type BucketUsageRollup struct { ProjectID uuid.UUID - BucketName []byte + BucketName string TotalStoredData float64 @@ -232,6 +232,8 @@ type ProjectAccounting interface { GetProjectObjectsSegments(ctx context.Context, projectID uuid.UUID) (*ProjectObjectsSegments, error) // GetBucketUsageRollups returns usage rollup per each bucket for specified period of time. GetBucketUsageRollups(ctx context.Context, projectID uuid.UUID, since, before time.Time) ([]BucketUsageRollup, error) + // GetSingleBucketUsageRollup returns usage rollup per single bucket for specified period of time. + GetSingleBucketUsageRollup(ctx context.Context, projectID uuid.UUID, bucket string, since, before time.Time) (*BucketUsageRollup, error) // GetBucketTotals returns per bucket usage summary for specified period of time. GetBucketTotals(ctx context.Context, projectID uuid.UUID, cursor BucketUsageCursor, since, before time.Time) (*BucketUsagePage, error) // ArchiveRollupsBefore archives rollups older than a given time and returns number of bucket bandwidth rollups archived. diff --git a/satellite/console/consoleweb/consoleapi/api.gen.go b/satellite/console/consoleweb/consoleapi/api.gen.go index 4ee8b31d7..a5f1c8493 100644 --- a/satellite/console/consoleweb/consoleapi/api.gen.go +++ b/satellite/console/consoleweb/consoleapi/api.gen.go @@ -49,7 +49,7 @@ func (h *Handler) handleGetUserProjects(w http.ResponseWriter, r *http.Request) w.Header().Set("Content-Type", "application/json") - err = h.auth.IsAuthenticated(r) + ctx, err = h.auth.IsAuthenticated(ctx, r) if err != nil { api.ServeError(h.log, w, http.StatusUnauthorized, err) return diff --git a/satellite/console/service.go b/satellite/console/service.go index 3a1c18a84..085a74dfa 100644 --- a/satellite/console/service.go +++ b/satellite/console/service.go @@ -7,6 +7,7 @@ import ( "context" "crypto/subtle" "fmt" + "net/http" "net/mail" "sort" "time" @@ -23,6 +24,7 @@ import ( "storj.io/common/storj" "storj.io/common/uuid" "storj.io/private/cfgstruct" + "storj.io/storj/private/api" "storj.io/storj/satellite/accounting" "storj.io/storj/satellite/analytics" "storj.io/storj/satellite/console/consoleauth" @@ -1098,6 +1100,30 @@ func (s *Service) GetUsersProjects(ctx context.Context) (ps []Project, err error return } +// GenGetUsersProjects is a method for querying all projects for generated api. +func (s *Service) GenGetUsersProjects(ctx context.Context) (ps []Project, httpErr api.HTTPError) { + var err error + defer mon.Task()(&ctx)(&err) + + auth, err := s.getAuthAndAuditLog(ctx, "get users projects") + if err != nil { + return nil, api.HTTPError{ + Status: http.StatusUnauthorized, + Err: Error.Wrap(err), + } + } + + ps, err = s.store.Projects().GetByUserID(ctx, auth.User.ID) + if err != nil { + return nil, api.HTTPError{ + Status: http.StatusInternalServerError, + Err: Error.Wrap(err), + } + } + + return +} + // GetUsersOwnedProjectsPage is a method for querying paged projects. func (s *Service) GetUsersOwnedProjectsPage(ctx context.Context, cursor ProjectsCursor) (_ ProjectsPage, err error) { defer mon.Task()(&ctx)(&err) @@ -1658,6 +1684,38 @@ func (s *Service) GetBucketUsageRollups(ctx context.Context, projectID uuid.UUID return result, nil } +// GenGetSingleBucketUsageRollup retrieves usage rollup for single bucket of particular project for a given period for generated api. +func (s *Service) GenGetSingleBucketUsageRollup(ctx context.Context, projectID uuid.UUID, bucket string, since, before time.Time) (rollup *accounting.BucketUsageRollup, httpError api.HTTPError) { + var err error + defer mon.Task()(&ctx)(&err) + + auth, err := s.getAuthAndAuditLog(ctx, "get single bucket usage rollup", zap.String("projectID", projectID.String())) + if err != nil { + return nil, api.HTTPError{ + Status: http.StatusUnauthorized, + Err: Error.Wrap(err), + } + } + + _, err = s.isProjectMember(ctx, auth.User.ID, projectID) + if err != nil { + return nil, api.HTTPError{ + Status: http.StatusUnauthorized, + Err: Error.Wrap(err), + } + } + + rollup, err = s.projectAccounting.GetSingleBucketUsageRollup(ctx, projectID, bucket, since, before) + if err != nil { + return nil, api.HTTPError{ + Status: http.StatusInternalServerError, + Err: Error.Wrap(err), + } + } + + return +} + // GetDailyProjectUsage returns daily usage by project ID. func (s *Service) GetDailyProjectUsage(ctx context.Context, projectID uuid.UUID, from, to time.Time) (_ *accounting.ProjectDailyUsage, err error) { defer mon.Task()(&ctx)(&err) @@ -1814,6 +1872,21 @@ func (s *Service) Authorize(ctx context.Context) (a Authorization, err error) { }, nil } +// IsAuthenticated checks if request has an authorization cookie. +func (s *Service) IsAuthenticated(ctx context.Context, r *http.Request) (context.Context, error) { + cookie, err := r.Cookie("_tokenKey") + if err != nil { + return nil, err + } + + auth, err := s.Authorize(consoleauth.WithAPIKey(ctx, []byte(cookie.Value))) + if err != nil { + return nil, err + } + + return WithAuth(ctx, auth), nil +} + // checkProjectCanBeDeleted ensures that all data, api-keys and buckets are deleted and usage has been accounted. // no error means the project status is clean. func (s *Service) checkProjectCanBeDeleted(ctx context.Context, project uuid.UUID) (err error) { diff --git a/satellite/satellitedb/projectaccounting.go b/satellite/satellitedb/projectaccounting.go index 7f628ac27..1fe7bc481 100644 --- a/satellite/satellitedb/projectaccounting.go +++ b/satellite/satellitedb/projectaccounting.go @@ -584,6 +584,34 @@ func (db *ProjectAccounting) GetBucketUsageRollups(ctx context.Context, projectI return nil, err } + var bucketUsageRollups []accounting.BucketUsageRollup + for _, bucket := range buckets { + bucketRollup, err := db.getSingleBucketRollup(ctx, projectID, bucket, since, before) + if err != nil { + return nil, err + } + + bucketUsageRollups = append(bucketUsageRollups, *bucketRollup) + } + + return bucketUsageRollups, nil +} + +// GetSingleBucketUsageRollup retrieves usage rollup for a single bucket of particular project for a given period. +func (db *ProjectAccounting) GetSingleBucketUsageRollup(ctx context.Context, projectID uuid.UUID, bucket string, since, before time.Time) (_ *accounting.BucketUsageRollup, err error) { + defer mon.Task()(&ctx)(&err) + since = timeTruncateDown(since.UTC()) + before = before.UTC() + + bucketRollup, err := db.getSingleBucketRollup(ctx, projectID, bucket, since, before) + if err != nil { + return nil, err + } + + return bucketRollup, nil +} + +func (db *ProjectAccounting) getSingleBucketRollup(ctx context.Context, projectID uuid.UUID, bucket string, since, before time.Time) (*accounting.BucketUsageRollup, error) { roullupsQuery := db.db.Rebind(`SELECT SUM(settled), SUM(inline), action FROM bucket_bandwidth_rollups WHERE project_id = ? AND bucket_name = ? AND interval_start >= ? AND interval_start <= ? @@ -592,89 +620,78 @@ func (db *ProjectAccounting) GetBucketUsageRollups(ctx context.Context, projectI // TODO: should be optimized storageQuery := db.db.All_BucketStorageTally_By_ProjectId_And_BucketName_And_IntervalStart_GreaterOrEqual_And_IntervalStart_LessOrEqual_OrderBy_Desc_IntervalStart - var bucketUsageRollups []accounting.BucketUsageRollup - for _, bucket := range buckets { - err := func() error { - bucketRollup := accounting.BucketUsageRollup{ - ProjectID: projectID, - BucketName: []byte(bucket), - Since: since, - Before: before, - } + bucketRollup := &accounting.BucketUsageRollup{ + ProjectID: projectID, + BucketName: bucket, + Since: since, + Before: before, + } - // get bucket_bandwidth_rollups - rollupsRows, err := db.db.QueryContext(ctx, roullupsQuery, projectID[:], []byte(bucket), since, before) - if err != nil { - return err - } - defer func() { err = errs.Combine(err, rollupsRows.Close()) }() + // get bucket_bandwidth_rollup + rollupRows, err := db.db.QueryContext(ctx, roullupsQuery, projectID[:], []byte(bucket), since, before) + if err != nil { + return nil, err + } + defer func() { err = errs.Combine(err, rollupRows.Close()) }() - // fill egress - for rollupsRows.Next() { - var action pb.PieceAction - var settled, inline int64 + // fill egress + for rollupRows.Next() { + var action pb.PieceAction + var settled, inline int64 - err = rollupsRows.Scan(&settled, &inline, &action) - if err != nil { - return err - } - - switch action { - case pb.PieceAction_GET: - bucketRollup.GetEgress += memory.Size(settled + inline).GB() - case pb.PieceAction_GET_AUDIT: - bucketRollup.AuditEgress += memory.Size(settled + inline).GB() - case pb.PieceAction_GET_REPAIR: - bucketRollup.RepairEgress += memory.Size(settled + inline).GB() - default: - continue - } - } - if err := rollupsRows.Err(); err != nil { - return err - } - - bucketStorageTallies, err := storageQuery(ctx, - dbx.BucketStorageTally_ProjectId(projectID[:]), - dbx.BucketStorageTally_BucketName([]byte(bucket)), - dbx.BucketStorageTally_IntervalStart(since), - dbx.BucketStorageTally_IntervalStart(before)) - - if err != nil { - return err - } - - // fill metadata, objects and stored data - // hours calculated from previous tallies, - // so we skip the most recent one - for i := len(bucketStorageTallies) - 1; i > 0; i-- { - current := bucketStorageTallies[i] - - hours := bucketStorageTallies[i-1].IntervalStart.Sub(current.IntervalStart).Hours() - - if current.TotalBytes > 0 { - bucketRollup.TotalStoredData += memory.Size(current.TotalBytes).GB() * hours - } else { - bucketRollup.TotalStoredData += memory.Size(current.Remote+current.Inline).GB() * hours - } - bucketRollup.MetadataSize += memory.Size(current.MetadataSize).GB() * hours - if current.TotalSegmentsCount > 0 { - bucketRollup.TotalSegments += float64(current.TotalSegmentsCount) * hours - } else { - bucketRollup.TotalSegments += float64(current.RemoteSegmentsCount+current.InlineSegmentsCount) * hours - } - bucketRollup.ObjectCount += float64(current.ObjectCount) * hours - } - - bucketUsageRollups = append(bucketUsageRollups, bucketRollup) - return nil - }() + err = rollupRows.Scan(&settled, &inline, &action) if err != nil { return nil, err } + + switch action { + case pb.PieceAction_GET: + bucketRollup.GetEgress += memory.Size(settled + inline).GB() + case pb.PieceAction_GET_AUDIT: + bucketRollup.AuditEgress += memory.Size(settled + inline).GB() + case pb.PieceAction_GET_REPAIR: + bucketRollup.RepairEgress += memory.Size(settled + inline).GB() + default: + continue + } + } + if err := rollupRows.Err(); err != nil { + return nil, err } - return bucketUsageRollups, nil + bucketStorageTallies, err := storageQuery(ctx, + dbx.BucketStorageTally_ProjectId(projectID[:]), + dbx.BucketStorageTally_BucketName([]byte(bucket)), + dbx.BucketStorageTally_IntervalStart(since), + dbx.BucketStorageTally_IntervalStart(before)) + + if err != nil { + return nil, err + } + + // fill metadata, objects and stored data + // hours calculated from previous tallies, + // so we skip the most recent one + for i := len(bucketStorageTallies) - 1; i > 0; i-- { + current := bucketStorageTallies[i] + + hours := bucketStorageTallies[i-1].IntervalStart.Sub(current.IntervalStart).Hours() + + if current.TotalBytes > 0 { + bucketRollup.TotalStoredData += memory.Size(current.TotalBytes).GB() * hours + } else { + bucketRollup.TotalStoredData += memory.Size(current.Remote+current.Inline).GB() * hours + } + bucketRollup.MetadataSize += memory.Size(current.MetadataSize).GB() * hours + if current.TotalSegmentsCount > 0 { + bucketRollup.TotalSegments += float64(current.TotalSegmentsCount) * hours + } else { + bucketRollup.TotalSegments += float64(current.RemoteSegmentsCount+current.InlineSegmentsCount) * hours + } + bucketRollup.ObjectCount += float64(current.ObjectCount) * hours + } + + return bucketRollup, nil } // prefixIncrement returns the lexicographically lowest byte string which is diff --git a/satellite/satellitedb/projectaccounting_test.go b/satellite/satellitedb/projectaccounting_test.go index 7a558b41d..aaa0c2c2c 100644 --- a/satellite/satellitedb/projectaccounting_test.go +++ b/satellite/satellitedb/projectaccounting_test.go @@ -81,3 +81,85 @@ func Test_DailyUsage(t *testing.T) { }, ) } + +func Test_GetSingleBucketRollup(t *testing.T) { + testplanet.Run(t, testplanet.Config{SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1}, + func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + const ( + bucketName = "testbucket" + firstPath = "path" + secondPath = "another_path" + ) + + now := time.Now().UTC() + inFiveMinutes := time.Now().Add(5 * time.Minute).UTC() + + var ( + satelliteSys = planet.Satellites[0] + uplink = planet.Uplinks[0] + projectID = uplink.Projects[0].ID + ) + + newUser := console.CreateUser{ + FullName: "Project Single Bucket Rollup", + ShortName: "", + Email: "sbur@test.test", + } + + user, err := satelliteSys.AddUser(ctx, newUser, 3) + require.NoError(t, err) + + _, err = satelliteSys.DB.Console().ProjectMembers().Insert(ctx, user.ID, projectID) + require.NoError(t, err) + + planet.Satellites[0].Orders.Chore.Loop.Pause() + satelliteSys.Accounting.Tally.Loop.Pause() + + timeTruncateDown := func(t time.Time) time.Time { + return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 0, t.Location()) + } + + usage0, err := satelliteSys.DB.ProjectAccounting().GetSingleBucketUsageRollup(ctx, projectID, bucketName, now, inFiveMinutes) + require.NoError(t, err) + require.Equal(t, bucketName, usage0.BucketName) + require.Equal(t, projectID, usage0.ProjectID) + require.Equal(t, timeTruncateDown(now), usage0.Since) + require.Equal(t, inFiveMinutes, usage0.Before) + require.Zero(t, usage0.GetEgress) + require.Zero(t, usage0.ObjectCount) + require.Zero(t, usage0.AuditEgress) + require.Zero(t, usage0.RepairEgress) + require.Zero(t, usage0.MetadataSize) + require.Zero(t, usage0.TotalSegments) + require.Zero(t, usage0.TotalStoredData) + + firstSegment := testrand.Bytes(100 * memory.KiB) + secondSegment := testrand.Bytes(200 * memory.KiB) + + err = uplink.Upload(ctx, satelliteSys, bucketName, firstPath, firstSegment) + require.NoError(t, err) + err = uplink.Upload(ctx, satelliteSys, bucketName, secondPath, secondSegment) + require.NoError(t, err) + + _, err = uplink.Download(ctx, satelliteSys, bucketName, firstPath) + require.NoError(t, err) + + require.NoError(t, planet.WaitForStorageNodeEndpoints(ctx)) + tomorrow := time.Now().Add(24 * time.Hour) + planet.StorageNodes[0].Storage2.Orders.SendOrders(ctx, tomorrow) + + planet.Satellites[0].Orders.Chore.Loop.TriggerWait() + satelliteSys.Accounting.Tally.Loop.TriggerWait() + // We trigger tally one more time because the most recent tally is skipped in service method. + satelliteSys.Accounting.Tally.Loop.TriggerWait() + + usage1, err := satelliteSys.DB.ProjectAccounting().GetSingleBucketUsageRollup(ctx, projectID, bucketName, now, inFiveMinutes) + require.NoError(t, err) + require.Greater(t, usage1.GetEgress, 0.0) + require.Greater(t, usage1.ObjectCount, 0.0) + require.Greater(t, usage1.MetadataSize, 0.0) + require.Greater(t, usage1.TotalSegments, 0.0) + require.Greater(t, usage1.TotalStoredData, 0.0) + }, + ) +}