Separate garbage collect logic from psdb (#1167)
This commit is contained in:
parent
f49436268a
commit
1df81b1460
@ -25,7 +25,6 @@ import (
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/piecestore/psclient"
|
||||
"storj.io/storj/pkg/piecestore/psserver/psdb"
|
||||
"storj.io/storj/pkg/process"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/pkg/transport"
|
||||
@ -87,9 +86,6 @@ var (
|
||||
BootstrapAddr string `default:"bootstrap.storj.io:8888" help:"address of server the storage node was bootstrapped against"`
|
||||
}
|
||||
|
||||
diagCfg struct {
|
||||
}
|
||||
|
||||
defaultConfDir = fpath.ApplicationDir("storj", "storagenode")
|
||||
// TODO: this path should be defined somewhere else
|
||||
defaultIdentityDir = fpath.ApplicationDir("storj", "identity", "storagenode")
|
||||
@ -132,10 +128,18 @@ func init() {
|
||||
cfgstruct.Bind(runCmd.Flags(), &runCfg, cfgstruct.ConfDir(defaultConfDir), cfgstruct.IdentityDir(defaultIdentityDir))
|
||||
cfgstruct.BindSetup(setupCmd.Flags(), &setupCfg, cfgstruct.ConfDir(defaultConfDir), cfgstruct.IdentityDir(defaultIdentityDir))
|
||||
cfgstruct.BindSetup(configCmd.Flags(), &setupCfg, cfgstruct.ConfDir(defaultConfDir), cfgstruct.IdentityDir(defaultIdentityDir))
|
||||
cfgstruct.Bind(diagCmd.Flags(), &diagCfg, cfgstruct.ConfDir(defaultDiagDir), cfgstruct.IdentityDir(defaultIdentityDir))
|
||||
cfgstruct.Bind(diagCmd.Flags(), &runCfg, cfgstruct.ConfDir(defaultDiagDir), cfgstruct.IdentityDir(defaultIdentityDir))
|
||||
cfgstruct.Bind(dashboardCmd.Flags(), &dashboardCfg, cfgstruct.ConfDir(defaultDiagDir))
|
||||
}
|
||||
|
||||
func databaseConfig(config storagenode.Config) storagenodedb.Config {
|
||||
return storagenodedb.Config{
|
||||
Storage: config.Storage.Path,
|
||||
Info: filepath.Join(config.Storage.Path, "piecestore.db"),
|
||||
Kademlia: config.Kademlia.DBPath,
|
||||
}
|
||||
}
|
||||
|
||||
func cmdRun(cmd *cobra.Command, args []string) (err error) {
|
||||
log := zap.L()
|
||||
|
||||
@ -154,11 +158,7 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
|
||||
zap.S().Error("Failed to initialize telemetry batcher: ", err)
|
||||
}
|
||||
|
||||
db, err := storagenodedb.New(storagenodedb.Config{
|
||||
Storage: runCfg.Storage.Path,
|
||||
Info: filepath.Join(runCfg.Storage.Path, "piecestore.db"),
|
||||
Kademlia: runCfg.Kademlia.DBPath,
|
||||
})
|
||||
db, err := storagenodedb.New(databaseConfig(runCfg.Config))
|
||||
if err != nil {
|
||||
return errs.New("Error starting master database on storagenode: %+v", err)
|
||||
}
|
||||
@ -247,18 +247,18 @@ func cmdDiag(cmd *cobra.Command, args []string) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
// open the sql db
|
||||
dbpath := filepath.Join(diagDir, "storage", "piecestore.db")
|
||||
db, err := psdb.Open(context.Background(), nil, dbpath)
|
||||
db, err := storagenodedb.New(databaseConfig(runCfg.Config))
|
||||
if err != nil {
|
||||
fmt.Println("Storagenode database couldnt open:", dbpath)
|
||||
return err
|
||||
return errs.New("Error starting master database on storagenode: %v", err)
|
||||
}
|
||||
defer func() {
|
||||
err = errs.Combine(err, db.Close())
|
||||
}()
|
||||
|
||||
//get all bandwidth aggrements entries already ordered
|
||||
bwAgreements, err := db.GetBandwidthAllocations()
|
||||
bwAgreements, err := db.PSDB().GetBandwidthAllocations()
|
||||
if err != nil {
|
||||
fmt.Println("storage node 'bandwidth_agreements' table read error:", dbpath)
|
||||
fmt.Printf("storage node 'bandwidth_agreements' table read error: %v\n", err)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -374,11 +374,13 @@ func (planet *Planet) newStorageNodes(count int) ([]*storagenode.Peer, error) {
|
||||
},
|
||||
},
|
||||
Storage: psserver.Config{
|
||||
Path: "", // TODO: this argument won't be needed with master storagenodedb
|
||||
AllocatedDiskSpace: memory.TB,
|
||||
AllocatedBandwidth: memory.TB,
|
||||
KBucketRefreshInterval: time.Hour,
|
||||
Path: "", // TODO: this argument won't be needed with master storagenodedb
|
||||
AllocatedDiskSpace: memory.TB,
|
||||
AllocatedBandwidth: memory.TB,
|
||||
KBucketRefreshInterval: time.Hour,
|
||||
|
||||
AgreementSenderCheckInterval: time.Hour,
|
||||
CollectorInterval: time.Hour,
|
||||
},
|
||||
}
|
||||
|
||||
|
78
pkg/piecestore/psserver/collector.go
Normal file
78
pkg/piecestore/psserver/collector.go
Normal file
@ -0,0 +1,78 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package psserver
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
pstore "storj.io/storj/pkg/piecestore"
|
||||
"storj.io/storj/pkg/piecestore/psserver/psdb"
|
||||
)
|
||||
|
||||
// ErrorCollector is error class for piece collector
|
||||
var ErrorCollector = errs.Class("piecestore collector")
|
||||
|
||||
// Collector collects expired pieces from database and disk.
|
||||
type Collector struct {
|
||||
log *zap.Logger
|
||||
db *psdb.DB
|
||||
storage *pstore.Storage
|
||||
|
||||
interval time.Duration
|
||||
}
|
||||
|
||||
// NewCollector returns a new piece collector
|
||||
func NewCollector(log *zap.Logger, db *psdb.DB, storage *pstore.Storage, interval time.Duration) *Collector {
|
||||
return &Collector{
|
||||
log: log,
|
||||
db: db,
|
||||
storage: storage,
|
||||
interval: interval,
|
||||
}
|
||||
}
|
||||
|
||||
// Run runs the collector at regular intervals
|
||||
func (service *Collector) Run(ctx context.Context) error {
|
||||
ticker := time.NewTicker(service.interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
err := service.Collect(ctx)
|
||||
if err != nil {
|
||||
service.log.Error("collect", zap.Error(err))
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ticker.C: // wait for the next interval to happen
|
||||
case <-ctx.Done(): // or the bucket refresher service is canceled via context
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Collect collects expired pieces att this moment.
|
||||
func (service *Collector) Collect(ctx context.Context) error {
|
||||
for {
|
||||
expired, err := service.db.DeleteExpired(ctx)
|
||||
if err != nil {
|
||||
return ErrorCollector.Wrap(err)
|
||||
}
|
||||
if len(expired) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var errlist errs.Group
|
||||
for _, id := range expired {
|
||||
errlist.Add(service.storage.Delete(id))
|
||||
}
|
||||
|
||||
if err := errlist.Err(); err != nil {
|
||||
return ErrorCollector.Wrap(err)
|
||||
}
|
||||
}
|
||||
}
|
@ -17,11 +17,13 @@ var (
|
||||
|
||||
// Config contains everything necessary for a server
|
||||
type Config struct {
|
||||
Path string `help:"path to store data in" default:"$CONFDIR/storage"`
|
||||
WhitelistedSatelliteIDs string `help:"a comma-separated list of approved satellite node ids" default:""`
|
||||
SatelliteIDRestriction bool `help:"if true, only allow data from approved satellites" default:"false"`
|
||||
AllocatedDiskSpace memory.Size `user:"true" help:"total allocated disk space in bytes" default:"1TB"`
|
||||
AllocatedBandwidth memory.Size `user:"true" help:"total allocated bandwidth in bytes" default:"500GiB"`
|
||||
KBucketRefreshInterval time.Duration `help:"how frequently Kademlia bucket should be refreshed with node stats" default:"1h0m0s"`
|
||||
Path string `help:"path to store data in" default:"$CONFDIR/storage"`
|
||||
WhitelistedSatelliteIDs string `help:"a comma-separated list of approved satellite node ids" default:""`
|
||||
SatelliteIDRestriction bool `help:"if true, only allow data from approved satellites" default:"false"`
|
||||
AllocatedDiskSpace memory.Size `user:"true" help:"total allocated disk space in bytes" default:"1TB"`
|
||||
AllocatedBandwidth memory.Size `user:"true" help:"total allocated bandwidth in bytes" default:"500GiB"`
|
||||
KBucketRefreshInterval time.Duration `help:"how frequently Kademlia bucket should be refreshed with node stats" default:"1h0m0s"`
|
||||
|
||||
AgreementSenderCheckInterval time.Duration `help:"duration between agreement checks" default:"1h0m0s"`
|
||||
CollectorInterval time.Duration `help:"interval to check for expired pieces" default:"1h0m0s"`
|
||||
}
|
||||
|
@ -7,7 +7,6 @@ import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@ -21,7 +20,6 @@ import (
|
||||
monkit "gopkg.in/spacemonkeygo/monkit.v2"
|
||||
|
||||
"storj.io/storj/pkg/pb"
|
||||
pstore "storj.io/storj/pkg/piecestore"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/pkg/utils"
|
||||
)
|
||||
@ -30,16 +28,12 @@ var (
|
||||
mon = monkit.Package()
|
||||
// Error is the default psdb errs class
|
||||
Error = errs.Class("psdb")
|
||||
|
||||
defaultCheckInterval = flag.Duration("piecestore.ttl.check-interval", time.Hour, "number of seconds to sleep between ttl checks")
|
||||
)
|
||||
|
||||
// DB is a piece store database
|
||||
type DB struct {
|
||||
storage *pstore.Storage
|
||||
mu sync.Mutex
|
||||
DB *sql.DB // TODO: hide
|
||||
check *time.Ticker
|
||||
mu sync.Mutex
|
||||
DB *sql.DB // TODO: hide
|
||||
}
|
||||
|
||||
// Agreement is a struct that contains a bandwidth agreement and the associated signature
|
||||
@ -49,9 +43,7 @@ type Agreement struct {
|
||||
}
|
||||
|
||||
// Open opens DB at DBPath
|
||||
func Open(ctx context.Context, storage *pstore.Storage, DBPath string) (db *DB, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
func Open(DBPath string) (db *DB, err error) {
|
||||
if err = os.MkdirAll(filepath.Dir(DBPath), 0700); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -61,40 +53,29 @@ func Open(ctx context.Context, storage *pstore.Storage, DBPath string) (db *DB,
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
db = &DB{
|
||||
DB: sqlite,
|
||||
storage: storage,
|
||||
check: time.NewTicker(*defaultCheckInterval),
|
||||
DB: sqlite,
|
||||
}
|
||||
if err := db.init(); err != nil {
|
||||
return nil, utils.CombineErrors(err, db.DB.Close())
|
||||
}
|
||||
|
||||
go db.garbageCollect(ctx)
|
||||
|
||||
return db, nil
|
||||
}
|
||||
|
||||
// OpenInMemory opens sqlite DB inmemory
|
||||
func OpenInMemory(ctx context.Context, storage *pstore.Storage) (db *DB, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
func OpenInMemory() (db *DB, err error) {
|
||||
sqlite, err := sql.Open("sqlite3", ":memory:")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
db = &DB{
|
||||
DB: sqlite,
|
||||
storage: storage,
|
||||
check: time.NewTicker(*defaultCheckInterval),
|
||||
DB: sqlite,
|
||||
}
|
||||
if err := db.init(); err != nil {
|
||||
return nil, utils.CombineErrors(err, db.DB.Close())
|
||||
}
|
||||
|
||||
// TODO: make garbage collect calling piecestore service responsibility
|
||||
go db.garbageCollect(ctx)
|
||||
|
||||
return db, nil
|
||||
}
|
||||
|
||||
@ -147,71 +128,46 @@ func (db *DB) locked() func() {
|
||||
return db.mu.Unlock
|
||||
}
|
||||
|
||||
// DeleteExpired checks for expired TTLs in the DB and removes data from both the DB and the FS
|
||||
func (db *DB) DeleteExpired(ctx context.Context) (err error) {
|
||||
// DeleteExpired deletes expired pieces
|
||||
func (db *DB) DeleteExpired(ctx context.Context) (expired []string, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
defer db.locked()()
|
||||
|
||||
var expired []string
|
||||
err = func() error {
|
||||
defer db.locked()()
|
||||
// TODO: add limit
|
||||
|
||||
tx, err := db.DB.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() { _ = tx.Rollback() }()
|
||||
tx, err := db.DB.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() { _ = tx.Rollback() }()
|
||||
|
||||
now := time.Now().Unix()
|
||||
now := time.Now().Unix()
|
||||
|
||||
rows, err := tx.Query("SELECT id FROM ttl WHERE 0 < expires AND ? < expires", now)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for rows.Next() {
|
||||
var id string
|
||||
if err := rows.Scan(&id); err != nil {
|
||||
return err
|
||||
}
|
||||
expired = append(expired, id)
|
||||
}
|
||||
if err := rows.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = tx.Exec(`DELETE FROM ttl WHERE 0 < expires AND ? < expires`, now)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}()
|
||||
|
||||
if db.storage != nil {
|
||||
var errlist errs.Group
|
||||
for _, id := range expired {
|
||||
errlist.Add(db.storage.Delete(id))
|
||||
}
|
||||
|
||||
if len(errlist) > 0 {
|
||||
return errlist.Err()
|
||||
}
|
||||
rows, err := tx.Query("SELECT id FROM ttl WHERE 0 < expires AND ? < expires", now)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// garbageCollect will periodically run DeleteExpired
|
||||
func (db *DB) garbageCollect(ctx context.Context) {
|
||||
for range db.check.C {
|
||||
err := db.DeleteExpired(ctx)
|
||||
if err != nil {
|
||||
zap.S().Errorf("failed checking entries: %+v", err)
|
||||
for rows.Next() {
|
||||
var id string
|
||||
if err := rows.Scan(&id); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
expired = append(expired, id)
|
||||
}
|
||||
if err := rows.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, err = tx.Exec(`DELETE FROM ttl WHERE 0 < expires AND ? < expires`, now)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return expired, tx.Commit()
|
||||
}
|
||||
|
||||
// WriteBandwidthAllocToDB -- Insert bandwidth agreement into DB
|
||||
// WriteBandwidthAllocToDB inserts bandwidth agreement into DB
|
||||
func (db *DB) WriteBandwidthAllocToDB(rba *pb.RenterBandwidthAllocation) error {
|
||||
rbaBytes, err := proto.Marshal(rba)
|
||||
if err != nil {
|
||||
|
@ -4,7 +4,6 @@
|
||||
package psdb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@ -16,12 +15,9 @@ import (
|
||||
|
||||
"storj.io/storj/internal/teststorj"
|
||||
"storj.io/storj/pkg/pb"
|
||||
pstore "storj.io/storj/pkg/piecestore"
|
||||
"storj.io/storj/pkg/storj"
|
||||
)
|
||||
|
||||
var ctx = context.Background()
|
||||
|
||||
const concurrency = 10
|
||||
|
||||
func newDB(t testing.TB, id string) (*DB, func()) {
|
||||
@ -31,9 +27,7 @@ func newDB(t testing.TB, id string) (*DB, func()) {
|
||||
}
|
||||
dbpath := filepath.Join(tmpdir, "psdb.db")
|
||||
|
||||
storage := pstore.NewStorage(tmpdir)
|
||||
|
||||
db, err := Open(ctx, storage, dbpath)
|
||||
db, err := Open(dbpath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -43,11 +37,6 @@ func newDB(t testing.TB, id string) (*DB, func()) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = storage.Close()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = os.RemoveAll(tmpdir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -56,7 +45,7 @@ func newDB(t testing.TB, id string) (*DB, func()) {
|
||||
}
|
||||
|
||||
func TestNewInmemory(t *testing.T) {
|
||||
db, err := OpenInMemory(context.Background(), nil)
|
||||
db, err := OpenInMemory()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -15,7 +15,6 @@ import (
|
||||
|
||||
"storj.io/storj/internal/sync2"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/utils"
|
||||
)
|
||||
|
||||
// RetrieveError is a type of error for failures in Server.Retrieve()
|
||||
@ -95,10 +94,12 @@ func (s *Server) retrieveData(ctx context.Context, stream pb.PieceStoreRoutes_Re
|
||||
|
||||
storeFile, err := s.storage.Reader(ctx, id, offset, length)
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
return 0, 0, RetrieveError.Wrap(err)
|
||||
}
|
||||
|
||||
defer utils.LogClose(storeFile)
|
||||
defer func() {
|
||||
err = errs.Combine(err, storeFile.Close())
|
||||
}()
|
||||
|
||||
writer := NewStreamWriter(s, stream)
|
||||
allocationTracking := sync2.NewThrottle()
|
||||
@ -122,17 +123,17 @@ func (s *Server) retrieveData(ctx context.Context, stream pb.PieceStoreRoutes_Re
|
||||
for {
|
||||
recv, err := stream.Recv()
|
||||
if err != nil {
|
||||
allocationTracking.Fail(err)
|
||||
allocationTracking.Fail(RetrieveError.Wrap(err))
|
||||
return
|
||||
}
|
||||
rba := recv.BandwidthAllocation
|
||||
if err = s.verifySignature(stream.Context(), rba); err != nil {
|
||||
allocationTracking.Fail(err)
|
||||
allocationTracking.Fail(RetrieveError.Wrap(err))
|
||||
return
|
||||
}
|
||||
pba := rba.PayerAllocation
|
||||
if err = s.verifyPayerAllocation(&pba, "GET"); err != nil {
|
||||
allocationTracking.Fail(err)
|
||||
allocationTracking.Fail(RetrieveError.Wrap(err))
|
||||
return
|
||||
}
|
||||
//todo: figure out why this fails tests
|
||||
@ -161,7 +162,7 @@ func (s *Server) retrieveData(ctx context.Context, stream pb.PieceStoreRoutes_Re
|
||||
for used < length {
|
||||
nextMessageSize, err := allocationTracking.ConsumeOrWait(messageSize)
|
||||
if err != nil {
|
||||
allocationTracking.Fail(err)
|
||||
allocationTracking.Fail(RetrieveError.Wrap(err))
|
||||
break
|
||||
}
|
||||
|
||||
@ -176,14 +177,14 @@ func (s *Server) retrieveData(ctx context.Context, stream pb.PieceStoreRoutes_Re
|
||||
}
|
||||
// break on error
|
||||
if err != nil {
|
||||
allocationTracking.Fail(err)
|
||||
allocationTracking.Fail(RetrieveError.Wrap(err))
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// write to bandwidth usage table
|
||||
if err = s.DB.AddBandwidthUsed(used); err != nil {
|
||||
return retrieved, allocated, StoreError.New("failed to write bandwidth info to database: %v", err)
|
||||
return retrieved, allocated, RetrieveError.New("failed to write bandwidth info to database: %v", err)
|
||||
}
|
||||
|
||||
// TODO: handle errors
|
||||
|
@ -18,6 +18,7 @@ import (
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap/zaptest"
|
||||
@ -87,36 +88,35 @@ func TestPiece(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run("should return expected PieceSummary values", func(t *testing.T) {
|
||||
require := require.New(t)
|
||||
|
||||
t.Run("", func(t *testing.T) {
|
||||
// simulate piece TTL entry
|
||||
_, err := s.DB.DB.Exec(fmt.Sprintf(`INSERT INTO ttl (id, created, expires) VALUES ("%s", "%d", "%d")`, tt.id, 1234567890, tt.expiration))
|
||||
require.NoError(err)
|
||||
require.NoError(t, err)
|
||||
|
||||
defer func() {
|
||||
_, err := s.DB.DB.Exec(fmt.Sprintf(`DELETE FROM ttl WHERE id="%s"`, tt.id))
|
||||
require.NoError(err)
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
|
||||
req := &pb.PieceId{Id: tt.id}
|
||||
resp, err := c.Piece(ctx, req)
|
||||
|
||||
if tt.err != "" {
|
||||
require.NotNil(err)
|
||||
require.NotNil(t, err)
|
||||
if runtime.GOOS == "windows" && strings.Contains(tt.err, "no such file or directory") {
|
||||
//TODO (windows): ignoring for windows due to different underlying error
|
||||
return
|
||||
}
|
||||
require.Equal(tt.err, err.Error())
|
||||
require.Equal(t, tt.err, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
require.NoError(err)
|
||||
assert.NoError(t, err)
|
||||
require.NotNil(t, resp)
|
||||
|
||||
require.Equal(tt.id, resp.GetId())
|
||||
require.Equal(tt.size, resp.GetPieceSize())
|
||||
require.Equal(tt.expiration, resp.GetExpirationUnixSec())
|
||||
assert.Equal(t, tt.id, resp.GetId())
|
||||
assert.Equal(t, tt.size, resp.GetPieceSize())
|
||||
assert.Equal(t, tt.expiration, resp.GetExpirationUnixSec())
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -224,17 +224,16 @@ func TestRetrieve(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run("should return expected PieceRetrievalStream values", func(t *testing.T) {
|
||||
require := require.New(t)
|
||||
t.Run("", func(t *testing.T) {
|
||||
stream, err := c.Retrieve(ctx)
|
||||
require.NoError(err)
|
||||
require.NoError(t, err)
|
||||
|
||||
// send piece database
|
||||
err = stream.Send(&pb.PieceRetrieval{PieceData: &pb.PieceRetrieval_PieceData{Id: tt.id, PieceSize: tt.reqSize, Offset: tt.offset}})
|
||||
require.NoError(err)
|
||||
require.NoError(t, err)
|
||||
|
||||
pba, err := testbwagreement.GeneratePayerBandwidthAllocation(pb.BandwidthAction_GET, snID, upID, time.Hour)
|
||||
require.NoError(err)
|
||||
require.NoError(t, err)
|
||||
|
||||
totalAllocated := int64(0)
|
||||
var data string
|
||||
@ -245,32 +244,32 @@ func TestRetrieve(t *testing.T) {
|
||||
totalAllocated += tt.allocSize
|
||||
|
||||
rba, err := testbwagreement.GenerateRenterBandwidthAllocation(pba, snID.ID, upID, totalAllocated)
|
||||
require.NoError(err)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = stream.Send(&pb.PieceRetrieval{BandwidthAllocation: rba})
|
||||
require.NoError(err)
|
||||
require.NoError(t, err)
|
||||
|
||||
resp, err = stream.Recv()
|
||||
if tt.err != "" {
|
||||
require.NotNil(err)
|
||||
require.NotNil(t, err)
|
||||
if runtime.GOOS == "windows" && strings.Contains(tt.err, "no such file or directory") {
|
||||
//TODO (windows): ignoring for windows due to different underlying error
|
||||
return
|
||||
}
|
||||
require.Equal(tt.err, err.Error())
|
||||
require.Equal(t, tt.err, err.Error())
|
||||
return
|
||||
}
|
||||
assert.NoError(t, err)
|
||||
|
||||
data = fmt.Sprintf("%s%s", data, string(resp.GetContent()))
|
||||
totalRetrieved += resp.GetPieceSize()
|
||||
}
|
||||
|
||||
require.NoError(err)
|
||||
require.NotNil(resp)
|
||||
if resp != nil {
|
||||
require.Equal(tt.respSize, totalRetrieved)
|
||||
require.Equal(string(tt.content), data)
|
||||
}
|
||||
assert.NoError(t, err)
|
||||
require.NotNil(t, resp)
|
||||
|
||||
assert.Equal(t, tt.respSize, totalRetrieved)
|
||||
assert.Equal(t, string(tt.content), data)
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -324,24 +323,23 @@ func TestStore(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run("should return expected PieceStoreSummary values", func(t *testing.T) {
|
||||
t.Run("", func(t *testing.T) {
|
||||
snID, upID := newTestID(ctx, t), newTestID(ctx, t)
|
||||
s, c, cleanup := NewTest(ctx, t, snID, upID, []storj.NodeID{})
|
||||
defer cleanup()
|
||||
db := s.DB.DB
|
||||
|
||||
require := require.New(t)
|
||||
stream, err := c.Store(ctx)
|
||||
require.NoError(err)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Write the buffer to the stream we opened earlier
|
||||
err = stream.Send(&pb.PieceStore{PieceData: &pb.PieceStore_PieceData{Id: tt.id, ExpirationUnixSec: tt.ttl}})
|
||||
require.NoError(err)
|
||||
require.NoError(t, err)
|
||||
// Send Bandwidth Allocation Data
|
||||
pba, err := testbwagreement.GeneratePayerBandwidthAllocation(pb.BandwidthAction_PUT, snID, upID, time.Hour)
|
||||
require.NoError(err)
|
||||
require.NoError(t, err)
|
||||
rba, err := testbwagreement.GenerateRenterBandwidthAllocation(pba, snID.ID, upID, tt.totalReceived)
|
||||
require.NoError(err)
|
||||
require.NoError(t, err)
|
||||
msg := &pb.PieceStore{
|
||||
PieceData: &pb.PieceStore_PieceData{Content: tt.content},
|
||||
BandwidthAllocation: rba,
|
||||
@ -349,42 +347,42 @@ func TestStore(t *testing.T) {
|
||||
// Write the buffer to the stream we opened earlier
|
||||
err = stream.Send(msg)
|
||||
if err != io.EOF && err != nil {
|
||||
require.NoError(err)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
resp, err := stream.CloseAndRecv()
|
||||
if tt.err != "" {
|
||||
require.NotNil(err)
|
||||
require.True(strings.HasPrefix(err.Error(), tt.err), "expected")
|
||||
require.NotNil(t, err)
|
||||
require.True(t, strings.HasPrefix(err.Error(), tt.err))
|
||||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
_, err := db.Exec(fmt.Sprintf(`DELETE FROM ttl WHERE id="%s"`, tt.id))
|
||||
require.NoError(err)
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
|
||||
// check db to make sure agreement and signature were stored correctly
|
||||
rows, err := db.Query(`SELECT agreement, signature FROM bandwidth_agreements`)
|
||||
require.NoError(err)
|
||||
require.NoError(t, err)
|
||||
|
||||
defer func() { require.NoError(rows.Close()) }()
|
||||
defer func() { require.NoError(t, rows.Close()) }()
|
||||
for rows.Next() {
|
||||
var agreement, signature []byte
|
||||
err = rows.Scan(&agreement, &signature)
|
||||
require.NoError(err)
|
||||
require.NoError(t, err)
|
||||
rba := &pb.RenterBandwidthAllocation{}
|
||||
require.NoError(proto.Unmarshal(agreement, rba))
|
||||
require.Equal(msg.BandwidthAllocation.GetSignature(), signature)
|
||||
require.True(pb.Equal(pba, &rba.PayerAllocation))
|
||||
require.Equal(int64(len(tt.content)), rba.Total)
|
||||
require.NoError(t, proto.Unmarshal(agreement, rba))
|
||||
require.Equal(t, msg.BandwidthAllocation.GetSignature(), signature)
|
||||
require.True(t, pb.Equal(pba, &rba.PayerAllocation))
|
||||
require.Equal(t, int64(len(tt.content)), rba.Total)
|
||||
|
||||
}
|
||||
err = rows.Err()
|
||||
require.NoError(err)
|
||||
require.NotNil(resp)
|
||||
require.Equal(tt.message, resp.Message)
|
||||
require.Equal(tt.totalReceived, resp.TotalReceived)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, resp)
|
||||
require.Equal(t, tt.message, resp.Message)
|
||||
require.Equal(t, tt.totalReceived, resp.TotalReceived)
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -433,25 +431,24 @@ func TestPbaValidation(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run("should validate payer bandwidth allocation struct", func(t *testing.T) {
|
||||
t.Run("", func(t *testing.T) {
|
||||
s, c, cleanup := NewTest(ctx, t, snID, upID, tt.whitelist)
|
||||
defer cleanup()
|
||||
|
||||
require := require.New(t)
|
||||
stream, err := c.Store(ctx)
|
||||
require.NoError(err)
|
||||
require.NoError(t, err)
|
||||
|
||||
//cleanup incase tests previously paniced
|
||||
_ = s.storage.Delete("99999999999999999999")
|
||||
// Write the buffer to the stream we opened earlier
|
||||
err = stream.Send(&pb.PieceStore{PieceData: &pb.PieceStore_PieceData{Id: "99999999999999999999", ExpirationUnixSec: 9999999999}})
|
||||
require.NoError(err)
|
||||
require.NoError(t, err)
|
||||
// Send Bandwidth Allocation Data
|
||||
content := []byte("content")
|
||||
pba, err := testbwagreement.GeneratePayerBandwidthAllocation(tt.action, satID1, upID, time.Hour)
|
||||
require.NoError(err)
|
||||
require.NoError(t, err)
|
||||
rba, err := testbwagreement.GenerateRenterBandwidthAllocation(pba, snID.ID, upID, int64(len(content)))
|
||||
require.NoError(err)
|
||||
require.NoError(t, err)
|
||||
msg := &pb.PieceStore{
|
||||
PieceData: &pb.PieceStore_PieceData{Content: content},
|
||||
BandwidthAllocation: rba,
|
||||
@ -460,15 +457,15 @@ func TestPbaValidation(t *testing.T) {
|
||||
// Write the buffer to the stream we opened earlier
|
||||
err = stream.Send(msg)
|
||||
if err != io.EOF && err != nil {
|
||||
require.NoError(err)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
_, err = stream.CloseAndRecv()
|
||||
if err != nil {
|
||||
//require.NotNil(err)
|
||||
//require.NotNil(t, err)
|
||||
t.Log("Expected err string", tt.err)
|
||||
t.Log("Actual err.Error:", err.Error())
|
||||
require.Equal(tt.err, err.Error())
|
||||
require.Equal(t, tt.err, err.Error())
|
||||
return
|
||||
}
|
||||
})
|
||||
@ -509,9 +506,7 @@ func TestDelete(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run("should return expected PieceDeleteSummary values", func(t *testing.T) {
|
||||
require := require.New(t)
|
||||
|
||||
t.Run("", func(t *testing.T) {
|
||||
// simulate piece stored with storagenode
|
||||
if err := writeFile(s, "11111111111111111111"); err != nil {
|
||||
t.Errorf("Error: %v\nCould not create test piece", err)
|
||||
@ -520,31 +515,31 @@ func TestDelete(t *testing.T) {
|
||||
|
||||
// simulate piece TTL entry
|
||||
_, err := db.Exec(fmt.Sprintf(`INSERT INTO ttl (id, created, expires) VALUES ("%s", "%d", "%d")`, tt.id, 1234567890, 1234567890))
|
||||
require.NoError(err)
|
||||
require.NoError(t, err)
|
||||
|
||||
defer func() {
|
||||
_, err := db.Exec(fmt.Sprintf(`DELETE FROM ttl WHERE id="%s"`, tt.id))
|
||||
require.NoError(err)
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
|
||||
defer func() {
|
||||
require.NoError(s.storage.Delete("11111111111111111111"))
|
||||
require.NoError(t, s.storage.Delete("11111111111111111111"))
|
||||
}()
|
||||
|
||||
req := &pb.PieceDelete{Id: tt.id}
|
||||
resp, err := c.Delete(ctx, req)
|
||||
|
||||
if tt.err != "" {
|
||||
require.Equal(tt.err, err.Error())
|
||||
require.Equal(t, tt.err, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
require.NoError(err)
|
||||
require.Equal(tt.message, resp.GetMessage())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tt.message, resp.GetMessage())
|
||||
|
||||
// if test passes, check if file was indeed deleted
|
||||
filePath, err := s.storage.PiecePath(tt.id)
|
||||
require.NoError(err)
|
||||
require.NoError(t, err)
|
||||
if _, err = os.Stat(filePath); os.IsExist(err) {
|
||||
t.Errorf("File not deleted")
|
||||
return
|
||||
@ -561,7 +556,7 @@ func NewTest(ctx context.Context, t *testing.T, snID, upID *identity.FullIdentit
|
||||
tempDBPath := filepath.Join(tmp, "test.db")
|
||||
tempDir := filepath.Join(tmp, "test-data", "3000")
|
||||
storage := pstore.NewStorage(tempDir)
|
||||
psDB, err := psdb.Open(ctx, storage, tempDBPath)
|
||||
psDB, err := psdb.Open(tempDBPath)
|
||||
require.NoError(t, err)
|
||||
verifier := func(authorization *pb.SignedMessage) error {
|
||||
return nil
|
||||
|
@ -91,7 +91,9 @@ func (s *Server) storeData(ctx context.Context, stream pb.PieceStoreRoutes_Store
|
||||
return 0, err
|
||||
}
|
||||
|
||||
defer utils.LogClose(storeFile)
|
||||
defer func() {
|
||||
err = errs.Combine(err, storeFile.Close())
|
||||
}()
|
||||
|
||||
bwUsed, err := s.DB.GetTotalBandwidthBetween(getBeginningOfMonth(), time.Now())
|
||||
if err != nil {
|
||||
|
@ -77,8 +77,9 @@ type Peer struct {
|
||||
}
|
||||
|
||||
Storage struct {
|
||||
Endpoint *psserver.Server // TODO: separate into endpoint and service
|
||||
Monitor *psserver.Monitor
|
||||
Endpoint *psserver.Server // TODO: separate into endpoint and service
|
||||
Monitor *psserver.Monitor
|
||||
Collector *psserver.Collector
|
||||
}
|
||||
|
||||
Agreements struct {
|
||||
@ -172,6 +173,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config) (*P
|
||||
|
||||
// TODO: organize better
|
||||
peer.Storage.Monitor = psserver.NewMonitor(peer.Log.Named("piecestore:monitor"), config.KBucketRefreshInterval, peer.Kademlia.RoutingTable, peer.Storage.Endpoint)
|
||||
peer.Storage.Collector = psserver.NewCollector(peer.Log.Named("piecestore:collector"), peer.DB.PSDB(), peer.DB.Storage(), config.CollectorInterval)
|
||||
}
|
||||
|
||||
{ // agreements
|
||||
@ -204,6 +206,9 @@ func (peer *Peer) Run(ctx context.Context) error {
|
||||
group.Go(func() error {
|
||||
return ignoreCancel(peer.Storage.Monitor.Run(ctx))
|
||||
})
|
||||
group.Go(func() error {
|
||||
return ignoreCancel(peer.Storage.Collector.Run(ctx))
|
||||
})
|
||||
group.Go(func() error {
|
||||
// TODO: move the message into Server instead
|
||||
peer.Log.Sugar().Infof("Node %s started on %s", peer.Identity.ID, peer.Public.Server.Addr().String())
|
||||
|
@ -4,8 +4,6 @@
|
||||
package storagenodedb
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/pkg/kademlia"
|
||||
@ -38,8 +36,7 @@ type DB struct {
|
||||
func New(config Config) (*DB, error) {
|
||||
storage := pstore.NewStorage(config.Storage)
|
||||
|
||||
// TODO: Open shouldn't need context argument
|
||||
psdb, err := psdb.Open(context.TODO(), storage, config.Info)
|
||||
psdb, err := psdb.Open(config.Info)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -62,8 +59,7 @@ func New(config Config) (*DB, error) {
|
||||
func NewInMemory(storageDir string) (*DB, error) {
|
||||
storage := pstore.NewStorage(storageDir)
|
||||
|
||||
// TODO: OpenInMemory shouldn't need context argument
|
||||
psdb, err := psdb.OpenInMemory(context.TODO(), storage)
|
||||
psdb, err := psdb.OpenInMemory()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user