storagenode: No FATAL error when unsent orders aren't found (#2801)

* pkg/process: Fatal show complete error information
  Change the general process execution function to not using the sugared
  logger for outputting the full error information.
  Delete some unreachable code because Zap logger Fatal method calls exit
  1 internally.
* storagenode/storagenodedb: Add info to error
  Add more information to an error returned due to some data
  inconsistency.
* storagenode/orders: Don't use sugared logger
  Don't use sugar logger and provide better contextualized error messages
  in settle method.
* storagenode/orders: Add some log fields to error msgs
  Add some relevant log fields to some logged errors of the sender settle
  method.
* satellite/orders: Remove always nil error from debug
  Remove an error which as logged in debug level which was always nil and
  makes the logic that used this variable clear.
* storagenode/orders: Don't return error Archiving unsent
  Don't stop the process which archive unsent orders if some of them
  aren't found the DB because it cause the Storage Node to stop with a
  fatal error.
This commit is contained in:
Ivan Fraixedes 2019-08-16 16:53:22 +02:00 committed by GitHub
parent b1abbe5ce3
commit e47b8ed131
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 150 additions and 58 deletions

View File

@ -282,11 +282,11 @@ func cleanup(cmd *cobra.Command) {
err = workErr
if err != nil {
logger.Sugar().Fatal(err)
_ = logger.Sync()
os.Exit(1)
// This function call os.Exit(1)
logger.Fatal("Unrecoverable error", zap.Error(err))
}
return err
return nil
}
}

View File

