Tracking bandwidth v3-446 (#430)
* Added a new table 'mib' with 'data', 'size' and 'method' columns * added AddMIB() function and test case TestMIBHappyPath() * added function and a test case to add entries into bandwidth usage table * added functionality to create an entry, update the entry and readback the entry based on a given date into/from bandwidth tbl * added initial SumBandwidthSizes() * added the functionality to retrieve the total bw usage based on start and end date * Added the unit test case for AddBwUsageTbl * changed the arguments to take time format as arg than Unix format * changed the arguments to take time format as arg than Unix format * changes per code review comments * adding back go.sum * changes per code review comments * changes per code review comments * changes per code review comments
This commit is contained in:
parent
b546ed9510
commit
4a51db2344
@ -6,6 +6,7 @@ package psdb
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"errors"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
@ -79,6 +80,11 @@ func Open(ctx context.Context, DataPath, DBPath string) (db *DB, err error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_, err = tx.Exec("CREATE TABLE IF NOT EXISTS `bwusagetbl` (`size` INT(10), `daystartdate` INT(10), `dayenddate` INT(10));")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
err = tx.Commit()
|
err = tx.Commit()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -247,3 +253,54 @@ func (db *DB) DeleteTTLByID(id string) error {
|
|||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AddBandwidthUsed adds bandwidth usage into database by date
|
||||||
|
func (db *DB) AddBandwidthUsed(size int64) (err error) {
|
||||||
|
defer db.locked()()
|
||||||
|
|
||||||
|
t := time.Now()
|
||||||
|
daystartunixtime := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location()).Unix()
|
||||||
|
dayendunixtime := time.Date(t.Year(), t.Month(), t.Day(), 24, 0, 0, 0, t.Location()).Unix()
|
||||||
|
|
||||||
|
var getSize int64
|
||||||
|
if (t.Unix() >= daystartunixtime) && (t.Unix() <= dayendunixtime) {
|
||||||
|
err = db.DB.QueryRow(`SELECT size FROM bwusagetbl WHERE daystartdate <= ? AND ? <= dayenddate`, t.Unix(), t.Unix()).Scan(&getSize)
|
||||||
|
switch {
|
||||||
|
case err == sql.ErrNoRows:
|
||||||
|
_, err = db.DB.Exec("INSERT INTO bwusagetbl (size, daystartdate, dayenddate) VALUES (?, ?, ?)", size, daystartunixtime, dayendunixtime)
|
||||||
|
return err
|
||||||
|
case err != nil:
|
||||||
|
return err
|
||||||
|
default:
|
||||||
|
getSize = size + getSize
|
||||||
|
_, err = db.DB.Exec("UPDATE bwusagetbl SET size = ? WHERE daystartdate = ?", getSize, daystartunixtime)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetBandwidthUsedByDay finds the so far bw used by day and return it
|
||||||
|
func (db *DB) GetBandwidthUsedByDay(t time.Time) (size int64, err error) {
|
||||||
|
defer db.locked()()
|
||||||
|
|
||||||
|
daystarttime := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location()).Unix()
|
||||||
|
err = db.DB.QueryRow(`SELECT size FROM bwusagetbl WHERE daystartdate=?`, daystarttime).Scan(&size)
|
||||||
|
return size, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetTotalBandwidthBetween each row in the bwusagetbl contains the total bw used per day
|
||||||
|
func (db *DB) GetTotalBandwidthBetween(startdate time.Time, enddate time.Time) (totalbwusage int64, err error) {
|
||||||
|
defer db.locked()()
|
||||||
|
|
||||||
|
startTimeUnix := time.Date(startdate.Year(), startdate.Month(), startdate.Day(), 0, 0, 0, 0, startdate.Location()).Unix()
|
||||||
|
endTimeUnix := time.Date(enddate.Year(), enddate.Month(), enddate.Day(), 0, 0, 0, 0, enddate.Location()).Unix()
|
||||||
|
defaultunixtime := time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day(), 0, 0, 0, 0, time.Now().Location()).Unix()
|
||||||
|
|
||||||
|
if (endTimeUnix < startTimeUnix) && (startTimeUnix > defaultunixtime || endTimeUnix > defaultunixtime) {
|
||||||
|
return totalbwusage, errors.New("Invalid date range")
|
||||||
|
}
|
||||||
|
|
||||||
|
err = db.DB.QueryRow(`SELECT SUM(size) FROM bwusagetbl WHERE daystartdate BETWEEN ? AND ?`, startTimeUnix, endTimeUnix).Scan(&totalbwusage)
|
||||||
|
return totalbwusage, err
|
||||||
|
}
|
||||||
|
@ -10,6 +10,7 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/gogo/protobuf/proto"
|
"github.com/gogo/protobuf/proto"
|
||||||
_ "github.com/mattn/go-sqlite3"
|
_ "github.com/mattn/go-sqlite3"
|
||||||
@ -184,6 +185,70 @@ func TestHappyPath(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestBandwidthUsage(t *testing.T) {
|
||||||
|
db, cleanup := openTest(t)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
type BWUSAGE struct {
|
||||||
|
size int64
|
||||||
|
timenow time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
bwtests := []BWUSAGE{
|
||||||
|
{size: 1000, timenow: time.Now()},
|
||||||
|
}
|
||||||
|
|
||||||
|
var bwTotal int64
|
||||||
|
t.Run("AddBandwidthUsed", func(t *testing.T) {
|
||||||
|
for P := 0; P < concurrency; P++ {
|
||||||
|
bwTotal = bwTotal + bwtests[0].size
|
||||||
|
t.Run("#"+strconv.Itoa(P), func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
for _, bw := range bwtests {
|
||||||
|
err := db.AddBandwidthUsed(bw.size)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("GetTotalBandwidthBetween", func(t *testing.T) {
|
||||||
|
for P := 0; P < concurrency; P++ {
|
||||||
|
t.Run("#"+strconv.Itoa(P), func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
for _, bw := range bwtests {
|
||||||
|
size, err := db.GetTotalBandwidthBetween(bw.timenow, bw.timenow)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if bwTotal != size {
|
||||||
|
t.Fatalf("expected %d got %d", bw.size, size)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("GetBandwidthUsedByDay", func(t *testing.T) {
|
||||||
|
for P := 0; P < concurrency; P++ {
|
||||||
|
t.Run("#"+strconv.Itoa(P), func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
for _, bw := range bwtests {
|
||||||
|
size, err := db.GetBandwidthUsedByDay(bw.timenow)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if bwTotal != size {
|
||||||
|
t.Fatalf("expected %d got %d", bw.size, size)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func BenchmarkWriteBandwidthAllocation(b *testing.B) {
|
func BenchmarkWriteBandwidthAllocation(b *testing.B) {
|
||||||
db, cleanup := openTest(b)
|
db, cleanup := openTest(b)
|
||||||
defer cleanup()
|
defer cleanup()
|
||||||
|
@ -167,6 +167,11 @@ func (s *Server) retrieveData(ctx context.Context, stream pb.PieceStoreRoutes_Re
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// write to bandwidth usage table
|
||||||
|
if err = s.DB.AddBandwidthUsed(used); err != nil {
|
||||||
|
return retrieved, allocated, StoreError.New("failed to write bandwidth info to database: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: handle errors
|
// TODO: handle errors
|
||||||
// _ = stream.Close()
|
// _ = stream.Close()
|
||||||
|
|
||||||
|
@ -10,7 +10,7 @@ import (
|
|||||||
|
|
||||||
"github.com/zeebo/errs"
|
"github.com/zeebo/errs"
|
||||||
"storj.io/storj/pkg/pb"
|
"storj.io/storj/pkg/pb"
|
||||||
"storj.io/storj/pkg/piecestore"
|
pstore "storj.io/storj/pkg/piecestore"
|
||||||
"storj.io/storj/pkg/utils"
|
"storj.io/storj/pkg/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -54,6 +54,9 @@ func (s *Server) Store(reqStream pb.PieceStoreRoutes_StoreServer) (err error) {
|
|||||||
return StoreError.New("failed to write piece meta data to database: %v", utils.CombineErrors(err, deleteErr))
|
return StoreError.New("failed to write piece meta data to database: %v", utils.CombineErrors(err, deleteErr))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err = s.DB.AddBandwidthUsed(total); err != nil {
|
||||||
|
return StoreError.New("failed to write bandwidth info to database: %v", err)
|
||||||
|
}
|
||||||
log.Printf("Successfully stored %s.", pd.GetId())
|
log.Printf("Successfully stored %s.", pd.GetId())
|
||||||
|
|
||||||
return reqStream.SendAndClose(&pb.PieceStoreSummary{Message: OK, TotalReceived: total})
|
return reqStream.SendAndClose(&pb.PieceStoreSummary{Message: OK, TotalReceived: total})
|
||||||
|
Loading…
Reference in New Issue
Block a user