storagenode/orders: Add archival functionality to orders filestore
* Allow orders to be archived after being settled successfully with the satellite. * Allow for cleanup of orders that were archived before a certain time. * Rewrite other parts of the orders file store to work better with new design. Change-Id: I39bea96d80e66a324ec522745169bd6d8b351751
This commit is contained in:
parent
e8ef9b76bb
commit
be59727790
@ -9,6 +9,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -19,8 +20,8 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
unsentStagingFileName = "unsent-orders-staging"
|
unsentFilePrefix = "unsent-orders-"
|
||||||
unsentReadyFilePrefix = "unsent-orders-ready-"
|
archiveFilePrefix = "archived-orders-"
|
||||||
)
|
)
|
||||||
|
|
||||||
// FileStore implements the orders.Store interface by appending orders to flat files.
|
// FileStore implements the orders.Store interface by appending orders to flat files.
|
||||||
@ -28,29 +29,43 @@ type FileStore struct {
|
|||||||
ordersDir string
|
ordersDir string
|
||||||
unsentDir string
|
unsentDir string
|
||||||
archiveDir string
|
archiveDir string
|
||||||
mu sync.Mutex
|
// mutex for unsent directory
|
||||||
|
unsentMu sync.Mutex
|
||||||
|
// mutex for archive directory
|
||||||
|
archiveMu sync.Mutex
|
||||||
|
|
||||||
|
// how long after OrderLimit creation date are OrderLimits no longer accepted (piecestore Config)
|
||||||
|
orderLimitGracePeriod time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewFileStore creates a new orders file store.
|
// NewFileStore creates a new orders file store.
|
||||||
func NewFileStore(ordersDir string) *FileStore {
|
func NewFileStore(ordersDir string, orderLimitGracePeriod time.Duration) *FileStore {
|
||||||
return &FileStore{
|
return &FileStore{
|
||||||
ordersDir: ordersDir,
|
ordersDir: ordersDir,
|
||||||
unsentDir: filepath.Join(ordersDir, "unsent"),
|
unsentDir: filepath.Join(ordersDir, "unsent"),
|
||||||
archiveDir: filepath.Join(ordersDir, "archive"),
|
archiveDir: filepath.Join(ordersDir, "archive"),
|
||||||
|
|
||||||
|
orderLimitGracePeriod: orderLimitGracePeriod,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Enqueue inserts order to the list of orders needing to be sent to the satellite.
|
// Enqueue inserts order to be sent at the end of the unsent file for a particular creation hour.
|
||||||
|
// It assumes the order is not being queued after the order limit grace period.
|
||||||
func (store *FileStore) Enqueue(info *Info) (err error) {
|
func (store *FileStore) Enqueue(info *Info) (err error) {
|
||||||
store.mu.Lock()
|
store.unsentMu.Lock()
|
||||||
defer store.mu.Unlock()
|
defer store.unsentMu.Unlock()
|
||||||
|
|
||||||
f, err := store.getUnsentStagingFile()
|
// if the grace period has already passed, do not enqueue this order
|
||||||
|
if store.gracePeriodPassed(info.Limit.OrderCreation.Truncate(time.Hour)) {
|
||||||
|
return OrderError.New("grace period passed for order limit")
|
||||||
|
}
|
||||||
|
|
||||||
|
f, err := store.getUnsentFile(info.Limit.SatelliteId, info.Limit.OrderCreation)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return OrderError.Wrap(err)
|
return OrderError.Wrap(err)
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
err = errs.Combine(err, f.Close())
|
err = errs.Combine(err, OrderError.Wrap(f.Close()))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
err = writeLimit(f, info.Limit)
|
err = writeLimit(f, info.Limit)
|
||||||
@ -64,36 +79,53 @@ func (store *FileStore) Enqueue(info *Info) (err error) {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListUnsentBySatellite returns orders that haven't been sent yet grouped by satellite.
|
// UnsentInfo is a struct containing a window of orders for a satellite and order creation hour.
|
||||||
// It copies the staging file to a read-only "ready to send" file first.
|
type UnsentInfo struct {
|
||||||
// It should never be called concurrently with DeleteReadyToSendFiles.
|
CreatedAtHour time.Time
|
||||||
func (store *FileStore) ListUnsentBySatellite() (infoMap map[storj.NodeID][]*Info, err error) {
|
InfoList []*Info
|
||||||
err = store.convertUnsentStagingToReady()
|
}
|
||||||
if err != nil {
|
|
||||||
return infoMap, err
|
// ListUnsentBySatellite returns one window of orders that haven't been sent yet, grouped by satellite.
|
||||||
}
|
// It only reads files where the order limit grace period has passed, meaning no new orders will be appended.
|
||||||
|
// There is a separate window for each created at hour, so if a satellite has 2 windows, `ListUnsentBySatellite`
|
||||||
|
// needs to be called twice, with calls to `DeleteUnsentFile` in between each call, to see all unsent orders.
|
||||||
|
func (store *FileStore) ListUnsentBySatellite() (infoMap map[storj.NodeID]UnsentInfo, err error) {
|
||||||
|
store.unsentMu.Lock()
|
||||||
|
defer store.unsentMu.Unlock()
|
||||||
|
|
||||||
var errList error
|
var errList error
|
||||||
infoMap = make(map[storj.NodeID][]*Info)
|
infoMap = make(map[storj.NodeID]UnsentInfo)
|
||||||
|
|
||||||
err = filepath.Walk(store.unsentDir, func(path string, info os.FileInfo, err error) error {
|
err = filepath.Walk(store.unsentDir, func(path string, info os.FileInfo, err error) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errList = errs.Combine(errList, err)
|
errList = errs.Combine(errList, OrderError.Wrap(err))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if info.IsDir() {
|
if info.IsDir() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if info.Name() == unsentStagingFileName {
|
satelliteID, createdAtHour, err := getUnsentFileInfo(info.Name())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// if we already have orders for this satellite, ignore the file
|
||||||
|
if _, ok := infoMap[satelliteID]; ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
// if orders can still be added to file, ignore it.
|
||||||
|
if !store.gracePeriodPassed(createdAtHour) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
newUnsentInfo := UnsentInfo{
|
||||||
|
CreatedAtHour: createdAtHour,
|
||||||
|
}
|
||||||
|
|
||||||
f, err := os.Open(path)
|
f, err := os.Open(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return OrderError.Wrap(err)
|
return OrderError.Wrap(err)
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
err = errs.Combine(err, f.Close())
|
err = errs.Combine(err, OrderError.Wrap(f.Close()))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
@ -113,10 +145,10 @@ func (store *FileStore) ListUnsentBySatellite() (infoMap map[storj.NodeID][]*Inf
|
|||||||
Limit: limit,
|
Limit: limit,
|
||||||
Order: order,
|
Order: order,
|
||||||
}
|
}
|
||||||
infoList := infoMap[limit.SatelliteId]
|
newUnsentInfo.InfoList = append(newUnsentInfo.InfoList, newInfo)
|
||||||
infoList = append(infoList, newInfo)
|
|
||||||
infoMap[limit.SatelliteId] = infoList
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
infoMap[satelliteID] = newUnsentInfo
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -126,13 +158,56 @@ func (store *FileStore) ListUnsentBySatellite() (infoMap map[storj.NodeID][]*Inf
|
|||||||
return infoMap, errList
|
return infoMap, errList
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteReadyToSendFiles deletes all non-staging files in the "unsent" directory.
|
// DeleteUnsentFile deletes an unsent-orders file for a satellite ID and created hour.
|
||||||
// It should be called after the order limits have been sent.
|
func (store *FileStore) DeleteUnsentFile(satelliteID storj.NodeID, createdAtHour time.Time) error {
|
||||||
// It should never be called concurrently with ListUnsentBySatellite.
|
store.unsentMu.Lock()
|
||||||
func (store *FileStore) DeleteReadyToSendFiles() (err error) {
|
defer store.unsentMu.Unlock()
|
||||||
var errList error
|
|
||||||
|
|
||||||
err = filepath.Walk(store.unsentDir, func(path string, info os.FileInfo, err error) error {
|
fileName := unsentFilePrefix + satelliteID.String() + "-" + getCreationHourString(createdAtHour)
|
||||||
|
filePath := filepath.Join(store.unsentDir, fileName)
|
||||||
|
|
||||||
|
return OrderError.Wrap(os.Remove(filePath))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Archive marks order as being settled.
|
||||||
|
func (store *FileStore) Archive(archivedAt time.Time, requests ...*ArchivedInfo) error {
|
||||||
|
store.archiveMu.Lock()
|
||||||
|
defer store.archiveMu.Unlock()
|
||||||
|
|
||||||
|
f, err := store.getNewArchiveFile(archivedAt)
|
||||||
|
if err != nil {
|
||||||
|
return OrderError.Wrap(err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
err = errs.Combine(err, OrderError.Wrap(f.Close()))
|
||||||
|
}()
|
||||||
|
|
||||||
|
for _, info := range requests {
|
||||||
|
err = writeStatus(f, info.Status)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = writeLimit(f, info.Limit)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = writeOrder(f, info.Order)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListArchived returns orders that have been sent.
|
||||||
|
func (store *FileStore) ListArchived() ([]*ArchivedInfo, error) {
|
||||||
|
store.archiveMu.Lock()
|
||||||
|
defer store.archiveMu.Unlock()
|
||||||
|
|
||||||
|
var errList error
|
||||||
|
archivedList := []*ArchivedInfo{}
|
||||||
|
|
||||||
|
err := filepath.Walk(store.archiveDir, func(path string, info os.FileInfo, err error) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errList = errs.Combine(errList, OrderError.Wrap(err))
|
errList = errs.Combine(errList, OrderError.Wrap(err))
|
||||||
return nil
|
return nil
|
||||||
@ -140,46 +215,86 @@ func (store *FileStore) DeleteReadyToSendFiles() (err error) {
|
|||||||
if info.IsDir() {
|
if info.IsDir() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if info.Name() == unsentStagingFileName {
|
archivedAt, err := getArchivedFileInfo(info.Name())
|
||||||
return nil
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
// delete all non-staging files
|
|
||||||
return OrderError.Wrap(os.Remove(path))
|
f, err := os.Open(path)
|
||||||
|
if err != nil {
|
||||||
|
return OrderError.Wrap(err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
err = errs.Combine(err, OrderError.Wrap(f.Close()))
|
||||||
|
}()
|
||||||
|
|
||||||
|
for {
|
||||||
|
status, err := readStatus(f)
|
||||||
|
if err != nil {
|
||||||
|
if errs.Is(err, io.EOF) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
limit, err := readLimit(f)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
order, err := readOrder(f)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
newInfo := &ArchivedInfo{
|
||||||
|
Limit: limit,
|
||||||
|
Order: order,
|
||||||
|
Status: status,
|
||||||
|
ArchivedAt: archivedAt,
|
||||||
|
}
|
||||||
|
archivedList = append(archivedList, newInfo)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errList = errs.Combine(errList, err)
|
errList = errs.Combine(errList, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return errList
|
return archivedList, errList
|
||||||
}
|
}
|
||||||
|
|
||||||
// convertUnsentStagingToReady converts the unsent staging file to be read only, and renames it.
|
// CleanArchive deletes all entries archvied before the provided time.
|
||||||
func (store *FileStore) convertUnsentStagingToReady() error {
|
func (store *FileStore) CleanArchive(deleteBefore time.Time) error {
|
||||||
// lock mutex so no one tries to write to the file while we do this
|
store.archiveMu.Lock()
|
||||||
store.mu.Lock()
|
defer store.archiveMu.Unlock()
|
||||||
defer store.mu.Unlock()
|
|
||||||
|
|
||||||
oldFileName := unsentStagingFileName
|
// we want to delete everything older than ttl
|
||||||
oldFilePath := filepath.Join(store.unsentDir, oldFileName)
|
var errList error
|
||||||
if _, err := os.Stat(oldFilePath); os.IsNotExist(err) {
|
err := filepath.Walk(store.archiveDir, func(path string, info os.FileInfo, err error) error {
|
||||||
|
if err != nil {
|
||||||
|
errList = errs.Combine(errList, OrderError.Wrap(err))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if info.IsDir() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
archivedAt, err := getArchivedFileInfo(info.Name())
|
||||||
|
if err != nil {
|
||||||
|
errList = errs.Combine(errList, err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if archivedAt.Before(deleteBefore) {
|
||||||
|
return OrderError.Wrap(os.Remove(path))
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
})
|
||||||
|
|
||||||
// set file to readonly
|
return errs.Combine(errList, err)
|
||||||
err := os.Chmod(oldFilePath, 0444)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// make new file suffix the current time in case there are other "ready" files already
|
|
||||||
timeStr := strconv.FormatInt(time.Now().UnixNano(), 10)
|
|
||||||
newFilePath := filepath.Join(store.unsentDir, unsentReadyFilePrefix+timeStr)
|
|
||||||
// rename file
|
|
||||||
return os.Rename(oldFilePath, newFilePath)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// getUnsentStagingFile creates or gets the order limit file for appending unsent orders to.
|
// getUnsentFile creates or gets the order limit file for appending unsent orders to.
|
||||||
// it expects the caller to lock the store's mutex before calling, and to handle closing the returned file.
|
// There is a different file for each satellite and creation hour.
|
||||||
func (store *FileStore) getUnsentStagingFile() (*os.File, error) {
|
// It expects the caller to lock the store's mutex before calling, and to handle closing the returned file.
|
||||||
|
func (store *FileStore) getUnsentFile(satelliteID storj.NodeID, creationTime time.Time) (*os.File, error) {
|
||||||
if _, err := os.Stat(store.unsentDir); os.IsNotExist(err) {
|
if _, err := os.Stat(store.unsentDir); os.IsNotExist(err) {
|
||||||
err = os.Mkdir(store.unsentDir, 0700)
|
err = os.Mkdir(store.unsentDir, 0700)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -187,7 +302,7 @@ func (store *FileStore) getUnsentStagingFile() (*os.File, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fileName := unsentStagingFileName
|
fileName := unsentFilePrefix + satelliteID.String() + "-" + getCreationHourString(creationTime)
|
||||||
filePath := filepath.Join(store.unsentDir, fileName)
|
filePath := filepath.Join(store.unsentDir, fileName)
|
||||||
// create file if not exists or append
|
// create file if not exists or append
|
||||||
f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
||||||
@ -197,6 +312,85 @@ func (store *FileStore) getUnsentStagingFile() (*os.File, error) {
|
|||||||
return f, nil
|
return f, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getCreationHourString(t time.Time) string {
|
||||||
|
creationHour := t.Truncate(time.Hour)
|
||||||
|
timeStr := strconv.FormatInt(creationHour.UnixNano(), 10)
|
||||||
|
return timeStr
|
||||||
|
}
|
||||||
|
|
||||||
|
// gracePeriodPassed determines whether enough time has passed that no new orders will be added to a file.
|
||||||
|
func (store *FileStore) gracePeriodPassed(createdHour time.Time) bool {
|
||||||
|
canSendCutoff := time.Now().Add(-store.orderLimitGracePeriod)
|
||||||
|
// add one hour to include order limits in file added at end of createdHour
|
||||||
|
return createdHour.Add(time.Hour).Before(canSendCutoff)
|
||||||
|
}
|
||||||
|
|
||||||
|
// getNewArchiveFile creates the order limit file for appending archived orders to.
|
||||||
|
// it expects the caller to lock the store's mutex before calling, and to handle closing the returned file.
|
||||||
|
func (store *FileStore) getNewArchiveFile(archivedAt time.Time) (*os.File, error) {
|
||||||
|
if _, err := os.Stat(store.archiveDir); os.IsNotExist(err) {
|
||||||
|
err = os.Mkdir(store.archiveDir, 0700)
|
||||||
|
if err != nil {
|
||||||
|
return nil, OrderError.Wrap(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// suffix of filename is the archivedAt time
|
||||||
|
timeStr := strconv.FormatInt(archivedAt.UnixNano(), 10)
|
||||||
|
newFilePath := filepath.Join(store.archiveDir, archiveFilePrefix+timeStr)
|
||||||
|
// create file if not exists or append
|
||||||
|
f, err := os.OpenFile(newFilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
||||||
|
if err != nil {
|
||||||
|
return nil, OrderError.Wrap(err)
|
||||||
|
}
|
||||||
|
return f, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// getUnsentFileInfo gets the satellite ID and created hour from a filename.
|
||||||
|
// it expects the file name to be in the format "unsent-orders-<satelliteID>-<createdAtHour>"
|
||||||
|
func getUnsentFileInfo(name string) (satellite storj.NodeID, createdHour time.Time, err error) {
|
||||||
|
if !strings.HasPrefix(name, unsentFilePrefix) {
|
||||||
|
return storj.NodeID{}, time.Time{}, OrderError.New("Not a valid unsent order file name: %s", name)
|
||||||
|
}
|
||||||
|
// chop off prefix to get satellite ID and created hours
|
||||||
|
infoStr := name[len(unsentFilePrefix):]
|
||||||
|
infoSlice := strings.Split(infoStr, "-")
|
||||||
|
if len(infoSlice) != 2 {
|
||||||
|
return storj.NodeID{}, time.Time{}, OrderError.New("Not a valid unsent order file name: %s", name)
|
||||||
|
}
|
||||||
|
|
||||||
|
satelliteIDStr := infoSlice[0]
|
||||||
|
satelliteID, err := storj.NodeIDFromString(satelliteIDStr)
|
||||||
|
if err != nil {
|
||||||
|
return storj.NodeID{}, time.Time{}, OrderError.New("Not a valid unsent order file name: %s", name)
|
||||||
|
}
|
||||||
|
|
||||||
|
timeStr := infoSlice[1]
|
||||||
|
createdHourUnixNano, err := strconv.ParseInt(timeStr, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return satelliteID, time.Time{}, OrderError.Wrap(err)
|
||||||
|
}
|
||||||
|
createdAtHour := time.Unix(0, createdHourUnixNano)
|
||||||
|
|
||||||
|
return satelliteID, createdAtHour, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// getArchivedFileInfo gets the archived at time from an archive file name.
|
||||||
|
// it expects the file name to be in the format "archived-orders-<archviedAtTime>"
|
||||||
|
func getArchivedFileInfo(name string) (time.Time, error) {
|
||||||
|
if !strings.HasPrefix(name, archiveFilePrefix) {
|
||||||
|
return time.Time{}, OrderError.New("Not a valid archived file name: %s", name)
|
||||||
|
}
|
||||||
|
// chop off prefix to get archived at string
|
||||||
|
archivedAtStr := name[len(archiveFilePrefix):]
|
||||||
|
archivedAtUnixNano, err := strconv.ParseInt(archivedAtStr, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return time.Time{}, OrderError.Wrap(err)
|
||||||
|
}
|
||||||
|
archivedAt := time.Unix(0, archivedAtUnixNano)
|
||||||
|
return archivedAt, nil
|
||||||
|
}
|
||||||
|
|
||||||
// writeLimit writes the size of the order limit bytes, followed by the order limit bytes.
|
// writeLimit writes the size of the order limit bytes, followed by the order limit bytes.
|
||||||
// it expects the caller to have locked the mutex.
|
// it expects the caller to have locked the mutex.
|
||||||
func writeLimit(f io.Writer, limit *pb.OrderLimit) error {
|
func writeLimit(f io.Writer, limit *pb.OrderLimit) error {
|
||||||
@ -277,3 +471,22 @@ func readOrder(f io.Reader) (*pb.Order, error) {
|
|||||||
}
|
}
|
||||||
return order, nil
|
return order, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// writeStatus writes the satellite response status of an archived order.
|
||||||
|
// it expects the caller to have locked the mutex.
|
||||||
|
func writeStatus(f io.Writer, status Status) error {
|
||||||
|
if _, err := f.Write([]byte{byte(status)}); err != nil {
|
||||||
|
return OrderError.New("Error writing status: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// readStatus reads the status of an archived order limit.
|
||||||
|
func readStatus(f io.Reader) (Status, error) {
|
||||||
|
statusBytes := [1]byte{}
|
||||||
|
_, err := io.ReadFull(f, statusBytes[:])
|
||||||
|
if err != nil {
|
||||||
|
return 0, OrderError.Wrap(err)
|
||||||
|
}
|
||||||
|
return Status(statusBytes[0]), nil
|
||||||
|
}
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
// Copyright (C) 2020 Storj Labs, Inc.
|
// Copyright (C) 2020 Storj Labs, Inc.
|
||||||
// See LICENSE for copying information.
|
// See LICENSE for copying information.
|
||||||
|
|
||||||
package orders_test
|
package orders
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
@ -13,7 +13,6 @@ import (
|
|||||||
"storj.io/common/storj"
|
"storj.io/common/storj"
|
||||||
"storj.io/common/testcontext"
|
"storj.io/common/testcontext"
|
||||||
"storj.io/common/testrand"
|
"storj.io/common/testrand"
|
||||||
"storj.io/storj/storagenode/orders"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestOrdersStore(t *testing.T) {
|
func TestOrdersStore(t *testing.T) {
|
||||||
@ -21,79 +20,137 @@ func TestOrdersStore(t *testing.T) {
|
|||||||
defer ctx.Cleanup()
|
defer ctx.Cleanup()
|
||||||
dirName := ctx.Dir("test-orders")
|
dirName := ctx.Dir("test-orders")
|
||||||
|
|
||||||
ordersStore := orders.NewFileStore(dirName)
|
// make order limit grace period 24 hours
|
||||||
|
ordersStore := NewFileStore(dirName, 24*time.Hour)
|
||||||
|
|
||||||
|
// adding order before grace period should result in an error
|
||||||
|
newSN := testrand.SerialNumber()
|
||||||
|
newInfo := &Info{
|
||||||
|
Limit: &pb.OrderLimit{
|
||||||
|
SerialNumber: newSN,
|
||||||
|
SatelliteId: testrand.NodeID(),
|
||||||
|
Action: pb.PieceAction_GET,
|
||||||
|
OrderCreation: time.Now().Add(-48 * time.Hour),
|
||||||
|
OrderExpiration: time.Now().Add(time.Hour),
|
||||||
|
},
|
||||||
|
Order: &pb.Order{
|
||||||
|
SerialNumber: newSN,
|
||||||
|
Amount: 10,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err := ordersStore.Enqueue(newInfo)
|
||||||
|
require.Error(t, err)
|
||||||
|
|
||||||
|
// for each satellite, make three orders from four hours ago, three from two hours ago, and three from now.
|
||||||
numSatellites := 3
|
numSatellites := 3
|
||||||
serialsPerSat := 5
|
createdTimes := []time.Time{
|
||||||
originalInfos, err := storeNewOrders(ordersStore, numSatellites, serialsPerSat)
|
time.Now().Add(-4 * time.Hour),
|
||||||
|
time.Now().Add(-2 * time.Hour),
|
||||||
|
time.Now(),
|
||||||
|
}
|
||||||
|
serialsPerSatPerTime := 3
|
||||||
|
|
||||||
|
originalInfos, err := storeNewOrders(ordersStore, numSatellites, serialsPerSatPerTime, createdTimes)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
unsentMap, err := ordersStore.ListUnsentBySatellite()
|
// update grace period so that some of the order limits are considered created before the grace period
|
||||||
require.NoError(t, err)
|
ordersStore.orderLimitGracePeriod = time.Hour
|
||||||
|
|
||||||
// go through order limits and make sure information is accurate
|
// 3 times:
|
||||||
require.Len(t, unsentMap, numSatellites)
|
// list unsent orders - should receive data from all satellites the first two times, and nothing the last time.
|
||||||
for _, unsentSatList := range unsentMap {
|
// add listed orders to batches for archival
|
||||||
require.Len(t, unsentSatList, serialsPerSat)
|
// delete unsent file for each returned satellite/createdAt bucket
|
||||||
|
originalArchivedInfos := make(map[storj.SerialNumber]*ArchivedInfo)
|
||||||
|
archiveBatches := [][]*ArchivedInfo{}
|
||||||
|
archiveTime1 := time.Now().Add(-2 * time.Hour)
|
||||||
|
archiveTime2 := time.Now()
|
||||||
|
for i := 0; i < 3; i++ {
|
||||||
|
unsentMap, err := ordersStore.ListUnsentBySatellite()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
for _, unsentInfo := range unsentSatList {
|
// on last iteration, expect nothing returned
|
||||||
sn := unsentInfo.Limit.SerialNumber
|
if i == 2 {
|
||||||
originalInfo := originalInfos[sn]
|
require.Len(t, unsentMap, 0)
|
||||||
|
break
|
||||||
verifyInfosEqual(t, unsentInfo, originalInfo)
|
|
||||||
}
|
}
|
||||||
}
|
archiveBatch := []*ArchivedInfo{}
|
||||||
|
// go through order limits and make sure information is accurate
|
||||||
|
require.Len(t, unsentMap, numSatellites)
|
||||||
|
for satelliteID, unsentSatList := range unsentMap {
|
||||||
|
require.Len(t, unsentSatList.InfoList, serialsPerSatPerTime)
|
||||||
|
|
||||||
// add some more orders and list again
|
for _, unsentInfo := range unsentSatList.InfoList {
|
||||||
// we should see everything we added before, plus the new ones
|
// "new" orders should not be returned
|
||||||
newInfos, err := storeNewOrders(ordersStore, numSatellites, serialsPerSat)
|
require.True(t, unsentInfo.Limit.OrderCreation.Before(createdTimes[2]))
|
||||||
require.NoError(t, err)
|
sn := unsentInfo.Limit.SerialNumber
|
||||||
for sn, info := range newInfos {
|
originalInfo := originalInfos[sn]
|
||||||
originalInfos[sn] = info
|
|
||||||
}
|
|
||||||
// because we have stored two times, we have twice as many serials...
|
|
||||||
require.Len(t, originalInfos, 2*numSatellites*serialsPerSat)
|
|
||||||
|
|
||||||
unsentMap, err = ordersStore.ListUnsentBySatellite()
|
verifyInfosEqual(t, unsentInfo, originalInfo)
|
||||||
require.NoError(t, err)
|
// expect that creation hour is consistent with order
|
||||||
// ...and twice as many satellites.
|
require.Equal(t, unsentSatList.CreatedAtHour.UTC(), unsentInfo.Limit.OrderCreation.Truncate(time.Hour).UTC())
|
||||||
require.Len(t, unsentMap, 2*numSatellites)
|
|
||||||
for _, unsentSatList := range unsentMap {
|
|
||||||
require.Len(t, unsentSatList, serialsPerSat)
|
|
||||||
|
|
||||||
for _, unsentInfo := range unsentSatList {
|
// add to archive batch
|
||||||
sn := unsentInfo.Limit.SerialNumber
|
archivedAt := archiveTime1
|
||||||
originalInfo := originalInfos[sn]
|
if i == 1 {
|
||||||
|
archivedAt = archiveTime2
|
||||||
|
}
|
||||||
|
newArchivedInfo := &ArchivedInfo{
|
||||||
|
Limit: unsentInfo.Limit,
|
||||||
|
Order: unsentInfo.Order,
|
||||||
|
Status: StatusAccepted,
|
||||||
|
ArchivedAt: archivedAt,
|
||||||
|
}
|
||||||
|
originalArchivedInfos[unsentInfo.Limit.SerialNumber] = newArchivedInfo
|
||||||
|
archiveBatch = append(archiveBatch, newArchivedInfo)
|
||||||
|
}
|
||||||
|
|
||||||
verifyInfosEqual(t, unsentInfo, originalInfo)
|
// delete unsent file for this satellite/creation hour
|
||||||
|
err = ordersStore.DeleteUnsentFile(satelliteID, unsentSatList.CreatedAtHour)
|
||||||
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
// add archive batch to archiveBatches
|
||||||
|
archiveBatches = append(archiveBatches, archiveBatch)
|
||||||
|
}
|
||||||
|
// archive first batch two hours ago, archive second batch now
|
||||||
|
err = ordersStore.Archive(archiveTime1, archiveBatches[0]...)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = ordersStore.Archive(archiveTime2, archiveBatches[1]...)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// list archived, expect everything from first two created at time buckets
|
||||||
|
archived, err := ordersStore.ListArchived()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, archived, numSatellites*serialsPerSatPerTime*2)
|
||||||
|
for _, archivedInfo := range archived {
|
||||||
|
sn := archivedInfo.Limit.SerialNumber
|
||||||
|
originalInfo := originalArchivedInfos[sn]
|
||||||
|
verifyArchivedInfosEqual(t, archivedInfo, originalInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
// now, add another order, delete ready to send files, and list
|
// clean archive for anything older than 30 minutes
|
||||||
// we should only see the new order
|
err = ordersStore.CleanArchive(time.Now().Add(-30 * time.Minute))
|
||||||
originalInfos, err = storeNewOrders(ordersStore, 1, 1)
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = ordersStore.DeleteReadyToSendFiles()
|
// list archived, expect only recent archived batch (other was cleaned)
|
||||||
|
archived, err = ordersStore.ListArchived()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
require.Len(t, archived, numSatellites*serialsPerSatPerTime)
|
||||||
unsentMap, err = ordersStore.ListUnsentBySatellite()
|
for _, archivedInfo := range archived {
|
||||||
require.NoError(t, err)
|
sn := archivedInfo.Limit.SerialNumber
|
||||||
require.Len(t, unsentMap, 1)
|
originalInfo := originalArchivedInfos[sn]
|
||||||
for _, unsentSatList := range unsentMap {
|
verifyArchivedInfosEqual(t, archivedInfo, originalInfo)
|
||||||
require.Len(t, unsentSatList, 1)
|
require.Equal(t, archivedInfo.ArchivedAt.Round(0), archiveTime2.Round(0))
|
||||||
for _, unsentInfo := range unsentSatList {
|
|
||||||
sn := unsentInfo.Limit.SerialNumber
|
|
||||||
originalInfo := originalInfos[sn]
|
|
||||||
|
|
||||||
verifyInfosEqual(t, unsentInfo, originalInfo)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO test order archival
|
// clean archive for everything before now, expect list to return nothing
|
||||||
|
err = ordersStore.CleanArchive(time.Now())
|
||||||
|
require.NoError(t, err)
|
||||||
|
archived, err = ordersStore.ListArchived()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, archived, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
func verifyInfosEqual(t *testing.T, a, b *orders.Info) {
|
func verifyInfosEqual(t *testing.T, a, b *Info) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
require.NotNil(t, a)
|
require.NotNil(t, a)
|
||||||
@ -109,46 +166,63 @@ func verifyInfosEqual(t *testing.T, a, b *orders.Info) {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func storeNewOrders(ordersStore *orders.FileStore, numSatellites, numOrdersPerSatellite int) (map[storj.SerialNumber]*orders.Info, error) {
|
func verifyArchivedInfosEqual(t *testing.T, a, b *ArchivedInfo) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
require.NotNil(t, a)
|
||||||
|
require.NotNil(t, b)
|
||||||
|
|
||||||
|
require.Equal(t, a.Limit.SerialNumber, b.Limit.SerialNumber)
|
||||||
|
require.Equal(t, a.Limit.SatelliteId, b.Limit.SatelliteId)
|
||||||
|
require.Equal(t, a.Limit.OrderExpiration.UTC(), b.Limit.OrderExpiration.UTC())
|
||||||
|
require.Equal(t, a.Limit.Action, b.Limit.Action)
|
||||||
|
|
||||||
|
require.Equal(t, a.Order.Amount, b.Order.Amount)
|
||||||
|
require.Equal(t, a.Order.SerialNumber, b.Order.SerialNumber)
|
||||||
|
|
||||||
|
require.Equal(t, a.Status, b.Status)
|
||||||
|
require.Equal(t, a.ArchivedAt.UTC(), b.ArchivedAt.UTC())
|
||||||
|
}
|
||||||
|
|
||||||
|
func storeNewOrders(ordersStore *FileStore, numSatellites, numOrdersPerSatPerTime int, createdAtTimes []time.Time) (map[storj.SerialNumber]*Info, error) {
|
||||||
actions := []pb.PieceAction{
|
actions := []pb.PieceAction{
|
||||||
pb.PieceAction_GET,
|
pb.PieceAction_GET,
|
||||||
pb.PieceAction_PUT_REPAIR,
|
pb.PieceAction_PUT_REPAIR,
|
||||||
pb.PieceAction_GET_AUDIT,
|
pb.PieceAction_GET_AUDIT,
|
||||||
}
|
}
|
||||||
originalInfos := make(map[storj.SerialNumber]*orders.Info)
|
originalInfos := make(map[storj.SerialNumber]*Info)
|
||||||
for i := 0; i < numSatellites; i++ {
|
for i := 0; i < numSatellites; i++ {
|
||||||
satellite := testrand.NodeID()
|
satellite := testrand.NodeID()
|
||||||
|
|
||||||
// for each satellite, half of the orders will expire in an hour
|
for _, createdAt := range createdAtTimes {
|
||||||
// and half will expire in three hours.
|
for j := 0; j < numOrdersPerSatPerTime; j++ {
|
||||||
for j := 0; j < numOrdersPerSatellite; j++ {
|
expiration := time.Now().Add(time.Hour)
|
||||||
expiration := time.Now().Add(time.Hour)
|
amount := testrand.Int63n(1000)
|
||||||
if j < 2 {
|
sn := testrand.SerialNumber()
|
||||||
expiration = time.Now().Add(3 * time.Hour)
|
action := actions[j%len(actions)]
|
||||||
}
|
|
||||||
amount := testrand.Int63n(1000)
|
|
||||||
sn := testrand.SerialNumber()
|
|
||||||
action := actions[j%len(actions)]
|
|
||||||
|
|
||||||
newInfo := &orders.Info{
|
newInfo := &Info{
|
||||||
Limit: &pb.OrderLimit{
|
Limit: &pb.OrderLimit{
|
||||||
SerialNumber: sn,
|
SerialNumber: sn,
|
||||||
SatelliteId: satellite,
|
SatelliteId: satellite,
|
||||||
Action: action,
|
Action: action,
|
||||||
OrderExpiration: expiration,
|
OrderCreation: createdAt,
|
||||||
},
|
OrderExpiration: expiration,
|
||||||
Order: &pb.Order{
|
},
|
||||||
SerialNumber: sn,
|
Order: &pb.Order{
|
||||||
Amount: amount,
|
SerialNumber: sn,
|
||||||
},
|
Amount: amount,
|
||||||
}
|
},
|
||||||
originalInfos[sn] = newInfo
|
}
|
||||||
|
originalInfos[sn] = newInfo
|
||||||
|
|
||||||
// store the new info in the orders store
|
// store the new info in the orders store
|
||||||
err := ordersStore.Enqueue(newInfo)
|
err := ordersStore.Enqueue(newInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return originalInfos, err
|
return originalInfos, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return originalInfos, nil
|
return originalInfos, nil
|
||||||
|
Loading…
Reference in New Issue
Block a user