@ -177,8 +177,8 @@ func (endpoint *Endpoint) Settlement(stream pb.Orders_SettlementServer) (err err
}
return nil
}()
if rejectErr != err {
log.Debug("order limit/order verification failed", zap.Stringer("serial", orderLimit.SerialNumber), zap.Error(err), zap.Error(rejectErr))
if rejectErr != nil {
log.Debug("order limit/order verification failed", zap.Stringer("serial", orderLimit.SerialNumber), zap.Error(rejectErr))
err := monitoredSettlementStreamSend(ctx, stream, &pb.SettlementResponse{
SerialNumber: orderLimit.SerialNumber,
Status: pb.SettlementResponse_REJECTED,

View File

@ -34,8 +34,6 @@ func TestDB(t *testing.T) {
piece := storj.NewPieceID()
serialNumber := testrand.SerialNumber()
// basic test
emptyUnsent, err := ordersdb.ListUnsent(ctx, 100)
require.NoError(t, err)
@ -50,42 +48,57 @@ func TestDB(t *testing.T) {
piecePublicKey, piecePrivateKey, err := storj.NewPieceKey()
require.NoError(t, err)
limit, err := signing.SignOrderLimit(ctx, signing.SignerFromFullIdentity(satellite0), &pb.OrderLimit{
SerialNumber: serialNumber,
SatelliteId: satellite0.ID,
UplinkPublicKey: piecePublicKey,
StorageNodeId: storagenode.ID,
PieceId: piece,
Limit: 100,
Action: pb.PieceAction_GET,
OrderCreation: now.AddDate(0, 0, -1),
PieceExpiration: now,
OrderExpiration: now,
})
require.NoError(t, err)
infos := make([]*orders.Info, 2)
for i := 0; i < len(infos); i++ {
order, err := signing.SignUplinkOrder(ctx, piecePrivateKey, &pb.Order{
SerialNumber: serialNumber,
Amount: 50,
})
require.NoError(t, err)
serialNumber := testrand.SerialNumber()
limit, err := signing.SignOrderLimit(ctx, signing.SignerFromFullIdentity(satellite0), &pb.OrderLimit{
SerialNumber: serialNumber,
SatelliteId: satellite0.ID,
UplinkPublicKey: piecePublicKey,
StorageNodeId: storagenode.ID,
PieceId: piece,
Limit: 100,
Action: pb.PieceAction_GET,
OrderCreation: now.AddDate(0, 0, -1),
PieceExpiration: now,
OrderExpiration: now,
})
require.NoError(t, err)
info := &orders.Info{
Limit: limit,
Order: order,
order, err := signing.SignUplinkOrder(ctx, piecePrivateKey, &pb.Order{
SerialNumber: serialNumber,
Amount: 50,
})
require.NoError(t, err)
infos[i] = &orders.Info{
Limit: limit,
Order: order,
}
}
// basic add
err = ordersdb.Enqueue(ctx, info)
err = ordersdb.Enqueue(ctx, infos[0])
require.NoError(t, err)
// duplicate add
err = ordersdb.Enqueue(ctx, info)
err = ordersdb.Enqueue(ctx, infos[0])
require.Error(t, err, "duplicate add")
unsent, err := ordersdb.ListUnsent(ctx, 100)
require.NoError(t, err)
require.Empty(t, cmp.Diff([]*orders.Info{info}, unsent, cmp.Comparer(pb.Equal)))
require.Empty(t, cmp.Diff([]*orders.Info{infos[0]}, unsent, cmp.Comparer(pb.Equal)))
// Another add
err = ordersdb.Enqueue(ctx, infos[1])
require.NoError(t, err)
unsent, err = ordersdb.ListUnsent(ctx, 100)
require.NoError(t, err)
require.Empty(t,
cmp.Diff([]*orders.Info{infos[0], infos[1]}, unsent, cmp.Comparer(pb.Equal)),
)
// list by group
unsentGrouped, err := ordersdb.ListUnsentBySatellite(ctx)
@ -93,18 +106,47 @@ func TestDB(t *testing.T) {
expectedGrouped := map[storj.NodeID][]*orders.Info{
satellite0.ID: {
{Limit: limit, Order: order},
{Limit: infos[0].Limit, Order: infos[0].Order},
{Limit: infos[1].Limit, Order: infos[1].Order},
},
}
require.Empty(t, cmp.Diff(expectedGrouped, unsentGrouped, cmp.Comparer(pb.Equal)))
// test archival
err = ordersdb.Archive(ctx, orders.ArchiveRequest{satellite0.ID, serialNumber, orders.StatusAccepted})
err = ordersdb.Archive(ctx, orders.ArchiveRequest{
Satellite: satellite0.ID,
Serial: infos[0].Limit.SerialNumber,
Status: orders.StatusAccepted,
})
require.NoError(t, err)
// duplicate archive
err = ordersdb.Archive(ctx, orders.ArchiveRequest{satellite0.ID, serialNumber, orders.StatusRejected})
err = ordersdb.Archive(ctx, orders.ArchiveRequest{
Satellite: satellite0.ID,
Serial: infos[0].Limit.SerialNumber,
Status: orders.StatusRejected,
})
require.Error(t, err)
require.True(t,
orders.OrderNotFoundError.Has(err),
"expected orders.OrderNotFoundError class",
)
// one new archive and one duplicated
err = ordersdb.Archive(ctx, orders.ArchiveRequest{
Satellite: satellite0.ID,
Serial: infos[0].Limit.SerialNumber,
Status: orders.StatusRejected,
}, orders.ArchiveRequest{
Satellite: satellite0.ID,
Serial: infos[1].Limit.SerialNumber,
Status: orders.StatusRejected,
})
require.Error(t, err)
require.True(t,
orders.OrderNotFoundError.Has(err),
"expected ErrUnsentOrderNotFoundError class",
)
// shouldn't be in unsent list
unsent, err = ordersdb.ListUnsent(ctx, 100)
@ -114,16 +156,23 @@ func TestDB(t *testing.T) {
// it should now be in the archive
archived, err := ordersdb.ListArchived(ctx, 100)
require.NoError(t, err)
require.Len(t, archived, 1)
require.Len(t, archived, 2)
require.Empty(t, cmp.Diff([]*orders.ArchivedInfo{
{
Limit: limit,
Order: order,
Limit: infos[0].Limit,
Order: infos[0].Order,
Status: orders.StatusAccepted,
ArchivedAt: archived[0].ArchivedAt,
},
{
Limit: infos[1].Limit,
Order: infos[1].Order,
Status: orders.StatusRejected,
ArchivedAt: archived[1].ArchivedAt,
},
}, archived, cmp.Comparer(pb.Equal)))
// with 1 hour ttl, archived order should not be deleted
@ -134,7 +183,7 @@ func TestDB(t *testing.T) {
// with 1 nanosecond ttl, archived order should be deleted
n, err = db.Orders().CleanArchive(ctx, time.Nanosecond)
require.NoError(t, err)
require.Equal(t, 1, n)
require.Equal(t, 2, n)
})
}

View File

@ -23,6 +23,8 @@ import (
var (
// OrderError represents errors with orders
OrderError = errs.Class("order")
// OrderNotFoundError is the error returned when an order is not found
OrderNotFoundError = errs.Class("order not found")
mon = monkit.Package()
)
@ -178,7 +180,11 @@ func (sender *Sender) handleBatches(ctx context.Context, requests chan ArchiveRe
}
if err := sender.orders.Archive(ctx, buffer...); err != nil {
return err
if !OrderNotFoundError.Has(err) {
return err
}
sender.log.Warn("some unsent order aren't in the DB", zap.Error(err))
}
buffer = buffer[:0]
}
@ -222,25 +228,38 @@ func (sender *Sender) settle(ctx context.Context, log *zap.Logger, satelliteID s
return OrderError.New("failed to start settlement: %v", err)
}
var group errgroup.Group
var (
errList errs.Group
group errgroup.Group
)
group.Go(func() error {
for _, order := range orders {
err := client.Send(&pb.SettlementRequest{
req := pb.SettlementRequest{
Limit: order.Limit,
Order: order.Order,
})
}
err := client.Send(&req)
if err != nil {
return err
err = OrderError.New("sending settlement agreements returned an error: %v", err)
log.Error("gRPC client when sending new orders settlements",
zap.Error(err),
zap.Any("request", req),
)
errList.Add(err)
return nil
}
}
return client.CloseSend()
err := client.CloseSend()
if err != nil {
err = OrderError.New("CloseSend settlement agreements returned an error: %v", err)
log.Error("gRPC client error when closing sender ", zap.Error(err))
errList.Add(err)
}
return nil
})
var errList errs.Group
errHandle := func(cls errs.Class, format string, args ...interface{}) {
log.Sugar().Errorf(format, args...)
errList.Add(cls.New(format, args...))
}
for {
response, err := client.Recv()
if err != nil {
@ -248,7 +267,9 @@ func (sender *Sender) settle(ctx context.Context, log *zap.Logger, satelliteID s
break
}
errHandle(OrderError, "failed to receive response: %v", err)
err = OrderError.New("failed to receive settlement response: %v", err)
log.Error("gRPC client error when receiveing new order settlements", zap.Error(err))
errList.Add(err)
break
}
@ -259,7 +280,11 @@ func (sender *Sender) settle(ctx context.Context, log *zap.Logger, satelliteID s
case pb.SettlementResponse_REJECTED:
status = StatusRejected
default:
errHandle(OrderError, "unexpected response: %v", response.Status)
err := OrderError.New("unexpected settlement status response: %d", response.Status)
log.Error("gRPC client received a unexpected new orders setlement status",
zap.Error(err), zap.Any("response", response),
)
errList.Add(err)
continue
}
@ -270,10 +295,8 @@ func (sender *Sender) settle(ctx context.Context, log *zap.Logger, satelliteID s
}
}
if err := group.Wait(); err != nil {
errHandle(OrderError, "sending agreements returned an error: %v", err)
}
// Errors of this group are reported to errList so it always return nil
_ = group.Wait()
return errList.Err()
}

View File

@ -146,6 +146,11 @@ func (db *ordersdb) ListUnsentBySatellite(ctx context.Context) (_ map[storj.Node
}
// Archive marks order as being handled.
//
// If any of the request contains an order which doesn't exist the method will
// follow with the next ones without interrupting the operation and it will
// return an error of the class orders.OrderNotFoundError. Any other error, will
// abort the operation, rolling back the transaction.
func (db *ordersdb) Archive(ctx context.Context, requests ...orders.ArchiveRequest) (err error) {
defer mon.Task()(&ctx)(&err)
@ -153,9 +158,17 @@ func (db *ordersdb) Archive(ctx context.Context, requests ...orders.ArchiveReque
if err != nil {
return ErrInfo.Wrap(err)
}
var notFoundErrs errs.Group
defer func() {
if err == nil {
err = txn.Commit()
if err == nil {
if len(notFoundErrs) > 0 {
// Return a class error to allow to the caler to identify this case
err = orders.OrderNotFoundError.New(notFoundErrs.Err().Error())
}
}
} else {
err = errs.Combine(err, txn.Rollback())
}
@ -164,7 +177,12 @@ func (db *ordersdb) Archive(ctx context.Context, requests ...orders.ArchiveReque
for _, req := range requests {
err := db.archiveOne(ctx, txn, req)
if err != nil {
return ErrInfo.Wrap(err)
if orders.OrderNotFoundError.Has(err) {
notFoundErrs.Add(err)
continue
}
return err
}
}
@ -201,7 +219,9 @@ func (db *ordersdb) archiveOne(ctx context.Context, txn *sql.Tx, req orders.Arch
return ErrInfo.Wrap(err)
}
if count == 0 {
return ErrInfo.New("order was not in unsent list")
return orders.OrderNotFoundError.New("satellite: %s, serial number: %s",
req.Satellite.String(), req.Serial.String(),
)
}
return nil