satellite/orders: add tests for double sending the same order
Change-Id: If2fa7f035257df3b04f506f81aa8b2e0916f5033
This commit is contained in:
parent
2cd5eb7dac
commit
7baa59753a
@ -643,3 +643,90 @@ func TestRandomSampleLimits(t *testing.T) {
|
||||
assert.Equal(t, 0, nilCount)
|
||||
})
|
||||
}
|
||||
|
||||
func TestProcessOrders_DoubleSend(t *testing.T) {
|
||||
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
||||
ordersDB := db.Orders()
|
||||
chore := reportedrollup.NewChore(zaptest.NewLogger(t), ordersDB, reportedrollup.Config{})
|
||||
serialNum := storj.SerialNumber{2}
|
||||
projectID, _ := uuid.New()
|
||||
now := time.Now()
|
||||
beforeRollup := now.Add(-time.Hour - time.Second)
|
||||
afterRollup := now.Add(time.Hour + time.Second)
|
||||
|
||||
// assertion helpers
|
||||
checkBucketBandwidth := func(bucket string, amount int64) {
|
||||
settled, err := ordersDB.GetBucketBandwidth(ctx, *projectID, []byte(bucket), beforeRollup, afterRollup)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, amount, settled)
|
||||
}
|
||||
checkStoragenodeBandwidth := func(node storj.NodeID, amount int64) {
|
||||
settled, err := ordersDB.GetStorageNodeBandwidth(ctx, node, beforeRollup, afterRollup)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, amount, settled)
|
||||
}
|
||||
|
||||
// setup: create serial number records
|
||||
err := ordersDB.CreateSerialInfo(ctx, serialNum, []byte(projectID.String()+"/b"), now.AddDate(0, 0, 1))
|
||||
require.NoError(t, err)
|
||||
|
||||
order := &orders.ProcessOrderRequest{
|
||||
Order: &pb.Order{
|
||||
SerialNumber: serialNum,
|
||||
Amount: 100,
|
||||
},
|
||||
OrderLimit: &pb.OrderLimit{
|
||||
SerialNumber: serialNum,
|
||||
StorageNodeId: storj.NodeID{1},
|
||||
Action: pb.PieceAction_PUT,
|
||||
OrderExpiration: now.AddDate(0, 0, 3),
|
||||
},
|
||||
}
|
||||
|
||||
// send the same order twice in the same request
|
||||
{
|
||||
actualResponses, err := ordersDB.ProcessOrders(ctx, []*orders.ProcessOrderRequest{order, order})
|
||||
require.NoError(t, err)
|
||||
expectedResponses := []*orders.ProcessOrderResponse{
|
||||
{
|
||||
SerialNumber: serialNum,
|
||||
Status: pb.SettlementResponse_ACCEPTED,
|
||||
},
|
||||
{
|
||||
SerialNumber: serialNum,
|
||||
Status: pb.SettlementResponse_REJECTED,
|
||||
},
|
||||
}
|
||||
assert.Equal(t, expectedResponses, actualResponses)
|
||||
}
|
||||
|
||||
// confirm the correct data from processing orders was written and consumed
|
||||
{
|
||||
require.NoError(t, chore.RunOnce(ctx, now))
|
||||
|
||||
checkBucketBandwidth("b", 100)
|
||||
checkStoragenodeBandwidth(storj.NodeID{1}, 100)
|
||||
}
|
||||
|
||||
// send the already sent and handled order again
|
||||
{
|
||||
actualResponses, err := ordersDB.ProcessOrders(ctx, []*orders.ProcessOrderRequest{order})
|
||||
require.NoError(t, err)
|
||||
expectedResponses := []*orders.ProcessOrderResponse{
|
||||
{
|
||||
SerialNumber: serialNum,
|
||||
Status: pb.SettlementResponse_ACCEPTED,
|
||||
},
|
||||
}
|
||||
assert.Equal(t, expectedResponses, actualResponses)
|
||||
}
|
||||
|
||||
// confirm the correct data from processing orders was written and consumed
|
||||
{
|
||||
require.NoError(t, chore.RunOnce(ctx, now))
|
||||
|
||||
checkBucketBandwidth("b", 100)
|
||||
checkStoragenodeBandwidth(storj.NodeID{1}, 100)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -279,6 +279,14 @@ func (db *ordersDB) ProcessOrders(ctx context.Context, requests []*orders.Proces
|
||||
serialNumArray := make([][]byte, 0, len(requests))
|
||||
settledArray := make([]int64, 0, len(requests))
|
||||
|
||||
// remove duplicate bucket_id, serial_number pairs sent in the same request.
|
||||
// postgres will complain.
|
||||
type requestKey struct {
|
||||
BucketID string
|
||||
SerialNumber storj.SerialNumber
|
||||
}
|
||||
seenRequests := make(map[requestKey]struct{})
|
||||
|
||||
for i, request := range requests {
|
||||
if bucketIDs[i] == nil {
|
||||
responses = append(responses, &orders.ProcessOrderResponse{
|
||||
@ -287,6 +295,21 @@ func (db *ordersDB) ProcessOrders(ctx context.Context, requests []*orders.Proces
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
// Filter duplicate requests and reject them.
|
||||
key := requestKey{
|
||||
BucketID: string(bucketIDs[i]),
|
||||
SerialNumber: request.Order.SerialNumber,
|
||||
}
|
||||
if _, seen := seenRequests[key]; seen {
|
||||
responses = append(responses, &orders.ProcessOrderResponse{
|
||||
SerialNumber: request.Order.SerialNumber,
|
||||
Status: pb.SettlementResponse_REJECTED,
|
||||
})
|
||||
continue
|
||||
}
|
||||
seenRequests[key] = struct{}{}
|
||||
|
||||
expiresAtArray = append(expiresAtArray, request.OrderLimit.OrderExpiration)
|
||||
bucketIDArray = append(bucketIDArray, bucketIDs[i])
|
||||
actionArray = append(actionArray, request.OrderLimit.Action)
|
||||
|
Loading…
Reference in New Issue
Block a user