From 1df81b1460831ddd83151720580d86c569c446bd Mon Sep 17 00:00:00 2001 From: Egon Elbre Date: Tue, 29 Jan 2019 17:41:01 +0200 Subject: [PATCH] Separate garbage collect logic from psdb (#1167) --- cmd/storagenode/main.go | 34 +++--- internal/testplanet/planet.go | 10 +- pkg/piecestore/psserver/collector.go | 78 +++++++++++++ pkg/piecestore/psserver/config.go | 14 ++- pkg/piecestore/psserver/psdb/psdb.go | 114 ++++++------------- pkg/piecestore/psserver/psdb/psdb_test.go | 15 +-- pkg/piecestore/psserver/retrieve.go | 19 ++-- pkg/piecestore/psserver/server_test.go | 129 +++++++++++----------- pkg/piecestore/psserver/store.go | 4 +- storagenode/peer.go | 9 +- storagenode/storagenodedb/database.go | 8 +- 11 files changed, 230 insertions(+), 204 deletions(-) create mode 100644 pkg/piecestore/psserver/collector.go diff --git a/cmd/storagenode/main.go b/cmd/storagenode/main.go index e91eae210..197e002cf 100644 --- a/cmd/storagenode/main.go +++ b/cmd/storagenode/main.go @@ -25,7 +25,6 @@ import ( "storj.io/storj/pkg/identity" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/piecestore/psclient" - "storj.io/storj/pkg/piecestore/psserver/psdb" "storj.io/storj/pkg/process" "storj.io/storj/pkg/storj" "storj.io/storj/pkg/transport" @@ -87,9 +86,6 @@ var ( BootstrapAddr string `default:"bootstrap.storj.io:8888" help:"address of server the storage node was bootstrapped against"` } - diagCfg struct { - } - defaultConfDir = fpath.ApplicationDir("storj", "storagenode") // TODO: this path should be defined somewhere else defaultIdentityDir = fpath.ApplicationDir("storj", "identity", "storagenode") @@ -132,10 +128,18 @@ func init() { cfgstruct.Bind(runCmd.Flags(), &runCfg, cfgstruct.ConfDir(defaultConfDir), cfgstruct.IdentityDir(defaultIdentityDir)) cfgstruct.BindSetup(setupCmd.Flags(), &setupCfg, cfgstruct.ConfDir(defaultConfDir), cfgstruct.IdentityDir(defaultIdentityDir)) cfgstruct.BindSetup(configCmd.Flags(), &setupCfg, cfgstruct.ConfDir(defaultConfDir), cfgstruct.IdentityDir(defaultIdentityDir)) - cfgstruct.Bind(diagCmd.Flags(), &diagCfg, cfgstruct.ConfDir(defaultDiagDir), cfgstruct.IdentityDir(defaultIdentityDir)) + cfgstruct.Bind(diagCmd.Flags(), &runCfg, cfgstruct.ConfDir(defaultDiagDir), cfgstruct.IdentityDir(defaultIdentityDir)) cfgstruct.Bind(dashboardCmd.Flags(), &dashboardCfg, cfgstruct.ConfDir(defaultDiagDir)) } +func databaseConfig(config storagenode.Config) storagenodedb.Config { + return storagenodedb.Config{ + Storage: config.Storage.Path, + Info: filepath.Join(config.Storage.Path, "piecestore.db"), + Kademlia: config.Kademlia.DBPath, + } +} + func cmdRun(cmd *cobra.Command, args []string) (err error) { log := zap.L() @@ -154,11 +158,7 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) { zap.S().Error("Failed to initialize telemetry batcher: ", err) } - db, err := storagenodedb.New(storagenodedb.Config{ - Storage: runCfg.Storage.Path, - Info: filepath.Join(runCfg.Storage.Path, "piecestore.db"), - Kademlia: runCfg.Kademlia.DBPath, - }) + db, err := storagenodedb.New(databaseConfig(runCfg.Config)) if err != nil { return errs.New("Error starting master database on storagenode: %+v", err) } @@ -247,18 +247,18 @@ func cmdDiag(cmd *cobra.Command, args []string) (err error) { return err } - // open the sql db - dbpath := filepath.Join(diagDir, "storage", "piecestore.db") - db, err := psdb.Open(context.Background(), nil, dbpath) + db, err := storagenodedb.New(databaseConfig(runCfg.Config)) if err != nil { - fmt.Println("Storagenode database couldnt open:", dbpath) - return err + return errs.New("Error starting master database on storagenode: %v", err) } + defer func() { + err = errs.Combine(err, db.Close()) + }() //get all bandwidth aggrements entries already ordered - bwAgreements, err := db.GetBandwidthAllocations() + bwAgreements, err := db.PSDB().GetBandwidthAllocations() if err != nil { - fmt.Println("storage node 'bandwidth_agreements' table read error:", dbpath) + fmt.Printf("storage node 'bandwidth_agreements' table read error: %v\n", err) return err } diff --git a/internal/testplanet/planet.go b/internal/testplanet/planet.go index a3492b142..b5a786c69 100644 --- a/internal/testplanet/planet.go +++ b/internal/testplanet/planet.go @@ -374,11 +374,13 @@ func (planet *Planet) newStorageNodes(count int) ([]*storagenode.Peer, error) { }, }, Storage: psserver.Config{ - Path: "", // TODO: this argument won't be needed with master storagenodedb - AllocatedDiskSpace: memory.TB, - AllocatedBandwidth: memory.TB, - KBucketRefreshInterval: time.Hour, + Path: "", // TODO: this argument won't be needed with master storagenodedb + AllocatedDiskSpace: memory.TB, + AllocatedBandwidth: memory.TB, + KBucketRefreshInterval: time.Hour, + AgreementSenderCheckInterval: time.Hour, + CollectorInterval: time.Hour, }, } diff --git a/pkg/piecestore/psserver/collector.go b/pkg/piecestore/psserver/collector.go new file mode 100644 index 000000000..f556b2148 --- /dev/null +++ b/pkg/piecestore/psserver/collector.go @@ -0,0 +1,78 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package psserver + +import ( + "context" + "time" + + "github.com/zeebo/errs" + "go.uber.org/zap" + + pstore "storj.io/storj/pkg/piecestore" + "storj.io/storj/pkg/piecestore/psserver/psdb" +) + +// ErrorCollector is error class for piece collector +var ErrorCollector = errs.Class("piecestore collector") + +// Collector collects expired pieces from database and disk. +type Collector struct { + log *zap.Logger + db *psdb.DB + storage *pstore.Storage + + interval time.Duration +} + +// NewCollector returns a new piece collector +func NewCollector(log *zap.Logger, db *psdb.DB, storage *pstore.Storage, interval time.Duration) *Collector { + return &Collector{ + log: log, + db: db, + storage: storage, + interval: interval, + } +} + +// Run runs the collector at regular intervals +func (service *Collector) Run(ctx context.Context) error { + ticker := time.NewTicker(service.interval) + defer ticker.Stop() + + for { + err := service.Collect(ctx) + if err != nil { + service.log.Error("collect", zap.Error(err)) + } + + select { + case <-ticker.C: // wait for the next interval to happen + case <-ctx.Done(): // or the bucket refresher service is canceled via context + return ctx.Err() + } + } +} + +// Collect collects expired pieces att this moment. +func (service *Collector) Collect(ctx context.Context) error { + for { + expired, err := service.db.DeleteExpired(ctx) + if err != nil { + return ErrorCollector.Wrap(err) + } + if len(expired) == 0 { + return nil + } + + var errlist errs.Group + for _, id := range expired { + errlist.Add(service.storage.Delete(id)) + } + + if err := errlist.Err(); err != nil { + return ErrorCollector.Wrap(err) + } + } +} diff --git a/pkg/piecestore/psserver/config.go b/pkg/piecestore/psserver/config.go index 74f30a680..f11113182 100644 --- a/pkg/piecestore/psserver/config.go +++ b/pkg/piecestore/psserver/config.go @@ -17,11 +17,13 @@ var ( // Config contains everything necessary for a server type Config struct { - Path string `help:"path to store data in" default:"$CONFDIR/storage"` - WhitelistedSatelliteIDs string `help:"a comma-separated list of approved satellite node ids" default:""` - SatelliteIDRestriction bool `help:"if true, only allow data from approved satellites" default:"false"` - AllocatedDiskSpace memory.Size `user:"true" help:"total allocated disk space in bytes" default:"1TB"` - AllocatedBandwidth memory.Size `user:"true" help:"total allocated bandwidth in bytes" default:"500GiB"` - KBucketRefreshInterval time.Duration `help:"how frequently Kademlia bucket should be refreshed with node stats" default:"1h0m0s"` + Path string `help:"path to store data in" default:"$CONFDIR/storage"` + WhitelistedSatelliteIDs string `help:"a comma-separated list of approved satellite node ids" default:""` + SatelliteIDRestriction bool `help:"if true, only allow data from approved satellites" default:"false"` + AllocatedDiskSpace memory.Size `user:"true" help:"total allocated disk space in bytes" default:"1TB"` + AllocatedBandwidth memory.Size `user:"true" help:"total allocated bandwidth in bytes" default:"500GiB"` + KBucketRefreshInterval time.Duration `help:"how frequently Kademlia bucket should be refreshed with node stats" default:"1h0m0s"` + AgreementSenderCheckInterval time.Duration `help:"duration between agreement checks" default:"1h0m0s"` + CollectorInterval time.Duration `help:"interval to check for expired pieces" default:"1h0m0s"` } diff --git a/pkg/piecestore/psserver/psdb/psdb.go b/pkg/piecestore/psserver/psdb/psdb.go index c3329a6a4..7180e5899 100644 --- a/pkg/piecestore/psserver/psdb/psdb.go +++ b/pkg/piecestore/psserver/psdb/psdb.go @@ -7,7 +7,6 @@ import ( "context" "database/sql" "errors" - "flag" "fmt" "os" "path/filepath" @@ -21,7 +20,6 @@ import ( monkit "gopkg.in/spacemonkeygo/monkit.v2" "storj.io/storj/pkg/pb" - pstore "storj.io/storj/pkg/piecestore" "storj.io/storj/pkg/storj" "storj.io/storj/pkg/utils" ) @@ -30,16 +28,12 @@ var ( mon = monkit.Package() // Error is the default psdb errs class Error = errs.Class("psdb") - - defaultCheckInterval = flag.Duration("piecestore.ttl.check-interval", time.Hour, "number of seconds to sleep between ttl checks") ) // DB is a piece store database type DB struct { - storage *pstore.Storage - mu sync.Mutex - DB *sql.DB // TODO: hide - check *time.Ticker + mu sync.Mutex + DB *sql.DB // TODO: hide } // Agreement is a struct that contains a bandwidth agreement and the associated signature @@ -49,9 +43,7 @@ type Agreement struct { } // Open opens DB at DBPath -func Open(ctx context.Context, storage *pstore.Storage, DBPath string) (db *DB, err error) { - defer mon.Task()(&ctx)(&err) - +func Open(DBPath string) (db *DB, err error) { if err = os.MkdirAll(filepath.Dir(DBPath), 0700); err != nil { return nil, err } @@ -61,40 +53,29 @@ func Open(ctx context.Context, storage *pstore.Storage, DBPath string) (db *DB, return nil, Error.Wrap(err) } db = &DB{ - DB: sqlite, - storage: storage, - check: time.NewTicker(*defaultCheckInterval), + DB: sqlite, } if err := db.init(); err != nil { return nil, utils.CombineErrors(err, db.DB.Close()) } - go db.garbageCollect(ctx) - return db, nil } // OpenInMemory opens sqlite DB inmemory -func OpenInMemory(ctx context.Context, storage *pstore.Storage) (db *DB, err error) { - defer mon.Task()(&ctx)(&err) - +func OpenInMemory() (db *DB, err error) { sqlite, err := sql.Open("sqlite3", ":memory:") if err != nil { return nil, err } db = &DB{ - DB: sqlite, - storage: storage, - check: time.NewTicker(*defaultCheckInterval), + DB: sqlite, } if err := db.init(); err != nil { return nil, utils.CombineErrors(err, db.DB.Close()) } - // TODO: make garbage collect calling piecestore service responsibility - go db.garbageCollect(ctx) - return db, nil } @@ -147,71 +128,46 @@ func (db *DB) locked() func() { return db.mu.Unlock } -// DeleteExpired checks for expired TTLs in the DB and removes data from both the DB and the FS -func (db *DB) DeleteExpired(ctx context.Context) (err error) { +// DeleteExpired deletes expired pieces +func (db *DB) DeleteExpired(ctx context.Context) (expired []string, err error) { defer mon.Task()(&ctx)(&err) + defer db.locked()() - var expired []string - err = func() error { - defer db.locked()() + // TODO: add limit - tx, err := db.DB.BeginTx(ctx, nil) - if err != nil { - return err - } - defer func() { _ = tx.Rollback() }() + tx, err := db.DB.BeginTx(ctx, nil) + if err != nil { + return nil, err + } + defer func() { _ = tx.Rollback() }() - now := time.Now().Unix() + now := time.Now().Unix() - rows, err := tx.Query("SELECT id FROM ttl WHERE 0 < expires AND ? < expires", now) - if err != nil { - return err - } - - for rows.Next() { - var id string - if err := rows.Scan(&id); err != nil { - return err - } - expired = append(expired, id) - } - if err := rows.Close(); err != nil { - return err - } - - _, err = tx.Exec(`DELETE FROM ttl WHERE 0 < expires AND ? < expires`, now) - if err != nil { - return err - } - - return tx.Commit() - }() - - if db.storage != nil { - var errlist errs.Group - for _, id := range expired { - errlist.Add(db.storage.Delete(id)) - } - - if len(errlist) > 0 { - return errlist.Err() - } + rows, err := tx.Query("SELECT id FROM ttl WHERE 0 < expires AND ? < expires", now) + if err != nil { + return nil, err } - return nil -} - -// garbageCollect will periodically run DeleteExpired -func (db *DB) garbageCollect(ctx context.Context) { - for range db.check.C { - err := db.DeleteExpired(ctx) - if err != nil { - zap.S().Errorf("failed checking entries: %+v", err) + for rows.Next() { + var id string + if err := rows.Scan(&id); err != nil { + return nil, err } + expired = append(expired, id) } + if err := rows.Close(); err != nil { + return nil, err + } + + _, err = tx.Exec(`DELETE FROM ttl WHERE 0 < expires AND ? < expires`, now) + if err != nil { + return nil, err + } + + return expired, tx.Commit() } -// WriteBandwidthAllocToDB -- Insert bandwidth agreement into DB +// WriteBandwidthAllocToDB inserts bandwidth agreement into DB func (db *DB) WriteBandwidthAllocToDB(rba *pb.RenterBandwidthAllocation) error { rbaBytes, err := proto.Marshal(rba) if err != nil { diff --git a/pkg/piecestore/psserver/psdb/psdb_test.go b/pkg/piecestore/psserver/psdb/psdb_test.go index e00c39fce..9ebb4c522 100644 --- a/pkg/piecestore/psserver/psdb/psdb_test.go +++ b/pkg/piecestore/psserver/psdb/psdb_test.go @@ -4,7 +4,6 @@ package psdb import ( - "context" "io/ioutil" "os" "path/filepath" @@ -16,12 +15,9 @@ import ( "storj.io/storj/internal/teststorj" "storj.io/storj/pkg/pb" - pstore "storj.io/storj/pkg/piecestore" "storj.io/storj/pkg/storj" ) -var ctx = context.Background() - const concurrency = 10 func newDB(t testing.TB, id string) (*DB, func()) { @@ -31,9 +27,7 @@ func newDB(t testing.TB, id string) (*DB, func()) { } dbpath := filepath.Join(tmpdir, "psdb.db") - storage := pstore.NewStorage(tmpdir) - - db, err := Open(ctx, storage, dbpath) + db, err := Open(dbpath) if err != nil { t.Fatal(err) } @@ -43,11 +37,6 @@ func newDB(t testing.TB, id string) (*DB, func()) { if err != nil { t.Fatal(err) } - err = storage.Close() - if err != nil { - t.Fatal(err) - } - err = os.RemoveAll(tmpdir) if err != nil { t.Fatal(err) @@ -56,7 +45,7 @@ func newDB(t testing.TB, id string) (*DB, func()) { } func TestNewInmemory(t *testing.T) { - db, err := OpenInMemory(context.Background(), nil) + db, err := OpenInMemory() if err != nil { t.Fatal(err) } diff --git a/pkg/piecestore/psserver/retrieve.go b/pkg/piecestore/psserver/retrieve.go index 91b7da2a1..e086d3a8f 100644 --- a/pkg/piecestore/psserver/retrieve.go +++ b/pkg/piecestore/psserver/retrieve.go @@ -15,7 +15,6 @@ import ( "storj.io/storj/internal/sync2" "storj.io/storj/pkg/pb" - "storj.io/storj/pkg/utils" ) // RetrieveError is a type of error for failures in Server.Retrieve() @@ -95,10 +94,12 @@ func (s *Server) retrieveData(ctx context.Context, stream pb.PieceStoreRoutes_Re storeFile, err := s.storage.Reader(ctx, id, offset, length) if err != nil { - return 0, 0, err + return 0, 0, RetrieveError.Wrap(err) } - defer utils.LogClose(storeFile) + defer func() { + err = errs.Combine(err, storeFile.Close()) + }() writer := NewStreamWriter(s, stream) allocationTracking := sync2.NewThrottle() @@ -122,17 +123,17 @@ func (s *Server) retrieveData(ctx context.Context, stream pb.PieceStoreRoutes_Re for { recv, err := stream.Recv() if err != nil { - allocationTracking.Fail(err) + allocationTracking.Fail(RetrieveError.Wrap(err)) return } rba := recv.BandwidthAllocation if err = s.verifySignature(stream.Context(), rba); err != nil { - allocationTracking.Fail(err) + allocationTracking.Fail(RetrieveError.Wrap(err)) return } pba := rba.PayerAllocation if err = s.verifyPayerAllocation(&pba, "GET"); err != nil { - allocationTracking.Fail(err) + allocationTracking.Fail(RetrieveError.Wrap(err)) return } //todo: figure out why this fails tests @@ -161,7 +162,7 @@ func (s *Server) retrieveData(ctx context.Context, stream pb.PieceStoreRoutes_Re for used < length { nextMessageSize, err := allocationTracking.ConsumeOrWait(messageSize) if err != nil { - allocationTracking.Fail(err) + allocationTracking.Fail(RetrieveError.Wrap(err)) break } @@ -176,14 +177,14 @@ func (s *Server) retrieveData(ctx context.Context, stream pb.PieceStoreRoutes_Re } // break on error if err != nil { - allocationTracking.Fail(err) + allocationTracking.Fail(RetrieveError.Wrap(err)) break } } // write to bandwidth usage table if err = s.DB.AddBandwidthUsed(used); err != nil { - return retrieved, allocated, StoreError.New("failed to write bandwidth info to database: %v", err) + return retrieved, allocated, RetrieveError.New("failed to write bandwidth info to database: %v", err) } // TODO: handle errors diff --git a/pkg/piecestore/psserver/server_test.go b/pkg/piecestore/psserver/server_test.go index 229c3c884..738ee491e 100644 --- a/pkg/piecestore/psserver/server_test.go +++ b/pkg/piecestore/psserver/server_test.go @@ -18,6 +18,7 @@ import ( "github.com/gogo/protobuf/proto" _ "github.com/mattn/go-sqlite3" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/zeebo/errs" "go.uber.org/zap/zaptest" @@ -87,36 +88,35 @@ func TestPiece(t *testing.T) { } for _, tt := range tests { - t.Run("should return expected PieceSummary values", func(t *testing.T) { - require := require.New(t) - + t.Run("", func(t *testing.T) { // simulate piece TTL entry _, err := s.DB.DB.Exec(fmt.Sprintf(`INSERT INTO ttl (id, created, expires) VALUES ("%s", "%d", "%d")`, tt.id, 1234567890, tt.expiration)) - require.NoError(err) + require.NoError(t, err) defer func() { _, err := s.DB.DB.Exec(fmt.Sprintf(`DELETE FROM ttl WHERE id="%s"`, tt.id)) - require.NoError(err) + require.NoError(t, err) }() req := &pb.PieceId{Id: tt.id} resp, err := c.Piece(ctx, req) if tt.err != "" { - require.NotNil(err) + require.NotNil(t, err) if runtime.GOOS == "windows" && strings.Contains(tt.err, "no such file or directory") { //TODO (windows): ignoring for windows due to different underlying error return } - require.Equal(tt.err, err.Error()) + require.Equal(t, tt.err, err.Error()) return } - require.NoError(err) + assert.NoError(t, err) + require.NotNil(t, resp) - require.Equal(tt.id, resp.GetId()) - require.Equal(tt.size, resp.GetPieceSize()) - require.Equal(tt.expiration, resp.GetExpirationUnixSec()) + assert.Equal(t, tt.id, resp.GetId()) + assert.Equal(t, tt.size, resp.GetPieceSize()) + assert.Equal(t, tt.expiration, resp.GetExpirationUnixSec()) }) } } @@ -224,17 +224,16 @@ func TestRetrieve(t *testing.T) { } for _, tt := range tests { - t.Run("should return expected PieceRetrievalStream values", func(t *testing.T) { - require := require.New(t) + t.Run("", func(t *testing.T) { stream, err := c.Retrieve(ctx) - require.NoError(err) + require.NoError(t, err) // send piece database err = stream.Send(&pb.PieceRetrieval{PieceData: &pb.PieceRetrieval_PieceData{Id: tt.id, PieceSize: tt.reqSize, Offset: tt.offset}}) - require.NoError(err) + require.NoError(t, err) pba, err := testbwagreement.GeneratePayerBandwidthAllocation(pb.BandwidthAction_GET, snID, upID, time.Hour) - require.NoError(err) + require.NoError(t, err) totalAllocated := int64(0) var data string @@ -245,32 +244,32 @@ func TestRetrieve(t *testing.T) { totalAllocated += tt.allocSize rba, err := testbwagreement.GenerateRenterBandwidthAllocation(pba, snID.ID, upID, totalAllocated) - require.NoError(err) + require.NoError(t, err) err = stream.Send(&pb.PieceRetrieval{BandwidthAllocation: rba}) - require.NoError(err) + require.NoError(t, err) resp, err = stream.Recv() if tt.err != "" { - require.NotNil(err) + require.NotNil(t, err) if runtime.GOOS == "windows" && strings.Contains(tt.err, "no such file or directory") { //TODO (windows): ignoring for windows due to different underlying error return } - require.Equal(tt.err, err.Error()) + require.Equal(t, tt.err, err.Error()) return } + assert.NoError(t, err) data = fmt.Sprintf("%s%s", data, string(resp.GetContent())) totalRetrieved += resp.GetPieceSize() } - require.NoError(err) - require.NotNil(resp) - if resp != nil { - require.Equal(tt.respSize, totalRetrieved) - require.Equal(string(tt.content), data) - } + assert.NoError(t, err) + require.NotNil(t, resp) + + assert.Equal(t, tt.respSize, totalRetrieved) + assert.Equal(t, string(tt.content), data) }) } } @@ -324,24 +323,23 @@ func TestStore(t *testing.T) { } for _, tt := range tests { - t.Run("should return expected PieceStoreSummary values", func(t *testing.T) { + t.Run("", func(t *testing.T) { snID, upID := newTestID(ctx, t), newTestID(ctx, t) s, c, cleanup := NewTest(ctx, t, snID, upID, []storj.NodeID{}) defer cleanup() db := s.DB.DB - require := require.New(t) stream, err := c.Store(ctx) - require.NoError(err) + require.NoError(t, err) // Write the buffer to the stream we opened earlier err = stream.Send(&pb.PieceStore{PieceData: &pb.PieceStore_PieceData{Id: tt.id, ExpirationUnixSec: tt.ttl}}) - require.NoError(err) + require.NoError(t, err) // Send Bandwidth Allocation Data pba, err := testbwagreement.GeneratePayerBandwidthAllocation(pb.BandwidthAction_PUT, snID, upID, time.Hour) - require.NoError(err) + require.NoError(t, err) rba, err := testbwagreement.GenerateRenterBandwidthAllocation(pba, snID.ID, upID, tt.totalReceived) - require.NoError(err) + require.NoError(t, err) msg := &pb.PieceStore{ PieceData: &pb.PieceStore_PieceData{Content: tt.content}, BandwidthAllocation: rba, @@ -349,42 +347,42 @@ func TestStore(t *testing.T) { // Write the buffer to the stream we opened earlier err = stream.Send(msg) if err != io.EOF && err != nil { - require.NoError(err) + require.NoError(t, err) } resp, err := stream.CloseAndRecv() if tt.err != "" { - require.NotNil(err) - require.True(strings.HasPrefix(err.Error(), tt.err), "expected") + require.NotNil(t, err) + require.True(t, strings.HasPrefix(err.Error(), tt.err)) return } defer func() { _, err := db.Exec(fmt.Sprintf(`DELETE FROM ttl WHERE id="%s"`, tt.id)) - require.NoError(err) + require.NoError(t, err) }() // check db to make sure agreement and signature were stored correctly rows, err := db.Query(`SELECT agreement, signature FROM bandwidth_agreements`) - require.NoError(err) + require.NoError(t, err) - defer func() { require.NoError(rows.Close()) }() + defer func() { require.NoError(t, rows.Close()) }() for rows.Next() { var agreement, signature []byte err = rows.Scan(&agreement, &signature) - require.NoError(err) + require.NoError(t, err) rba := &pb.RenterBandwidthAllocation{} - require.NoError(proto.Unmarshal(agreement, rba)) - require.Equal(msg.BandwidthAllocation.GetSignature(), signature) - require.True(pb.Equal(pba, &rba.PayerAllocation)) - require.Equal(int64(len(tt.content)), rba.Total) + require.NoError(t, proto.Unmarshal(agreement, rba)) + require.Equal(t, msg.BandwidthAllocation.GetSignature(), signature) + require.True(t, pb.Equal(pba, &rba.PayerAllocation)) + require.Equal(t, int64(len(tt.content)), rba.Total) } err = rows.Err() - require.NoError(err) - require.NotNil(resp) - require.Equal(tt.message, resp.Message) - require.Equal(tt.totalReceived, resp.TotalReceived) + require.NoError(t, err) + require.NotNil(t, resp) + require.Equal(t, tt.message, resp.Message) + require.Equal(t, tt.totalReceived, resp.TotalReceived) }) } } @@ -433,25 +431,24 @@ func TestPbaValidation(t *testing.T) { } for _, tt := range tests { - t.Run("should validate payer bandwidth allocation struct", func(t *testing.T) { + t.Run("", func(t *testing.T) { s, c, cleanup := NewTest(ctx, t, snID, upID, tt.whitelist) defer cleanup() - require := require.New(t) stream, err := c.Store(ctx) - require.NoError(err) + require.NoError(t, err) //cleanup incase tests previously paniced _ = s.storage.Delete("99999999999999999999") // Write the buffer to the stream we opened earlier err = stream.Send(&pb.PieceStore{PieceData: &pb.PieceStore_PieceData{Id: "99999999999999999999", ExpirationUnixSec: 9999999999}}) - require.NoError(err) + require.NoError(t, err) // Send Bandwidth Allocation Data content := []byte("content") pba, err := testbwagreement.GeneratePayerBandwidthAllocation(tt.action, satID1, upID, time.Hour) - require.NoError(err) + require.NoError(t, err) rba, err := testbwagreement.GenerateRenterBandwidthAllocation(pba, snID.ID, upID, int64(len(content))) - require.NoError(err) + require.NoError(t, err) msg := &pb.PieceStore{ PieceData: &pb.PieceStore_PieceData{Content: content}, BandwidthAllocation: rba, @@ -460,15 +457,15 @@ func TestPbaValidation(t *testing.T) { // Write the buffer to the stream we opened earlier err = stream.Send(msg) if err != io.EOF && err != nil { - require.NoError(err) + require.NoError(t, err) } _, err = stream.CloseAndRecv() if err != nil { - //require.NotNil(err) + //require.NotNil(t, err) t.Log("Expected err string", tt.err) t.Log("Actual err.Error:", err.Error()) - require.Equal(tt.err, err.Error()) + require.Equal(t, tt.err, err.Error()) return } }) @@ -509,9 +506,7 @@ func TestDelete(t *testing.T) { } for _, tt := range tests { - t.Run("should return expected PieceDeleteSummary values", func(t *testing.T) { - require := require.New(t) - + t.Run("", func(t *testing.T) { // simulate piece stored with storagenode if err := writeFile(s, "11111111111111111111"); err != nil { t.Errorf("Error: %v\nCould not create test piece", err) @@ -520,31 +515,31 @@ func TestDelete(t *testing.T) { // simulate piece TTL entry _, err := db.Exec(fmt.Sprintf(`INSERT INTO ttl (id, created, expires) VALUES ("%s", "%d", "%d")`, tt.id, 1234567890, 1234567890)) - require.NoError(err) + require.NoError(t, err) defer func() { _, err := db.Exec(fmt.Sprintf(`DELETE FROM ttl WHERE id="%s"`, tt.id)) - require.NoError(err) + require.NoError(t, err) }() defer func() { - require.NoError(s.storage.Delete("11111111111111111111")) + require.NoError(t, s.storage.Delete("11111111111111111111")) }() req := &pb.PieceDelete{Id: tt.id} resp, err := c.Delete(ctx, req) if tt.err != "" { - require.Equal(tt.err, err.Error()) + require.Equal(t, tt.err, err.Error()) return } - require.NoError(err) - require.Equal(tt.message, resp.GetMessage()) + require.NoError(t, err) + require.Equal(t, tt.message, resp.GetMessage()) // if test passes, check if file was indeed deleted filePath, err := s.storage.PiecePath(tt.id) - require.NoError(err) + require.NoError(t, err) if _, err = os.Stat(filePath); os.IsExist(err) { t.Errorf("File not deleted") return @@ -561,7 +556,7 @@ func NewTest(ctx context.Context, t *testing.T, snID, upID *identity.FullIdentit tempDBPath := filepath.Join(tmp, "test.db") tempDir := filepath.Join(tmp, "test-data", "3000") storage := pstore.NewStorage(tempDir) - psDB, err := psdb.Open(ctx, storage, tempDBPath) + psDB, err := psdb.Open(tempDBPath) require.NoError(t, err) verifier := func(authorization *pb.SignedMessage) error { return nil diff --git a/pkg/piecestore/psserver/store.go b/pkg/piecestore/psserver/store.go index da1125a74..6f2cfd3b5 100644 --- a/pkg/piecestore/psserver/store.go +++ b/pkg/piecestore/psserver/store.go @@ -91,7 +91,9 @@ func (s *Server) storeData(ctx context.Context, stream pb.PieceStoreRoutes_Store return 0, err } - defer utils.LogClose(storeFile) + defer func() { + err = errs.Combine(err, storeFile.Close()) + }() bwUsed, err := s.DB.GetTotalBandwidthBetween(getBeginningOfMonth(), time.Now()) if err != nil { diff --git a/storagenode/peer.go b/storagenode/peer.go index 623ccb61f..074e11254 100644 --- a/storagenode/peer.go +++ b/storagenode/peer.go @@ -77,8 +77,9 @@ type Peer struct { } Storage struct { - Endpoint *psserver.Server // TODO: separate into endpoint and service - Monitor *psserver.Monitor + Endpoint *psserver.Server // TODO: separate into endpoint and service + Monitor *psserver.Monitor + Collector *psserver.Collector } Agreements struct { @@ -172,6 +173,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config) (*P // TODO: organize better peer.Storage.Monitor = psserver.NewMonitor(peer.Log.Named("piecestore:monitor"), config.KBucketRefreshInterval, peer.Kademlia.RoutingTable, peer.Storage.Endpoint) + peer.Storage.Collector = psserver.NewCollector(peer.Log.Named("piecestore:collector"), peer.DB.PSDB(), peer.DB.Storage(), config.CollectorInterval) } { // agreements @@ -204,6 +206,9 @@ func (peer *Peer) Run(ctx context.Context) error { group.Go(func() error { return ignoreCancel(peer.Storage.Monitor.Run(ctx)) }) + group.Go(func() error { + return ignoreCancel(peer.Storage.Collector.Run(ctx)) + }) group.Go(func() error { // TODO: move the message into Server instead peer.Log.Sugar().Infof("Node %s started on %s", peer.Identity.ID, peer.Public.Server.Addr().String()) diff --git a/storagenode/storagenodedb/database.go b/storagenode/storagenodedb/database.go index b0f2f345e..8c737f546 100644 --- a/storagenode/storagenodedb/database.go +++ b/storagenode/storagenodedb/database.go @@ -4,8 +4,6 @@ package storagenodedb import ( - "context" - "github.com/zeebo/errs" "storj.io/storj/pkg/kademlia" @@ -38,8 +36,7 @@ type DB struct { func New(config Config) (*DB, error) { storage := pstore.NewStorage(config.Storage) - // TODO: Open shouldn't need context argument - psdb, err := psdb.Open(context.TODO(), storage, config.Info) + psdb, err := psdb.Open(config.Info) if err != nil { return nil, err } @@ -62,8 +59,7 @@ func New(config Config) (*DB, error) { func NewInMemory(storageDir string) (*DB, error) { storage := pstore.NewStorage(storageDir) - // TODO: OpenInMemory shouldn't need context argument - psdb, err := psdb.OpenInMemory(context.TODO(), storage) + psdb, err := psdb.OpenInMemory() if err != nil { return nil, err }