diff --git a/storagenode/orders/store.go b/storagenode/orders/store.go index 967ecc3d3..0890cea71 100644 --- a/storagenode/orders/store.go +++ b/storagenode/orders/store.go @@ -9,6 +9,7 @@ import ( "os" "path/filepath" "strconv" + "strings" "sync" "time" @@ -19,8 +20,8 @@ import ( ) const ( - unsentStagingFileName = "unsent-orders-staging" - unsentReadyFilePrefix = "unsent-orders-ready-" + unsentFilePrefix = "unsent-orders-" + archiveFilePrefix = "archived-orders-" ) // FileStore implements the orders.Store interface by appending orders to flat files. @@ -28,29 +29,43 @@ type FileStore struct { ordersDir string unsentDir string archiveDir string - mu sync.Mutex + // mutex for unsent directory + unsentMu sync.Mutex + // mutex for archive directory + archiveMu sync.Mutex + + // how long after OrderLimit creation date are OrderLimits no longer accepted (piecestore Config) + orderLimitGracePeriod time.Duration } // NewFileStore creates a new orders file store. -func NewFileStore(ordersDir string) *FileStore { +func NewFileStore(ordersDir string, orderLimitGracePeriod time.Duration) *FileStore { return &FileStore{ ordersDir: ordersDir, unsentDir: filepath.Join(ordersDir, "unsent"), archiveDir: filepath.Join(ordersDir, "archive"), + + orderLimitGracePeriod: orderLimitGracePeriod, } } -// Enqueue inserts order to the list of orders needing to be sent to the satellite. +// Enqueue inserts order to be sent at the end of the unsent file for a particular creation hour. +// It assumes the order is not being queued after the order limit grace period. func (store *FileStore) Enqueue(info *Info) (err error) { - store.mu.Lock() - defer store.mu.Unlock() + store.unsentMu.Lock() + defer store.unsentMu.Unlock() - f, err := store.getUnsentStagingFile() + // if the grace period has already passed, do not enqueue this order + if store.gracePeriodPassed(info.Limit.OrderCreation.Truncate(time.Hour)) { + return OrderError.New("grace period passed for order limit") + } + + f, err := store.getUnsentFile(info.Limit.SatelliteId, info.Limit.OrderCreation) if err != nil { return OrderError.Wrap(err) } defer func() { - err = errs.Combine(err, f.Close()) + err = errs.Combine(err, OrderError.Wrap(f.Close())) }() err = writeLimit(f, info.Limit) @@ -64,36 +79,53 @@ func (store *FileStore) Enqueue(info *Info) (err error) { return nil } -// ListUnsentBySatellite returns orders that haven't been sent yet grouped by satellite. -// It copies the staging file to a read-only "ready to send" file first. -// It should never be called concurrently with DeleteReadyToSendFiles. -func (store *FileStore) ListUnsentBySatellite() (infoMap map[storj.NodeID][]*Info, err error) { - err = store.convertUnsentStagingToReady() - if err != nil { - return infoMap, err - } +// UnsentInfo is a struct containing a window of orders for a satellite and order creation hour. +type UnsentInfo struct { + CreatedAtHour time.Time + InfoList []*Info +} + +// ListUnsentBySatellite returns one window of orders that haven't been sent yet, grouped by satellite. +// It only reads files where the order limit grace period has passed, meaning no new orders will be appended. +// There is a separate window for each created at hour, so if a satellite has 2 windows, `ListUnsentBySatellite` +// needs to be called twice, with calls to `DeleteUnsentFile` in between each call, to see all unsent orders. +func (store *FileStore) ListUnsentBySatellite() (infoMap map[storj.NodeID]UnsentInfo, err error) { + store.unsentMu.Lock() + defer store.unsentMu.Unlock() var errList error - infoMap = make(map[storj.NodeID][]*Info) + infoMap = make(map[storj.NodeID]UnsentInfo) err = filepath.Walk(store.unsentDir, func(path string, info os.FileInfo, err error) error { if err != nil { - errList = errs.Combine(errList, err) + errList = errs.Combine(errList, OrderError.Wrap(err)) return nil } if info.IsDir() { return nil } - if info.Name() == unsentStagingFileName { + satelliteID, createdAtHour, err := getUnsentFileInfo(info.Name()) + if err != nil { + return err + } + // if we already have orders for this satellite, ignore the file + if _, ok := infoMap[satelliteID]; ok { return nil } + // if orders can still be added to file, ignore it. + if !store.gracePeriodPassed(createdAtHour) { + return nil + } + newUnsentInfo := UnsentInfo{ + CreatedAtHour: createdAtHour, + } f, err := os.Open(path) if err != nil { return OrderError.Wrap(err) } defer func() { - err = errs.Combine(err, f.Close()) + err = errs.Combine(err, OrderError.Wrap(f.Close())) }() for { @@ -113,10 +145,10 @@ func (store *FileStore) ListUnsentBySatellite() (infoMap map[storj.NodeID][]*Inf Limit: limit, Order: order, } - infoList := infoMap[limit.SatelliteId] - infoList = append(infoList, newInfo) - infoMap[limit.SatelliteId] = infoList + newUnsentInfo.InfoList = append(newUnsentInfo.InfoList, newInfo) } + + infoMap[satelliteID] = newUnsentInfo return nil }) if err != nil { @@ -126,13 +158,56 @@ func (store *FileStore) ListUnsentBySatellite() (infoMap map[storj.NodeID][]*Inf return infoMap, errList } -// DeleteReadyToSendFiles deletes all non-staging files in the "unsent" directory. -// It should be called after the order limits have been sent. -// It should never be called concurrently with ListUnsentBySatellite. -func (store *FileStore) DeleteReadyToSendFiles() (err error) { - var errList error +// DeleteUnsentFile deletes an unsent-orders file for a satellite ID and created hour. +func (store *FileStore) DeleteUnsentFile(satelliteID storj.NodeID, createdAtHour time.Time) error { + store.unsentMu.Lock() + defer store.unsentMu.Unlock() - err = filepath.Walk(store.unsentDir, func(path string, info os.FileInfo, err error) error { + fileName := unsentFilePrefix + satelliteID.String() + "-" + getCreationHourString(createdAtHour) + filePath := filepath.Join(store.unsentDir, fileName) + + return OrderError.Wrap(os.Remove(filePath)) +} + +// Archive marks order as being settled. +func (store *FileStore) Archive(archivedAt time.Time, requests ...*ArchivedInfo) error { + store.archiveMu.Lock() + defer store.archiveMu.Unlock() + + f, err := store.getNewArchiveFile(archivedAt) + if err != nil { + return OrderError.Wrap(err) + } + defer func() { + err = errs.Combine(err, OrderError.Wrap(f.Close())) + }() + + for _, info := range requests { + err = writeStatus(f, info.Status) + if err != nil { + return err + } + err = writeLimit(f, info.Limit) + if err != nil { + return err + } + err = writeOrder(f, info.Order) + if err != nil { + return err + } + } + return nil +} + +// ListArchived returns orders that have been sent. +func (store *FileStore) ListArchived() ([]*ArchivedInfo, error) { + store.archiveMu.Lock() + defer store.archiveMu.Unlock() + + var errList error + archivedList := []*ArchivedInfo{} + + err := filepath.Walk(store.archiveDir, func(path string, info os.FileInfo, err error) error { if err != nil { errList = errs.Combine(errList, OrderError.Wrap(err)) return nil @@ -140,46 +215,86 @@ func (store *FileStore) DeleteReadyToSendFiles() (err error) { if info.IsDir() { return nil } - if info.Name() == unsentStagingFileName { - return nil + archivedAt, err := getArchivedFileInfo(info.Name()) + if err != nil { + return err } - // delete all non-staging files - return OrderError.Wrap(os.Remove(path)) + + f, err := os.Open(path) + if err != nil { + return OrderError.Wrap(err) + } + defer func() { + err = errs.Combine(err, OrderError.Wrap(f.Close())) + }() + + for { + status, err := readStatus(f) + if err != nil { + if errs.Is(err, io.EOF) { + break + } + return err + } + limit, err := readLimit(f) + if err != nil { + return err + } + order, err := readOrder(f) + if err != nil { + return err + } + + newInfo := &ArchivedInfo{ + Limit: limit, + Order: order, + Status: status, + ArchivedAt: archivedAt, + } + archivedList = append(archivedList, newInfo) + } + return nil }) if err != nil { errList = errs.Combine(errList, err) } - return errList + return archivedList, errList } -// convertUnsentStagingToReady converts the unsent staging file to be read only, and renames it. -func (store *FileStore) convertUnsentStagingToReady() error { - // lock mutex so no one tries to write to the file while we do this - store.mu.Lock() - defer store.mu.Unlock() +// CleanArchive deletes all entries archvied before the provided time. +func (store *FileStore) CleanArchive(deleteBefore time.Time) error { + store.archiveMu.Lock() + defer store.archiveMu.Unlock() - oldFileName := unsentStagingFileName - oldFilePath := filepath.Join(store.unsentDir, oldFileName) - if _, err := os.Stat(oldFilePath); os.IsNotExist(err) { + // we want to delete everything older than ttl + var errList error + err := filepath.Walk(store.archiveDir, func(path string, info os.FileInfo, err error) error { + if err != nil { + errList = errs.Combine(errList, OrderError.Wrap(err)) + return nil + } + if info.IsDir() { + return nil + } + archivedAt, err := getArchivedFileInfo(info.Name()) + if err != nil { + errList = errs.Combine(errList, err) + return nil + } + if archivedAt.Before(deleteBefore) { + return OrderError.Wrap(os.Remove(path)) + } return nil - } + }) - // set file to readonly - err := os.Chmod(oldFilePath, 0444) - if err != nil { - return err - } - // make new file suffix the current time in case there are other "ready" files already - timeStr := strconv.FormatInt(time.Now().UnixNano(), 10) - newFilePath := filepath.Join(store.unsentDir, unsentReadyFilePrefix+timeStr) - // rename file - return os.Rename(oldFilePath, newFilePath) + return errs.Combine(errList, err) } -// getUnsentStagingFile creates or gets the order limit file for appending unsent orders to. -// it expects the caller to lock the store's mutex before calling, and to handle closing the returned file. -func (store *FileStore) getUnsentStagingFile() (*os.File, error) { +// getUnsentFile creates or gets the order limit file for appending unsent orders to. +// There is a different file for each satellite and creation hour. +// It expects the caller to lock the store's mutex before calling, and to handle closing the returned file. +func (store *FileStore) getUnsentFile(satelliteID storj.NodeID, creationTime time.Time) (*os.File, error) { if _, err := os.Stat(store.unsentDir); os.IsNotExist(err) { err = os.Mkdir(store.unsentDir, 0700) if err != nil { @@ -187,7 +302,7 @@ func (store *FileStore) getUnsentStagingFile() (*os.File, error) { } } - fileName := unsentStagingFileName + fileName := unsentFilePrefix + satelliteID.String() + "-" + getCreationHourString(creationTime) filePath := filepath.Join(store.unsentDir, fileName) // create file if not exists or append f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) @@ -197,6 +312,85 @@ func (store *FileStore) getUnsentStagingFile() (*os.File, error) { return f, nil } +func getCreationHourString(t time.Time) string { + creationHour := t.Truncate(time.Hour) + timeStr := strconv.FormatInt(creationHour.UnixNano(), 10) + return timeStr +} + +// gracePeriodPassed determines whether enough time has passed that no new orders will be added to a file. +func (store *FileStore) gracePeriodPassed(createdHour time.Time) bool { + canSendCutoff := time.Now().Add(-store.orderLimitGracePeriod) + // add one hour to include order limits in file added at end of createdHour + return createdHour.Add(time.Hour).Before(canSendCutoff) +} + +// getNewArchiveFile creates the order limit file for appending archived orders to. +// it expects the caller to lock the store's mutex before calling, and to handle closing the returned file. +func (store *FileStore) getNewArchiveFile(archivedAt time.Time) (*os.File, error) { + if _, err := os.Stat(store.archiveDir); os.IsNotExist(err) { + err = os.Mkdir(store.archiveDir, 0700) + if err != nil { + return nil, OrderError.Wrap(err) + } + } + + // suffix of filename is the archivedAt time + timeStr := strconv.FormatInt(archivedAt.UnixNano(), 10) + newFilePath := filepath.Join(store.archiveDir, archiveFilePrefix+timeStr) + // create file if not exists or append + f, err := os.OpenFile(newFilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return nil, OrderError.Wrap(err) + } + return f, nil +} + +// getUnsentFileInfo gets the satellite ID and created hour from a filename. +// it expects the file name to be in the format "unsent-orders--" +func getUnsentFileInfo(name string) (satellite storj.NodeID, createdHour time.Time, err error) { + if !strings.HasPrefix(name, unsentFilePrefix) { + return storj.NodeID{}, time.Time{}, OrderError.New("Not a valid unsent order file name: %s", name) + } + // chop off prefix to get satellite ID and created hours + infoStr := name[len(unsentFilePrefix):] + infoSlice := strings.Split(infoStr, "-") + if len(infoSlice) != 2 { + return storj.NodeID{}, time.Time{}, OrderError.New("Not a valid unsent order file name: %s", name) + } + + satelliteIDStr := infoSlice[0] + satelliteID, err := storj.NodeIDFromString(satelliteIDStr) + if err != nil { + return storj.NodeID{}, time.Time{}, OrderError.New("Not a valid unsent order file name: %s", name) + } + + timeStr := infoSlice[1] + createdHourUnixNano, err := strconv.ParseInt(timeStr, 10, 64) + if err != nil { + return satelliteID, time.Time{}, OrderError.Wrap(err) + } + createdAtHour := time.Unix(0, createdHourUnixNano) + + return satelliteID, createdAtHour, nil +} + +// getArchivedFileInfo gets the archived at time from an archive file name. +// it expects the file name to be in the format "archived-orders-" +func getArchivedFileInfo(name string) (time.Time, error) { + if !strings.HasPrefix(name, archiveFilePrefix) { + return time.Time{}, OrderError.New("Not a valid archived file name: %s", name) + } + // chop off prefix to get archived at string + archivedAtStr := name[len(archiveFilePrefix):] + archivedAtUnixNano, err := strconv.ParseInt(archivedAtStr, 10, 64) + if err != nil { + return time.Time{}, OrderError.Wrap(err) + } + archivedAt := time.Unix(0, archivedAtUnixNano) + return archivedAt, nil +} + // writeLimit writes the size of the order limit bytes, followed by the order limit bytes. // it expects the caller to have locked the mutex. func writeLimit(f io.Writer, limit *pb.OrderLimit) error { @@ -277,3 +471,22 @@ func readOrder(f io.Reader) (*pb.Order, error) { } return order, nil } + +// writeStatus writes the satellite response status of an archived order. +// it expects the caller to have locked the mutex. +func writeStatus(f io.Writer, status Status) error { + if _, err := f.Write([]byte{byte(status)}); err != nil { + return OrderError.New("Error writing status: %w", err) + } + return nil +} + +// readStatus reads the status of an archived order limit. +func readStatus(f io.Reader) (Status, error) { + statusBytes := [1]byte{} + _, err := io.ReadFull(f, statusBytes[:]) + if err != nil { + return 0, OrderError.Wrap(err) + } + return Status(statusBytes[0]), nil +} diff --git a/storagenode/orders/store_test.go b/storagenode/orders/store_test.go index d1c27af3e..f30afe6eb 100644 --- a/storagenode/orders/store_test.go +++ b/storagenode/orders/store_test.go @@ -1,7 +1,7 @@ // Copyright (C) 2020 Storj Labs, Inc. // See LICENSE for copying information. -package orders_test +package orders import ( "testing" @@ -13,7 +13,6 @@ import ( "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/common/testrand" - "storj.io/storj/storagenode/orders" ) func TestOrdersStore(t *testing.T) { @@ -21,79 +20,137 @@ func TestOrdersStore(t *testing.T) { defer ctx.Cleanup() dirName := ctx.Dir("test-orders") - ordersStore := orders.NewFileStore(dirName) + // make order limit grace period 24 hours + ordersStore := NewFileStore(dirName, 24*time.Hour) + // adding order before grace period should result in an error + newSN := testrand.SerialNumber() + newInfo := &Info{ + Limit: &pb.OrderLimit{ + SerialNumber: newSN, + SatelliteId: testrand.NodeID(), + Action: pb.PieceAction_GET, + OrderCreation: time.Now().Add(-48 * time.Hour), + OrderExpiration: time.Now().Add(time.Hour), + }, + Order: &pb.Order{ + SerialNumber: newSN, + Amount: 10, + }, + } + err := ordersStore.Enqueue(newInfo) + require.Error(t, err) + + // for each satellite, make three orders from four hours ago, three from two hours ago, and three from now. numSatellites := 3 - serialsPerSat := 5 - originalInfos, err := storeNewOrders(ordersStore, numSatellites, serialsPerSat) + createdTimes := []time.Time{ + time.Now().Add(-4 * time.Hour), + time.Now().Add(-2 * time.Hour), + time.Now(), + } + serialsPerSatPerTime := 3 + + originalInfos, err := storeNewOrders(ordersStore, numSatellites, serialsPerSatPerTime, createdTimes) require.NoError(t, err) - unsentMap, err := ordersStore.ListUnsentBySatellite() - require.NoError(t, err) + // update grace period so that some of the order limits are considered created before the grace period + ordersStore.orderLimitGracePeriod = time.Hour - // go through order limits and make sure information is accurate - require.Len(t, unsentMap, numSatellites) - for _, unsentSatList := range unsentMap { - require.Len(t, unsentSatList, serialsPerSat) + // 3 times: + // list unsent orders - should receive data from all satellites the first two times, and nothing the last time. + // add listed orders to batches for archival + // delete unsent file for each returned satellite/createdAt bucket + originalArchivedInfos := make(map[storj.SerialNumber]*ArchivedInfo) + archiveBatches := [][]*ArchivedInfo{} + archiveTime1 := time.Now().Add(-2 * time.Hour) + archiveTime2 := time.Now() + for i := 0; i < 3; i++ { + unsentMap, err := ordersStore.ListUnsentBySatellite() + require.NoError(t, err) - for _, unsentInfo := range unsentSatList { - sn := unsentInfo.Limit.SerialNumber - originalInfo := originalInfos[sn] - - verifyInfosEqual(t, unsentInfo, originalInfo) + // on last iteration, expect nothing returned + if i == 2 { + require.Len(t, unsentMap, 0) + break } - } + archiveBatch := []*ArchivedInfo{} + // go through order limits and make sure information is accurate + require.Len(t, unsentMap, numSatellites) + for satelliteID, unsentSatList := range unsentMap { + require.Len(t, unsentSatList.InfoList, serialsPerSatPerTime) - // add some more orders and list again - // we should see everything we added before, plus the new ones - newInfos, err := storeNewOrders(ordersStore, numSatellites, serialsPerSat) - require.NoError(t, err) - for sn, info := range newInfos { - originalInfos[sn] = info - } - // because we have stored two times, we have twice as many serials... - require.Len(t, originalInfos, 2*numSatellites*serialsPerSat) + for _, unsentInfo := range unsentSatList.InfoList { + // "new" orders should not be returned + require.True(t, unsentInfo.Limit.OrderCreation.Before(createdTimes[2])) + sn := unsentInfo.Limit.SerialNumber + originalInfo := originalInfos[sn] - unsentMap, err = ordersStore.ListUnsentBySatellite() - require.NoError(t, err) - // ...and twice as many satellites. - require.Len(t, unsentMap, 2*numSatellites) - for _, unsentSatList := range unsentMap { - require.Len(t, unsentSatList, serialsPerSat) + verifyInfosEqual(t, unsentInfo, originalInfo) + // expect that creation hour is consistent with order + require.Equal(t, unsentSatList.CreatedAtHour.UTC(), unsentInfo.Limit.OrderCreation.Truncate(time.Hour).UTC()) - for _, unsentInfo := range unsentSatList { - sn := unsentInfo.Limit.SerialNumber - originalInfo := originalInfos[sn] + // add to archive batch + archivedAt := archiveTime1 + if i == 1 { + archivedAt = archiveTime2 + } + newArchivedInfo := &ArchivedInfo{ + Limit: unsentInfo.Limit, + Order: unsentInfo.Order, + Status: StatusAccepted, + ArchivedAt: archivedAt, + } + originalArchivedInfos[unsentInfo.Limit.SerialNumber] = newArchivedInfo + archiveBatch = append(archiveBatch, newArchivedInfo) + } - verifyInfosEqual(t, unsentInfo, originalInfo) + // delete unsent file for this satellite/creation hour + err = ordersStore.DeleteUnsentFile(satelliteID, unsentSatList.CreatedAtHour) + require.NoError(t, err) } + // add archive batch to archiveBatches + archiveBatches = append(archiveBatches, archiveBatch) + } + // archive first batch two hours ago, archive second batch now + err = ordersStore.Archive(archiveTime1, archiveBatches[0]...) + require.NoError(t, err) + err = ordersStore.Archive(archiveTime2, archiveBatches[1]...) + require.NoError(t, err) + + // list archived, expect everything from first two created at time buckets + archived, err := ordersStore.ListArchived() + require.NoError(t, err) + require.Len(t, archived, numSatellites*serialsPerSatPerTime*2) + for _, archivedInfo := range archived { + sn := archivedInfo.Limit.SerialNumber + originalInfo := originalArchivedInfos[sn] + verifyArchivedInfosEqual(t, archivedInfo, originalInfo) } - // now, add another order, delete ready to send files, and list - // we should only see the new order - originalInfos, err = storeNewOrders(ordersStore, 1, 1) + // clean archive for anything older than 30 minutes + err = ordersStore.CleanArchive(time.Now().Add(-30 * time.Minute)) require.NoError(t, err) - err = ordersStore.DeleteReadyToSendFiles() + // list archived, expect only recent archived batch (other was cleaned) + archived, err = ordersStore.ListArchived() require.NoError(t, err) - - unsentMap, err = ordersStore.ListUnsentBySatellite() - require.NoError(t, err) - require.Len(t, unsentMap, 1) - for _, unsentSatList := range unsentMap { - require.Len(t, unsentSatList, 1) - for _, unsentInfo := range unsentSatList { - sn := unsentInfo.Limit.SerialNumber - originalInfo := originalInfos[sn] - - verifyInfosEqual(t, unsentInfo, originalInfo) - } + require.Len(t, archived, numSatellites*serialsPerSatPerTime) + for _, archivedInfo := range archived { + sn := archivedInfo.Limit.SerialNumber + originalInfo := originalArchivedInfos[sn] + verifyArchivedInfosEqual(t, archivedInfo, originalInfo) + require.Equal(t, archivedInfo.ArchivedAt.Round(0), archiveTime2.Round(0)) } - // TODO test order archival + // clean archive for everything before now, expect list to return nothing + err = ordersStore.CleanArchive(time.Now()) + require.NoError(t, err) + archived, err = ordersStore.ListArchived() + require.NoError(t, err) + require.Len(t, archived, 0) } -func verifyInfosEqual(t *testing.T, a, b *orders.Info) { +func verifyInfosEqual(t *testing.T, a, b *Info) { t.Helper() require.NotNil(t, a) @@ -109,46 +166,63 @@ func verifyInfosEqual(t *testing.T, a, b *orders.Info) { } -func storeNewOrders(ordersStore *orders.FileStore, numSatellites, numOrdersPerSatellite int) (map[storj.SerialNumber]*orders.Info, error) { +func verifyArchivedInfosEqual(t *testing.T, a, b *ArchivedInfo) { + t.Helper() + + require.NotNil(t, a) + require.NotNil(t, b) + + require.Equal(t, a.Limit.SerialNumber, b.Limit.SerialNumber) + require.Equal(t, a.Limit.SatelliteId, b.Limit.SatelliteId) + require.Equal(t, a.Limit.OrderExpiration.UTC(), b.Limit.OrderExpiration.UTC()) + require.Equal(t, a.Limit.Action, b.Limit.Action) + + require.Equal(t, a.Order.Amount, b.Order.Amount) + require.Equal(t, a.Order.SerialNumber, b.Order.SerialNumber) + + require.Equal(t, a.Status, b.Status) + require.Equal(t, a.ArchivedAt.UTC(), b.ArchivedAt.UTC()) +} + +func storeNewOrders(ordersStore *FileStore, numSatellites, numOrdersPerSatPerTime int, createdAtTimes []time.Time) (map[storj.SerialNumber]*Info, error) { actions := []pb.PieceAction{ pb.PieceAction_GET, pb.PieceAction_PUT_REPAIR, pb.PieceAction_GET_AUDIT, } - originalInfos := make(map[storj.SerialNumber]*orders.Info) + originalInfos := make(map[storj.SerialNumber]*Info) for i := 0; i < numSatellites; i++ { satellite := testrand.NodeID() - // for each satellite, half of the orders will expire in an hour - // and half will expire in three hours. - for j := 0; j < numOrdersPerSatellite; j++ { - expiration := time.Now().Add(time.Hour) - if j < 2 { - expiration = time.Now().Add(3 * time.Hour) - } - amount := testrand.Int63n(1000) - sn := testrand.SerialNumber() - action := actions[j%len(actions)] + for _, createdAt := range createdAtTimes { + for j := 0; j < numOrdersPerSatPerTime; j++ { + expiration := time.Now().Add(time.Hour) + amount := testrand.Int63n(1000) + sn := testrand.SerialNumber() + action := actions[j%len(actions)] - newInfo := &orders.Info{ - Limit: &pb.OrderLimit{ - SerialNumber: sn, - SatelliteId: satellite, - Action: action, - OrderExpiration: expiration, - }, - Order: &pb.Order{ - SerialNumber: sn, - Amount: amount, - }, - } - originalInfos[sn] = newInfo + newInfo := &Info{ + Limit: &pb.OrderLimit{ + SerialNumber: sn, + SatelliteId: satellite, + Action: action, + OrderCreation: createdAt, + OrderExpiration: expiration, + }, + Order: &pb.Order{ + SerialNumber: sn, + Amount: amount, + }, + } + originalInfos[sn] = newInfo - // store the new info in the orders store - err := ordersStore.Enqueue(newInfo) - if err != nil { - return originalInfos, err + // store the new info in the orders store + err := ordersStore.Enqueue(newInfo) + if err != nil { + return originalInfos, err + } } + } } return originalInfos, nil