Create Tally and Rollup pkgs in accounting (#642)

* creates separate tally and rollup packages and writes skeleton for rollup

* TODO add rollupDB and rawDB to rollup struct

* TODO add rawDB to tally struct
This commit is contained in:
Jennifer Li Johnson 2018-11-13 20:22:18 -05:00 committed by GitHub
parent 50d657af11
commit 377832c705
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 132 additions and 11 deletions

View File

@ -0,0 +1,15 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package rollup
import (
"github.com/zeebo/errs"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
)
// Error is a standard error class for this package.
var (
Error = errs.Class("rollup error")
mon = monkit.Package()
)

View File

@ -0,0 +1,41 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package rollup
import (
"context"
"time"
"go.uber.org/zap"
"storj.io/storj/pkg/provider"
)
// Config contains configurable values for rollup
type Config struct {
Interval time.Duration `help:"how frequently rollup should run" default:"30s"`
}
// Initialize a rollup struct
func (c Config) initialize(ctx context.Context) (Rollup, error) {
return newRollup(zap.L(), c.Interval), nil
}
// Run runs the rollup with configured values
func (c Config) Run(ctx context.Context, server *provider.Provider) (err error) {
rollup, err := c.initialize(ctx)
if err != nil {
return err
}
ctx, cancel := context.WithCancel(ctx)
go func() {
if err := rollup.Run(ctx); err != nil {
defer cancel()
zap.L().Error("Error running rollup", zap.Error(err))
}
}()
return server.Run(ctx)
}

View File

@ -0,0 +1,56 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package rollup
import (
"context"
"time"
"go.uber.org/zap"
)
// Rollup is the service for totalling data on storage nodes for 1, 7, 30 day intervals
type Rollup interface {
Run(ctx context.Context) error
}
type rollup struct {
logger *zap.Logger
ticker *time.Ticker
//TODO:
//rollupDB
//rawDB
}
func newRollup(logger *zap.Logger, interval time.Duration) *rollup {
return &rollup{
logger: logger,
ticker: time.NewTicker(interval),
//TODO:
//rollupDB
//rawDB
}
}
// Run the rollup loop
func (r *rollup) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
for {
err = r.Query(ctx)
if err != nil {
zap.L().Error("Rollup Query failed", zap.Error(err))
}
select {
case <-r.ticker.C: // wait for the next interval to happen
case <-ctx.Done(): // or the rollup is canceled via context
return ctx.Err()
}
}
}
func (r *rollup) Query(ctx context.Context) error {
return nil
}

View File

@ -0,0 +1,4 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package rollup

View File

@ -1,7 +1,7 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package accounting
package tally
import (
"github.com/zeebo/errs"

View File

@ -1,7 +1,7 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package accounting
package tally
import (
"context"
@ -15,12 +15,12 @@ import (
"storj.io/storj/pkg/provider"
)
// Config contains configurable values accounting
// Config contains configurable values for tally
type Config struct {
Interval time.Duration `help:"how frequently checker should audit segments" default:"30s"`
Interval time.Duration `help:"how frequently tally should run" default:"30s"`
}
// Initialize a Accounting tally struct
// Initialize a tally struct
func (c Config) initialize(ctx context.Context) (Tally, error) {
pointerdb := pointerdb.LoadFromContext(ctx)
overlay := overlay.LoadServerFromContext(ctx)

View File

@ -1,7 +1,7 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package accounting
package tally
import (
"context"
@ -20,7 +20,7 @@ import (
"storj.io/storj/storage"
)
// Tally is the service for adding up storage node data usage
// Tally is the service for accounting for data stored on each storage node
type Tally interface {
Run(ctx context.Context) error
}
@ -32,6 +32,8 @@ type tally struct {
limit int
logger *zap.Logger
ticker *time.Ticker
//TODO:
//rawDB
}
func newTally(pointerdb *pointerdb.Server, overlay pb.OverlayServer, kademlia *kademlia.Kademlia, limit int, logger *zap.Logger, interval time.Duration) *tally {
@ -41,22 +43,25 @@ func newTally(pointerdb *pointerdb.Server, overlay pb.OverlayServer, kademlia *k
kademlia: kademlia,
limit: limit,
logger: logger,
ticker: time.NewTicker(interval),
//TODO:
//rawDB
}
}
// Run the collector loop
// Run the tally loop
func (t *tally) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
for {
err = t.identifyActiveNodes(ctx)
if err != nil {
zap.L().Error("Collector failed", zap.Error(err))
zap.L().Error("Tally failed", zap.Error(err))
}
select {
case <-t.ticker.C: // wait for the next interval to happen
case <-ctx.Done(): // or the collector is canceled via context
case <-ctx.Done(): // or the tally is canceled via context
return ctx.Err()
}
}

View File

@ -1,7 +1,7 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package accounting
package tally
import (
"context"