diff --git a/pkg/piecestore/rpc/server/psdb/psdb.go b/pkg/piecestore/rpc/server/psdb/psdb.go index 256db27d6..7b4443c6b 100644 --- a/pkg/piecestore/rpc/server/psdb/psdb.go +++ b/pkg/piecestore/rpc/server/psdb/psdb.go @@ -6,6 +6,7 @@ package psdb import ( "context" "database/sql" + "errors" "flag" "fmt" "os" @@ -79,6 +80,11 @@ func Open(ctx context.Context, DataPath, DBPath string) (db *DB, err error) { return nil, err } + _, err = tx.Exec("CREATE TABLE IF NOT EXISTS `bwusagetbl` (`size` INT(10), `daystartdate` INT(10), `dayenddate` INT(10));") + if err != nil { + return nil, err + } + err = tx.Commit() if err != nil { return nil, err @@ -247,3 +253,54 @@ func (db *DB) DeleteTTLByID(id string) error { } return err } + +// AddBandwidthUsed adds bandwidth usage into database by date +func (db *DB) AddBandwidthUsed(size int64) (err error) { + defer db.locked()() + + t := time.Now() + daystartunixtime := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location()).Unix() + dayendunixtime := time.Date(t.Year(), t.Month(), t.Day(), 24, 0, 0, 0, t.Location()).Unix() + + var getSize int64 + if (t.Unix() >= daystartunixtime) && (t.Unix() <= dayendunixtime) { + err = db.DB.QueryRow(`SELECT size FROM bwusagetbl WHERE daystartdate <= ? AND ? <= dayenddate`, t.Unix(), t.Unix()).Scan(&getSize) + switch { + case err == sql.ErrNoRows: + _, err = db.DB.Exec("INSERT INTO bwusagetbl (size, daystartdate, dayenddate) VALUES (?, ?, ?)", size, daystartunixtime, dayendunixtime) + return err + case err != nil: + return err + default: + getSize = size + getSize + _, err = db.DB.Exec("UPDATE bwusagetbl SET size = ? WHERE daystartdate = ?", getSize, daystartunixtime) + return err + } + } + return err +} + +// GetBandwidthUsedByDay finds the so far bw used by day and return it +func (db *DB) GetBandwidthUsedByDay(t time.Time) (size int64, err error) { + defer db.locked()() + + daystarttime := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location()).Unix() + err = db.DB.QueryRow(`SELECT size FROM bwusagetbl WHERE daystartdate=?`, daystarttime).Scan(&size) + return size, err +} + +// GetTotalBandwidthBetween each row in the bwusagetbl contains the total bw used per day +func (db *DB) GetTotalBandwidthBetween(startdate time.Time, enddate time.Time) (totalbwusage int64, err error) { + defer db.locked()() + + startTimeUnix := time.Date(startdate.Year(), startdate.Month(), startdate.Day(), 0, 0, 0, 0, startdate.Location()).Unix() + endTimeUnix := time.Date(enddate.Year(), enddate.Month(), enddate.Day(), 0, 0, 0, 0, enddate.Location()).Unix() + defaultunixtime := time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day(), 0, 0, 0, 0, time.Now().Location()).Unix() + + if (endTimeUnix < startTimeUnix) && (startTimeUnix > defaultunixtime || endTimeUnix > defaultunixtime) { + return totalbwusage, errors.New("Invalid date range") + } + + err = db.DB.QueryRow(`SELECT SUM(size) FROM bwusagetbl WHERE daystartdate BETWEEN ? AND ?`, startTimeUnix, endTimeUnix).Scan(&totalbwusage) + return totalbwusage, err +} diff --git a/pkg/piecestore/rpc/server/psdb/psdb_test.go b/pkg/piecestore/rpc/server/psdb/psdb_test.go index 19dbf4272..c45a0c68c 100644 --- a/pkg/piecestore/rpc/server/psdb/psdb_test.go +++ b/pkg/piecestore/rpc/server/psdb/psdb_test.go @@ -10,6 +10,7 @@ import ( "path/filepath" "strconv" "testing" + "time" "github.com/gogo/protobuf/proto" _ "github.com/mattn/go-sqlite3" @@ -184,6 +185,70 @@ func TestHappyPath(t *testing.T) { }) } +func TestBandwidthUsage(t *testing.T) { + db, cleanup := openTest(t) + defer cleanup() + + type BWUSAGE struct { + size int64 + timenow time.Time + } + + bwtests := []BWUSAGE{ + {size: 1000, timenow: time.Now()}, + } + + var bwTotal int64 + t.Run("AddBandwidthUsed", func(t *testing.T) { + for P := 0; P < concurrency; P++ { + bwTotal = bwTotal + bwtests[0].size + t.Run("#"+strconv.Itoa(P), func(t *testing.T) { + t.Parallel() + for _, bw := range bwtests { + err := db.AddBandwidthUsed(bw.size) + if err != nil { + t.Fatal(err) + } + } + }) + } + }) + + t.Run("GetTotalBandwidthBetween", func(t *testing.T) { + for P := 0; P < concurrency; P++ { + t.Run("#"+strconv.Itoa(P), func(t *testing.T) { + t.Parallel() + for _, bw := range bwtests { + size, err := db.GetTotalBandwidthBetween(bw.timenow, bw.timenow) + if err != nil { + t.Fatal(err) + } + if bwTotal != size { + t.Fatalf("expected %d got %d", bw.size, size) + } + } + }) + } + }) + + t.Run("GetBandwidthUsedByDay", func(t *testing.T) { + for P := 0; P < concurrency; P++ { + t.Run("#"+strconv.Itoa(P), func(t *testing.T) { + t.Parallel() + for _, bw := range bwtests { + size, err := db.GetBandwidthUsedByDay(bw.timenow) + if err != nil { + t.Fatal(err) + } + if bwTotal != size { + t.Fatalf("expected %d got %d", bw.size, size) + } + } + }) + } + }) +} + func BenchmarkWriteBandwidthAllocation(b *testing.B) { db, cleanup := openTest(b) defer cleanup() diff --git a/pkg/piecestore/rpc/server/retrieve.go b/pkg/piecestore/rpc/server/retrieve.go index 68246b8cb..8314566c7 100644 --- a/pkg/piecestore/rpc/server/retrieve.go +++ b/pkg/piecestore/rpc/server/retrieve.go @@ -167,6 +167,11 @@ func (s *Server) retrieveData(ctx context.Context, stream pb.PieceStoreRoutes_Re } } + // write to bandwidth usage table + if err = s.DB.AddBandwidthUsed(used); err != nil { + return retrieved, allocated, StoreError.New("failed to write bandwidth info to database: %v", err) + } + // TODO: handle errors // _ = stream.Close() diff --git a/pkg/piecestore/rpc/server/store.go b/pkg/piecestore/rpc/server/store.go index 4959835bf..aa9f0d38b 100644 --- a/pkg/piecestore/rpc/server/store.go +++ b/pkg/piecestore/rpc/server/store.go @@ -10,7 +10,7 @@ import ( "github.com/zeebo/errs" "storj.io/storj/pkg/pb" - "storj.io/storj/pkg/piecestore" + pstore "storj.io/storj/pkg/piecestore" "storj.io/storj/pkg/utils" ) @@ -54,6 +54,9 @@ func (s *Server) Store(reqStream pb.PieceStoreRoutes_StoreServer) (err error) { return StoreError.New("failed to write piece meta data to database: %v", utils.CombineErrors(err, deleteErr)) } + if err = s.DB.AddBandwidthUsed(total); err != nil { + return StoreError.New("failed to write bandwidth info to database: %v", err) + } log.Printf("Successfully stored %s.", pd.GetId()) return reqStream.SendAndClose(&pb.PieceStoreSummary{Message: OK, TotalReceived: total})