storagenode/orders: Refactor orders store

Abstract details of writing and reading data to/from orders files so
that adding V1 and future maintenance are easier.

Change-Id: I85f4a91761293de1a782e197bc9e09db228933c9
This commit is contained in:
Moby von Briesen 2020-10-01 18:52:22 -04:00
parent bd177bff03
commit fbf2c0b242
10 changed files with 433 additions and 285 deletions

View File

@ -18,6 +18,7 @@ import (
"storj.io/common/testrand"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/orders"
"storj.io/storj/storagenode/orders/ordersfile"
"storj.io/storj/storagenode/storagenodedb/storagenodedbtest"
)
@ -46,7 +47,7 @@ func TestDB(t *testing.T) {
piecePublicKey, piecePrivateKey, err := storj.NewPieceKey()
require.NoError(t, err)
infos := make([]*orders.Info, 2)
infos := make([]*ordersfile.Info, 2)
for i := 0; i < len(infos); i++ {
serialNumber := testrand.SerialNumber()
@ -70,7 +71,7 @@ func TestDB(t *testing.T) {
})
require.NoError(t, err)
infos[i] = &orders.Info{
infos[i] = &ordersfile.Info{
Limit: limit,
Order: order,
}
@ -86,7 +87,7 @@ func TestDB(t *testing.T) {
unsent, err := ordersdb.ListUnsent(ctx, 100)
require.NoError(t, err)
require.Empty(t, cmp.Diff([]*orders.Info{infos[0]}, unsent, cmp.Comparer(pb.Equal)))
require.Empty(t, cmp.Diff([]*ordersfile.Info{infos[0]}, unsent, cmp.Comparer(pb.Equal)))
// Another add
err = ordersdb.Enqueue(ctx, infos[1])
@ -95,14 +96,14 @@ func TestDB(t *testing.T) {
unsent, err = ordersdb.ListUnsent(ctx, 100)
require.NoError(t, err)
require.Empty(t,
cmp.Diff([]*orders.Info{infos[0], infos[1]}, unsent, cmp.Comparer(pb.Equal)),
cmp.Diff([]*ordersfile.Info{infos[0], infos[1]}, unsent, cmp.Comparer(pb.Equal)),
)
// list by group
unsentGrouped, err := ordersdb.ListUnsentBySatellite(ctx)
require.NoError(t, err)
expectedGrouped := map[storj.NodeID][]*orders.Info{
expectedGrouped := map[storj.NodeID][]*ordersfile.Info{
satellite0.ID: {
{Limit: infos[0].Limit, Order: infos[0].Order},
{Limit: infos[1].Limit, Order: infos[1].Order},
@ -196,7 +197,7 @@ func TestDB_Trivial(t *testing.T) {
before := now.Add(-time.Second)
{ // Ensure Enqueue works at all
err := db.Orders().Enqueue(ctx, &orders.Info{
err := db.Orders().Enqueue(ctx, &ordersfile.Info{
Order: &pb.Order{},
Limit: &pb.OrderLimit{
SatelliteId: satelliteID,

View File

@ -0,0 +1,222 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package ordersfile
import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/storj/private/date"
)
const (
unsentFilePrefix = "unsent-orders-"
archiveFilePrefix = "archived-orders-"
)
var (
// Error identifies errors with orders files.
Error = errs.Class("ordersfile")
mon = monkit.Package()
)
// Info contains full information about an order.
type Info struct {
Limit *pb.OrderLimit
Order *pb.Order
}
// Writable defines an interface for a write-only orders file.
type Writable interface {
Append(*Info) error
Close() error
}
// Readable defines an interface for a read-only orders file.
type Readable interface {
ReadOne() (*Info, error)
Close() error
}
// OpenWritableUnsent creates or opens for appending the unsent orders file for a given satellite ID and creation hour.
func OpenWritableUnsent(log *zap.Logger, unsentDir string, satelliteID storj.NodeID, creationTime time.Time) (Writable, error) {
fileName := unsentFileName(satelliteID, creationTime)
filePath := filepath.Join(unsentDir, fileName)
// create file if not exists or append
f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nil, Error.Wrap(err)
}
return &fileV0{
log: log.Named("writable V0 orders file"),
f: f,
}, nil
}
// UnsentInfo contains information relevant to an unsent orders file, as well as information necessary to open it for reading.
type UnsentInfo struct {
SatelliteID storj.NodeID
CreatedAtHour time.Time
}
// ArchivedInfo contains information relevant to an archived orders file, as well as information necessary to open it for reading.
type ArchivedInfo struct {
SatelliteID storj.NodeID
CreatedAtHour time.Time
ArchivedAt time.Time
StatusText string
}
// GetUnsentInfo returns a new UnsentInfo which can be used to get information about and read from an unsent orders file.
func GetUnsentInfo(info os.FileInfo) (*UnsentInfo, error) {
satelliteID, createdAtHour, err := getUnsentFileInfo(info.Name())
if err != nil {
return nil, err
}
return &UnsentInfo{
SatelliteID: satelliteID,
CreatedAtHour: createdAtHour,
}, nil
}
// GetArchivedInfo returns a new ArchivedInfo which can be used to get information about and read from an archived orders file.
func GetArchivedInfo(info os.FileInfo) (*ArchivedInfo, error) {
satelliteID, createdAtHour, archivedAt, statusText, err := getArchivedFileInfo(info.Name())
if err != nil {
return nil, err
}
return &ArchivedInfo{
SatelliteID: satelliteID,
CreatedAtHour: createdAtHour,
ArchivedAt: archivedAt,
StatusText: statusText,
}, nil
}
// OpenReadable opens for reading the unsent or archived orders file at a given path.
// It assumes the path has already been validated with GetUnsentInfo or GetArchivedInfo.
func OpenReadable(log *zap.Logger, path string) (Readable, error) {
f, err := os.Open(path)
if err != nil {
return nil, Error.Wrap(err)
}
return &fileV0{
log: log.Named("readable V0 orders file"),
f: f,
}, nil
}
// MoveUnsent moves an unsent orders file to the archived orders file directory.
func MoveUnsent(unsentDir, archiveDir string, satelliteID storj.NodeID, createdAtHour, archivedAt time.Time, status pb.SettlementWithWindowResponse_Status) error {
oldFilePath := filepath.Join(unsentDir, unsentFileName(satelliteID, createdAtHour))
newFilePath := filepath.Join(archiveDir, archiveFileName(satelliteID, createdAtHour, archivedAt, status))
return Error.Wrap(os.Rename(oldFilePath, newFilePath))
}
// it expects the file name to be in the format "unsent-orders-<satelliteID>-<createdAtHour>".
func getUnsentFileInfo(filename string) (satellite storj.NodeID, createdHour time.Time, err error) {
if !strings.HasPrefix(filename, unsentFilePrefix) {
return storj.NodeID{}, time.Time{}, Error.New("invalid path: %q", filename)
}
// chop off prefix to get satellite ID and created hours
infoStr := filename[len(unsentFilePrefix):]
infoSlice := strings.Split(infoStr, "-")
if len(infoSlice) != 2 {
return storj.NodeID{}, time.Time{}, Error.New("invalid path: %q", filename)
}
satelliteIDStr := infoSlice[0]
satelliteID, err := storj.NodeIDFromString(satelliteIDStr)
if err != nil {
return storj.NodeID{}, time.Time{}, Error.New("invalid path: %q", filename)
}
timeStr := infoSlice[1]
createdHourUnixNano, err := strconv.ParseInt(timeStr, 10, 64)
if err != nil {
return satelliteID, time.Time{}, Error.Wrap(err)
}
createdAtHour := time.Unix(0, createdHourUnixNano)
return satelliteID, createdAtHour, nil
}
// getArchivedFileInfo gets the archived at time from an archive file name.
// it expects the file name to be in the format "archived-orders-<satelliteID>-<createdAtHour>-<archviedAtTime>-<status>".
func getArchivedFileInfo(name string) (satelliteID storj.NodeID, createdAtHour, archivedAt time.Time, status string, err error) {
if !strings.HasPrefix(name, archiveFilePrefix) {
return storj.NodeID{}, time.Time{}, time.Time{}, "", Error.New("invalid path: %q", name)
}
// chop off prefix to get satellite ID, created hour, archive time, and status
infoStr := name[len(archiveFilePrefix):]
infoSlice := strings.Split(infoStr, "-")
if len(infoSlice) != 4 {
return storj.NodeID{}, time.Time{}, time.Time{}, "", Error.New("invalid path: %q", name)
}
satelliteIDStr := infoSlice[0]
satelliteID, err = storj.NodeIDFromString(satelliteIDStr)
if err != nil {
return storj.NodeID{}, time.Time{}, time.Time{}, "", Error.New("invalid path: %q", name)
}
createdAtStr := infoSlice[1]
createdHourUnixNano, err := strconv.ParseInt(createdAtStr, 10, 64)
if err != nil {
return satelliteID, time.Time{}, time.Time{}, "", Error.New("invalid path: %q", name)
}
createdAtHour = time.Unix(0, createdHourUnixNano)
archivedAtStr := infoSlice[2]
archivedAtUnixNano, err := strconv.ParseInt(archivedAtStr, 10, 64)
if err != nil {
return satelliteID, createdAtHour, time.Time{}, "", Error.New("invalid path: %q", name)
}
archivedAt = time.Unix(0, archivedAtUnixNano)
status = infoSlice[3]
return satelliteID, createdAtHour, archivedAt, status, nil
}
func unsentFileName(satelliteID storj.NodeID, creationTime time.Time) string {
return fmt.Sprintf("%s%s-%s",
unsentFilePrefix,
satelliteID,
getCreationHourString(creationTime),
)
}
func archiveFileName(satelliteID storj.NodeID, creationTime, archiveTime time.Time, status pb.SettlementWithWindowResponse_Status) string {
return fmt.Sprintf("%s%s-%s-%s-%s",
archiveFilePrefix,
satelliteID,
getCreationHourString(creationTime),
strconv.FormatInt(archiveTime.UnixNano(), 10),
pb.SettlementWithWindowResponse_Status_name[int32(status)],
)
}
func getCreationHourString(t time.Time) string {
creationHour := date.TruncateToHourInNano(t)
timeStr := strconv.FormatInt(creationHour, 10)
return timeStr
}

View File

@ -0,0 +1,109 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package ordersfile
import (
"encoding/binary"
"errors"
"io"
"os"
"go.uber.org/zap"
"storj.io/common/pb"
)
// fileV0 is a version 0 orders file.
type fileV0 struct {
log *zap.Logger
f *os.File
}
// Append writes limit and order to the file as
// [limitSize][limitBytes][orderSize][orderBytes].
func (of *fileV0) Append(info *Info) error {
toWrite := []byte{}
limitSerialized, err := pb.Marshal(info.Limit)
if err != nil {
return Error.Wrap(err)
}
orderSerialized, err := pb.Marshal(info.Order)
if err != nil {
return Error.Wrap(err)
}
limitSizeBytes := [4]byte{}
binary.LittleEndian.PutUint32(limitSizeBytes[:], uint32(len(limitSerialized)))
orderSizeBytes := [4]byte{}
binary.LittleEndian.PutUint32(orderSizeBytes[:], uint32(len(orderSerialized)))
toWrite = append(toWrite, limitSizeBytes[:]...)
toWrite = append(toWrite, limitSerialized...)
toWrite = append(toWrite, orderSizeBytes[:]...)
toWrite = append(toWrite, orderSerialized...)
if _, err = of.f.Write(toWrite); err != nil {
return Error.New("Couldn't write serialized order and limit: %w", err)
}
return nil
}
// ReadOne reads one entry from the file.
func (of *fileV0) ReadOne() (info *Info, err error) {
defer func() {
// if error is unexpected EOF, file is corrupted.
// V0 files do not handle corruption, so just return EOF so caller thinks we have reached the end of the file.
if errors.Is(err, io.ErrUnexpectedEOF) {
of.log.Warn("Unexpected EOF while reading archived order file", zap.Error(err))
mon.Meter("orders_archive_file_corrupted").Mark64(1)
err = io.EOF
}
}()
sizeBytes := [4]byte{}
_, err = io.ReadFull(of.f, sizeBytes[:])
if err != nil {
return nil, Error.Wrap(err)
}
limitSize := binary.LittleEndian.Uint32(sizeBytes[:])
limitSerialized := make([]byte, limitSize)
_, err = io.ReadFull(of.f, limitSerialized)
if err != nil {
return nil, Error.Wrap(err)
}
limit := &pb.OrderLimit{}
err = pb.Unmarshal(limitSerialized, limit)
if err != nil {
return nil, Error.Wrap(err)
}
_, err = io.ReadFull(of.f, sizeBytes[:])
if err != nil {
return nil, Error.Wrap(err)
}
orderSize := binary.LittleEndian.Uint32(sizeBytes[:])
orderSerialized := make([]byte, orderSize)
_, err = io.ReadFull(of.f, orderSerialized)
if err != nil {
return nil, Error.Wrap(err)
}
order := &pb.Order{}
err = pb.Unmarshal(orderSerialized, order)
if err != nil {
return nil, Error.Wrap(err)
}
return &Info{
Limit: limit,
Order: order,
}, nil
}
// Close closes the file.
func (of *fileV0) Close() error {
return of.f.Close()
}

View File

@ -20,6 +20,7 @@ import (
"storj.io/common/rpc"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/storj/storagenode/orders/ordersfile"
"storj.io/storj/storagenode/trust"
)
@ -32,12 +33,6 @@ var (
mon = monkit.Package()
)
// Info contains full information about an order.
type Info struct {
Limit *pb.OrderLimit
Order *pb.Order
}
// ArchivedInfo contains full information about an archived order.
type ArchivedInfo struct {
Limit *pb.OrderLimit
@ -69,11 +64,11 @@ type ArchiveRequest struct {
// architecture: Database
type DB interface {
// Enqueue inserts order to the list of orders needing to be sent to the satellite.
Enqueue(ctx context.Context, info *Info) error
Enqueue(ctx context.Context, info *ordersfile.Info) error
// ListUnsent returns orders that haven't been sent yet.
ListUnsent(ctx context.Context, limit int) ([]*Info, error)
ListUnsent(ctx context.Context, limit int) ([]*ordersfile.Info, error)
// ListUnsentBySatellite returns orders that haven't been sent yet grouped by satellite.
ListUnsentBySatellite(ctx context.Context) (map[storj.NodeID][]*Info, error)
ListUnsentBySatellite(ctx context.Context) (map[storj.NodeID][]*ordersfile.Info, error)
// Archive marks order as being handled.
Archive(ctx context.Context, archivedAt time.Time, requests ...ArchiveRequest) error
@ -241,7 +236,7 @@ func (service *Service) sendOrdersFromDB(ctx context.Context) (hasOrders bool) {
}
// Settle uploads orders to the satellite.
func (service *Service) Settle(ctx context.Context, satelliteID storj.NodeID, orders []*Info, requests chan ArchiveRequest) {
func (service *Service) Settle(ctx context.Context, satelliteID storj.NodeID, orders []*ordersfile.Info, requests chan ArchiveRequest) {
log := service.log.Named(satelliteID.String())
err := service.settle(ctx, log, satelliteID, orders, requests)
if err != nil {
@ -291,7 +286,7 @@ func (service *Service) handleBatches(ctx context.Context, requests chan Archive
return nil
}
func (service *Service) settle(ctx context.Context, log *zap.Logger, satelliteID storj.NodeID, orders []*Info, requests chan ArchiveRequest) (err error) {
func (service *Service) settle(ctx context.Context, log *zap.Logger, satelliteID storj.NodeID, orders []*ordersfile.Info, requests chan ArchiveRequest) (err error) {
defer mon.Task()(&ctx)(&err)
log.Info("sending", zap.Int("count", len(orders)))
@ -429,7 +424,7 @@ func (service *Service) sendOrdersFromFileStore(ctx context.Context, now time.Ti
return nil
}
err = service.ordersStore.Archive(satelliteID, unsentInfo.CreatedAtHour, time.Now().UTC(), status)
err = service.ordersStore.Archive(satelliteID, unsentInfo, time.Now().UTC(), status)
if err != nil {
log.Error("failed to archive orders", zap.Error(err))
return nil
@ -448,7 +443,7 @@ func (service *Service) sendOrdersFromFileStore(ctx context.Context, now time.Ti
}
}
func (service *Service) settleWindow(ctx context.Context, log *zap.Logger, satelliteID storj.NodeID, orders []*Info) (status pb.SettlementWithWindowResponse_Status, err error) {
func (service *Service) settleWindow(ctx context.Context, log *zap.Logger, satelliteID storj.NodeID, orders []*ordersfile.Info) (status pb.SettlementWithWindowResponse_Status, err error) {
defer mon.Task()(&ctx)(&err)
log.Info("sending", zap.Int("count", len(orders)))

View File

@ -19,6 +19,7 @@ import (
"storj.io/storj/satellite/overlay"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/orders"
"storj.io/storj/storagenode/orders/ordersfile"
)
// TODO remove when db is removed.
@ -52,7 +53,7 @@ func TestOrderDBSettle(t *testing.T) {
}
signedOrder, err := signing.SignUplinkOrder(ctx, piecePrivateKey, order)
require.NoError(t, err)
order0 := &orders.Info{
order0 := &ordersfile.Info{
Limit: orderLimit,
Order: signedOrder,
}
@ -150,7 +151,7 @@ func TestOrderFileStoreAndDBSettle(t *testing.T) {
}
signedOrder, err := signing.SignUplinkOrder(ctx, piecePrivateKey, order)
require.NoError(t, err)
order0 := &orders.Info{
order0 := &ordersfile.Info{
Limit: orderLimit,
Order: signedOrder,
}
@ -221,14 +222,14 @@ func TestCleanArchiveDB(t *testing.T) {
serialNumber0 := testrand.SerialNumber()
serialNumber1 := testrand.SerialNumber()
order0 := &orders.Info{
order0 := &ordersfile.Info{
Limit: &pb.OrderLimit{
SatelliteId: satellite,
SerialNumber: serialNumber0,
},
Order: &pb.Order{},
}
order1 := &orders.Info{
order1 := &ordersfile.Info{
Limit: &pb.OrderLimit{
SatelliteId: satellite,
SerialNumber: serialNumber1,
@ -295,7 +296,7 @@ func TestCleanArchiveFileStore(t *testing.T) {
serialNumber1 := testrand.SerialNumber()
createdAt1 := now.Add(-24 * time.Hour)
order0 := &orders.Info{
order0 := &ordersfile.Info{
Limit: &pb.OrderLimit{
SatelliteId: satellite,
SerialNumber: serialNumber0,
@ -303,7 +304,7 @@ func TestCleanArchiveFileStore(t *testing.T) {
},
Order: &pb.Order{},
}
order1 := &orders.Info{
order1 := &ordersfile.Info{
Limit: &pb.OrderLimit{
SatelliteId: satellite,
SerialNumber: serialNumber1,
@ -319,9 +320,12 @@ func TestCleanArchiveFileStore(t *testing.T) {
require.NoError(t, err)
// archive one order yesterday, one today
err = node.OrdersStore.Archive(satellite, createdAt0.Truncate(time.Hour), yesterday, pb.SettlementWithWindowResponse_ACCEPTED)
unsentInfo := orders.UnsentInfo{}
unsentInfo.CreatedAtHour = createdAt0.Truncate(time.Hour)
err = node.OrdersStore.Archive(satellite, unsentInfo, yesterday, pb.SettlementWithWindowResponse_ACCEPTED)
require.NoError(t, err)
err = node.OrdersStore.Archive(satellite, createdAt1.Truncate(time.Hour), now, pb.SettlementWithWindowResponse_ACCEPTED)
unsentInfo.CreatedAtHour = createdAt1.Truncate(time.Hour)
err = node.OrdersStore.Archive(satellite, unsentInfo, now, pb.SettlementWithWindowResponse_ACCEPTED)
require.NoError(t, err)
archived, err := node.OrdersStore.ListArchived()

View File

@ -4,13 +4,9 @@
package orders
import (
"encoding/binary"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
@ -20,11 +16,7 @@ import (
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/storj/private/date"
)
const (
unsentFilePrefix = "unsent-orders-"
archiveFilePrefix = "archived-orders-"
"storj.io/storj/storagenode/orders/ordersfile"
)
// activeWindow represents a window with active operations waiting to finish to enqueue
@ -80,7 +72,7 @@ func NewFileStore(log *zap.Logger, ordersDir string, orderLimitGracePeriod time.
// BeginEnqueue returns a function that can be called to enqueue the passed in Info. If the Info
// is too old to be enqueued, then an error is returned.
func (store *FileStore) BeginEnqueue(satelliteID storj.NodeID, createdAt time.Time) (commit func(*Info) error, err error) {
func (store *FileStore) BeginEnqueue(satelliteID storj.NodeID, createdAt time.Time) (commit func(*ordersfile.Info) error, err error) {
store.unsentMu.Lock()
defer store.unsentMu.Unlock()
store.activeMu.Lock()
@ -96,7 +88,7 @@ func (store *FileStore) BeginEnqueue(satelliteID storj.NodeID, createdAt time.Ti
// record that there is an operation in flight for this window
store.enqueueStartedLocked(satelliteID, createdAt)
return func(info *Info) error {
return func(info *ordersfile.Info) error {
// always acquire the activeMu after the unsentMu to avoid deadlocks
store.unsentMu.Lock()
defer store.unsentMu.Unlock()
@ -117,17 +109,17 @@ func (store *FileStore) BeginEnqueue(satelliteID storj.NodeID, createdAt time.Ti
}
// write out the data
f, err := store.getUnsentFile(info.Limit.SatelliteId, info.Limit.OrderCreation)
of, err := ordersfile.OpenWritableUnsent(store.log, store.unsentDir, info.Limit.SatelliteId, info.Limit.OrderCreation)
if err != nil {
return OrderError.Wrap(err)
}
defer func() {
err = errs.Combine(err, OrderError.Wrap(f.Close()))
err = errs.Combine(err, OrderError.Wrap(of.Close()))
}()
err = writeLimitAndOrder(f, info.Limit, info.Order)
err = of.Append(info)
if err != nil {
return err
return OrderError.Wrap(err)
}
return nil
@ -169,7 +161,7 @@ func (store *FileStore) hasActiveEnqueue(satelliteID storj.NodeID, createdAt tim
// Enqueue inserts order to be sent at the end of the unsent file for a particular creation hour.
// It ensures the order is not being queued after the order limit grace period.
func (store *FileStore) Enqueue(info *Info) (err error) {
func (store *FileStore) Enqueue(info *ordersfile.Info) (err error) {
commit, err := store.BeginEnqueue(info.Limit.SatelliteId, info.Limit.OrderCreation)
if err != nil {
return err
@ -180,7 +172,7 @@ func (store *FileStore) Enqueue(info *Info) (err error) {
// UnsentInfo is a struct containing a window of orders for a satellite and order creation hour.
type UnsentInfo struct {
CreatedAtHour time.Time
InfoList []*Info
InfoList []*ordersfile.Info
}
// ListUnsentBySatellite returns one window of orders that haven't been sent yet, grouped by satellite.
@ -203,63 +195,55 @@ func (store *FileStore) ListUnsentBySatellite(now time.Time) (infoMap map[storj.
if info.IsDir() {
return nil
}
satelliteID, createdAtHour, err := getUnsentFileInfo(info.Name())
fileInfo, err := ordersfile.GetUnsentInfo(info)
if err != nil {
return err
errList = errs.Combine(errList, OrderError.Wrap(err))
return nil
}
// if we already have orders for this satellite, ignore the file
if _, ok := infoMap[satelliteID]; ok {
if _, ok := infoMap[fileInfo.SatelliteID]; ok {
return nil
}
// if orders can still be added to file, ignore it. We add an hour because that's
// the newest order that could be added to that window.
if now.Sub(createdAtHour.Add(time.Hour)) <= store.orderLimitGracePeriod {
if now.Sub(fileInfo.CreatedAtHour.Add(time.Hour)) <= store.orderLimitGracePeriod {
return nil
}
// if there are still active orders for the time, ignore it.
if store.hasActiveEnqueue(satelliteID, createdAtHour) {
if store.hasActiveEnqueue(fileInfo.SatelliteID, fileInfo.CreatedAtHour) {
return nil
}
newUnsentInfo := UnsentInfo{
CreatedAtHour: createdAtHour,
CreatedAtHour: fileInfo.CreatedAtHour,
}
f, err := os.Open(path)
of, err := ordersfile.OpenReadable(store.log, path)
if err != nil {
return OrderError.Wrap(err)
}
defer func() {
err = errs.Combine(err, OrderError.Wrap(f.Close()))
err = errs.Combine(err, OrderError.Wrap(of.Close()))
}()
for {
// if at any point we see an unexpected EOF error, return what orders we could read successfully with no error
// this behavior ensures that we will attempt to archive corrupted files instead of continually failing to read them
limit, order, err := readLimitAndOrder(f)
newInfo, err := of.ReadOne()
if err != nil {
if errs.Is(err, io.EOF) {
break
}
if errs.Is(err, io.ErrUnexpectedEOF) {
store.log.Warn("Unexpected EOF while reading unsent order file", zap.Error(err))
mon.Meter("orders_unsent_file_corrupted").Mark64(1)
break
}
return err
}
newInfo := &Info{
Limit: limit,
Order: order,
}
newUnsentInfo.InfoList = append(newUnsentInfo.InfoList, newInfo)
}
infoMap[satelliteID] = newUnsentInfo
infoMap[fileInfo.SatelliteID] = newUnsentInfo
return nil
})
if err != nil {
@ -269,29 +253,21 @@ func (store *FileStore) ListUnsentBySatellite(now time.Time) (infoMap map[storj.
return infoMap, errList
}
// Archive moves a file from "unsent" to "archive". The filename/path changes from
// unsent/unsent-orders-<satelliteID>-<createdAtHour>
// to
// archive/archived-orders-<satelliteID>-<createdAtHour>-<archivedTime>-<ACCEPTED/REJECTED>.
func (store *FileStore) Archive(satelliteID storj.NodeID, createdAtHour, archivedAt time.Time, status pb.SettlementWithWindowResponse_Status) error {
// Archive moves a file from "unsent" to "archive".
func (store *FileStore) Archive(satelliteID storj.NodeID, unsentInfo UnsentInfo, archivedAt time.Time, status pb.SettlementWithWindowResponse_Status) error {
store.unsentMu.Lock()
defer store.unsentMu.Unlock()
store.archiveMu.Lock()
defer store.archiveMu.Unlock()
oldFileName := unsentFilePrefix + satelliteID.String() + "-" + getCreationHourString(createdAtHour)
oldFilePath := filepath.Join(store.unsentDir, oldFileName)
newFileName := fmt.Sprintf("%s%s-%s-%s-%s",
archiveFilePrefix,
satelliteID.String(),
getCreationHourString(createdAtHour),
strconv.FormatInt(archivedAt.UnixNano(), 10),
pb.SettlementWithWindowResponse_Status_name[int32(status)],
)
newFilePath := filepath.Join(store.archiveDir, newFileName)
return OrderError.Wrap(os.Rename(oldFilePath, newFilePath))
return OrderError.Wrap(ordersfile.MoveUnsent(
store.unsentDir,
store.archiveDir,
satelliteID,
unsentInfo.CreatedAtHour,
archivedAt,
status,
))
}
// ListArchived returns orders that have been sent.
@ -309,46 +285,41 @@ func (store *FileStore) ListArchived() ([]*ArchivedInfo, error) {
if info.IsDir() {
return nil
}
_, _, archivedAt, statusText, err := getArchivedFileInfo(info.Name())
fileInfo, err := ordersfile.GetArchivedInfo(info)
if err != nil {
return err
return OrderError.Wrap(err)
}
of, err := ordersfile.OpenReadable(store.log, path)
if err != nil {
return OrderError.Wrap(err)
}
defer func() {
err = errs.Combine(err, OrderError.Wrap(of.Close()))
}()
status := StatusUnsent
switch statusText {
switch fileInfo.StatusText {
case pb.SettlementWithWindowResponse_ACCEPTED.String():
status = StatusAccepted
case pb.SettlementWithWindowResponse_REJECTED.String():
status = StatusRejected
}
f, err := os.Open(path)
if err != nil {
return OrderError.Wrap(err)
}
defer func() {
err = errs.Combine(err, OrderError.Wrap(f.Close()))
}()
for {
limit, order, err := readLimitAndOrder(f)
info, err := of.ReadOne()
if err != nil {
if errs.Is(err, io.EOF) {
break
}
if errs.Is(err, io.ErrUnexpectedEOF) {
store.log.Warn("Unexpected EOF while reading archived order file", zap.Error(err))
mon.Meter("orders_archive_file_corrupted").Mark64(1)
break
}
return err
}
newInfo := &ArchivedInfo{
Limit: limit,
Order: order,
Limit: info.Limit,
Order: info.Order,
Status: status,
ArchivedAt: archivedAt,
ArchivedAt: fileInfo.ArchivedAt,
}
archivedList = append(archivedList, newInfo)
}
@ -376,12 +347,12 @@ func (store *FileStore) CleanArchive(deleteBefore time.Time) error {
if info.IsDir() {
return nil
}
_, _, archivedAt, _, err := getArchivedFileInfo(info.Name())
fileInfo, err := ordersfile.GetArchivedInfo(info)
if err != nil {
errList = errs.Combine(errList, err)
return nil
}
if archivedAt.Before(deleteBefore) {
if fileInfo.ArchivedAt.Before(deleteBefore) {
return OrderError.Wrap(os.Remove(path))
}
return nil
@ -405,160 +376,3 @@ func (store *FileStore) ensureDirectories() error {
}
return nil
}
// getUnsentFile creates or gets the order limit file for appending unsent orders to.
// There is a different file for each satellite and creation hour.
// It expects the caller to lock the store's mutex before calling, and to handle closing the returned file.
func (store *FileStore) getUnsentFile(satelliteID storj.NodeID, creationTime time.Time) (*os.File, error) {
fileName := unsentFilePrefix + satelliteID.String() + "-" + getCreationHourString(creationTime)
filePath := filepath.Join(store.unsentDir, fileName)
// create file if not exists or append
f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nil, OrderError.Wrap(err)
}
return f, nil
}
func getCreationHourString(t time.Time) string {
creationHour := date.TruncateToHourInNano(t)
timeStr := strconv.FormatInt(creationHour, 10)
return timeStr
}
// getUnsentFileInfo gets the satellite ID and created hour from a filename.
// it expects the file name to be in the format "unsent-orders-<satelliteID>-<createdAtHour>".
func getUnsentFileInfo(name string) (satellite storj.NodeID, createdHour time.Time, err error) {
if !strings.HasPrefix(name, unsentFilePrefix) {
return storj.NodeID{}, time.Time{}, OrderError.New("Not a valid unsent order file name: %s", name)
}
// chop off prefix to get satellite ID and created hours
infoStr := name[len(unsentFilePrefix):]
infoSlice := strings.Split(infoStr, "-")
if len(infoSlice) != 2 {
return storj.NodeID{}, time.Time{}, OrderError.New("Not a valid unsent order file name: %s", name)
}
satelliteIDStr := infoSlice[0]
satelliteID, err := storj.NodeIDFromString(satelliteIDStr)
if err != nil {
return storj.NodeID{}, time.Time{}, OrderError.New("Not a valid unsent order file name: %s", name)
}
timeStr := infoSlice[1]
createdHourUnixNano, err := strconv.ParseInt(timeStr, 10, 64)
if err != nil {
return satelliteID, time.Time{}, OrderError.Wrap(err)
}
createdAtHour := time.Unix(0, createdHourUnixNano)
return satelliteID, createdAtHour, nil
}
// getArchivedFileInfo gets the archived at time from an archive file name.
// it expects the file name to be in the format "archived-orders-<satelliteID>-<createdAtHour>-<archviedAtTime>-<status>".
func getArchivedFileInfo(name string) (satelliteID storj.NodeID, createdAtHour, archivedAt time.Time, status string, err error) {
if !strings.HasPrefix(name, archiveFilePrefix) {
return storj.NodeID{}, time.Time{}, time.Time{}, "", OrderError.New("Not a valid archived order file name: %s", name)
}
// chop off prefix to get satellite ID, created hour, archive time, and status
infoStr := name[len(archiveFilePrefix):]
infoSlice := strings.Split(infoStr, "-")
if len(infoSlice) != 4 {
return storj.NodeID{}, time.Time{}, time.Time{}, "", OrderError.New("Not a valid archived order file name: %s", name)
}
satelliteIDStr := infoSlice[0]
satelliteID, err = storj.NodeIDFromString(satelliteIDStr)
if err != nil {
return storj.NodeID{}, time.Time{}, time.Time{}, "", OrderError.New("Not a valid archived order file name: %s", name)
}
createdAtStr := infoSlice[1]
createdHourUnixNano, err := strconv.ParseInt(createdAtStr, 10, 64)
if err != nil {
return satelliteID, time.Time{}, time.Time{}, "", OrderError.New("Not a valid archived order file name: %s", name)
}
createdAtHour = time.Unix(0, createdHourUnixNano)
archivedAtStr := infoSlice[2]
archivedAtUnixNano, err := strconv.ParseInt(archivedAtStr, 10, 64)
if err != nil {
return satelliteID, createdAtHour, time.Time{}, "", OrderError.New("Not a valid archived order file name: %s", name)
}
archivedAt = time.Unix(0, archivedAtUnixNano)
status = infoSlice[3]
return satelliteID, createdAtHour, archivedAt, status, nil
}
// writeLimitAndOrder writes limit and order to the file as
// [limitSize][limitBytes][orderSize][orderBytes]
// It expects the caller to have locked the mutex.
func writeLimitAndOrder(f io.Writer, limit *pb.OrderLimit, order *pb.Order) error {
toWrite := []byte{}
limitSerialized, err := pb.Marshal(limit)
if err != nil {
return OrderError.Wrap(err)
}
orderSerialized, err := pb.Marshal(order)
if err != nil {
return OrderError.Wrap(err)
}
limitSizeBytes := [4]byte{}
binary.LittleEndian.PutUint32(limitSizeBytes[:], uint32(len(limitSerialized)))
orderSizeBytes := [4]byte{}
binary.LittleEndian.PutUint32(orderSizeBytes[:], uint32(len(orderSerialized)))
toWrite = append(toWrite, limitSizeBytes[:]...)
toWrite = append(toWrite, limitSerialized...)
toWrite = append(toWrite, orderSizeBytes[:]...)
toWrite = append(toWrite, orderSerialized...)
if _, err = f.Write(toWrite); err != nil {
return OrderError.New("Error writing serialized order size: %w", err)
}
return nil
}
// readLimitAndOrder reads the next limit and order from the file and returns them.
func readLimitAndOrder(f io.Reader) (*pb.OrderLimit, *pb.Order, error) {
sizeBytes := [4]byte{}
_, err := io.ReadFull(f, sizeBytes[:])
if err != nil {
return nil, nil, OrderError.Wrap(err)
}
limitSize := binary.LittleEndian.Uint32(sizeBytes[:])
limitSerialized := make([]byte, limitSize)
_, err = io.ReadFull(f, limitSerialized)
if err != nil {
return nil, nil, OrderError.Wrap(err)
}
limit := &pb.OrderLimit{}
err = pb.Unmarshal(limitSerialized, limit)
if err != nil {
return nil, nil, OrderError.Wrap(err)
}
_, err = io.ReadFull(f, sizeBytes[:])
if err != nil {
return nil, nil, OrderError.Wrap(err)
}
orderSize := binary.LittleEndian.Uint32(sizeBytes[:])
orderSerialized := make([]byte, orderSize)
_, err = io.ReadFull(f, orderSerialized)
if err != nil {
return nil, nil, OrderError.Wrap(err)
}
order := &pb.Order{}
err = pb.Unmarshal(orderSerialized, order)
if err != nil {
return nil, nil, OrderError.Wrap(err)
}
return limit, order, nil
}

View File

@ -17,6 +17,7 @@ import (
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/storagenode/orders"
"storj.io/storj/storagenode/orders/ordersfile"
)
func TestOrdersStore_Enqueue_GracePeriodFailure(t *testing.T) {
@ -31,7 +32,7 @@ func TestOrdersStore_Enqueue_GracePeriodFailure(t *testing.T) {
// adding order before grace period should result in an error
newSN := testrand.SerialNumber()
newInfo := &orders.Info{
newInfo := &ordersfile.Info{
Limit: &pb.OrderLimit{
SerialNumber: newSN,
SatelliteId: testrand.NodeID(),
@ -129,7 +130,7 @@ func TestOrdersStore_ListUnsentBySatellite(t *testing.T) {
archivedAt = archiveTime2
status = status2
}
err = ordersStore.Archive(satelliteID, unsentSatList.CreatedAtHour, archivedAt, status)
err = ordersStore.Archive(satelliteID, unsentSatList, archivedAt, status)
require.NoError(t, err)
}
}
@ -194,7 +195,7 @@ func TestOrdersStore_ListUnsentBySatellite_Ongoing(t *testing.T) {
// store an order that can be listed
sn := testrand.SerialNumber()
require.NoError(t, ordersStore.Enqueue(&orders.Info{
require.NoError(t, ordersStore.Enqueue(&ordersfile.Info{
Limit: &pb.OrderLimit{
SerialNumber: sn,
SatelliteId: satellite,
@ -223,7 +224,7 @@ func TestOrdersStore_ListUnsentBySatellite_Ongoing(t *testing.T) {
// commit the order
sn = testrand.SerialNumber()
require.NoError(t, commit(&orders.Info{
require.NoError(t, commit(&ordersfile.Info{
Limit: &pb.OrderLimit{
SerialNumber: sn,
SatelliteId: satellite,
@ -260,7 +261,7 @@ func TestOrdersStore_CorruptUnsent(t *testing.T) {
require.Len(t, unsent, 0)
sn := testrand.SerialNumber()
info := &orders.Info{
info := &ordersfile.Info{
Limit: &pb.OrderLimit{
SerialNumber: sn,
SatelliteId: satellite,
@ -300,7 +301,7 @@ func TestOrdersStore_CorruptUnsent(t *testing.T) {
require.Len(t, unsent[satellite].InfoList, 1)
}
func verifyInfosEqual(t *testing.T, a, b *orders.Info) {
func verifyInfosEqual(t *testing.T, a, b *ordersfile.Info) {
t.Helper()
require.NotNil(t, a)
@ -333,13 +334,13 @@ func verifyArchivedInfosEqual(t *testing.T, a, b *orders.ArchivedInfo) {
require.Equal(t, a.ArchivedAt.UTC(), b.ArchivedAt.UTC())
}
func storeNewOrders(ordersStore *orders.FileStore, numSatellites, numOrdersPerSatPerTime int, createdAtTimes []time.Time) (map[storj.SerialNumber]*orders.Info, error) {
func storeNewOrders(ordersStore *orders.FileStore, numSatellites, numOrdersPerSatPerTime int, createdAtTimes []time.Time) (map[storj.SerialNumber]*ordersfile.Info, error) {
actions := []pb.PieceAction{
pb.PieceAction_GET,
pb.PieceAction_PUT_REPAIR,
pb.PieceAction_GET_AUDIT,
}
originalInfos := make(map[storj.SerialNumber]*orders.Info)
originalInfos := make(map[storj.SerialNumber]*ordersfile.Info)
for i := 0; i < numSatellites; i++ {
satellite := testrand.NodeID()
@ -350,7 +351,7 @@ func storeNewOrders(ordersStore *orders.FileStore, numSatellites, numOrdersPerSa
sn := testrand.SerialNumber()
action := actions[j%len(actions)]
newInfo := &orders.Info{
newInfo := &ordersfile.Info{
Limit: &pb.OrderLimit{
SerialNumber: sn,
SatelliteId: satellite,

View File

@ -30,6 +30,7 @@ import (
"storj.io/storj/storagenode/bandwidth"
"storj.io/storj/storagenode/monitor"
"storj.io/storj/storagenode/orders"
"storj.io/storj/storagenode/orders/ordersfile"
"storj.io/storj/storagenode/pieces"
"storj.io/storj/storagenode/piecestore/usedserials"
"storj.io/storj/storagenode/retain"
@ -674,7 +675,7 @@ func (endpoint *Endpoint) beginSaveOrder(limit *pb.OrderLimit) (_commit func(ctx
return
}
err = commit(&orders.Info{Limit: limit, Order: order})
err = commit(&ordersfile.Info{Limit: limit, Order: order})
if err != nil {
endpoint.log.Error("failed to add order", zap.Error(err))
} else {

View File

@ -15,6 +15,7 @@ import (
"storj.io/common/storj"
"storj.io/storj/private/tagsql"
"storj.io/storj/storagenode/orders"
"storj.io/storj/storagenode/orders/ordersfile"
)
// ErrOrders represents errors from the ordersdb database.
@ -28,7 +29,7 @@ type ordersDB struct {
}
// Enqueue inserts order to the unsent list.
func (db *ordersDB) Enqueue(ctx context.Context, info *orders.Info) (err error) {
func (db *ordersDB) Enqueue(ctx context.Context, info *ordersfile.Info) (err error) {
defer mon.Task()(&ctx)(&err)
limitSerialized, err := pb.Marshal(info.Limit)
@ -61,7 +62,7 @@ func (db *ordersDB) Enqueue(ctx context.Context, info *orders.Info) (err error)
// which have not. In case of database or other system error, the method will
// stop without any further processing and will return an error without any
// order.
func (db *ordersDB) ListUnsent(ctx context.Context, limit int) (_ []*orders.Info, err error) {
func (db *ordersDB) ListUnsent(ctx context.Context, limit int) (_ []*ordersfile.Info, err error) {
defer mon.Task()(&ctx)(&err)
rows, err := db.QueryContext(ctx, `
@ -79,7 +80,7 @@ func (db *ordersDB) ListUnsent(ctx context.Context, limit int) (_ []*orders.Info
var unmarshalErrors errs.Group
defer func() { err = errs.Combine(err, unmarshalErrors.Err(), rows.Close()) }()
var infos []*orders.Info
var infos []*ordersfile.Info
for rows.Next() {
var limitSerialized []byte
var orderSerialized []byte
@ -89,7 +90,7 @@ func (db *ordersDB) ListUnsent(ctx context.Context, limit int) (_ []*orders.Info
return nil, ErrOrders.Wrap(err)
}
var info orders.Info
var info ordersfile.Info
info.Limit = &pb.OrderLimit{}
info.Order = &pb.Order{}
@ -120,7 +121,7 @@ func (db *ordersDB) ListUnsent(ctx context.Context, limit int) (_ []*orders.Info
// which have not. In case of database or other system error, the method will
// stop without any further processing and will return an error without any
// order.
func (db *ordersDB) ListUnsentBySatellite(ctx context.Context) (_ map[storj.NodeID][]*orders.Info, err error) {
func (db *ordersDB) ListUnsentBySatellite(ctx context.Context) (_ map[storj.NodeID][]*ordersfile.Info, err error) {
defer mon.Task()(&ctx)(&err)
// TODO: add some limiting
@ -138,7 +139,7 @@ func (db *ordersDB) ListUnsentBySatellite(ctx context.Context) (_ map[storj.Node
var unmarshalErrors errs.Group
defer func() { err = errs.Combine(err, unmarshalErrors.Err(), rows.Close()) }()
infos := map[storj.NodeID][]*orders.Info{}
infos := map[storj.NodeID][]*ordersfile.Info{}
for rows.Next() {
var limitSerialized []byte
var orderSerialized []byte
@ -148,7 +149,7 @@ func (db *ordersDB) ListUnsentBySatellite(ctx context.Context) (_ map[storj.Node
return nil, ErrOrders.Wrap(err)
}
var info orders.Info
var info ordersfile.Info
info.Limit = &pb.OrderLimit{}
info.Order = &pb.Order{}

View File

@ -24,7 +24,7 @@ import (
"storj.io/common/uuid"
"storj.io/storj/storage/filestore"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/orders"
"storj.io/storj/storagenode/orders/ordersfile"
"storj.io/storj/storagenode/storagenodedb"
"storj.io/storj/storagenode/storagenodedb/storagenodedbtest"
)
@ -92,7 +92,7 @@ func testConcurrency(t *testing.T, ctx *testcontext.Context, db *storagenodedb.D
t.Fatal(err)
}
ordersMap := make(map[string]orders.Info)
ordersMap := make(map[string]ordersfile.Info)
err = createOrders(t, ctx, ordersMap, 1000)
require.NoError(t, err)
@ -104,7 +104,7 @@ func testConcurrency(t *testing.T, ctx *testcontext.Context, db *storagenodedb.D
})
}
func insertOrders(t *testing.T, ctx *testcontext.Context, db *storagenodedb.DB, ordersMap map[string]orders.Info) (err error) {
func insertOrders(t *testing.T, ctx *testcontext.Context, db *storagenodedb.DB, ordersMap map[string]ordersfile.Info) (err error) {
var wg sync.WaitGroup
for _, order := range ordersMap {
wg.Add(1)
@ -116,13 +116,13 @@ func insertOrders(t *testing.T, ctx *testcontext.Context, db *storagenodedb.DB,
return nil
}
func insertOrder(t *testing.T, ctx *testcontext.Context, db *storagenodedb.DB, wg *sync.WaitGroup, order *orders.Info) {
func insertOrder(t *testing.T, ctx *testcontext.Context, db *storagenodedb.DB, wg *sync.WaitGroup, order *ordersfile.Info) {
defer wg.Done()
err := db.Orders().Enqueue(ctx, order)
require.NoError(t, err)
}
func verifyOrders(t *testing.T, ctx *testcontext.Context, db *storagenodedb.DB, orders map[string]orders.Info) (err error) {
func verifyOrders(t *testing.T, ctx *testcontext.Context, db *storagenodedb.DB, orders map[string]ordersfile.Info) (err error) {
dbOrders, _ := db.Orders().ListUnsent(ctx, 10000)
found := 0
for _, order := range orders {
@ -137,7 +137,7 @@ func verifyOrders(t *testing.T, ctx *testcontext.Context, db *storagenodedb.DB,
return nil
}
func createOrders(t *testing.T, ctx *testcontext.Context, orders map[string]orders.Info, count int) (err error) {
func createOrders(t *testing.T, ctx *testcontext.Context, orders map[string]ordersfile.Info, count int) (err error) {
for i := 0; i < count; i++ {
key, err := uuid.New()
if err != nil {
@ -149,7 +149,7 @@ func createOrders(t *testing.T, ctx *testcontext.Context, orders map[string]orde
return nil
}
func createOrder(t *testing.T, ctx *testcontext.Context) (info *orders.Info) {
func createOrder(t *testing.T, ctx *testcontext.Context) (info *ordersfile.Info) {
storageNodeIdentity := testidentity.MustPregeneratedSignedIdentity(0, storj.LatestIDVersion())
satelliteIdentity := testidentity.MustPregeneratedSignedIdentity(1, storj.LatestIDVersion())
@ -179,7 +179,7 @@ func createOrder(t *testing.T, ctx *testcontext.Context) (info *orders.Info) {
})
require.NoError(t, err)
return &orders.Info{
return &ordersfile.Info{
Limit: limit,
Order: order,
}