89bdb20a62
In production we are seeing ~115 storage nodes (out of ~6,500) are not using the new SettlementWithWindow endpoint (but they are upgraded to > v1.12). We analyzed data being reported by monkit for the nodes who were above version 1.11 but were not successfully submitting orders to the new endpoint. The nodes fell into a few categories: 1. Always fail to list orders from the db; never get to try sending orders from the filestore 2. Successfully list/send orders from the db; never get to calling satellite endpoint for submitting filestore orders 3. Successfully list/send orders from the db; successfully list filestore orders, but satellite endpoint fails (with "unauthenticated" drpc error) The code change here add the following to address these issues: - modify the query for ordersDB.listUnsentBySatellite so that we no longer select expired orders from the unsent_orders table - always process any orders that are in the ordersDB and also any orders stored in the filestore - add monkit monitoring to filestore.ListUnsentBySatellite so that we can see the failures/successes Change-Id: I0b473e5d75252e7ab5fa6b5c204ed260ab5094ec
582 lines
18 KiB
Go
582 lines
18 KiB
Go
// Copyright (C) 2020 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package orders_test
|
|
|
|
import (
|
|
"os"
|
|
"path/filepath"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
"go.uber.org/zap/zaptest"
|
|
|
|
"storj.io/common/pb"
|
|
"storj.io/common/storj"
|
|
"storj.io/common/testcontext"
|
|
"storj.io/common/testrand"
|
|
"storj.io/storj/private/testplanet"
|
|
"storj.io/storj/storagenode/orders"
|
|
"storj.io/storj/storagenode/orders/ordersfile"
|
|
)
|
|
|
|
func TestOrdersStore_Enqueue_GracePeriodFailure(t *testing.T) {
|
|
ctx := testcontext.New(t)
|
|
defer ctx.Cleanup()
|
|
dirName := ctx.Dir("test-orders")
|
|
now := time.Now()
|
|
|
|
// make order limit grace period 24 hours
|
|
ordersStore, err := orders.NewFileStore(zaptest.NewLogger(t), dirName, 24*time.Hour)
|
|
require.NoError(t, err)
|
|
|
|
// adding order before grace period should result in an error
|
|
newSN := testrand.SerialNumber()
|
|
newInfo := &ordersfile.Info{
|
|
Limit: &pb.OrderLimit{
|
|
SerialNumber: newSN,
|
|
SatelliteId: testrand.NodeID(),
|
|
Action: pb.PieceAction_GET,
|
|
OrderCreation: now.Add(-48 * time.Hour),
|
|
OrderExpiration: now.Add(time.Hour),
|
|
},
|
|
Order: &pb.Order{
|
|
SerialNumber: newSN,
|
|
Amount: 10,
|
|
},
|
|
}
|
|
err = ordersStore.Enqueue(newInfo)
|
|
require.Error(t, err)
|
|
}
|
|
|
|
func TestOrdersStore_ListUnsentBySatellite(t *testing.T) {
|
|
ctx := testcontext.New(t)
|
|
defer ctx.Cleanup()
|
|
dirName := ctx.Dir("test-orders")
|
|
now := time.Now()
|
|
|
|
// make order limit grace period 12 hours
|
|
ordersStore, err := orders.NewFileStore(zaptest.NewLogger(t), dirName, 12*time.Hour)
|
|
require.NoError(t, err)
|
|
|
|
// for each satellite, make three orders from four hours ago, three from two hours ago, and three from now.
|
|
numSatellites := 3
|
|
createdTimes := []time.Time{
|
|
now.Add(-4 * time.Hour),
|
|
now.Add(-2 * time.Hour),
|
|
now,
|
|
}
|
|
serialsPerSatPerTime := 3
|
|
|
|
originalInfos, err := storeNewOrders(ordersStore, numSatellites, serialsPerSatPerTime, createdTimes)
|
|
require.NoError(t, err)
|
|
|
|
// 3 times:
|
|
// list unsent orders - should receive data from all satellites the first two times, and nothing the last time.
|
|
// archive unsent orders
|
|
expectedArchivedInfos := make(map[storj.SerialNumber]*orders.ArchivedInfo)
|
|
|
|
archiveTime1 := now.Add(-2 * time.Hour)
|
|
archiveTime2 := now
|
|
status1 := pb.SettlementWithWindowResponse_ACCEPTED
|
|
status2 := pb.SettlementWithWindowResponse_REJECTED
|
|
for i := 0; i < 3; i++ {
|
|
// This should return all the orders created more than 1 hour before "now".
|
|
unsentMap, err := ordersStore.ListUnsentBySatellite(ctx, now.Add(12*time.Hour))
|
|
require.NoError(t, err)
|
|
|
|
// on last iteration, expect nothing returned
|
|
if i == 2 {
|
|
require.Len(t, unsentMap, 0)
|
|
break
|
|
}
|
|
|
|
// go through order limits and make sure information is accurate
|
|
require.Len(t, unsentMap, numSatellites)
|
|
for satelliteID, unsentSatList := range unsentMap {
|
|
require.Len(t, unsentSatList.InfoList, serialsPerSatPerTime)
|
|
|
|
for _, unsentInfo := range unsentSatList.InfoList {
|
|
// "new" orders should not be returned
|
|
require.True(t, unsentInfo.Limit.OrderCreation.Before(createdTimes[2]))
|
|
sn := unsentInfo.Limit.SerialNumber
|
|
originalInfo := originalInfos[sn]
|
|
|
|
verifyInfosEqual(t, unsentInfo, originalInfo)
|
|
// expect that creation hour is consistent with order
|
|
require.True(t, unsentSatList.CreatedAtHour.Equal(unsentInfo.Limit.OrderCreation.Truncate(time.Hour)))
|
|
|
|
// add to archive batch
|
|
// create
|
|
archivedAt := archiveTime1
|
|
orderStatus := orders.StatusAccepted
|
|
if i == 1 {
|
|
archivedAt = archiveTime2
|
|
orderStatus = orders.StatusRejected
|
|
}
|
|
newArchivedInfo := &orders.ArchivedInfo{
|
|
Limit: unsentInfo.Limit,
|
|
Order: unsentInfo.Order,
|
|
Status: orderStatus,
|
|
ArchivedAt: archivedAt,
|
|
}
|
|
expectedArchivedInfos[unsentInfo.Limit.SerialNumber] = newArchivedInfo
|
|
}
|
|
|
|
// archive unsent file
|
|
archivedAt := archiveTime1
|
|
status := status1
|
|
if i == 1 {
|
|
archivedAt = archiveTime2
|
|
status = status2
|
|
}
|
|
err = ordersStore.Archive(satelliteID, unsentSatList, archivedAt, status)
|
|
require.NoError(t, err)
|
|
}
|
|
}
|
|
|
|
// list archived, expect everything from first two created at time buckets
|
|
archived, err := ordersStore.ListArchived()
|
|
require.NoError(t, err)
|
|
require.Len(t, archived, numSatellites*serialsPerSatPerTime*2)
|
|
for _, archivedInfo := range archived {
|
|
sn := archivedInfo.Limit.SerialNumber
|
|
expectedInfo := expectedArchivedInfos[sn]
|
|
verifyArchivedInfosEqual(t, expectedInfo, archivedInfo)
|
|
|
|
// one of the batches should be "accepted" and the other should be "rejected"
|
|
if archivedInfo.ArchivedAt.Round(0).Equal(archiveTime2.Round(0)) {
|
|
require.Equal(t, archivedInfo.Status, orders.StatusRejected)
|
|
} else {
|
|
require.Equal(t, archivedInfo.Status, orders.StatusAccepted)
|
|
}
|
|
}
|
|
|
|
// clean archive for anything older than 30 minutes
|
|
err = ordersStore.CleanArchive(now.Add(-30 * time.Minute))
|
|
require.NoError(t, err)
|
|
|
|
// list archived, expect only recent archived batch (other was cleaned)
|
|
archived, err = ordersStore.ListArchived()
|
|
require.NoError(t, err)
|
|
require.Len(t, archived, numSatellites*serialsPerSatPerTime)
|
|
for _, archivedInfo := range archived {
|
|
sn := archivedInfo.Limit.SerialNumber
|
|
expectedInfo := expectedArchivedInfos[sn]
|
|
verifyArchivedInfosEqual(t, expectedInfo, archivedInfo)
|
|
require.Equal(t, archivedInfo.ArchivedAt.Round(0), archiveTime2.Round(0))
|
|
require.Equal(t, archivedInfo.Status, orders.StatusRejected)
|
|
}
|
|
|
|
// clean archive for everything before now, expect list to return nothing
|
|
err = ordersStore.CleanArchive(now.Add(time.Nanosecond))
|
|
require.NoError(t, err)
|
|
archived, err = ordersStore.ListArchived()
|
|
require.NoError(t, err)
|
|
require.Len(t, archived, 0)
|
|
}
|
|
|
|
func TestOrdersStore_ListUnsentBySatellite_Ongoing(t *testing.T) {
|
|
ctx := testcontext.New(t)
|
|
defer ctx.Cleanup()
|
|
dirName := ctx.Dir("test-orders")
|
|
now := time.Now()
|
|
satellite := testrand.NodeID()
|
|
tomorrow := now.Add(24 * time.Hour)
|
|
|
|
// make order limit grace period 1 hour
|
|
ordersStore, err := orders.NewFileStore(zaptest.NewLogger(t), dirName, time.Hour)
|
|
require.NoError(t, err)
|
|
|
|
// empty store means no orders can be listed
|
|
unsent, err := ordersStore.ListUnsentBySatellite(ctx, tomorrow)
|
|
require.NoError(t, err)
|
|
require.Len(t, unsent, 0)
|
|
|
|
// store an order that can be listed
|
|
sn := testrand.SerialNumber()
|
|
require.NoError(t, ordersStore.Enqueue(&ordersfile.Info{
|
|
Limit: &pb.OrderLimit{
|
|
SerialNumber: sn,
|
|
SatelliteId: satellite,
|
|
Action: pb.PieceAction_GET,
|
|
OrderCreation: now,
|
|
},
|
|
Order: &pb.Order{
|
|
SerialNumber: sn,
|
|
Amount: 1,
|
|
},
|
|
}))
|
|
|
|
// check that we can list it tomorrow
|
|
unsent, err = ordersStore.ListUnsentBySatellite(ctx, tomorrow)
|
|
require.NoError(t, err)
|
|
require.Len(t, unsent, 1)
|
|
|
|
// begin an enqueue in the bucket
|
|
commit, err := ordersStore.BeginEnqueue(satellite, now)
|
|
require.NoError(t, err)
|
|
|
|
// we should no longer be able to list that window
|
|
unsent, err = ordersStore.ListUnsentBySatellite(ctx, tomorrow)
|
|
require.NoError(t, err)
|
|
require.Len(t, unsent, 0)
|
|
|
|
// commit the order
|
|
sn = testrand.SerialNumber()
|
|
require.NoError(t, commit(&ordersfile.Info{
|
|
Limit: &pb.OrderLimit{
|
|
SerialNumber: sn,
|
|
SatelliteId: satellite,
|
|
Action: pb.PieceAction_GET,
|
|
OrderCreation: now,
|
|
},
|
|
Order: &pb.Order{
|
|
SerialNumber: sn,
|
|
Amount: 1,
|
|
},
|
|
}))
|
|
|
|
// check that we can list it tomorrow
|
|
unsent, err = ordersStore.ListUnsentBySatellite(ctx, tomorrow)
|
|
require.NoError(t, err)
|
|
require.Len(t, unsent, 1)
|
|
}
|
|
|
|
func TestOrdersDB_ListUnsentBySatellite_Expired(t *testing.T) {
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
satellitePeer := planet.Satellites[0]
|
|
storagenodePeer := planet.StorageNodes[0]
|
|
storageNodeOrdersDB := storagenodePeer.DB.Orders()
|
|
now := time.Now().UTC()
|
|
testSerialNumber := testrand.SerialNumber()
|
|
// Setup: add one order that is not expired
|
|
require.NoError(t, storageNodeOrdersDB.Enqueue(ctx, &ordersfile.Info{
|
|
Limit: &pb.OrderLimit{
|
|
SerialNumber: testSerialNumber,
|
|
SatelliteId: satellitePeer.ID(),
|
|
Action: pb.PieceAction_GET,
|
|
OrderCreation: now,
|
|
OrderExpiration: now.Add(5 * time.Hour),
|
|
},
|
|
Order: &pb.Order{
|
|
SerialNumber: testSerialNumber,
|
|
Amount: 100,
|
|
},
|
|
}))
|
|
testSerialNumber2 := testrand.SerialNumber()
|
|
// Setup: add one order that IS expired
|
|
require.NoError(t, storageNodeOrdersDB.Enqueue(ctx, &ordersfile.Info{
|
|
Limit: &pb.OrderLimit{
|
|
SerialNumber: testSerialNumber2,
|
|
SatelliteId: satellitePeer.ID(),
|
|
Action: pb.PieceAction_GET,
|
|
OrderExpiration: now.Add(-5 * time.Hour),
|
|
},
|
|
Order: &pb.Order{
|
|
SerialNumber: testSerialNumber2,
|
|
Amount: 20,
|
|
},
|
|
}))
|
|
|
|
// Confirm that when you list unsent orders that expired orders are not returned
|
|
unsentOrdersBySA, err := storageNodeOrdersDB.ListUnsentBySatellite(ctx)
|
|
require.NoError(t, err)
|
|
require.Equal(t, len(unsentOrdersBySA), 1)
|
|
// there should only be 1 unsent order, since the other order is expired
|
|
require.Equal(t, len(unsentOrdersBySA[satellitePeer.ID()]), 1)
|
|
// the unsent order should be the unexpired order
|
|
require.Equal(t, unsentOrdersBySA[satellitePeer.ID()][0].Limit.SerialNumber, testSerialNumber)
|
|
})
|
|
}
|
|
|
|
func TestOrdersStore_CorruptUnsentV0(t *testing.T) {
|
|
ctx := testcontext.New(t)
|
|
defer ctx.Cleanup()
|
|
dirName := ctx.Dir("test-orders")
|
|
now := time.Now()
|
|
satellite := testrand.NodeID()
|
|
tomorrow := now.Add(24 * time.Hour)
|
|
|
|
// make order limit grace period 1 hour
|
|
ordersStore, err := orders.NewFileStore(zaptest.NewLogger(t), dirName, time.Hour)
|
|
require.NoError(t, err)
|
|
|
|
// empty store means no orders can be listed
|
|
unsent, err := ordersStore.ListUnsentBySatellite(ctx, tomorrow)
|
|
require.NoError(t, err)
|
|
require.Len(t, unsent, 0)
|
|
|
|
sn := testrand.SerialNumber()
|
|
info := &ordersfile.Info{
|
|
Limit: &pb.OrderLimit{
|
|
SerialNumber: sn,
|
|
SatelliteId: satellite,
|
|
Action: pb.PieceAction_GET,
|
|
OrderCreation: now,
|
|
},
|
|
Order: &pb.Order{
|
|
SerialNumber: sn,
|
|
Amount: 1,
|
|
},
|
|
}
|
|
// store two orders for the same window using deprecated V0
|
|
unsentFileName := ordersfile.UnsentFileName(satellite, now, ordersfile.V0)
|
|
unsentDir := filepath.Join(dirName, "unsent")
|
|
unsentFilePath := filepath.Join(unsentDir, unsentFileName)
|
|
of, err := ordersfile.OpenWritableV0(unsentFilePath)
|
|
require.NoError(t, err)
|
|
require.NoError(t, of.Append(info))
|
|
require.NoError(t, of.Append(info))
|
|
require.NoError(t, of.Close())
|
|
|
|
// check that we can see both orders tomorrow
|
|
unsent, err = ordersStore.ListUnsentBySatellite(ctx, tomorrow)
|
|
require.NoError(t, err)
|
|
require.Len(t, unsent, 1)
|
|
require.Len(t, unsent[satellite].InfoList, 2)
|
|
|
|
// corrupt unsent orders file by removing the last byte
|
|
err = filepath.Walk(unsentDir, func(path string, info os.FileInfo, err error) error {
|
|
require.NoError(t, err)
|
|
if info.IsDir() {
|
|
return nil
|
|
}
|
|
err = os.Truncate(path, info.Size()-1)
|
|
return err
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
// add another order, which we shouldn't see for V0 since it is after the corrupted one
|
|
of, err = ordersfile.OpenWritableV0(unsentFilePath)
|
|
require.NoError(t, err)
|
|
require.NoError(t, of.Append(info))
|
|
require.NoError(t, of.Close())
|
|
|
|
// only the second order should be corrupted, so we should still see one order
|
|
unsent, err = ordersStore.ListUnsentBySatellite(ctx, tomorrow)
|
|
require.NoError(t, err)
|
|
require.Len(t, unsent, 1)
|
|
require.Len(t, unsent[satellite].InfoList, 1)
|
|
}
|
|
|
|
func TestOrdersStore_CorruptUnsentV1(t *testing.T) {
|
|
ctx := testcontext.New(t)
|
|
defer ctx.Cleanup()
|
|
dirName := ctx.Dir("test-orders")
|
|
now := time.Now()
|
|
satellite := testrand.NodeID()
|
|
tomorrow := now.Add(24 * time.Hour)
|
|
|
|
// make order limit grace period 1 hour
|
|
ordersStore, err := orders.NewFileStore(zaptest.NewLogger(t), dirName, time.Hour)
|
|
require.NoError(t, err)
|
|
|
|
// empty store means no orders can be listed
|
|
unsent, err := ordersStore.ListUnsentBySatellite(ctx, tomorrow)
|
|
require.NoError(t, err)
|
|
require.Len(t, unsent, 0)
|
|
|
|
sn1 := testrand.SerialNumber()
|
|
sn2 := testrand.SerialNumber()
|
|
sn3 := testrand.SerialNumber()
|
|
info := &ordersfile.Info{
|
|
Limit: &pb.OrderLimit{
|
|
SerialNumber: sn1,
|
|
SatelliteId: satellite,
|
|
Action: pb.PieceAction_GET,
|
|
OrderCreation: now,
|
|
},
|
|
Order: &pb.Order{
|
|
SerialNumber: sn1,
|
|
Amount: 1,
|
|
},
|
|
}
|
|
// store sn1 and sn2 in the same window
|
|
require.NoError(t, ordersStore.Enqueue(info))
|
|
info.Limit.SerialNumber = sn2
|
|
info.Order.SerialNumber = sn2
|
|
require.NoError(t, ordersStore.Enqueue(info))
|
|
|
|
// check that we can see both orders tomorrow
|
|
unsent, err = ordersStore.ListUnsentBySatellite(ctx, tomorrow)
|
|
require.NoError(t, err)
|
|
require.Len(t, unsent, 1)
|
|
require.Len(t, unsent[satellite].InfoList, 2)
|
|
|
|
// corrupt unsent orders file by removing the last byte
|
|
err = filepath.Walk(filepath.Join(dirName, "unsent"), func(path string, info os.FileInfo, err error) error {
|
|
require.NoError(t, err)
|
|
if info.IsDir() {
|
|
return nil
|
|
}
|
|
err = os.Truncate(path, info.Size()-1)
|
|
return err
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
// only the second order should be corrupted, so we should still see one order (sn1)
|
|
unsent, err = ordersStore.ListUnsentBySatellite(ctx, tomorrow)
|
|
require.NoError(t, err)
|
|
require.Len(t, unsent, 1)
|
|
require.Len(t, unsent[satellite].InfoList, 1)
|
|
require.EqualValues(t, sn1, unsent[satellite].InfoList[0].Order.SerialNumber)
|
|
|
|
// add another order, sn3, to the same window
|
|
info.Limit.SerialNumber = sn3
|
|
info.Order.SerialNumber = sn3
|
|
require.NoError(t, ordersStore.Enqueue(info))
|
|
|
|
// only the second order should be corrupted, so we should still see first and last orders (sn1, sn3)
|
|
unsent, err = ordersStore.ListUnsentBySatellite(ctx, tomorrow)
|
|
require.NoError(t, err)
|
|
require.Len(t, unsent, 1)
|
|
require.Len(t, unsent[satellite].InfoList, 2)
|
|
require.Equal(t, ordersfile.V1, unsent[satellite].Version)
|
|
require.EqualValues(t, sn1, unsent[satellite].InfoList[0].Order.SerialNumber)
|
|
require.EqualValues(t, sn3, unsent[satellite].InfoList[1].Order.SerialNumber)
|
|
}
|
|
|
|
func TestOrdersStore_V0ToV1(t *testing.T) {
|
|
ctx := testcontext.New(t)
|
|
defer ctx.Cleanup()
|
|
dirName := ctx.Dir("test-orders")
|
|
now := time.Now()
|
|
satellite := testrand.NodeID()
|
|
tomorrow := now.Add(24 * time.Hour)
|
|
|
|
// make order limit grace period 1 hour
|
|
ordersStore, err := orders.NewFileStore(zaptest.NewLogger(t), dirName, time.Hour)
|
|
require.NoError(t, err)
|
|
|
|
// empty store means no orders can be listed
|
|
unsent, err := ordersStore.ListUnsentBySatellite(ctx, tomorrow)
|
|
require.NoError(t, err)
|
|
require.Len(t, unsent, 0)
|
|
|
|
sn1 := testrand.SerialNumber()
|
|
sn2 := testrand.SerialNumber()
|
|
info := &ordersfile.Info{
|
|
Limit: &pb.OrderLimit{
|
|
SerialNumber: sn1,
|
|
SatelliteId: satellite,
|
|
Action: pb.PieceAction_GET,
|
|
OrderCreation: now,
|
|
},
|
|
Order: &pb.Order{
|
|
SerialNumber: sn1,
|
|
Amount: 1,
|
|
},
|
|
}
|
|
// store sn1 and sn2 in the same window
|
|
// sn1 is stored with deprecated V0, so sn2 should also be stored with V0 even when Enqueue() is used
|
|
unsentFileName := ordersfile.UnsentFileName(satellite, now, ordersfile.V0)
|
|
unsentDir := filepath.Join(dirName, "unsent")
|
|
unsentFilePath := filepath.Join(unsentDir, unsentFileName)
|
|
of, err := ordersfile.OpenWritableV0(unsentFilePath)
|
|
require.NoError(t, err)
|
|
|
|
require.NoError(t, of.Append(info))
|
|
info.Limit.SerialNumber = sn2
|
|
info.Order.SerialNumber = sn2
|
|
require.NoError(t, of.Append(info))
|
|
require.NoError(t, of.Close())
|
|
|
|
// check that we can see both orders tomorrow
|
|
unsent, err = ordersStore.ListUnsentBySatellite(ctx, tomorrow)
|
|
require.NoError(t, err)
|
|
require.Len(t, unsent, 1)
|
|
require.Len(t, unsent[satellite].InfoList, 2)
|
|
require.Equal(t, ordersfile.V0, unsent[satellite].Version)
|
|
|
|
// archive file to free up window
|
|
require.NoError(t, ordersStore.Archive(satellite, unsent[satellite], time.Now(), pb.SettlementWithWindowResponse_ACCEPTED))
|
|
// new file should be created with version V1
|
|
require.NoError(t, ordersStore.Enqueue(info))
|
|
|
|
unsent, err = ordersStore.ListUnsentBySatellite(ctx, tomorrow)
|
|
require.NoError(t, err)
|
|
require.Len(t, unsent, 1)
|
|
require.Len(t, unsent[satellite].InfoList, 1)
|
|
require.Equal(t, ordersfile.V1, unsent[satellite].Version)
|
|
}
|
|
|
|
func verifyInfosEqual(t *testing.T, a, b *ordersfile.Info) {
|
|
t.Helper()
|
|
|
|
require.NotNil(t, a)
|
|
require.NotNil(t, b)
|
|
|
|
require.Equal(t, a.Limit.SerialNumber, b.Limit.SerialNumber)
|
|
require.Equal(t, a.Limit.SatelliteId, b.Limit.SatelliteId)
|
|
require.Equal(t, a.Limit.OrderExpiration.UTC(), b.Limit.OrderExpiration.UTC())
|
|
require.Equal(t, a.Limit.Action, b.Limit.Action)
|
|
|
|
require.Equal(t, a.Order.Amount, b.Order.Amount)
|
|
require.Equal(t, a.Order.SerialNumber, b.Order.SerialNumber)
|
|
}
|
|
|
|
func verifyArchivedInfosEqual(t *testing.T, a, b *orders.ArchivedInfo) {
|
|
t.Helper()
|
|
|
|
require.NotNil(t, a)
|
|
require.NotNil(t, b)
|
|
|
|
require.Equal(t, a.Limit.SerialNumber, b.Limit.SerialNumber)
|
|
require.Equal(t, a.Limit.SatelliteId, b.Limit.SatelliteId)
|
|
require.Equal(t, a.Limit.OrderExpiration.UTC(), b.Limit.OrderExpiration.UTC())
|
|
require.Equal(t, a.Limit.Action, b.Limit.Action)
|
|
|
|
require.Equal(t, a.Order.Amount, b.Order.Amount)
|
|
require.Equal(t, a.Order.SerialNumber, b.Order.SerialNumber)
|
|
|
|
require.Equal(t, a.Status, b.Status)
|
|
require.Equal(t, a.ArchivedAt.UTC(), b.ArchivedAt.UTC())
|
|
}
|
|
|
|
func storeNewOrders(ordersStore *orders.FileStore, numSatellites, numOrdersPerSatPerTime int, createdAtTimes []time.Time) (map[storj.SerialNumber]*ordersfile.Info, error) {
|
|
actions := []pb.PieceAction{
|
|
pb.PieceAction_GET,
|
|
pb.PieceAction_PUT_REPAIR,
|
|
pb.PieceAction_GET_AUDIT,
|
|
}
|
|
originalInfos := make(map[storj.SerialNumber]*ordersfile.Info)
|
|
for i := 0; i < numSatellites; i++ {
|
|
satellite := testrand.NodeID()
|
|
|
|
for _, createdAt := range createdAtTimes {
|
|
for j := 0; j < numOrdersPerSatPerTime; j++ {
|
|
expiration := time.Now().Add(time.Hour)
|
|
amount := testrand.Int63n(1000)
|
|
sn := testrand.SerialNumber()
|
|
action := actions[j%len(actions)]
|
|
|
|
newInfo := &ordersfile.Info{
|
|
Limit: &pb.OrderLimit{
|
|
SerialNumber: sn,
|
|
SatelliteId: satellite,
|
|
Action: action,
|
|
OrderCreation: createdAt,
|
|
OrderExpiration: expiration,
|
|
},
|
|
Order: &pb.Order{
|
|
SerialNumber: sn,
|
|
Amount: amount,
|
|
},
|
|
}
|
|
originalInfos[sn] = newInfo
|
|
|
|
// store the new info in the orders store
|
|
err := ordersStore.Enqueue(newInfo)
|
|
if err != nil {
|
|
return originalInfos, err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return originalInfos, nil
|
|
}
|