satellite/orders: Fix for V3-2529: Release v0.19.0 storage nodes can't submit orders, duplicate key value violates unique constraint (#2900)
* V3-2529: Add DB savepoint to fix issue with postgres. Add test force a rejected order Co-Authored-By: Ivan Fraixedes <ivan@fraixed.es> * Update satellite/satellitedb/orders.go
This commit is contained in:
parent
24a36999ba
commit
4ede12a2ab
@ -22,6 +22,7 @@ import (
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/orders"
|
||||
"storj.io/storj/satellite/satellitedb/satellitedbtest"
|
||||
snorders "storj.io/storj/storagenode/orders"
|
||||
"storj.io/storj/uplink"
|
||||
)
|
||||
|
||||
@ -69,6 +70,67 @@ func TestSendingReceivingOrders(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestSendingReceivingDuplicateOrders(t *testing.T) {
|
||||
// test happy path
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
planet.Satellites[0].Audit.Service.Loop.Stop()
|
||||
for _, storageNode := range planet.StorageNodes {
|
||||
storageNode.Storage2.Orders.Sender.Pause()
|
||||
}
|
||||
|
||||
expectedData := testrand.Bytes(50 * memory.KiB)
|
||||
|
||||
redundancy := noLongTailRedundancy(planet)
|
||||
err := planet.Uplinks[0].UploadWithConfig(ctx, planet.Satellites[0], &redundancy, "testbucket", "test/path", expectedData)
|
||||
require.NoError(t, err)
|
||||
|
||||
sumBeforeSend := 0
|
||||
usedOne := false
|
||||
for _, storageNode := range planet.StorageNodes {
|
||||
infos, err := storageNode.DB.Orders().ListUnsent(ctx, 10)
|
||||
require.NoError(t, err)
|
||||
sumBeforeSend += len(infos)
|
||||
|
||||
if len(infos) > 0 && !usedOne {
|
||||
_, err := planet.Satellites[0].DB.Orders().UseSerialNumber(ctx, infos[0].Order.SerialNumber, infos[0].Limit.StorageNodeId)
|
||||
require.NoError(t, err)
|
||||
usedOne = true
|
||||
}
|
||||
|
||||
}
|
||||
require.NotZero(t, sumBeforeSend)
|
||||
|
||||
sumUnsent := 0
|
||||
rejected := 0
|
||||
accepted := 0
|
||||
|
||||
for _, storageNode := range planet.StorageNodes {
|
||||
storageNode.Storage2.Orders.Sender.TriggerWait()
|
||||
|
||||
infos, err := storageNode.DB.Orders().ListUnsent(ctx, 10)
|
||||
require.NoError(t, err)
|
||||
sumUnsent += len(infos)
|
||||
|
||||
archivedInfos, err := storageNode.DB.Orders().ListArchived(ctx, sumBeforeSend)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, archived := range archivedInfos {
|
||||
if archived.Status == snorders.StatusRejected {
|
||||
rejected++
|
||||
} else if archived.Status == snorders.StatusAccepted {
|
||||
accepted++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
require.Zero(t, sumUnsent)
|
||||
require.Equal(t, 1, rejected)
|
||||
require.Equal(t, sumBeforeSend-1, accepted)
|
||||
})
|
||||
}
|
||||
|
||||
func TestUnableToSendOrders(t *testing.T) {
|
||||
// test sending when satellite is unavailable
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
|
@ -266,18 +266,32 @@ func (db *ordersDB) ProcessOrders(ctx context.Context, requests []*orders.Proces
|
||||
// the case where the order has already been processed. Duplicates and previously
|
||||
// processed orders are rejected
|
||||
for _, request := range requests {
|
||||
insert := "INSERT INTO used_serials (serial_number_id, storage_node_id) SELECT id, ? FROM serial_numbers WHERE serial_number = ?"
|
||||
// avoid the PG error "current transaction is aborted, commands ignored until end of transaction block" if the below insert fails due any constraint.
|
||||
// see https://www.postgresql.org/message-id/13131805-BCBB-42DF-953B-27EE36AAF213%40yahoo.com
|
||||
_, err = tx.Exec("savepoint sp")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
insert := "INSERT INTO used_serials (serial_number_id, storage_node_id) SELECT id, ? FROM serial_numbers WHERE serial_number = ?"
|
||||
|
||||
_, err = tx.Exec(db.db.Rebind(insert), request.OrderLimit.StorageNodeId.Bytes(), request.OrderLimit.SerialNumber.Bytes())
|
||||
if err != nil {
|
||||
if pgutil.IsConstraintError(err) || sqliteutil.IsConstraintError(err) {
|
||||
reject(request.OrderLimit.SerialNumber)
|
||||
// rollback to the savepoint before the insert failed
|
||||
_, err = tx.Exec("rollback to savepoint sp")
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
} else {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
}
|
||||
_, err = tx.Exec("release savepoint sp")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// call to get all the bucket IDs
|
||||
|
Loading…
Reference in New Issue
Block a user