diff --git a/storagenode/inspector/inspector.go b/storagenode/inspector/inspector.go index 8841dd91d..f2c6b6f45 100644 --- a/storagenode/inspector/inspector.go +++ b/storagenode/inspector/inspector.go @@ -51,7 +51,8 @@ func NewEndpoint(log *zap.Logger, pieceInfo pieces.DB, kademlia *kademlia.Kademl } } -func (inspector *Endpoint) retrieveStats(ctx context.Context) (*pb.StatSummaryResponse, error) { +func (inspector *Endpoint) retrieveStats(ctx context.Context) (_ *pb.StatSummaryResponse, err error) { + defer mon.Task()(&ctx)(&err) // Space Usage totalUsedSpace, err := inspector.pieceInfo.SpaceUsed(ctx) @@ -93,7 +94,9 @@ func (inspector *Endpoint) Stats(ctx context.Context, in *pb.StatsRequest) (out return statsSummary, nil } -func (inspector *Endpoint) getDashboardData(ctx context.Context) (*pb.DashboardResponse, error) { +func (inspector *Endpoint) getDashboardData(ctx context.Context) (_ *pb.DashboardResponse, err error) { + defer mon.Task()(&ctx)(&err) + statsSummary, err := inspector.retrieveStats(ctx) if err != nil { return &pb.DashboardResponse{}, Error.Wrap(err) diff --git a/storagenode/monitor/monitor.go b/storagenode/monitor/monitor.go index 7bfb8bd07..a7e225f29 100644 --- a/storagenode/monitor/monitor.go +++ b/storagenode/monitor/monitor.go @@ -64,7 +64,7 @@ func (service *Service) Run(ctx context.Context) (err error) { // get the disk space details // The returned path ends in a slash only if it represents a root directory, such as "/" on Unix or `C:\` on Windows. - storageStatus, err := service.store.StorageStatus() + storageStatus, err := service.store.StorageStatus(ctx) if err != nil { return Error.Wrap(err) } @@ -122,7 +122,9 @@ func (service *Service) Close() (err error) { return nil } -func (service *Service) updateNodeInformation(ctx context.Context) error { +func (service *Service) updateNodeInformation(ctx context.Context) (err error) { + defer mon.Task()(&ctx)(&err) + usedSpace, err := service.usedSpace(ctx) if err != nil { return Error.Wrap(err) @@ -141,7 +143,8 @@ func (service *Service) updateNodeInformation(ctx context.Context) error { return nil } -func (service *Service) usedSpace(ctx context.Context) (int64, error) { +func (service *Service) usedSpace(ctx context.Context) (_ int64, err error) { + defer mon.Task()(&ctx)(&err) usedSpace, err := service.pieceInfo.SpaceUsed(ctx) if err != nil { return 0, err @@ -149,7 +152,8 @@ func (service *Service) usedSpace(ctx context.Context) (int64, error) { return usedSpace, nil } -func (service *Service) usedBandwidth(ctx context.Context) (int64, error) { +func (service *Service) usedBandwidth(ctx context.Context) (_ int64, err error) { + defer mon.Task()(&ctx)(&err) usage, err := bandwidth.TotalMonthlySummary(ctx, service.usageDB) if err != nil { return 0, err @@ -158,7 +162,8 @@ func (service *Service) usedBandwidth(ctx context.Context) (int64, error) { } // AvailableSpace returns available disk space for upload -func (service *Service) AvailableSpace(ctx context.Context) (int64, error) { +func (service *Service) AvailableSpace(ctx context.Context) (_ int64, err error) { + defer mon.Task()(&ctx)(&err) usedSpace, err := service.pieceInfo.SpaceUsed(ctx) if err != nil { return 0, Error.Wrap(err) @@ -168,7 +173,8 @@ func (service *Service) AvailableSpace(ctx context.Context) (int64, error) { } // AvailableBandwidth returns available bandwidth for upload/download -func (service *Service) AvailableBandwidth(ctx context.Context) (int64, error) { +func (service *Service) AvailableBandwidth(ctx context.Context) (_ int64, err error) { + defer mon.Task()(&ctx)(&err) usage, err := bandwidth.TotalMonthlySummary(ctx, service.usageDB) if err != nil { return 0, Error.Wrap(err) diff --git a/storagenode/orders/sender.go b/storagenode/orders/sender.go index 4bf2356c3..930bf6019 100644 --- a/storagenode/orders/sender.go +++ b/storagenode/orders/sender.go @@ -8,8 +8,10 @@ import ( "io" "time" + "github.com/zeebo/errs" "go.uber.org/zap" "golang.org/x/sync/errgroup" + monkit "gopkg.in/spacemonkeygo/monkit.v2" "storj.io/storj/internal/sync2" "storj.io/storj/pkg/identity" @@ -19,6 +21,13 @@ import ( "storj.io/storj/pkg/transport" ) +var ( + // OrderError represents errors with orders + OrderError = errs.Class("order") + + mon = monkit.Package() +) + // Info contains full information about an order. type Info struct { Limit *pb.OrderLimit2 @@ -94,66 +103,75 @@ func NewSender(log *zap.Logger, transport transport.Client, kademlia *kademlia.K } // Run sends orders on every interval to the appropriate satellites. -func (sender *Sender) Run(ctx context.Context) error { - return sender.Loop.Run(ctx, func(ctx context.Context) error { - sender.log.Debug("sending") +func (sender *Sender) Run(ctx context.Context) (err error) { + defer mon.Task()(&ctx)(&err) + return sender.Loop.Run(ctx, sender.runOnce) +} - ordersBySatellite, err := sender.orders.ListUnsentBySatellite(ctx) - if err != nil { - sender.log.Error("listing orders", zap.Error(err)) - return nil - } - - if len(ordersBySatellite) > 0 { - var group errgroup.Group - ctx, cancel := context.WithTimeout(ctx, sender.config.Timeout) - defer cancel() - - for satelliteID, orders := range ordersBySatellite { - satelliteID, orders := satelliteID, orders - group.Go(func() error { - - sender.Settle(ctx, satelliteID, orders) - return nil - }) - } - _ = group.Wait() // doesn't return errors - } else { - sender.log.Debug("no orders to send") - } +func (sender *Sender) runOnce(ctx context.Context) (err error) { + defer mon.Task()(&ctx)(&err) + sender.log.Debug("sending") + ordersBySatellite, err := sender.orders.ListUnsentBySatellite(ctx) + if err != nil { + sender.log.Error("listing orders", zap.Error(err)) return nil - }) + } + + if len(ordersBySatellite) > 0 { + var group errgroup.Group + ctx, cancel := context.WithTimeout(ctx, sender.config.Timeout) + defer cancel() + + for satelliteID, orders := range ordersBySatellite { + satelliteID, orders := satelliteID, orders + group.Go(func() error { + + sender.Settle(ctx, satelliteID, orders) + return nil + }) + } + _ = group.Wait() // doesn't return errors + } else { + sender.log.Debug("no orders to send") + } + + return nil } // Settle uploads orders to the satellite. func (sender *Sender) Settle(ctx context.Context, satelliteID storj.NodeID, orders []*Info) { log := sender.log.Named(satelliteID.String()) + err := sender.settle(ctx, log, satelliteID, orders) + if err != nil { + log.Error("failed to settle orders", zap.Error(err)) + } +} + +func (sender *Sender) settle(ctx context.Context, log *zap.Logger, satelliteID storj.NodeID, orders []*Info) (err error) { + defer mon.Task()(&ctx)(&err) log.Info("sending", zap.Int("count", len(orders))) defer log.Info("finished") satellite, err := sender.kademlia.FindNode(ctx, satelliteID) if err != nil { - log.Error("unable to find satellite on the network", zap.Error(err)) - return + return OrderError.New("unable to find satellite on the network: %v", err) } conn, err := sender.transport.DialNode(ctx, &satellite) if err != nil { - log.Error("unable to connect to the satellite", zap.Error(err)) - return + return OrderError.New("unable to connect to the satellite: %v", err) } defer func() { - if err := conn.Close(); err != nil { - log.Warn("failed to close connection", zap.Error(err)) + if cerr := conn.Close(); cerr != nil { + err = errs.Combine(err, OrderError.New("failed to close connection: %v", err)) } }() client, err := pb.NewOrdersClient(conn).Settlement(ctx) if err != nil { - log.Error("failed to start settlement", zap.Error(err)) - return + return OrderError.New("failed to start settlement: %v", err) } var group errgroup.Group @@ -170,6 +188,11 @@ func (sender *Sender) Settle(ctx context.Context, satelliteID storj.NodeID, orde return client.CloseSend() }) + var errList errs.Group + errHandle := func(cls errs.Class, format string, args ...interface{}) { + log.Sugar().Errorf(format, args...) + errList.Add(cls.New(format, args...)) + } for { response, err := client.Recv() if err != nil { @@ -177,7 +200,7 @@ func (sender *Sender) Settle(ctx context.Context, satelliteID storj.NodeID, orde break } - log.Error("failed to receive response", zap.Error(err)) + errHandle(OrderError, "failed to receive response: %v", err) break } @@ -185,21 +208,23 @@ func (sender *Sender) Settle(ctx context.Context, satelliteID storj.NodeID, orde case pb.SettlementResponse_ACCEPTED: err = sender.orders.Archive(ctx, satelliteID, response.SerialNumber, StatusAccepted) if err != nil { - log.Error("failed to archive order as accepted", zap.Stringer("serial", response.SerialNumber), zap.Error(err)) + errHandle(OrderError, "failed to archive order as accepted: serial: %v, %v", response.SerialNumber, err) } case pb.SettlementResponse_REJECTED: err = sender.orders.Archive(ctx, satelliteID, response.SerialNumber, StatusRejected) if err != nil { - log.Error("failed to archive order as rejected", zap.Stringer("serial", response.SerialNumber), zap.Error(err)) + errHandle(OrderError, "failed to archive order as rejected: serial: %v, %v", response.SerialNumber, err) } default: - log.Error("unexpected response", zap.Error(err)) + errHandle(OrderError, "unexpected response: %v", response.Status) } } if err := group.Wait(); err != nil { - log.Error("sending agreements returned an error", zap.Error(err)) + errHandle(OrderError, "sending aggreements returned an error: %v", err) } + + return errList.Err() } // Close stops the sending service. diff --git a/storagenode/peer.go b/storagenode/peer.go index 0f3979b88..0f1cda044 100644 --- a/storagenode/peer.go +++ b/storagenode/peer.go @@ -9,6 +9,7 @@ import ( "github.com/zeebo/errs" "go.uber.org/zap" "golang.org/x/sync/errgroup" + monkit "gopkg.in/spacemonkeygo/monkit.v2" "storj.io/storj/internal/errs2" "storj.io/storj/internal/version" @@ -32,6 +33,10 @@ import ( "storj.io/storj/storagenode/trust" ) +var ( + mon = monkit.Package() +) + // DB is the master database for Storage Node type DB interface { // CreateTables initializes the database @@ -249,7 +254,9 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config, ver } // Run runs storage node until it's either closed or it errors. -func (peer *Peer) Run(ctx context.Context) error { +func (peer *Peer) Run(ctx context.Context) (err error) { + defer mon.Task()(&ctx)(&err) + group, ctx := errgroup.WithContext(ctx) group.Go(func() error { diff --git a/storagenode/pieces/readwrite.go b/storagenode/pieces/readwrite.go index a25a5e98a..8cbc5f14a 100644 --- a/storagenode/pieces/readwrite.go +++ b/storagenode/pieces/readwrite.go @@ -5,6 +5,7 @@ package pieces import ( "bufio" + "context" "hash" "io" @@ -48,7 +49,8 @@ func (w *Writer) Size() int64 { return w.size } func (w *Writer) Hash() []byte { return w.hash.Sum(nil) } // Commit commits piece to permanent storage. -func (w *Writer) Commit() error { +func (w *Writer) Commit(ctx context.Context) (err error) { + defer mon.Task()(&ctx)(&err) if w.closed { return Error.New("already closed") } @@ -60,7 +62,8 @@ func (w *Writer) Commit() error { } // Cancel deletes any temporarily written data. -func (w *Writer) Cancel() error { +func (w *Writer) Cancel(ctx context.Context) (err error) { + defer mon.Task()(&ctx)(&err) if w.closed { return nil } diff --git a/storagenode/pieces/store.go b/storagenode/pieces/store.go index 6444c7479..42b605f7a 100644 --- a/storagenode/pieces/store.go +++ b/storagenode/pieces/store.go @@ -10,6 +10,7 @@ import ( "github.com/zeebo/errs" "go.uber.org/zap" + monkit "gopkg.in/spacemonkeygo/monkit.v2" "storj.io/storj/internal/memory" "storj.io/storj/pkg/identity" @@ -24,8 +25,12 @@ const ( preallocSize = 4 * memory.MiB ) -// Error is the default error class. -var Error = errs.Class("pieces error") +var ( + // Error is the default error class. + Error = errs.Class("pieces error") + + mon = monkit.Package() +) // Info contains all the information we need to know about a Piece to manage them. type Info struct { @@ -77,7 +82,8 @@ func NewStore(log *zap.Logger, blobs storage.Blobs) *Store { } // Writer returns a new piece writer. -func (store *Store) Writer(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (*Writer, error) { +func (store *Store) Writer(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (_ *Writer, err error) { + defer mon.Task()(&ctx)(&err) blob, err := store.blobs.Create(ctx, storage.BlobRef{ Namespace: satellite.Bytes(), Key: pieceID.Bytes(), @@ -91,7 +97,8 @@ func (store *Store) Writer(ctx context.Context, satellite storj.NodeID, pieceID } // Reader returns a new piece reader. -func (store *Store) Reader(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (*Reader, error) { +func (store *Store) Reader(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (_ *Reader, err error) { + defer mon.Task()(&ctx)(&err) blob, err := store.blobs.Open(ctx, storage.BlobRef{ Namespace: satellite.Bytes(), Key: pieceID.Bytes(), @@ -108,8 +115,9 @@ func (store *Store) Reader(ctx context.Context, satellite storj.NodeID, pieceID } // Delete deletes the specified piece. -func (store *Store) Delete(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) error { - err := store.blobs.Delete(ctx, storage.BlobRef{ +func (store *Store) Delete(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (err error) { + defer mon.Task()(&ctx)(&err) + err = store.blobs.Delete(ctx, storage.BlobRef{ Namespace: satellite.Bytes(), Key: pieceID.Bytes(), }) @@ -123,7 +131,8 @@ type StorageStatus struct { } // StorageStatus returns information about the disk. -func (store *Store) StorageStatus() (StorageStatus, error) { +func (store *Store) StorageStatus(ctx context.Context) (_ StorageStatus, err error) { + defer mon.Task()(&ctx)(&err) diskFree, err := store.blobs.FreeSpace() if err != nil { return StorageStatus{}, err diff --git a/storagenode/pieces/store_test.go b/storagenode/pieces/store_test.go index 367c99b97..4850e8873 100644 --- a/storagenode/pieces/store_test.go +++ b/storagenode/pieces/store_test.go @@ -54,9 +54,9 @@ func TestPieces(t *testing.T) { assert.Equal(t, hash.Sum(nil), writer.Hash()) // commit - require.NoError(t, writer.Commit()) + require.NoError(t, writer.Commit(ctx)) // after commit we should be able to call cancel without an error - require.NoError(t, writer.Cancel()) + require.NoError(t, writer.Cancel(ctx)) } { // valid reads @@ -101,9 +101,9 @@ func TestPieces(t *testing.T) { assert.Equal(t, len(source), int(writer.Size())) // cancel writing - require.NoError(t, writer.Cancel()) + require.NoError(t, writer.Cancel(ctx)) // commit should not fail - require.Error(t, writer.Commit()) + require.Error(t, writer.Commit(ctx)) // read should fail _, err = store.Reader(ctx, satelliteID, cancelledPieceID) diff --git a/storagenode/piecestore/endpoint.go b/storagenode/piecestore/endpoint.go index 13819ad6d..c8a5cf530 100644 --- a/storagenode/piecestore/endpoint.go +++ b/storagenode/piecestore/endpoint.go @@ -194,7 +194,7 @@ func (endpoint *Endpoint) Upload(stream pb.Piecestore_UploadServer) (err error) } defer func() { // cancel error if it hasn't been committed - if cancelErr := pieceWriter.Cancel(); cancelErr != nil { + if cancelErr := pieceWriter.Cancel(ctx); cancelErr != nil { endpoint.log.Error("error during canceling a piece write", zap.Error(cancelErr)) } }() @@ -263,7 +263,7 @@ func (endpoint *Endpoint) Upload(stream pb.Piecestore_UploadServer) (err error) return err // TODO: report grpc status internal server error } - if err := pieceWriter.Commit(); err != nil { + if err := pieceWriter.Commit(ctx); err != nil { return ErrInternal.Wrap(err) // TODO: report grpc status internal server error } @@ -494,11 +494,14 @@ func (endpoint *Endpoint) Download(stream pb.Piecestore_DownloadServer) (err err // SaveOrder saves the order with all necessary information. It assumes it has been already verified. func (endpoint *Endpoint) SaveOrder(ctx context.Context, limit *pb.OrderLimit2, order *pb.Order2, uplink *identity.PeerIdentity) { + var err error + defer mon.Task()(&ctx)(&err) + // TODO: do this in a goroutine if order == nil || order.Amount <= 0 { return } - err := endpoint.orders.Enqueue(ctx, &orders.Info{ + err = endpoint.orders.Enqueue(ctx, &orders.Info{ Limit: limit, Order: order, Uplink: uplink, @@ -506,7 +509,7 @@ func (endpoint *Endpoint) SaveOrder(ctx context.Context, limit *pb.OrderLimit2, if err != nil { endpoint.log.Error("failed to add order", zap.Error(err)) } else { - err := endpoint.usage.Add(ctx, limit.SatelliteId, limit.Action, order.Amount, time.Now()) + err = endpoint.usage.Add(ctx, limit.SatelliteId, limit.Action, order.Amount, time.Now()) if err != nil { endpoint.log.Error("failed to add bandwidth usage", zap.Error(err)) } diff --git a/storagenode/piecestore/verification.go b/storagenode/piecestore/verification.go index 138499574..8e043af46 100644 --- a/storagenode/piecestore/verification.go +++ b/storagenode/piecestore/verification.go @@ -28,7 +28,9 @@ var ( // VerifyOrderLimit verifies that the order limit is properly signed and has sane values. // It also verifies that the serial number has not been used. -func (endpoint *Endpoint) VerifyOrderLimit(ctx context.Context, limit *pb.OrderLimit2) error { +func (endpoint *Endpoint) VerifyOrderLimit(ctx context.Context, limit *pb.OrderLimit2) (err error) { + defer mon.Task()(&ctx)(&err) + // sanity checks switch { case limit.Limit < 0: @@ -85,7 +87,9 @@ func (endpoint *Endpoint) VerifyOrderLimit(ctx context.Context, limit *pb.OrderL } // VerifyOrder verifies that the order corresponds to the order limit and has all the necessary fields. -func (endpoint *Endpoint) VerifyOrder(ctx context.Context, peer *identity.PeerIdentity, limit *pb.OrderLimit2, order *pb.Order2, largestOrderAmount int64) error { +func (endpoint *Endpoint) VerifyOrder(ctx context.Context, peer *identity.PeerIdentity, limit *pb.OrderLimit2, order *pb.Order2, largestOrderAmount int64) (err error) { + defer mon.Task()(&ctx)(&err) + if order.SerialNumber != limit.SerialNumber { return ErrProtocol.New("order serial number changed during upload") // TODO: report grpc status bad message } @@ -105,7 +109,9 @@ func (endpoint *Endpoint) VerifyOrder(ctx context.Context, peer *identity.PeerId } // VerifyPieceHash verifies whether the piece hash is properly signed and matches the locally computed hash. -func (endpoint *Endpoint) VerifyPieceHash(ctx context.Context, peer *identity.PeerIdentity, limit *pb.OrderLimit2, hash *pb.PieceHash, expectedHash []byte) error { +func (endpoint *Endpoint) VerifyPieceHash(ctx context.Context, peer *identity.PeerIdentity, limit *pb.OrderLimit2, hash *pb.PieceHash, expectedHash []byte) (err error) { + defer mon.Task()(&ctx)(&err) + if peer == nil || limit == nil || hash == nil || len(expectedHash) == 0 { return ErrProtocol.New("invalid arguments") } @@ -124,7 +130,9 @@ func (endpoint *Endpoint) VerifyPieceHash(ctx context.Context, peer *identity.Pe } // VerifyOrderLimitSignature verifies that the order limit signature is valid. -func (endpoint *Endpoint) VerifyOrderLimitSignature(ctx context.Context, limit *pb.OrderLimit2) error { +func (endpoint *Endpoint) VerifyOrderLimitSignature(ctx context.Context, limit *pb.OrderLimit2) (err error) { + defer mon.Task()(&ctx)(&err) + signee, err := endpoint.trust.GetSignee(ctx, limit.SatelliteId) if err != nil { if err == context.Canceled { diff --git a/storagenode/storagenodedb/bandwidthdb.go b/storagenode/storagenodedb/bandwidthdb.go index 233c751c0..acf4227ca 100644 --- a/storagenode/storagenodedb/bandwidthdb.go +++ b/storagenode/storagenodedb/bandwidthdb.go @@ -24,11 +24,12 @@ func (db *DB) Bandwidth() bandwidth.DB { return db.info.Bandwidth() } func (db *InfoDB) Bandwidth() bandwidth.DB { return &bandwidthdb{db} } // Add adds bandwidth usage to the table -func (db *bandwidthdb) Add(ctx context.Context, satelliteID storj.NodeID, action pb.PieceAction, amount int64, created time.Time) error { +func (db *bandwidthdb) Add(ctx context.Context, satelliteID storj.NodeID, action pb.PieceAction, amount int64, created time.Time) (err error) { + defer mon.Task()(&ctx)(&err) defer db.locked()() - _, err := db.db.Exec(` - INSERT INTO + _, err = db.db.Exec(` + INSERT INTO bandwidth_usage(satellite_id, action, amount, created_at) VALUES(?, ?, ?, ?)`, satelliteID, action, amount, created) @@ -37,12 +38,13 @@ func (db *bandwidthdb) Add(ctx context.Context, satelliteID storj.NodeID, action // Summary returns summary of bandwidth usages func (db *bandwidthdb) Summary(ctx context.Context, from, to time.Time) (_ *bandwidth.Usage, err error) { + defer mon.Task()(&ctx)(&err) defer db.locked()() usage := &bandwidth.Usage{} rows, err := db.db.Query(` - SELECT action, sum(amount) + SELECT action, sum(amount) FROM bandwidth_usage WHERE ? <= created_at AND created_at <= ? GROUP BY action`, from, to) @@ -69,12 +71,13 @@ func (db *bandwidthdb) Summary(ctx context.Context, from, to time.Time) (_ *band // SummaryBySatellite returns summary of bandwidth usage grouping by satellite. func (db *bandwidthdb) SummaryBySatellite(ctx context.Context, from, to time.Time) (_ map[storj.NodeID]*bandwidth.Usage, err error) { + defer mon.Task()(&ctx)(&err) defer db.locked()() entries := map[storj.NodeID]*bandwidth.Usage{} rows, err := db.db.Query(` - SELECT satellite_id, action, sum(amount) + SELECT satellite_id, action, sum(amount) FROM bandwidth_usage WHERE ? <= created_at AND created_at <= ? GROUP BY satellite_id, action`, from, to) diff --git a/storagenode/storagenodedb/certdb.go b/storagenode/storagenodedb/certdb.go index 69d89f243..55b5397ac 100644 --- a/storagenode/storagenodedb/certdb.go +++ b/storagenode/storagenodedb/certdb.go @@ -8,12 +8,18 @@ import ( "crypto/x509" "encoding/asn1" + monkit "gopkg.in/spacemonkeygo/monkit.v2" + "storj.io/storj/internal/dbutil/sqliteutil" "storj.io/storj/pkg/identity" "storj.io/storj/pkg/pkcrypto" "storj.io/storj/storagenode/trust" ) +var ( + mon = monkit.Package() +) + type certdb struct { *InfoDB } @@ -26,6 +32,8 @@ func (db *InfoDB) CertDB() trust.CertDB { return &certdb{db} } // Include includes the certificate in the table and returns an unique id. func (db *certdb) Include(ctx context.Context, pi *identity.PeerIdentity) (certid int64, err error) { + defer mon.Task()(&ctx)(&err) + chain := encodePeerIdentity(pi) defer db.locked()() @@ -43,11 +51,13 @@ func (db *certdb) Include(ctx context.Context, pi *identity.PeerIdentity) (certi } // LookupByCertID finds certificate by the certid returned by Include. -func (db *certdb) LookupByCertID(ctx context.Context, id int64) (*identity.PeerIdentity, error) { +func (db *certdb) LookupByCertID(ctx context.Context, id int64) (_ *identity.PeerIdentity, err error) { + defer mon.Task()(&ctx)(&err) + var pem *[]byte db.mu.Lock() - err := db.db.QueryRow(`SELECT peer_identity FROM certificate WHERE cert_id = ?`, id).Scan(&pem) + err = db.db.QueryRow(`SELECT peer_identity FROM certificate WHERE cert_id = ?`, id).Scan(&pem) db.mu.Unlock() if err != nil { @@ -57,7 +67,7 @@ func (db *certdb) LookupByCertID(ctx context.Context, id int64) (*identity.PeerI return nil, ErrInfo.New("did not find certificate") } - peer, err := decodePeerIdentity(*pem) + peer, err := decodePeerIdentity(ctx, *pem) return peer, ErrInfo.Wrap(err) } @@ -72,7 +82,9 @@ func encodePeerIdentity(pi *identity.PeerIdentity) []byte { return chain } -func decodePeerIdentity(chain []byte) (*identity.PeerIdentity, error) { +func decodePeerIdentity(ctx context.Context, chain []byte) (_ *identity.PeerIdentity, err error) { + defer mon.Task()(&ctx)(&err) + var certs []*x509.Certificate for len(chain) > 0 { var raw asn1.RawValue diff --git a/storagenode/storagenodedb/orders.go b/storagenode/storagenodedb/orders.go index fefb11daa..71eaba3c0 100644 --- a/storagenode/storagenodedb/orders.go +++ b/storagenode/storagenodedb/orders.go @@ -26,7 +26,9 @@ func (db *DB) Orders() orders.DB { return db.info.Orders() } func (db *InfoDB) Orders() orders.DB { return &ordersdb{db} } // Enqueue inserts order to the unsent list -func (db *ordersdb) Enqueue(ctx context.Context, info *orders.Info) error { +func (db *ordersdb) Enqueue(ctx context.Context, info *orders.Info) (err error) { + defer mon.Task()(&ctx)(&err) + certdb := db.CertDB() uplinkCertID, err := certdb.Include(ctx, info.Uplink) @@ -64,6 +66,7 @@ func (db *ordersdb) Enqueue(ctx context.Context, info *orders.Info) error { // ListUnsent returns orders that haven't been sent yet. func (db *ordersdb) ListUnsent(ctx context.Context, limit int) (_ []*orders.Info, err error) { + defer mon.Task()(&ctx)(&err) defer db.locked()() rows, err := db.db.Query(` @@ -105,7 +108,7 @@ func (db *ordersdb) ListUnsent(ctx context.Context, limit int) (_ []*orders.Info return nil, ErrInfo.Wrap(err) } - info.Uplink, err = decodePeerIdentity(uplinkIdentity) + info.Uplink, err = decodePeerIdentity(ctx, uplinkIdentity) if err != nil { return nil, ErrInfo.Wrap(err) } @@ -118,7 +121,8 @@ func (db *ordersdb) ListUnsent(ctx context.Context, limit int) (_ []*orders.Info // ListUnsentBySatellite returns orders that haven't been sent yet grouped by satellite. // Does not return uplink identity. -func (db *ordersdb) ListUnsentBySatellite(ctx context.Context) (map[storj.NodeID][]*orders.Info, error) { +func (db *ordersdb) ListUnsentBySatellite(ctx context.Context) (_ map[storj.NodeID][]*orders.Info, err error) { + defer mon.Task()(&ctx)(&err) defer db.locked()() // TODO: add some limiting @@ -165,7 +169,8 @@ func (db *ordersdb) ListUnsentBySatellite(ctx context.Context) (map[storj.NodeID } // Archive marks order as being handled. -func (db *ordersdb) Archive(ctx context.Context, satellite storj.NodeID, serial storj.SerialNumber, status orders.Status) error { +func (db *ordersdb) Archive(ctx context.Context, satellite storj.NodeID, serial storj.SerialNumber, status orders.Status) (err error) { + defer mon.Task()(&ctx)(&err) defer db.locked()() result, err := db.db.Exec(` @@ -174,15 +179,15 @@ func (db *ordersdb) Archive(ctx context.Context, satellite storj.NodeID, serial order_limit_serialized, order_serialized, uplink_cert_id, status, archived_at - ) SELECT + ) SELECT satellite_id, serial_number, - order_limit_serialized, order_serialized, + order_limit_serialized, order_serialized, uplink_cert_id, ?, ? FROM unsent_order WHERE satellite_id = ? AND serial_number = ?; - DELETE FROM unsent_order + DELETE FROM unsent_order WHERE satellite_id = ? AND serial_number = ?; `, int(status), time.Now(), satellite, serial, satellite, serial) if err != nil { @@ -201,11 +206,12 @@ func (db *ordersdb) Archive(ctx context.Context, satellite storj.NodeID, serial } // ListArchived returns orders that have been sent. -func (db *ordersdb) ListArchived(ctx context.Context, limit int) ([]*orders.ArchivedInfo, error) { +func (db *ordersdb) ListArchived(ctx context.Context, limit int) (_ []*orders.ArchivedInfo, err error) { + defer mon.Task()(&ctx)(&err) defer db.locked()() rows, err := db.db.Query(` - SELECT order_limit_serialized, order_serialized, certificate.peer_identity, + SELECT order_limit_serialized, order_serialized, certificate.peer_identity, status, archived_at FROM order_archive INNER JOIN certificate on order_archive.uplink_cert_id = certificate.cert_id @@ -250,7 +256,7 @@ func (db *ordersdb) ListArchived(ctx context.Context, limit int) ([]*orders.Arch return nil, ErrInfo.Wrap(err) } - info.Uplink, err = decodePeerIdentity(uplinkIdentity) + info.Uplink, err = decodePeerIdentity(ctx, uplinkIdentity) if err != nil { return nil, ErrInfo.Wrap(err) } diff --git a/storagenode/storagenodedb/pieceinfo.go b/storagenode/storagenodedb/pieceinfo.go index ea28ee0d0..8177cfd29 100644 --- a/storagenode/storagenodedb/pieceinfo.go +++ b/storagenode/storagenodedb/pieceinfo.go @@ -25,7 +25,8 @@ func (db *DB) PieceInfo() pieces.DB { return db.info.PieceInfo() } func (db *InfoDB) PieceInfo() pieces.DB { return &pieceinfo{db} } // Add inserts piece information into the database. -func (db *pieceinfo) Add(ctx context.Context, info *pieces.Info) error { +func (db *pieceinfo) Add(ctx context.Context, info *pieces.Info) (err error) { + defer mon.Task()(&ctx)(&err) certdb := db.CertDB() certid, err := certdb.Include(ctx, info.Uplink) if err != nil { @@ -49,7 +50,8 @@ func (db *pieceinfo) Add(ctx context.Context, info *pieces.Info) error { } // Get gets piece information by satellite id and piece id. -func (db *pieceinfo) Get(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) (*pieces.Info, error) { +func (db *pieceinfo) Get(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) (_ *pieces.Info, err error) { + defer mon.Task()(&ctx)(&err) info := &pieces.Info{} info.SatelliteID = satelliteID info.PieceID = pieceID @@ -58,7 +60,7 @@ func (db *pieceinfo) Get(ctx context.Context, satelliteID storj.NodeID, pieceID var uplinkIdentity []byte db.mu.Lock() - err := db.db.QueryRowContext(ctx, db.Rebind(` + err = db.db.QueryRowContext(ctx, db.Rebind(` SELECT piece_size, piece_expiration, uplink_piece_hash, certificate.peer_identity FROM pieceinfo INNER JOIN certificate ON pieceinfo.uplink_cert_id = certificate.cert_id @@ -76,7 +78,7 @@ func (db *pieceinfo) Get(ctx context.Context, satelliteID storj.NodeID, pieceID return nil, ErrInfo.Wrap(err) } - info.Uplink, err = decodePeerIdentity(uplinkIdentity) + info.Uplink, err = decodePeerIdentity(ctx, uplinkIdentity) if err != nil { return nil, ErrInfo.Wrap(err) } @@ -85,12 +87,13 @@ func (db *pieceinfo) Get(ctx context.Context, satelliteID storj.NodeID, pieceID } // Delete deletes piece information. -func (db *pieceinfo) Delete(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) error { +func (db *pieceinfo) Delete(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) (err error) { + defer mon.Task()(&ctx)(&err) defer db.locked()() - _, err := db.db.ExecContext(ctx, db.Rebind(` - DELETE FROM pieceinfo - WHERE satellite_id = ? + _, err = db.db.ExecContext(ctx, db.Rebind(` + DELETE FROM pieceinfo + WHERE satellite_id = ? AND piece_id = ? `), satelliteID, pieceID) @@ -98,13 +101,14 @@ func (db *pieceinfo) Delete(ctx context.Context, satelliteID storj.NodeID, piece } // DeleteFailed marks piece as a failed deletion. -func (db *pieceinfo) DeleteFailed(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID, now time.Time) error { +func (db *pieceinfo) DeleteFailed(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID, now time.Time) (err error) { + defer mon.Task()(&ctx)(&err) defer db.locked()() - _, err := db.db.ExecContext(ctx, db.Rebind(` - UPDATE pieceinfo + _, err = db.db.ExecContext(ctx, db.Rebind(` + UPDATE pieceinfo SET deletion_failed_at = ? - WHERE satellite_id = ? + WHERE satellite_id = ? AND piece_id = ? `), now, satelliteID, pieceID) @@ -113,6 +117,7 @@ func (db *pieceinfo) DeleteFailed(ctx context.Context, satelliteID storj.NodeID, // GetExpired gets pieceinformation identites that are expired. func (db *pieceinfo) GetExpired(ctx context.Context, expiredAt time.Time, limit int64) (infos []pieces.ExpiredInfo, err error) { + defer mon.Task()(&ctx)(&err) defer db.locked()() rows, err := db.db.QueryContext(ctx, db.Rebind(` @@ -138,17 +143,18 @@ func (db *pieceinfo) GetExpired(ctx context.Context, expiredAt time.Time, limit } // SpaceUsed calculates disk space used by all pieces -func (db *pieceinfo) SpaceUsed(ctx context.Context) (int64, error) { +func (db *pieceinfo) SpaceUsed(ctx context.Context) (_ int64, err error) { + defer mon.Task()(&ctx)(&err) defer db.locked()() - var sum *int64 - err := db.db.QueryRowContext(ctx, db.Rebind(` + var sum sql.NullInt64 + err = db.db.QueryRowContext(ctx, db.Rebind(` SELECT SUM(piece_size) FROM pieceinfo `)).Scan(&sum) - if err == sql.ErrNoRows || sum == nil { + if err == sql.ErrNoRows || !sum.Valid { return 0, nil } - return *sum, err + return sum.Int64, err } diff --git a/storagenode/storagenodedb/usedserials.go b/storagenode/storagenodedb/usedserials.go index b1c545cc6..73f40c55d 100644 --- a/storagenode/storagenodedb/usedserials.go +++ b/storagenode/storagenodedb/usedserials.go @@ -24,22 +24,24 @@ func (db *DB) UsedSerials() piecestore.UsedSerials { return db.info.UsedSerials( func (db *InfoDB) UsedSerials() piecestore.UsedSerials { return &usedSerials{db} } // Add adds a serial to the database. -func (db *usedSerials) Add(ctx context.Context, satelliteID storj.NodeID, serialNumber storj.SerialNumber, expiration time.Time) error { +func (db *usedSerials) Add(ctx context.Context, satelliteID storj.NodeID, serialNumber storj.SerialNumber, expiration time.Time) (err error) { + defer mon.Task()(&ctx)(&err) defer db.locked()() - _, err := db.db.Exec(` - INSERT INTO - used_serial(satellite_id, serial_number, expiration) + _, err = db.db.Exec(` + INSERT INTO + used_serial(satellite_id, serial_number, expiration) VALUES(?, ?, ?)`, satelliteID, serialNumber, expiration) return ErrInfo.Wrap(err) } // DeleteExpired deletes expired serial numbers -func (db *usedSerials) DeleteExpired(ctx context.Context, now time.Time) error { +func (db *usedSerials) DeleteExpired(ctx context.Context, now time.Time) (err error) { + defer mon.Task()(&ctx)(&err) defer db.locked()() - _, err := db.db.Exec(`DELETE FROM used_serial WHERE expiration < ?`, now) + _, err = db.db.Exec(`DELETE FROM used_serial WHERE expiration < ?`, now) return ErrInfo.Wrap(err) } @@ -47,6 +49,7 @@ func (db *usedSerials) DeleteExpired(ctx context.Context, now time.Time) error { // IterateAll iterates all serials. // Note, this will lock the database and should only be used during startup. func (db *usedSerials) IterateAll(ctx context.Context, fn piecestore.SerialNumberFn) (err error) { + defer mon.Task()(&ctx)(&err) defer db.locked()() rows, err := db.db.Query(`SELECT satellite_id, serial_number, expiration FROM used_serial`) diff --git a/storagenode/trust/service.go b/storagenode/trust/service.go index 5d9ebb281..dc98cc5ef 100644 --- a/storagenode/trust/service.go +++ b/storagenode/trust/service.go @@ -10,6 +10,7 @@ import ( "sync" "github.com/zeebo/errs" + monkit "gopkg.in/spacemonkeygo/monkit.v2" "storj.io/storj/pkg/auth/signing" "storj.io/storj/pkg/identity" @@ -19,6 +20,7 @@ import ( // Error is the default error class var Error = errs.Class("trust:") +var mon = monkit.Package() // Pool implements different peer verifications. type Pool struct { @@ -74,7 +76,8 @@ func NewPool(kademlia *kademlia.Kademlia, trustAll bool, trustedSatelliteIDs str } // VerifySatelliteID checks whether id corresponds to a trusted satellite. -func (pool *Pool) VerifySatelliteID(ctx context.Context, id storj.NodeID) error { +func (pool *Pool) VerifySatelliteID(ctx context.Context, id storj.NodeID) (err error) { + defer mon.Task()(&ctx)(&err) if pool.trustAllSatellites { return nil } @@ -90,14 +93,17 @@ func (pool *Pool) VerifySatelliteID(ctx context.Context, id storj.NodeID) error } // VerifyUplinkID verifides whether id corresponds to a trusted uplink. -func (pool *Pool) VerifyUplinkID(ctx context.Context, id storj.NodeID) error { +func (pool *Pool) VerifyUplinkID(ctx context.Context, id storj.NodeID) (err error) { + defer mon.Task()(&ctx)(&err) // trusting all the uplinks for now return nil } // GetSignee gets the corresponding signee for verifying signatures. // It ignores passed in ctx cancellation to avoid miscaching between concurrent requests. -func (pool *Pool) GetSignee(ctx context.Context, id storj.NodeID) (signing.Signee, error) { +func (pool *Pool) GetSignee(ctx context.Context, id storj.NodeID) (_ signing.Signee, err error) { + defer mon.Task()(&ctx)(&err) + // lookup peer identity with id pool.mu.RLock() info, ok := pool.trustedSatellites[id]