storagenode: add monkit task to missing places (#2107)

This commit is contained in:
JT Olio 2019-06-04 06:31:39 -06:00 committed by Stefan Benten
parent 3fe8343b6c
commit c4bb84f209
15 changed files with 215 additions and 115 deletions

View File

@ -51,7 +51,8 @@ func NewEndpoint(log *zap.Logger, pieceInfo pieces.DB, kademlia *kademlia.Kademl
}
}
func (inspector *Endpoint) retrieveStats(ctx context.Context) (*pb.StatSummaryResponse, error) {
func (inspector *Endpoint) retrieveStats(ctx context.Context) (_ *pb.StatSummaryResponse, err error) {
defer mon.Task()(&ctx)(&err)
// Space Usage
totalUsedSpace, err := inspector.pieceInfo.SpaceUsed(ctx)
@ -93,7 +94,9 @@ func (inspector *Endpoint) Stats(ctx context.Context, in *pb.StatsRequest) (out
return statsSummary, nil
}
func (inspector *Endpoint) getDashboardData(ctx context.Context) (*pb.DashboardResponse, error) {
func (inspector *Endpoint) getDashboardData(ctx context.Context) (_ *pb.DashboardResponse, err error) {
defer mon.Task()(&ctx)(&err)
statsSummary, err := inspector.retrieveStats(ctx)
if err != nil {
return &pb.DashboardResponse{}, Error.Wrap(err)

View File

@ -64,7 +64,7 @@ func (service *Service) Run(ctx context.Context) (err error) {
// get the disk space details
// The returned path ends in a slash only if it represents a root directory, such as "/" on Unix or `C:\` on Windows.
storageStatus, err := service.store.StorageStatus()
storageStatus, err := service.store.StorageStatus(ctx)
if err != nil {
return Error.Wrap(err)
}
@ -122,7 +122,9 @@ func (service *Service) Close() (err error) {
return nil
}
func (service *Service) updateNodeInformation(ctx context.Context) error {
func (service *Service) updateNodeInformation(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
usedSpace, err := service.usedSpace(ctx)
if err != nil {
return Error.Wrap(err)
@ -141,7 +143,8 @@ func (service *Service) updateNodeInformation(ctx context.Context) error {
return nil
}
func (service *Service) usedSpace(ctx context.Context) (int64, error) {
func (service *Service) usedSpace(ctx context.Context) (_ int64, err error) {
defer mon.Task()(&ctx)(&err)
usedSpace, err := service.pieceInfo.SpaceUsed(ctx)
if err != nil {
return 0, err
@ -149,7 +152,8 @@ func (service *Service) usedSpace(ctx context.Context) (int64, error) {
return usedSpace, nil
}
func (service *Service) usedBandwidth(ctx context.Context) (int64, error) {
func (service *Service) usedBandwidth(ctx context.Context) (_ int64, err error) {
defer mon.Task()(&ctx)(&err)
usage, err := bandwidth.TotalMonthlySummary(ctx, service.usageDB)
if err != nil {
return 0, err
@ -158,7 +162,8 @@ func (service *Service) usedBandwidth(ctx context.Context) (int64, error) {
}
// AvailableSpace returns available disk space for upload
func (service *Service) AvailableSpace(ctx context.Context) (int64, error) {
func (service *Service) AvailableSpace(ctx context.Context) (_ int64, err error) {
defer mon.Task()(&ctx)(&err)
usedSpace, err := service.pieceInfo.SpaceUsed(ctx)
if err != nil {
return 0, Error.Wrap(err)
@ -168,7 +173,8 @@ func (service *Service) AvailableSpace(ctx context.Context) (int64, error) {
}
// AvailableBandwidth returns available bandwidth for upload/download
func (service *Service) AvailableBandwidth(ctx context.Context) (int64, error) {
func (service *Service) AvailableBandwidth(ctx context.Context) (_ int64, err error) {
defer mon.Task()(&ctx)(&err)
usage, err := bandwidth.TotalMonthlySummary(ctx, service.usageDB)
if err != nil {
return 0, Error.Wrap(err)

View File

@ -8,8 +8,10 @@ import (
"io"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/internal/sync2"
"storj.io/storj/pkg/identity"
@ -19,6 +21,13 @@ import (
"storj.io/storj/pkg/transport"
)
var (
// OrderError represents errors with orders
OrderError = errs.Class("order")
mon = monkit.Package()
)
// Info contains full information about an order.
type Info struct {
Limit *pb.OrderLimit2
@ -94,66 +103,75 @@ func NewSender(log *zap.Logger, transport transport.Client, kademlia *kademlia.K
}
// Run sends orders on every interval to the appropriate satellites.
func (sender *Sender) Run(ctx context.Context) error {
return sender.Loop.Run(ctx, func(ctx context.Context) error {
sender.log.Debug("sending")
func (sender *Sender) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
return sender.Loop.Run(ctx, sender.runOnce)
}
ordersBySatellite, err := sender.orders.ListUnsentBySatellite(ctx)
if err != nil {
sender.log.Error("listing orders", zap.Error(err))
return nil
}
if len(ordersBySatellite) > 0 {
var group errgroup.Group
ctx, cancel := context.WithTimeout(ctx, sender.config.Timeout)
defer cancel()
for satelliteID, orders := range ordersBySatellite {
satelliteID, orders := satelliteID, orders
group.Go(func() error {
sender.Settle(ctx, satelliteID, orders)
return nil
})
}
_ = group.Wait() // doesn't return errors
} else {
sender.log.Debug("no orders to send")
}
func (sender *Sender) runOnce(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
sender.log.Debug("sending")
ordersBySatellite, err := sender.orders.ListUnsentBySatellite(ctx)
if err != nil {
sender.log.Error("listing orders", zap.Error(err))
return nil
})
}
if len(ordersBySatellite) > 0 {
var group errgroup.Group
ctx, cancel := context.WithTimeout(ctx, sender.config.Timeout)
defer cancel()
for satelliteID, orders := range ordersBySatellite {
satelliteID, orders := satelliteID, orders
group.Go(func() error {
sender.Settle(ctx, satelliteID, orders)
return nil
})
}
_ = group.Wait() // doesn't return errors
} else {
sender.log.Debug("no orders to send")
}
return nil
}
// Settle uploads orders to the satellite.
func (sender *Sender) Settle(ctx context.Context, satelliteID storj.NodeID, orders []*Info) {
log := sender.log.Named(satelliteID.String())
err := sender.settle(ctx, log, satelliteID, orders)
if err != nil {
log.Error("failed to settle orders", zap.Error(err))
}
}
func (sender *Sender) settle(ctx context.Context, log *zap.Logger, satelliteID storj.NodeID, orders []*Info) (err error) {
defer mon.Task()(&ctx)(&err)
log.Info("sending", zap.Int("count", len(orders)))
defer log.Info("finished")
satellite, err := sender.kademlia.FindNode(ctx, satelliteID)
if err != nil {
log.Error("unable to find satellite on the network", zap.Error(err))
return
return OrderError.New("unable to find satellite on the network: %v", err)
}
conn, err := sender.transport.DialNode(ctx, &satellite)
if err != nil {
log.Error("unable to connect to the satellite", zap.Error(err))
return
return OrderError.New("unable to connect to the satellite: %v", err)
}
defer func() {
if err := conn.Close(); err != nil {
log.Warn("failed to close connection", zap.Error(err))
if cerr := conn.Close(); cerr != nil {
err = errs.Combine(err, OrderError.New("failed to close connection: %v", err))
}
}()
client, err := pb.NewOrdersClient(conn).Settlement(ctx)
if err != nil {
log.Error("failed to start settlement", zap.Error(err))
return
return OrderError.New("failed to start settlement: %v", err)
}
var group errgroup.Group
@ -170,6 +188,11 @@ func (sender *Sender) Settle(ctx context.Context, satelliteID storj.NodeID, orde
return client.CloseSend()
})
var errList errs.Group
errHandle := func(cls errs.Class, format string, args ...interface{}) {
log.Sugar().Errorf(format, args...)
errList.Add(cls.New(format, args...))
}
for {
response, err := client.Recv()
if err != nil {
@ -177,7 +200,7 @@ func (sender *Sender) Settle(ctx context.Context, satelliteID storj.NodeID, orde
break
}
log.Error("failed to receive response", zap.Error(err))
errHandle(OrderError, "failed to receive response: %v", err)
break
}
@ -185,21 +208,23 @@ func (sender *Sender) Settle(ctx context.Context, satelliteID storj.NodeID, orde
case pb.SettlementResponse_ACCEPTED:
err = sender.orders.Archive(ctx, satelliteID, response.SerialNumber, StatusAccepted)
if err != nil {
log.Error("failed to archive order as accepted", zap.Stringer("serial", response.SerialNumber), zap.Error(err))
errHandle(OrderError, "failed to archive order as accepted: serial: %v, %v", response.SerialNumber, err)
}
case pb.SettlementResponse_REJECTED:
err = sender.orders.Archive(ctx, satelliteID, response.SerialNumber, StatusRejected)
if err != nil {
log.Error("failed to archive order as rejected", zap.Stringer("serial", response.SerialNumber), zap.Error(err))
errHandle(OrderError, "failed to archive order as rejected: serial: %v, %v", response.SerialNumber, err)
}
default:
log.Error("unexpected response", zap.Error(err))
errHandle(OrderError, "unexpected response: %v", response.Status)
}
}
if err := group.Wait(); err != nil {
log.Error("sending agreements returned an error", zap.Error(err))
errHandle(OrderError, "sending aggreements returned an error: %v", err)
}
return errList.Err()
}
// Close stops the sending service.

View File

@ -9,6 +9,7 @@ import (
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/internal/errs2"
"storj.io/storj/internal/version"
@ -32,6 +33,10 @@ import (
"storj.io/storj/storagenode/trust"
)
var (
mon = monkit.Package()
)
// DB is the master database for Storage Node
type DB interface {
// CreateTables initializes the database
@ -249,7 +254,9 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config, ver
}
// Run runs storage node until it's either closed or it errors.
func (peer *Peer) Run(ctx context.Context) error {
func (peer *Peer) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
group, ctx := errgroup.WithContext(ctx)
group.Go(func() error {

View File

@ -5,6 +5,7 @@ package pieces
import (
"bufio"
"context"
"hash"
"io"
@ -48,7 +49,8 @@ func (w *Writer) Size() int64 { return w.size }
func (w *Writer) Hash() []byte { return w.hash.Sum(nil) }
// Commit commits piece to permanent storage.
func (w *Writer) Commit() error {
func (w *Writer) Commit(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
if w.closed {
return Error.New("already closed")
}
@ -60,7 +62,8 @@ func (w *Writer) Commit() error {
}
// Cancel deletes any temporarily written data.
func (w *Writer) Cancel() error {
func (w *Writer) Cancel(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
if w.closed {
return nil
}

View File

@ -10,6 +10,7 @@ import (
"github.com/zeebo/errs"
"go.uber.org/zap"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/internal/memory"
"storj.io/storj/pkg/identity"
@ -24,8 +25,12 @@ const (
preallocSize = 4 * memory.MiB
)
// Error is the default error class.
var Error = errs.Class("pieces error")
var (
// Error is the default error class.
Error = errs.Class("pieces error")
mon = monkit.Package()
)
// Info contains all the information we need to know about a Piece to manage them.
type Info struct {
@ -77,7 +82,8 @@ func NewStore(log *zap.Logger, blobs storage.Blobs) *Store {
}
// Writer returns a new piece writer.
func (store *Store) Writer(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (*Writer, error) {
func (store *Store) Writer(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (_ *Writer, err error) {
defer mon.Task()(&ctx)(&err)
blob, err := store.blobs.Create(ctx, storage.BlobRef{
Namespace: satellite.Bytes(),
Key: pieceID.Bytes(),
@ -91,7 +97,8 @@ func (store *Store) Writer(ctx context.Context, satellite storj.NodeID, pieceID
}
// Reader returns a new piece reader.
func (store *Store) Reader(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (*Reader, error) {
func (store *Store) Reader(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (_ *Reader, err error) {
defer mon.Task()(&ctx)(&err)
blob, err := store.blobs.Open(ctx, storage.BlobRef{
Namespace: satellite.Bytes(),
Key: pieceID.Bytes(),
@ -108,8 +115,9 @@ func (store *Store) Reader(ctx context.Context, satellite storj.NodeID, pieceID
}
// Delete deletes the specified piece.
func (store *Store) Delete(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) error {
err := store.blobs.Delete(ctx, storage.BlobRef{
func (store *Store) Delete(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (err error) {
defer mon.Task()(&ctx)(&err)
err = store.blobs.Delete(ctx, storage.BlobRef{
Namespace: satellite.Bytes(),
Key: pieceID.Bytes(),
})
@ -123,7 +131,8 @@ type StorageStatus struct {
}
// StorageStatus returns information about the disk.
func (store *Store) StorageStatus() (StorageStatus, error) {
func (store *Store) StorageStatus(ctx context.Context) (_ StorageStatus, err error) {
defer mon.Task()(&ctx)(&err)
diskFree, err := store.blobs.FreeSpace()
if err != nil {
return StorageStatus{}, err

View File

@ -54,9 +54,9 @@ func TestPieces(t *testing.T) {
assert.Equal(t, hash.Sum(nil), writer.Hash())
// commit
require.NoError(t, writer.Commit())
require.NoError(t, writer.Commit(ctx))
// after commit we should be able to call cancel without an error
require.NoError(t, writer.Cancel())
require.NoError(t, writer.Cancel(ctx))
}
{ // valid reads
@ -101,9 +101,9 @@ func TestPieces(t *testing.T) {
assert.Equal(t, len(source), int(writer.Size()))
// cancel writing
require.NoError(t, writer.Cancel())
require.NoError(t, writer.Cancel(ctx))
// commit should not fail
require.Error(t, writer.Commit())
require.Error(t, writer.Commit(ctx))
// read should fail
_, err = store.Reader(ctx, satelliteID, cancelledPieceID)

View File

@ -194,7 +194,7 @@ func (endpoint *Endpoint) Upload(stream pb.Piecestore_UploadServer) (err error)
}
defer func() {
// cancel error if it hasn't been committed
if cancelErr := pieceWriter.Cancel(); cancelErr != nil {
if cancelErr := pieceWriter.Cancel(ctx); cancelErr != nil {
endpoint.log.Error("error during canceling a piece write", zap.Error(cancelErr))
}
}()
@ -263,7 +263,7 @@ func (endpoint *Endpoint) Upload(stream pb.Piecestore_UploadServer) (err error)
return err // TODO: report grpc status internal server error
}
if err := pieceWriter.Commit(); err != nil {
if err := pieceWriter.Commit(ctx); err != nil {
return ErrInternal.Wrap(err) // TODO: report grpc status internal server error
}
@ -494,11 +494,14 @@ func (endpoint *Endpoint) Download(stream pb.Piecestore_DownloadServer) (err err
// SaveOrder saves the order with all necessary information. It assumes it has been already verified.
func (endpoint *Endpoint) SaveOrder(ctx context.Context, limit *pb.OrderLimit2, order *pb.Order2, uplink *identity.PeerIdentity) {
var err error
defer mon.Task()(&ctx)(&err)
// TODO: do this in a goroutine
if order == nil || order.Amount <= 0 {
return
}
err := endpoint.orders.Enqueue(ctx, &orders.Info{
err = endpoint.orders.Enqueue(ctx, &orders.Info{
Limit: limit,
Order: order,
Uplink: uplink,
@ -506,7 +509,7 @@ func (endpoint *Endpoint) SaveOrder(ctx context.Context, limit *pb.OrderLimit2,
if err != nil {
endpoint.log.Error("failed to add order", zap.Error(err))
} else {
err := endpoint.usage.Add(ctx, limit.SatelliteId, limit.Action, order.Amount, time.Now())
err = endpoint.usage.Add(ctx, limit.SatelliteId, limit.Action, order.Amount, time.Now())
if err != nil {
endpoint.log.Error("failed to add bandwidth usage", zap.Error(err))
}

View File

@ -28,7 +28,9 @@ var (
// VerifyOrderLimit verifies that the order limit is properly signed and has sane values.
// It also verifies that the serial number has not been used.
func (endpoint *Endpoint) VerifyOrderLimit(ctx context.Context, limit *pb.OrderLimit2) error {
func (endpoint *Endpoint) VerifyOrderLimit(ctx context.Context, limit *pb.OrderLimit2) (err error) {
defer mon.Task()(&ctx)(&err)
// sanity checks
switch {
case limit.Limit < 0:
@ -85,7 +87,9 @@ func (endpoint *Endpoint) VerifyOrderLimit(ctx context.Context, limit *pb.OrderL
}
// VerifyOrder verifies that the order corresponds to the order limit and has all the necessary fields.
func (endpoint *Endpoint) VerifyOrder(ctx context.Context, peer *identity.PeerIdentity, limit *pb.OrderLimit2, order *pb.Order2, largestOrderAmount int64) error {
func (endpoint *Endpoint) VerifyOrder(ctx context.Context, peer *identity.PeerIdentity, limit *pb.OrderLimit2, order *pb.Order2, largestOrderAmount int64) (err error) {
defer mon.Task()(&ctx)(&err)
if order.SerialNumber != limit.SerialNumber {
return ErrProtocol.New("order serial number changed during upload") // TODO: report grpc status bad message
}
@ -105,7 +109,9 @@ func (endpoint *Endpoint) VerifyOrder(ctx context.Context, peer *identity.PeerId
}
// VerifyPieceHash verifies whether the piece hash is properly signed and matches the locally computed hash.
func (endpoint *Endpoint) VerifyPieceHash(ctx context.Context, peer *identity.PeerIdentity, limit *pb.OrderLimit2, hash *pb.PieceHash, expectedHash []byte) error {
func (endpoint *Endpoint) VerifyPieceHash(ctx context.Context, peer *identity.PeerIdentity, limit *pb.OrderLimit2, hash *pb.PieceHash, expectedHash []byte) (err error) {
defer mon.Task()(&ctx)(&err)
if peer == nil || limit == nil || hash == nil || len(expectedHash) == 0 {
return ErrProtocol.New("invalid arguments")
}
@ -124,7 +130,9 @@ func (endpoint *Endpoint) VerifyPieceHash(ctx context.Context, peer *identity.Pe
}
// VerifyOrderLimitSignature verifies that the order limit signature is valid.
func (endpoint *Endpoint) VerifyOrderLimitSignature(ctx context.Context, limit *pb.OrderLimit2) error {
func (endpoint *Endpoint) VerifyOrderLimitSignature(ctx context.Context, limit *pb.OrderLimit2) (err error) {
defer mon.Task()(&ctx)(&err)
signee, err := endpoint.trust.GetSignee(ctx, limit.SatelliteId)
if err != nil {
if err == context.Canceled {

View File

@ -24,11 +24,12 @@ func (db *DB) Bandwidth() bandwidth.DB { return db.info.Bandwidth() }
func (db *InfoDB) Bandwidth() bandwidth.DB { return &bandwidthdb{db} }
// Add adds bandwidth usage to the table
func (db *bandwidthdb) Add(ctx context.Context, satelliteID storj.NodeID, action pb.PieceAction, amount int64, created time.Time) error {
func (db *bandwidthdb) Add(ctx context.Context, satelliteID storj.NodeID, action pb.PieceAction, amount int64, created time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
_, err := db.db.Exec(`
INSERT INTO
_, err = db.db.Exec(`
INSERT INTO
bandwidth_usage(satellite_id, action, amount, created_at)
VALUES(?, ?, ?, ?)`, satelliteID, action, amount, created)
@ -37,12 +38,13 @@ func (db *bandwidthdb) Add(ctx context.Context, satelliteID storj.NodeID, action
// Summary returns summary of bandwidth usages
func (db *bandwidthdb) Summary(ctx context.Context, from, to time.Time) (_ *bandwidth.Usage, err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
usage := &bandwidth.Usage{}
rows, err := db.db.Query(`
SELECT action, sum(amount)
SELECT action, sum(amount)
FROM bandwidth_usage
WHERE ? <= created_at AND created_at <= ?
GROUP BY action`, from, to)
@ -69,12 +71,13 @@ func (db *bandwidthdb) Summary(ctx context.Context, from, to time.Time) (_ *band
// SummaryBySatellite returns summary of bandwidth usage grouping by satellite.
func (db *bandwidthdb) SummaryBySatellite(ctx context.Context, from, to time.Time) (_ map[storj.NodeID]*bandwidth.Usage, err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
entries := map[storj.NodeID]*bandwidth.Usage{}
rows, err := db.db.Query(`
SELECT satellite_id, action, sum(amount)
SELECT satellite_id, action, sum(amount)
FROM bandwidth_usage
WHERE ? <= created_at AND created_at <= ?
GROUP BY satellite_id, action`, from, to)

View File

@ -8,12 +8,18 @@ import (
"crypto/x509"
"encoding/asn1"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/internal/dbutil/sqliteutil"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/pkcrypto"
"storj.io/storj/storagenode/trust"
)
var (
mon = monkit.Package()
)
type certdb struct {
*InfoDB
}
@ -26,6 +32,8 @@ func (db *InfoDB) CertDB() trust.CertDB { return &certdb{db} }
// Include includes the certificate in the table and returns an unique id.
func (db *certdb) Include(ctx context.Context, pi *identity.PeerIdentity) (certid int64, err error) {
defer mon.Task()(&ctx)(&err)
chain := encodePeerIdentity(pi)
defer db.locked()()
@ -43,11 +51,13 @@ func (db *certdb) Include(ctx context.Context, pi *identity.PeerIdentity) (certi
}
// LookupByCertID finds certificate by the certid returned by Include.
func (db *certdb) LookupByCertID(ctx context.Context, id int64) (*identity.PeerIdentity, error) {
func (db *certdb) LookupByCertID(ctx context.Context, id int64) (_ *identity.PeerIdentity, err error) {
defer mon.Task()(&ctx)(&err)
var pem *[]byte
db.mu.Lock()
err := db.db.QueryRow(`SELECT peer_identity FROM certificate WHERE cert_id = ?`, id).Scan(&pem)
err = db.db.QueryRow(`SELECT peer_identity FROM certificate WHERE cert_id = ?`, id).Scan(&pem)
db.mu.Unlock()
if err != nil {
@ -57,7 +67,7 @@ func (db *certdb) LookupByCertID(ctx context.Context, id int64) (*identity.PeerI
return nil, ErrInfo.New("did not find certificate")
}
peer, err := decodePeerIdentity(*pem)
peer, err := decodePeerIdentity(ctx, *pem)
return peer, ErrInfo.Wrap(err)
}
@ -72,7 +82,9 @@ func encodePeerIdentity(pi *identity.PeerIdentity) []byte {
return chain
}
func decodePeerIdentity(chain []byte) (*identity.PeerIdentity, error) {
func decodePeerIdentity(ctx context.Context, chain []byte) (_ *identity.PeerIdentity, err error) {
defer mon.Task()(&ctx)(&err)
var certs []*x509.Certificate
for len(chain) > 0 {
var raw asn1.RawValue

View File

@ -26,7 +26,9 @@ func (db *DB) Orders() orders.DB { return db.info.Orders() }
func (db *InfoDB) Orders() orders.DB { return &ordersdb{db} }
// Enqueue inserts order to the unsent list
func (db *ordersdb) Enqueue(ctx context.Context, info *orders.Info) error {
func (db *ordersdb) Enqueue(ctx context.Context, info *orders.Info) (err error) {
defer mon.Task()(&ctx)(&err)
certdb := db.CertDB()
uplinkCertID, err := certdb.Include(ctx, info.Uplink)
@ -64,6 +66,7 @@ func (db *ordersdb) Enqueue(ctx context.Context, info *orders.Info) error {
// ListUnsent returns orders that haven't been sent yet.
func (db *ordersdb) ListUnsent(ctx context.Context, limit int) (_ []*orders.Info, err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
rows, err := db.db.Query(`
@ -105,7 +108,7 @@ func (db *ordersdb) ListUnsent(ctx context.Context, limit int) (_ []*orders.Info
return nil, ErrInfo.Wrap(err)
}
info.Uplink, err = decodePeerIdentity(uplinkIdentity)
info.Uplink, err = decodePeerIdentity(ctx, uplinkIdentity)
if err != nil {
return nil, ErrInfo.Wrap(err)
}
@ -118,7 +121,8 @@ func (db *ordersdb) ListUnsent(ctx context.Context, limit int) (_ []*orders.Info
// ListUnsentBySatellite returns orders that haven't been sent yet grouped by satellite.
// Does not return uplink identity.
func (db *ordersdb) ListUnsentBySatellite(ctx context.Context) (map[storj.NodeID][]*orders.Info, error) {
func (db *ordersdb) ListUnsentBySatellite(ctx context.Context) (_ map[storj.NodeID][]*orders.Info, err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
// TODO: add some limiting
@ -165,7 +169,8 @@ func (db *ordersdb) ListUnsentBySatellite(ctx context.Context) (map[storj.NodeID
}
// Archive marks order as being handled.
func (db *ordersdb) Archive(ctx context.Context, satellite storj.NodeID, serial storj.SerialNumber, status orders.Status) error {
func (db *ordersdb) Archive(ctx context.Context, satellite storj.NodeID, serial storj.SerialNumber, status orders.Status) (err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
result, err := db.db.Exec(`
@ -174,15 +179,15 @@ func (db *ordersdb) Archive(ctx context.Context, satellite storj.NodeID, serial
order_limit_serialized, order_serialized,
uplink_cert_id,
status, archived_at
) SELECT
) SELECT
satellite_id, serial_number,
order_limit_serialized, order_serialized,
order_limit_serialized, order_serialized,
uplink_cert_id,
?, ?
FROM unsent_order
WHERE satellite_id = ? AND serial_number = ?;
DELETE FROM unsent_order
DELETE FROM unsent_order
WHERE satellite_id = ? AND serial_number = ?;
`, int(status), time.Now(), satellite, serial, satellite, serial)
if err != nil {
@ -201,11 +206,12 @@ func (db *ordersdb) Archive(ctx context.Context, satellite storj.NodeID, serial
}
// ListArchived returns orders that have been sent.
func (db *ordersdb) ListArchived(ctx context.Context, limit int) ([]*orders.ArchivedInfo, error) {
func (db *ordersdb) ListArchived(ctx context.Context, limit int) (_ []*orders.ArchivedInfo, err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
rows, err := db.db.Query(`
SELECT order_limit_serialized, order_serialized, certificate.peer_identity,
SELECT order_limit_serialized, order_serialized, certificate.peer_identity,
status, archived_at
FROM order_archive
INNER JOIN certificate on order_archive.uplink_cert_id = certificate.cert_id
@ -250,7 +256,7 @@ func (db *ordersdb) ListArchived(ctx context.Context, limit int) ([]*orders.Arch
return nil, ErrInfo.Wrap(err)
}
info.Uplink, err = decodePeerIdentity(uplinkIdentity)
info.Uplink, err = decodePeerIdentity(ctx, uplinkIdentity)
if err != nil {
return nil, ErrInfo.Wrap(err)
}

View File

@ -25,7 +25,8 @@ func (db *DB) PieceInfo() pieces.DB { return db.info.PieceInfo() }
func (db *InfoDB) PieceInfo() pieces.DB { return &pieceinfo{db} }
// Add inserts piece information into the database.
func (db *pieceinfo) Add(ctx context.Context, info *pieces.Info) error {
func (db *pieceinfo) Add(ctx context.Context, info *pieces.Info) (err error) {
defer mon.Task()(&ctx)(&err)
certdb := db.CertDB()
certid, err := certdb.Include(ctx, info.Uplink)
if err != nil {
@ -49,7 +50,8 @@ func (db *pieceinfo) Add(ctx context.Context, info *pieces.Info) error {
}
// Get gets piece information by satellite id and piece id.
func (db *pieceinfo) Get(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) (*pieces.Info, error) {
func (db *pieceinfo) Get(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) (_ *pieces.Info, err error) {
defer mon.Task()(&ctx)(&err)
info := &pieces.Info{}
info.SatelliteID = satelliteID
info.PieceID = pieceID
@ -58,7 +60,7 @@ func (db *pieceinfo) Get(ctx context.Context, satelliteID storj.NodeID, pieceID
var uplinkIdentity []byte
db.mu.Lock()
err := db.db.QueryRowContext(ctx, db.Rebind(`
err = db.db.QueryRowContext(ctx, db.Rebind(`
SELECT piece_size, piece_expiration, uplink_piece_hash, certificate.peer_identity
FROM pieceinfo
INNER JOIN certificate ON pieceinfo.uplink_cert_id = certificate.cert_id
@ -76,7 +78,7 @@ func (db *pieceinfo) Get(ctx context.Context, satelliteID storj.NodeID, pieceID
return nil, ErrInfo.Wrap(err)
}
info.Uplink, err = decodePeerIdentity(uplinkIdentity)
info.Uplink, err = decodePeerIdentity(ctx, uplinkIdentity)
if err != nil {
return nil, ErrInfo.Wrap(err)
}
@ -85,12 +87,13 @@ func (db *pieceinfo) Get(ctx context.Context, satelliteID storj.NodeID, pieceID
}
// Delete deletes piece information.
func (db *pieceinfo) Delete(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) error {
func (db *pieceinfo) Delete(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) (err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
_, err := db.db.ExecContext(ctx, db.Rebind(`
DELETE FROM pieceinfo
WHERE satellite_id = ?
_, err = db.db.ExecContext(ctx, db.Rebind(`
DELETE FROM pieceinfo
WHERE satellite_id = ?
AND piece_id = ?
`), satelliteID, pieceID)
@ -98,13 +101,14 @@ func (db *pieceinfo) Delete(ctx context.Context, satelliteID storj.NodeID, piece
}
// DeleteFailed marks piece as a failed deletion.
func (db *pieceinfo) DeleteFailed(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID, now time.Time) error {
func (db *pieceinfo) DeleteFailed(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID, now time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
_, err := db.db.ExecContext(ctx, db.Rebind(`
UPDATE pieceinfo
_, err = db.db.ExecContext(ctx, db.Rebind(`
UPDATE pieceinfo
SET deletion_failed_at = ?
WHERE satellite_id = ?
WHERE satellite_id = ?
AND piece_id = ?
`), now, satelliteID, pieceID)
@ -113,6 +117,7 @@ func (db *pieceinfo) DeleteFailed(ctx context.Context, satelliteID storj.NodeID,
// GetExpired gets pieceinformation identites that are expired.
func (db *pieceinfo) GetExpired(ctx context.Context, expiredAt time.Time, limit int64) (infos []pieces.ExpiredInfo, err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
rows, err := db.db.QueryContext(ctx, db.Rebind(`
@ -138,17 +143,18 @@ func (db *pieceinfo) GetExpired(ctx context.Context, expiredAt time.Time, limit
}
// SpaceUsed calculates disk space used by all pieces
func (db *pieceinfo) SpaceUsed(ctx context.Context) (int64, error) {
func (db *pieceinfo) SpaceUsed(ctx context.Context) (_ int64, err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
var sum *int64
err := db.db.QueryRowContext(ctx, db.Rebind(`
var sum sql.NullInt64
err = db.db.QueryRowContext(ctx, db.Rebind(`
SELECT SUM(piece_size)
FROM pieceinfo
`)).Scan(&sum)
if err == sql.ErrNoRows || sum == nil {
if err == sql.ErrNoRows || !sum.Valid {
return 0, nil
}
return *sum, err
return sum.Int64, err
}

View File

@ -24,22 +24,24 @@ func (db *DB) UsedSerials() piecestore.UsedSerials { return db.info.UsedSerials(
func (db *InfoDB) UsedSerials() piecestore.UsedSerials { return &usedSerials{db} }
// Add adds a serial to the database.
func (db *usedSerials) Add(ctx context.Context, satelliteID storj.NodeID, serialNumber storj.SerialNumber, expiration time.Time) error {
func (db *usedSerials) Add(ctx context.Context, satelliteID storj.NodeID, serialNumber storj.SerialNumber, expiration time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
_, err := db.db.Exec(`
INSERT INTO
used_serial(satellite_id, serial_number, expiration)
_, err = db.db.Exec(`
INSERT INTO
used_serial(satellite_id, serial_number, expiration)
VALUES(?, ?, ?)`, satelliteID, serialNumber, expiration)
return ErrInfo.Wrap(err)
}
// DeleteExpired deletes expired serial numbers
func (db *usedSerials) DeleteExpired(ctx context.Context, now time.Time) error {
func (db *usedSerials) DeleteExpired(ctx context.Context, now time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
_, err := db.db.Exec(`DELETE FROM used_serial WHERE expiration < ?`, now)
_, err = db.db.Exec(`DELETE FROM used_serial WHERE expiration < ?`, now)
return ErrInfo.Wrap(err)
}
@ -47,6 +49,7 @@ func (db *usedSerials) DeleteExpired(ctx context.Context, now time.Time) error {
// IterateAll iterates all serials.
// Note, this will lock the database and should only be used during startup.
func (db *usedSerials) IterateAll(ctx context.Context, fn piecestore.SerialNumberFn) (err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
rows, err := db.db.Query(`SELECT satellite_id, serial_number, expiration FROM used_serial`)

View File

@ -10,6 +10,7 @@ import (
"sync"
"github.com/zeebo/errs"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/pkg/auth/signing"
"storj.io/storj/pkg/identity"
@ -19,6 +20,7 @@ import (
// Error is the default error class
var Error = errs.Class("trust:")
var mon = monkit.Package()
// Pool implements different peer verifications.
type Pool struct {
@ -74,7 +76,8 @@ func NewPool(kademlia *kademlia.Kademlia, trustAll bool, trustedSatelliteIDs str
}
// VerifySatelliteID checks whether id corresponds to a trusted satellite.
func (pool *Pool) VerifySatelliteID(ctx context.Context, id storj.NodeID) error {
func (pool *Pool) VerifySatelliteID(ctx context.Context, id storj.NodeID) (err error) {
defer mon.Task()(&ctx)(&err)
if pool.trustAllSatellites {
return nil
}
@ -90,14 +93,17 @@ func (pool *Pool) VerifySatelliteID(ctx context.Context, id storj.NodeID) error
}
// VerifyUplinkID verifides whether id corresponds to a trusted uplink.
func (pool *Pool) VerifyUplinkID(ctx context.Context, id storj.NodeID) error {
func (pool *Pool) VerifyUplinkID(ctx context.Context, id storj.NodeID) (err error) {
defer mon.Task()(&ctx)(&err)
// trusting all the uplinks for now
return nil
}
// GetSignee gets the corresponding signee for verifying signatures.
// It ignores passed in ctx cancellation to avoid miscaching between concurrent requests.
func (pool *Pool) GetSignee(ctx context.Context, id storj.NodeID) (signing.Signee, error) {
func (pool *Pool) GetSignee(ctx context.Context, id storj.NodeID) (_ signing.Signee, err error) {
defer mon.Task()(&ctx)(&err)
// lookup peer identity with id
pool.mu.RLock()
info, ok := pool.trustedSatellites[id]