satellite/satellitedb: use noreturn (#3022)

This commit is contained in:
Egon Elbre 2019-09-12 20:31:50 +03:00 committed by GitHub
parent a085b05ec5
commit 8ef57a2af3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 991 additions and 655 deletions

View File

@ -82,7 +82,9 @@ type StoragenodeAccounting interface {
// architecture: Database
type ProjectAccounting interface {
// SaveTallies saves the latest project info
SaveTallies(ctx context.Context, intervalStart time.Time, bucketTallies map[string]*BucketTally) ([]BucketTally, error)
SaveTallies(ctx context.Context, intervalStart time.Time, bucketTallies map[string]*BucketTally) error
// GetTallies retrieves all tallies
GetTallies(ctx context.Context) ([]BucketTally, error)
// CreateStorageTally creates a record for BucketStorageTally in the accounting DB table
CreateStorageTally(ctx context.Context, tally BucketStorageTally) error
// GetAllocatedBandwidthTotal returns the sum of GET bandwidth usage allocated for a projectID in the past time frame

View File

@ -35,9 +35,13 @@ func TestSaveBucketTallies(t *testing.T) {
// Execute test: retrieve the save tallies and confirm they contains the expected data
intervalStart := time.Now()
pdb := db.ProjectAccounting()
actualTallies, err := pdb.SaveTallies(ctx, intervalStart, bucketTallies)
err = pdb.SaveTallies(ctx, intervalStart, bucketTallies)
require.NoError(t, err)
for _, tally := range actualTallies {
tallies, err := pdb.GetTallies(ctx)
require.NoError(t, err)
for _, tally := range tallies {
require.Contains(t, expectedTallies, tally)
}
})

View File

@ -98,7 +98,7 @@ func (t *Service) Tally(ctx context.Context) (err error) {
}
if len(bucketData) > 0 {
_, err = t.projectAccountingDB.SaveTallies(ctx, latestTally, bucketData)
err = t.projectAccountingDB.SaveTallies(ctx, latestTally, bucketData)
if err != nil {
errBucketInfo = errs.New("Saving bucket storage data failed")
}

View File

@ -110,7 +110,7 @@ func TestOnlyInline(t *testing.T) {
require.NoError(t, err)
assert.Len(t, actualNodeData, 0)
_, err = planet.Satellites[0].DB.ProjectAccounting().SaveTallies(ctx, latestTally, actualBucketData)
err = planet.Satellites[0].DB.ProjectAccounting().SaveTallies(ctx, latestTally, actualBucketData)
require.NoError(t, err)
// Confirm the correct bucket storage tally was created

View File

@ -132,9 +132,8 @@ func TestUsageRollups(t *testing.T) {
bucketTallies[bucketID2] = tally2
}
tallies, err := db.ProjectAccounting().SaveTallies(ctx, interval, bucketTallies)
err := db.ProjectAccounting().SaveTallies(ctx, interval, bucketTallies)
require.NoError(t, err)
require.Equal(t, len(tallies), len(buckets)*2)
}
usageRollups := db.Console().UsageRollups()

View File

@ -184,15 +184,13 @@ func (keys *apikeys) Create(ctx context.Context, head []byte, info console.APIKe
// Update implements satellite.APIKeys
func (keys *apikeys) Update(ctx context.Context, key console.APIKeyInfo) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = keys.methods.Update_ApiKey_By_Id(
return keys.methods.UpdateNoReturn_ApiKey_By_Id(
ctx,
dbx.ApiKey_Id(key.ID[:]),
dbx.ApiKey_Update_Fields{
Name: dbx.ApiKey_Name(key.Name),
},
)
return err
}
// Delete implements satellite.APIKeys

View File

@ -78,7 +78,7 @@ func (containment *containment) IncrementPending(ctx context.Context, pendingAud
Contained: dbx.Node_Contained(true),
}
_, err = tx.Update_Node_By_Id(ctx, dbx.Node_Id(pendingAudit.NodeID.Bytes()), updateContained)
err = tx.UpdateNoReturn_Node_By_Id(ctx, dbx.Node_Id(pendingAudit.NodeID.Bytes()), updateContained)
if err != nil {
return audit.ContainError.Wrap(errs.Combine(err, tx.Rollback()))
}
@ -106,7 +106,7 @@ func (containment *containment) Delete(ctx context.Context, id pb.NodeID) (_ boo
Contained: dbx.Node_Contained(false),
}
_, err = tx.Update_Node_By_Id(ctx, dbx.Node_Id(id.Bytes()), updateContained)
err = tx.UpdateNoReturn_Node_By_Id(ctx, dbx.Node_Id(id.Bytes()), updateContained)
if err != nil {
return isDeleted, audit.ContainError.Wrap(errs.Combine(err, tx.Rollback()))
}

View File

@ -55,8 +55,11 @@ model irreparabledb (
field repair_attempt_count int64 ( updatable )
)
create irreparabledb ( )
update irreparabledb ( where irreparabledb.segmentpath = ? )
create irreparabledb ( noreturn )
update irreparabledb (
where irreparabledb.segmentpath = ?
noreturn
)
delete irreparabledb ( where irreparabledb.segmentpath = ? )
read one (
@ -80,8 +83,11 @@ model accounting_timestamps (
field value timestamp ( updatable )
)
create accounting_timestamps ( )
update accounting_timestamps ( where accounting_timestamps.name = ? )
create accounting_timestamps ( noreturn )
update accounting_timestamps (
where accounting_timestamps.name = ?
noreturn
)
read scalar (
select accounting_timestamps.value
@ -102,7 +108,7 @@ model accounting_rollup (
field at_rest_total float64
)
create accounting_rollup ( )
create accounting_rollup ( noreturn )
delete accounting_rollup ( where accounting_rollup.id = ? )
read one (
@ -163,8 +169,12 @@ model node (
field uptime_reputation_beta float64 ( updatable )
)
create node ( )
create node ( noreturn )
update node ( where node.id = ? )
update node (
where node.id = ?
noreturn
)
delete node ( where node.id = ? )
// "Get" query; fails if node not found
@ -394,8 +404,11 @@ model api_key (
field created_at timestamp (autoinsert)
)
create api_key ()
update api_key ( where api_key.id = ? )
create api_key ( )
update api_key (
where api_key.id = ?
noreturn
)
delete api_key ( where api_key.id = ? )
read one (
@ -438,7 +451,7 @@ model bucket_usage (
field audit_egress uint64
)
create bucket_usage ()
create bucket_usage ( )
delete bucket_usage ( where bucket_usage.id = ? )
read one (
@ -489,7 +502,7 @@ model used_serial (
)
// inserting a new serial number
create serial_number ()
create serial_number ( noreturn )
// finding out information about the serial number
read scalar (
@ -503,7 +516,7 @@ delete serial_number (
)
// for preventing duplicate serial numbers
create used_serial ()
create used_serial ( noreturn )
// --- bucket accounting tables --- //
@ -552,7 +565,7 @@ model bucket_storage_tally (
field metadata_size uint64
)
create bucket_storage_tally ()
create bucket_storage_tally ( noreturn )
read first (
select bucket_storage_tally
@ -560,6 +573,10 @@ read first (
orderby desc bucket_storage_tally.interval_start
)
read all (
select bucket_storage_tally
)
read all (
select bucket_storage_tally
where bucket_storage_tally.project_id = ?
@ -608,7 +625,7 @@ model storagenode_storage_tally (
field data_total float64
)
create storagenode_storage_tally ()
create storagenode_storage_tally ( noreturn )
delete storagenode_storage_tally ( where storagenode_storage_tally.id = ? )
read one (
select storagenode_storage_tally
@ -633,9 +650,10 @@ model peer_identity (
field updated_at timestamp ( autoinsert, autoupdate )
)
create peer_identity ( )
create peer_identity ( noreturn )
update peer_identity (
where peer_identity.node_id = ?
noreturn
)
read one (
@ -734,6 +752,7 @@ read all (
update offer (
where offer.id = ?
noreturn
)
create offer ( )

File diff suppressed because it is too large Load Diff

View File

@ -33,7 +33,7 @@ func (db *irreparableDB) IncrementRepairAttempts(ctx context.Context, segmentInf
dbxInfo, err := tx.Get_Irreparabledb_By_Segmentpath(ctx, dbx.Irreparabledb_Segmentpath(segmentInfo.Path))
if err != nil {
// no rows err, so create/insert an entry
_, err = tx.Create_Irreparabledb(
err = tx.CreateNoReturn_Irreparabledb(
ctx,
dbx.Irreparabledb_Segmentpath(segmentInfo.Path),
dbx.Irreparabledb_Segmentdetail(bytes),
@ -50,7 +50,7 @@ func (db *irreparableDB) IncrementRepairAttempts(ctx context.Context, segmentInf
updateFields := dbx.Irreparabledb_Update_Fields{}
updateFields.RepairAttemptCount = dbx.Irreparabledb_RepairAttemptCount(dbxInfo.RepairAttemptCount)
updateFields.SegDamagedUnixSec = dbx.Irreparabledb_SegDamagedUnixSec(segmentInfo.LastRepairAttempt)
_, err = tx.Update_Irreparabledb_By_Segmentpath(
err = tx.UpdateNoReturn_Irreparabledb_By_Segmentpath(
ctx,
dbx.Irreparabledb_Segmentpath(dbxInfo.Segmentpath),
updateFields,

View File

@ -1037,8 +1037,15 @@ func (m *lockedProjectAccounting) GetStorageTotals(ctx context.Context, projectI
return m.db.GetStorageTotals(ctx, projectID)
}
// GetTallies retrieves all tallies
func (m *lockedProjectAccounting) GetTallies(ctx context.Context) ([]accounting.BucketTally, error) {
m.Lock()
defer m.Unlock()
return m.db.GetTallies(ctx)
}
// SaveTallies saves the latest project info
func (m *lockedProjectAccounting) SaveTallies(ctx context.Context, intervalStart time.Time, bucketTallies map[string]*accounting.BucketTally) ([]accounting.BucketTally, error) {
func (m *lockedProjectAccounting) SaveTallies(ctx context.Context, intervalStart time.Time, bucketTallies map[string]*accounting.BucketTally) error {
m.Lock()
defer m.Unlock()
return m.db.SaveTallies(ctx, intervalStart, bucketTallies)

View File

@ -153,19 +153,12 @@ func (db *offersDB) Create(ctx context.Context, o *rewards.NewOffer) (*rewards.O
// Finish changes the offer status to be Done and its expiration date to be now based on offer id
func (db *offersDB) Finish(ctx context.Context, oID int) error {
updateFields := dbx.Offer_Update_Fields{
Status: dbx.Offer_Status(int(rewards.Done)),
ExpiresAt: dbx.Offer_ExpiresAt(time.Now().UTC()),
}
offerID := dbx.Offer_Id(oID)
_, err := db.db.Update_Offer_By_Id(ctx, offerID, updateFields)
if err != nil {
return offerErr.Wrap(err)
}
return nil
return offerErr.Wrap(
db.db.UpdateNoReturn_Offer_By_Id(ctx,
dbx.Offer_Id(oID), dbx.Offer_Update_Fields{
Status: dbx.Offer_Status(int(rewards.Done)),
ExpiresAt: dbx.Offer_ExpiresAt(time.Now().UTC()),
}))
}
func offersFromDBX(offersDbx []*dbx.Offer) (rewards.Offers, error) {

View File

@ -37,13 +37,12 @@ type ordersDB struct {
// CreateSerialInfo creates serial number entry in database
func (db *ordersDB) CreateSerialInfo(ctx context.Context, serialNumber storj.SerialNumber, bucketID []byte, limitExpiration time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = db.db.Create_SerialNumber(
return db.db.CreateNoReturn_SerialNumber(
ctx,
dbx.SerialNumber_SerialNumber(serialNumber.Bytes()),
dbx.SerialNumber_BucketId(bucketID),
dbx.SerialNumber_ExpiresAt(limitExpiration),
)
return err
}
// DeleteExpiredSerials deletes all expired serials in serial_number and used_serials table.

View File

@ -615,7 +615,7 @@ func (cache *overlaycache) UpdateAddress(ctx context.Context, info *pb.Node, def
if err != nil {
// add the node to DB for first time
_, err = tx.Create_Node(
err = tx.CreateNoReturn_Node(
ctx,
dbx.Node_Id(info.Id.Bytes()),
dbx.Node_Address(address.Address),
@ -652,13 +652,12 @@ func (cache *overlaycache) UpdateAddress(ctx context.Context, info *pb.Node, def
return Error.Wrap(errs.Combine(err, tx.Rollback()))
}
} else {
update := dbx.Node_Update_Fields{
Address: dbx.Node_Address(address.Address),
LastNet: dbx.Node_LastNet(info.LastIp),
Protocol: dbx.Node_Protocol(int(address.Transport)),
}
_, err := tx.Update_Node_By_Id(ctx, dbx.Node_Id(info.Id.Bytes()), update)
err = tx.UpdateNoReturn_Node_By_Id(ctx, dbx.Node_Id(info.Id.Bytes()),
dbx.Node_Update_Fields{
Address: dbx.Node_Address(address.Address),
LastNet: dbx.Node_LastNet(info.LastIp),
Protocol: dbx.Node_Protocol(int(address.Transport)),
})
if err != nil {
return Error.Wrap(errs.Combine(err, tx.Rollback()))
}

View File

@ -44,23 +44,22 @@ func (idents *peerIdentities) Set(ctx context.Context, nodeID storj.NodeID, iden
serial, err := tx.Get_PeerIdentity_LeafSerialNumber_By_NodeId(ctx, dbx.PeerIdentity_NodeId(nodeID.Bytes()))
if serial == nil || err != nil {
if serial == nil || err == sql.ErrNoRows {
_, err = tx.Create_PeerIdentity(ctx,
return Error.Wrap(tx.CreateNoReturn_PeerIdentity(ctx,
dbx.PeerIdentity_NodeId(nodeID.Bytes()),
dbx.PeerIdentity_LeafSerialNumber(ident.Leaf.SerialNumber.Bytes()),
dbx.PeerIdentity_Chain(identity.EncodePeerIdentity(ident)),
)
return Error.Wrap(err)
))
}
return Error.Wrap(err)
}
if !bytes.Equal(serial.LeafSerialNumber, ident.Leaf.SerialNumber.Bytes()) {
_, err = tx.Update_PeerIdentity_By_NodeId(ctx,
return Error.Wrap(tx.UpdateNoReturn_PeerIdentity_By_NodeId(ctx,
dbx.PeerIdentity_NodeId(nodeID.Bytes()),
dbx.PeerIdentity_Update_Fields{
LeafSerialNumber: dbx.PeerIdentity_LeafSerialNumber(ident.Leaf.SerialNumber.Bytes()),
Chain: dbx.PeerIdentity_Chain(identity.EncodePeerIdentity(ident)),
},
)
))
}
return Error.Wrap(err)

View File

@ -22,54 +22,65 @@ type ProjectAccounting struct {
}
// SaveTallies saves the latest bucket info
func (db *ProjectAccounting) SaveTallies(ctx context.Context, intervalStart time.Time, bucketTallies map[string]*accounting.BucketTally) (_ []accounting.BucketTally, err error) {
func (db *ProjectAccounting) SaveTallies(ctx context.Context, intervalStart time.Time, bucketTallies map[string]*accounting.BucketTally) (err error) {
defer mon.Task()(&ctx)(&err)
if len(bucketTallies) == 0 {
return nil, Error.New("In SaveTallies with empty bucketTallies")
return nil
}
var result []accounting.BucketTally
// TODO: see if we can send all bucket storage tallies to the db in one operation
err = db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
return Error.Wrap(db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
for _, info := range bucketTallies {
bucketName := dbx.BucketStorageTally_BucketName(info.BucketName)
projectID := dbx.BucketStorageTally_ProjectId(info.ProjectID)
interval := dbx.BucketStorageTally_IntervalStart(intervalStart)
inlineBytes := dbx.BucketStorageTally_Inline(uint64(info.InlineBytes))
remoteBytes := dbx.BucketStorageTally_Remote(uint64(info.RemoteBytes))
rSegments := dbx.BucketStorageTally_RemoteSegmentsCount(uint(info.RemoteSegments))
iSegments := dbx.BucketStorageTally_InlineSegmentsCount(uint(info.InlineSegments))
objectCount := dbx.BucketStorageTally_ObjectCount(uint(info.Files))
meta := dbx.BucketStorageTally_MetadataSize(uint64(info.MetadataSize))
dbxTally, err := tx.Create_BucketStorageTally(ctx, bucketName, projectID, interval, inlineBytes, remoteBytes, rSegments, iSegments, objectCount, meta)
err := tx.CreateNoReturn_BucketStorageTally(ctx,
dbx.BucketStorageTally_BucketName(info.BucketName),
dbx.BucketStorageTally_ProjectId(info.ProjectID),
dbx.BucketStorageTally_IntervalStart(intervalStart),
dbx.BucketStorageTally_Inline(uint64(info.InlineBytes)),
dbx.BucketStorageTally_Remote(uint64(info.RemoteBytes)),
dbx.BucketStorageTally_RemoteSegmentsCount(uint(info.RemoteSegments)),
dbx.BucketStorageTally_InlineSegmentsCount(uint(info.InlineSegments)),
dbx.BucketStorageTally_ObjectCount(uint(info.Files)),
dbx.BucketStorageTally_MetadataSize(uint64(info.MetadataSize)),
)
if err != nil {
return Error.Wrap(err)
}
tally := accounting.BucketTally{
BucketName: dbxTally.BucketName,
ProjectID: dbxTally.ProjectId,
InlineSegments: int64(dbxTally.InlineSegmentsCount),
RemoteSegments: int64(dbxTally.RemoteSegmentsCount),
Files: int64(dbxTally.ObjectCount),
InlineBytes: int64(dbxTally.Inline),
RemoteBytes: int64(dbxTally.Remote),
MetadataSize: int64(dbxTally.MetadataSize),
}
result = append(result, tally)
}
return nil
})
}))
}
// GetTallies saves the latest bucket info
func (db *ProjectAccounting) GetTallies(ctx context.Context) (tallies []accounting.BucketTally, err error) {
defer mon.Task()(&ctx)(&err)
dbxTallies, err := db.db.All_BucketStorageTally(ctx)
if err != nil {
return nil, Error.Wrap(err)
}
return result, nil
for _, dbxTally := range dbxTallies {
tallies = append(tallies, accounting.BucketTally{
BucketName: dbxTally.BucketName,
ProjectID: dbxTally.ProjectId,
InlineSegments: int64(dbxTally.InlineSegmentsCount),
RemoteSegments: int64(dbxTally.RemoteSegmentsCount),
Files: int64(dbxTally.ObjectCount),
InlineBytes: int64(dbxTally.Inline),
RemoteBytes: int64(dbxTally.Remote),
MetadataSize: int64(dbxTally.MetadataSize),
})
}
return tallies, nil
}
// CreateStorageTally creates a record in the bucket_storage_tallies accounting table
func (db *ProjectAccounting) CreateStorageTally(ctx context.Context, tally accounting.BucketStorageTally) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = db.db.Create_BucketStorageTally(
return Error.Wrap(db.db.CreateNoReturn_BucketStorageTally(
ctx,
dbx.BucketStorageTally_BucketName([]byte(tally.BucketName)),
dbx.BucketStorageTally_ProjectId(tally.ProjectID[:]),
@ -80,11 +91,7 @@ func (db *ProjectAccounting) CreateStorageTally(ctx context.Context, tally accou
dbx.BucketStorageTally_InlineSegmentsCount(uint(tally.InlineSegmentCount)),
dbx.BucketStorageTally_ObjectCount(uint(tally.ObjectCount)),
dbx.BucketStorageTally_MetadataSize(uint64(tally.MetadataSize)),
)
if err != nil {
return err
}
return nil
))
}
// GetAllocatedBandwidthTotal returns the sum of GET bandwidth usage allocated for a projectID for a time frame

View File

@ -32,14 +32,17 @@ func (db *StoragenodeAccounting) SaveTallies(ctx context.Context, latestTally ti
nID := dbx.StoragenodeStorageTally_NodeId(k.Bytes())
end := dbx.StoragenodeStorageTally_IntervalEndTime(latestTally)
total := dbx.StoragenodeStorageTally_DataTotal(v)
_, err := tx.Create_StoragenodeStorageTally(ctx, nID, end, total)
err := tx.CreateNoReturn_StoragenodeStorageTally(ctx, nID, end, total)
if err != nil {
return err
}
}
update := dbx.AccountingTimestamps_Update_Fields{Value: dbx.AccountingTimestamps_Value(latestTally)}
_, err := tx.Update_AccountingTimestamps_By_Name(ctx, dbx.AccountingTimestamps_Name(accounting.LastAtRestTally), update)
return err
return tx.UpdateNoReturn_AccountingTimestamps_By_Name(ctx,
dbx.AccountingTimestamps_Name(accounting.LastAtRestTally),
dbx.AccountingTimestamps_Update_Fields{
Value: dbx.AccountingTimestamps_Value(latestTally),
},
)
})
return Error.Wrap(err)
}
@ -121,15 +124,20 @@ func (db *StoragenodeAccounting) SaveRollup(ctx context.Context, latestRollup ti
getRepair := dbx.AccountingRollup_GetRepairTotal(ar.GetRepairTotal)
putRepair := dbx.AccountingRollup_PutRepairTotal(ar.PutRepairTotal)
atRest := dbx.AccountingRollup_AtRestTotal(ar.AtRestTotal)
_, err := tx.Create_AccountingRollup(ctx, nID, start, put, get, audit, getRepair, putRepair, atRest)
err := tx.CreateNoReturn_AccountingRollup(ctx, nID, start, put, get, audit, getRepair, putRepair, atRest)
if err != nil {
return err
}
}
}
update := dbx.AccountingTimestamps_Update_Fields{Value: dbx.AccountingTimestamps_Value(latestRollup)}
_, err := tx.Update_AccountingTimestamps_By_Name(ctx, dbx.AccountingTimestamps_Name(accounting.LastRollup), update)
return err
return tx.UpdateNoReturn_AccountingTimestamps_By_Name(ctx,
dbx.AccountingTimestamps_Name(accounting.LastRollup),
dbx.AccountingTimestamps_Update_Fields{
Value: dbx.AccountingTimestamps_Value(latestRollup),
},
)
})
return Error.Wrap(err)
}
@ -141,9 +149,10 @@ func (db *StoragenodeAccounting) LastTimestamp(ctx context.Context, timestampTyp
err = db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
lt, err := tx.Find_AccountingTimestamps_Value_By_Name(ctx, dbx.AccountingTimestamps_Name(timestampType))
if lt == nil {
update := dbx.AccountingTimestamps_Value(lastTally)
_, err = tx.Create_AccountingTimestamps(ctx, dbx.AccountingTimestamps_Name(timestampType), update)
return err
return tx.CreateNoReturn_AccountingTimestamps(ctx,
dbx.AccountingTimestamps_Name(timestampType),
dbx.AccountingTimestamps_Value(lastTally),
)
}
lastTally = lt.Value
return err