storagenode/orders: Add V1 orders file

V1 allows the storagenode to continue reading orders from an
unsent/archived orders file, even if orders in the middle are corrupted.

Change-Id: Iea4117d55c05ceeb77f47d5c973e5ba95da46c66
This commit is contained in:
Moby von Briesen 2020-10-06 12:14:25 -04:00 committed by Maximillian von Briesen
parent 59d85aab5b
commit 02cbf1e72a
6 changed files with 548 additions and 64 deletions

View File

@ -11,16 +11,22 @@ import (
"strings" "strings"
"time" "time"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs" "github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/pb" "storj.io/common/pb"
"storj.io/common/storj" "storj.io/common/storj"
"storj.io/storj/private/date" "storj.io/storj/private/date"
) )
// Version is a type for defining different file versions.
type Version string
const ( const (
// V0 is the first orders file version. It stores orders and limits with no checksum.
V0 = Version("v0")
// V1 is the second orders file version. It includes a checksum for each entry so that file corruption is handled better.
V1 = Version("v1")
unsentFilePrefix = "unsent-orders-" unsentFilePrefix = "unsent-orders-"
archiveFilePrefix = "archived-orders-" archiveFilePrefix = "archived-orders-"
) )
@ -28,8 +34,8 @@ const (
var ( var (
// Error identifies errors with orders files. // Error identifies errors with orders files.
Error = errs.Class("ordersfile") Error = errs.Class("ordersfile")
// ErrEntryCorrupt is returned when a corrupt entry is found.
mon = monkit.Package() ErrEntryCorrupt = errs.Class("ordersfile corrupt entry")
) )
// Info contains full information about an order. // Info contains full information about an order.
@ -51,26 +57,28 @@ type Readable interface {
} }
// OpenWritableUnsent creates or opens for appending the unsent orders file for a given satellite ID and creation hour. // OpenWritableUnsent creates or opens for appending the unsent orders file for a given satellite ID and creation hour.
func OpenWritableUnsent(log *zap.Logger, unsentDir string, satelliteID storj.NodeID, creationTime time.Time) (Writable, error) { func OpenWritableUnsent(unsentDir string, satelliteID storj.NodeID, creationTime time.Time) (Writable, error) {
fileName := unsentFileName(satelliteID, creationTime) // if V0 file already exists, use that. Otherwise use V1 file.
versionToUse := V0
fileName := UnsentFileName(satelliteID, creationTime, V0)
filePath := filepath.Join(unsentDir, fileName) filePath := filepath.Join(unsentDir, fileName)
if _, err := os.Stat(filePath); os.IsNotExist(err) {
// create file if not exists or append fileName = UnsentFileName(satelliteID, creationTime, V1)
f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) filePath = filepath.Join(unsentDir, fileName)
if err != nil { versionToUse = V1
return nil, Error.Wrap(err)
} }
return &fileV0{ if versionToUse == V0 {
log: log.Named("writable V0 orders file"), return OpenWritableV0(filePath)
f: f, }
}, nil return OpenWritableV1(filePath, satelliteID, creationTime)
} }
// UnsentInfo contains information relevant to an unsent orders file, as well as information necessary to open it for reading. // UnsentInfo contains information relevant to an unsent orders file, as well as information necessary to open it for reading.
type UnsentInfo struct { type UnsentInfo struct {
SatelliteID storj.NodeID SatelliteID storj.NodeID
CreatedAtHour time.Time CreatedAtHour time.Time
Version Version
} }
// ArchivedInfo contains information relevant to an archived orders file, as well as information necessary to open it for reading. // ArchivedInfo contains information relevant to an archived orders file, as well as information necessary to open it for reading.
@ -79,11 +87,12 @@ type ArchivedInfo struct {
CreatedAtHour time.Time CreatedAtHour time.Time
ArchivedAt time.Time ArchivedAt time.Time
StatusText string StatusText string
Version Version
} }
// GetUnsentInfo returns a new UnsentInfo which can be used to get information about and read from an unsent orders file. // GetUnsentInfo returns a new UnsentInfo which can be used to get information about and read from an unsent orders file.
func GetUnsentInfo(info os.FileInfo) (*UnsentInfo, error) { func GetUnsentInfo(info os.FileInfo) (*UnsentInfo, error) {
satelliteID, createdAtHour, err := getUnsentFileInfo(info.Name()) satelliteID, createdAtHour, version, err := getUnsentFileInfo(info.Name())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -91,12 +100,13 @@ func GetUnsentInfo(info os.FileInfo) (*UnsentInfo, error) {
return &UnsentInfo{ return &UnsentInfo{
SatelliteID: satelliteID, SatelliteID: satelliteID,
CreatedAtHour: createdAtHour, CreatedAtHour: createdAtHour,
Version: version,
}, nil }, nil
} }
// GetArchivedInfo returns a new ArchivedInfo which can be used to get information about and read from an archived orders file. // GetArchivedInfo returns a new ArchivedInfo which can be used to get information about and read from an archived orders file.
func GetArchivedInfo(info os.FileInfo) (*ArchivedInfo, error) { func GetArchivedInfo(info os.FileInfo) (*ArchivedInfo, error) {
satelliteID, createdAtHour, archivedAt, statusText, err := getArchivedFileInfo(info.Name()) satelliteID, createdAtHour, archivedAt, statusText, version, err := getArchivedFileInfo(info.Name())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -106,113 +116,125 @@ func GetArchivedInfo(info os.FileInfo) (*ArchivedInfo, error) {
CreatedAtHour: createdAtHour, CreatedAtHour: createdAtHour,
ArchivedAt: archivedAt, ArchivedAt: archivedAt,
StatusText: statusText, StatusText: statusText,
Version: version,
}, nil }, nil
} }
// OpenReadable opens for reading the unsent or archived orders file at a given path. // OpenReadable opens for reading the unsent or archived orders file at a given path.
// It assumes the path has already been validated with GetUnsentInfo or GetArchivedInfo. // It assumes the path has already been validated with GetUnsentInfo or GetArchivedInfo.
func OpenReadable(log *zap.Logger, path string) (Readable, error) { func OpenReadable(path string, version Version) (Readable, error) {
f, err := os.Open(path) if version == V0 {
if err != nil { return OpenReadableV0(path)
return nil, Error.Wrap(err)
} }
return OpenReadableV1(path)
return &fileV0{
log: log.Named("readable V0 orders file"),
f: f,
}, nil
} }
// MoveUnsent moves an unsent orders file to the archived orders file directory. // MoveUnsent moves an unsent orders file to the archived orders file directory.
func MoveUnsent(unsentDir, archiveDir string, satelliteID storj.NodeID, createdAtHour, archivedAt time.Time, status pb.SettlementWithWindowResponse_Status) error { func MoveUnsent(unsentDir, archiveDir string, satelliteID storj.NodeID, createdAtHour, archivedAt time.Time, status pb.SettlementWithWindowResponse_Status, version Version) error {
oldFilePath := filepath.Join(unsentDir, unsentFileName(satelliteID, createdAtHour)) oldFilePath := filepath.Join(unsentDir, UnsentFileName(satelliteID, createdAtHour, version))
newFilePath := filepath.Join(archiveDir, archiveFileName(satelliteID, createdAtHour, archivedAt, status)) newFilePath := filepath.Join(archiveDir, ArchiveFileName(satelliteID, createdAtHour, archivedAt, status, version))
return Error.Wrap(os.Rename(oldFilePath, newFilePath)) return Error.Wrap(os.Rename(oldFilePath, newFilePath))
} }
// it expects the file name to be in the format "unsent-orders-<satelliteID>-<createdAtHour>". // it expects the file name to be in the format "unsent-orders-<satelliteID>-<createdAtHour>.<version>".
func getUnsentFileInfo(filename string) (satellite storj.NodeID, createdHour time.Time, err error) { // V0 will not have ".<version>" at the end of the filename.
func getUnsentFileInfo(filename string) (satellite storj.NodeID, createdHour time.Time, version Version, err error) {
filename, version = getVersion(filename)
if !strings.HasPrefix(filename, unsentFilePrefix) { if !strings.HasPrefix(filename, unsentFilePrefix) {
return storj.NodeID{}, time.Time{}, Error.New("invalid path: %q", filename) return storj.NodeID{}, time.Time{}, version, Error.New("invalid path: %q", filename)
} }
// chop off prefix to get satellite ID and created hours // chop off prefix to get satellite ID and created hours
infoStr := filename[len(unsentFilePrefix):] infoStr := filename[len(unsentFilePrefix):]
infoSlice := strings.Split(infoStr, "-") infoSlice := strings.Split(infoStr, "-")
if len(infoSlice) != 2 { if len(infoSlice) != 2 {
return storj.NodeID{}, time.Time{}, Error.New("invalid path: %q", filename) return storj.NodeID{}, time.Time{}, version, Error.New("invalid path: %q", filename)
} }
satelliteIDStr := infoSlice[0] satelliteIDStr := infoSlice[0]
satelliteID, err := storj.NodeIDFromString(satelliteIDStr) satelliteID, err := storj.NodeIDFromString(satelliteIDStr)
if err != nil { if err != nil {
return storj.NodeID{}, time.Time{}, Error.New("invalid path: %q", filename) return storj.NodeID{}, time.Time{}, version, Error.New("invalid path: %q", filename)
} }
timeStr := infoSlice[1] timeStr := infoSlice[1]
createdHourUnixNano, err := strconv.ParseInt(timeStr, 10, 64) createdHourUnixNano, err := strconv.ParseInt(timeStr, 10, 64)
if err != nil { if err != nil {
return satelliteID, time.Time{}, Error.Wrap(err) return satelliteID, time.Time{}, version, Error.Wrap(err)
} }
createdAtHour := time.Unix(0, createdHourUnixNano) createdAtHour := time.Unix(0, createdHourUnixNano)
return satelliteID, createdAtHour, nil return satelliteID, createdAtHour, version, nil
} }
// getArchivedFileInfo gets the archived at time from an archive file name. // getArchivedFileInfo gets the archived at time from an archive file name.
// it expects the file name to be in the format "archived-orders-<satelliteID>-<createdAtHour>-<archviedAtTime>-<status>". // it expects the file name to be in the format "archived-orders-<satelliteID>-<createdAtHour>-<archviedAtTime>-<status>.<version>".
func getArchivedFileInfo(name string) (satelliteID storj.NodeID, createdAtHour, archivedAt time.Time, status string, err error) { // V0 will not have ".<version>" at the end of the filename.
func getArchivedFileInfo(name string) (satelliteID storj.NodeID, createdAtHour, archivedAt time.Time, status string, version Version, err error) {
name, version = getVersion(name)
if !strings.HasPrefix(name, archiveFilePrefix) { if !strings.HasPrefix(name, archiveFilePrefix) {
return storj.NodeID{}, time.Time{}, time.Time{}, "", Error.New("invalid path: %q", name) return storj.NodeID{}, time.Time{}, time.Time{}, "", version, Error.New("invalid path: %q", name)
} }
// chop off prefix to get satellite ID, created hour, archive time, and status // chop off prefix to get satellite ID, created hour, archive time, and status
infoStr := name[len(archiveFilePrefix):] infoStr := name[len(archiveFilePrefix):]
infoSlice := strings.Split(infoStr, "-") infoSlice := strings.Split(infoStr, "-")
if len(infoSlice) != 4 { if len(infoSlice) != 4 {
return storj.NodeID{}, time.Time{}, time.Time{}, "", Error.New("invalid path: %q", name) return storj.NodeID{}, time.Time{}, time.Time{}, "", version, Error.New("invalid path: %q", name)
} }
satelliteIDStr := infoSlice[0] satelliteIDStr := infoSlice[0]
satelliteID, err = storj.NodeIDFromString(satelliteIDStr) satelliteID, err = storj.NodeIDFromString(satelliteIDStr)
if err != nil { if err != nil {
return storj.NodeID{}, time.Time{}, time.Time{}, "", Error.New("invalid path: %q", name) return storj.NodeID{}, time.Time{}, time.Time{}, "", version, Error.New("invalid path: %q", name)
} }
createdAtStr := infoSlice[1] createdAtStr := infoSlice[1]
createdHourUnixNano, err := strconv.ParseInt(createdAtStr, 10, 64) createdHourUnixNano, err := strconv.ParseInt(createdAtStr, 10, 64)
if err != nil { if err != nil {
return satelliteID, time.Time{}, time.Time{}, "", Error.New("invalid path: %q", name) return satelliteID, time.Time{}, time.Time{}, "", version, Error.New("invalid path: %q", name)
} }
createdAtHour = time.Unix(0, createdHourUnixNano) createdAtHour = time.Unix(0, createdHourUnixNano)
archivedAtStr := infoSlice[2] archivedAtStr := infoSlice[2]
archivedAtUnixNano, err := strconv.ParseInt(archivedAtStr, 10, 64) archivedAtUnixNano, err := strconv.ParseInt(archivedAtStr, 10, 64)
if err != nil { if err != nil {
return satelliteID, createdAtHour, time.Time{}, "", Error.New("invalid path: %q", name) return satelliteID, createdAtHour, time.Time{}, "", version, Error.New("invalid path: %q", name)
} }
archivedAt = time.Unix(0, archivedAtUnixNano) archivedAt = time.Unix(0, archivedAtUnixNano)
status = infoSlice[3] status = infoSlice[3]
return satelliteID, createdAtHour, archivedAt, status, nil return satelliteID, createdAtHour, archivedAt, status, version, nil
} }
func unsentFileName(satelliteID storj.NodeID, creationTime time.Time) string { // UnsentFileName gets the filename of an unsent file.
return fmt.Sprintf("%s%s-%s", func UnsentFileName(satelliteID storj.NodeID, creationTime time.Time, version Version) string {
filename := fmt.Sprintf("%s%s-%s",
unsentFilePrefix, unsentFilePrefix,
satelliteID, satelliteID,
getCreationHourString(creationTime), getCreationHourString(creationTime),
) )
if version != V0 {
filename = fmt.Sprintf("%s.%s", filename, version)
}
return filename
} }
func archiveFileName(satelliteID storj.NodeID, creationTime, archiveTime time.Time, status pb.SettlementWithWindowResponse_Status) string { // ArchiveFileName gets the filename of an archived file.
return fmt.Sprintf("%s%s-%s-%s-%s", func ArchiveFileName(satelliteID storj.NodeID, creationTime, archiveTime time.Time, status pb.SettlementWithWindowResponse_Status, version Version) string {
filename := fmt.Sprintf("%s%s-%s-%s-%s",
archiveFilePrefix, archiveFilePrefix,
satelliteID, satelliteID,
getCreationHourString(creationTime), getCreationHourString(creationTime),
strconv.FormatInt(archiveTime.UnixNano(), 10), strconv.FormatInt(archiveTime.UnixNano(), 10),
pb.SettlementWithWindowResponse_Status_name[int32(status)], pb.SettlementWithWindowResponse_Status_name[int32(status)],
) )
if version != V0 {
filename = fmt.Sprintf("%s.%s", filename, version)
}
return filename
} }
func getCreationHourString(t time.Time) string { func getCreationHourString(t time.Time) string {
@ -220,3 +242,13 @@ func getCreationHourString(t time.Time) string {
timeStr := strconv.FormatInt(creationHour, 10) timeStr := strconv.FormatInt(creationHour, 10)
return timeStr return timeStr
} }
func getVersion(filename string) (trimmed string, version Version) {
ext := filepath.Ext(filename)
if ext == "."+string(V1) {
version = V1
trimmed = strings.TrimSuffix(filename, ext)
return trimmed, V1
}
return filename, V0
}

View File

@ -9,17 +9,37 @@ import (
"io" "io"
"os" "os"
"go.uber.org/zap"
"storj.io/common/pb" "storj.io/common/pb"
) )
// fileV0 is a version 0 orders file. // fileV0 is a version 0 orders file.
type fileV0 struct { type fileV0 struct {
log *zap.Logger
f *os.File f *os.File
} }
// OpenWritableV0 opens for writing the unsent or archived orders file at a given path.
func OpenWritableV0(path string) (Writable, error) {
// create file if not exists or append
f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nil, Error.Wrap(err)
}
return &fileV0{
f: f,
}, nil
}
// OpenReadableV0 opens for reading the unsent or archived orders file at a given path.
func OpenReadableV0(path string) (Readable, error) {
f, err := os.Open(path)
if err != nil {
return nil, Error.Wrap(err)
}
return &fileV0{
f: f,
}, nil
}
// Append writes limit and order to the file as // Append writes limit and order to the file as
// [limitSize][limitBytes][orderSize][orderBytes]. // [limitSize][limitBytes][orderSize][orderBytes].
func (of *fileV0) Append(info *Info) error { func (of *fileV0) Append(info *Info) error {
@ -55,12 +75,8 @@ func (of *fileV0) Append(info *Info) error {
// ReadOne reads one entry from the file. // ReadOne reads one entry from the file.
func (of *fileV0) ReadOne() (info *Info, err error) { func (of *fileV0) ReadOne() (info *Info, err error) {
defer func() { defer func() {
// if error is unexpected EOF, file is corrupted.
// V0 files do not handle corruption, so just return EOF so caller thinks we have reached the end of the file.
if errors.Is(err, io.ErrUnexpectedEOF) { if errors.Is(err, io.ErrUnexpectedEOF) {
of.log.Warn("Unexpected EOF while reading archived order file", zap.Error(err)) err = ErrEntryCorrupt.Wrap(err)
mon.Meter("orders_archive_file_corrupted").Mark64(1)
err = io.EOF
} }
}() }()

View File

@ -0,0 +1,268 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package ordersfile
import (
"bufio"
"bytes"
"encoding/binary"
"errors"
"hash/crc32"
"io"
"os"
"time"
"github.com/zeebo/errs"
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/storj/private/date"
)
var (
// fileMagic used to identify header of file.
// "0ddba11 acc01ade5".
fileMagic = [8]byte{0x0d, 0xdb, 0xa1, 0x1a, 0xcc, 0x01, 0xad, 0xe5}
// entryHeader is 8 bytes that appears before every order/limit in a V1 file.
// "5ca1ab1e ba5eba11".
entryHeader = [8]byte{0x5c, 0xa1, 0xab, 0x1e, 0xba, 0x5e, 0xba, 0x11}
// entryFooter is 8 bytes that appears after every order/limit in a V1 file.
// "feed 1 f00d 1 c0ffee".
entryFooter = [8]byte{0xfe, 0xed, 0x1f, 0x00, 0xd1, 0xc0, 0xff, 0xee}
)
// fileV1 is a version 1 orders file.
type fileV1 struct {
f *os.File
br *bufio.Reader
}
// OpenWritableV1 opens for writing the unsent or archived orders file at a given path.
// If the file is new, the file header is written.
func OpenWritableV1(path string, satelliteID storj.NodeID, creationTime time.Time) (Writable, error) {
// create file if not exists or append
f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nil, Error.Wrap(err)
}
of := &fileV1{
f: f,
}
currentPos, err := of.f.Seek(0, io.SeekCurrent)
if err != nil {
return nil, Error.Wrap(err)
}
if currentPos == 0 {
err = of.writeHeader(satelliteID, creationTime)
if err != nil {
return of, err
}
}
return of, nil
}
// writeHeader writes file header as [filemagic][satellite ID][creation hour].
func (of *fileV1) writeHeader(satelliteID storj.NodeID, creationTime time.Time) error {
toWrite := fileMagic[:]
toWrite = append(toWrite, satelliteID.Bytes()...)
creationHour := date.TruncateToHourInNano(creationTime)
creationHourBytes := [8]byte{}
binary.LittleEndian.PutUint64(creationHourBytes[:], uint64(creationHour))
toWrite = append(toWrite, creationHourBytes[:]...)
if _, err := of.f.Write(toWrite); err != nil {
return Error.New("Couldn't write file header: %w", err)
}
return nil
}
// OpenReadableV1 opens for reading the unsent or archived orders file at a given path.
func OpenReadableV1(path string) (Readable, error) {
f, err := os.Open(path)
if err != nil {
return nil, Error.Wrap(err)
}
return &fileV1{
f: f,
// buffered reader is used to search for entryHeader that precedes limit and order.
br: bufio.NewReader(f),
}, nil
}
// Append writes limit and order to the file as
// [entryHeader][limitSize][limitBytes][orderSize][orderBytes][checksum][entryFooter].
func (of *fileV1) Append(info *Info) error {
toWrite := entryHeader[:]
limitSerialized, err := pb.Marshal(info.Limit)
if err != nil {
return Error.Wrap(err)
}
orderSerialized, err := pb.Marshal(info.Order)
if err != nil {
return Error.Wrap(err)
}
limitSizeBytes := [2]byte{}
binary.LittleEndian.PutUint16(limitSizeBytes[:], uint16(len(limitSerialized)))
orderSizeBytes := [2]byte{}
binary.LittleEndian.PutUint16(orderSizeBytes[:], uint16(len(orderSerialized)))
toWrite = append(toWrite, limitSizeBytes[:]...)
toWrite = append(toWrite, limitSerialized...)
toWrite = append(toWrite, orderSizeBytes[:]...)
toWrite = append(toWrite, orderSerialized...)
checksumInt := crc32.ChecksumIEEE(toWrite[len(entryHeader):])
checksumBytes := [4]byte{}
binary.LittleEndian.PutUint32(checksumBytes[:], checksumInt)
toWrite = append(toWrite, checksumBytes[:]...)
toWrite = append(toWrite, entryFooter[:]...)
if _, err = of.f.Write(toWrite); err != nil {
return Error.New("Couldn't write serialized order and limit: %w", err)
}
return nil
}
// ReadOne reads one entry from the file.
// It returns ErrEntryCorrupt upon finding a corrupt limit/order combo. On next call after a corrupt entry, it will find the next valid order.
func (of *fileV1) ReadOne() (info *Info, err error) {
// attempt to read an order/limit; if corrupted, keep trying until EOF or uncorrupted pair found.
// start position will be the position of the of.f cursor minus the number of unread buffered bytes in of.br
startPosition, err := of.f.Seek(0, io.SeekCurrent)
if err != nil {
return nil, Error.Wrap(err)
}
startPosition -= int64(of.br.Buffered())
defer func() {
// Treat all non-EOF errors as corrupt entry errors so that ReadOne is called again.
if err != nil && !errors.Is(err, io.EOF) {
// seek to just after where we started at the beginning of ReadOne
_, seekErr := of.f.Seek(startPosition+1, io.SeekStart)
if err != nil {
err = errs.Combine(err, seekErr)
}
of.br.Reset(of.f)
err = ErrEntryCorrupt.Wrap(err)
}
}()
err = of.gotoNextEntry()
if err != nil {
return nil, err
}
limitSizeBytes := [2]byte{}
_, err = io.ReadFull(of.br, limitSizeBytes[:])
if err != nil {
return nil, Error.Wrap(err)
}
limitSize := binary.LittleEndian.Uint16(limitSizeBytes[:])
limitSerialized := make([]byte, limitSize)
_, err = io.ReadFull(of.br, limitSerialized)
if err != nil {
return nil, Error.Wrap(err)
}
limit := &pb.OrderLimit{}
err = pb.Unmarshal(limitSerialized, limit)
if err != nil {
return nil, Error.Wrap(err)
}
orderSizeBytes := [2]byte{}
_, err = io.ReadFull(of.br, orderSizeBytes[:])
if err != nil {
return nil, Error.Wrap(err)
}
orderSize := binary.LittleEndian.Uint16(orderSizeBytes[:])
orderSerialized := make([]byte, orderSize)
_, err = io.ReadFull(of.br, orderSerialized)
if err != nil {
return nil, Error.Wrap(err)
}
order := &pb.Order{}
err = pb.Unmarshal(orderSerialized, order)
if err != nil {
return nil, Error.Wrap(err)
}
// read checksum
checksumBytes := [4]byte{}
_, err = io.ReadFull(of.br, checksumBytes[:])
if err != nil {
return nil, Error.Wrap(err)
}
expectedChecksum := binary.LittleEndian.Uint32(checksumBytes[:])
actualChecksum := uint32(0)
actualChecksum = crc32.Update(actualChecksum, crc32.IEEETable, limitSizeBytes[:])
actualChecksum = crc32.Update(actualChecksum, crc32.IEEETable, limitSerialized)
actualChecksum = crc32.Update(actualChecksum, crc32.IEEETable, orderSizeBytes[:])
actualChecksum = crc32.Update(actualChecksum, crc32.IEEETable, orderSerialized)
if expectedChecksum != actualChecksum {
return nil, Error.New("checksum does not match")
}
footerBytes := [len(entryFooter)]byte{}
_, err = io.ReadFull(of.br, footerBytes[:])
if err != nil {
return nil, Error.Wrap(err)
}
if !bytes.Equal(entryFooter[:], footerBytes[:]) {
return nil, Error.New("footer bytes do not match")
}
return &Info{
Limit: limit,
Order: order,
}, nil
}
func (of *fileV1) gotoNextEntry() error {
// search file for next occurrence of entry header, or until EOF
for {
searchBufSize := 2 * memory.KiB.Int()
nextBufferBytes, err := of.br.Peek(searchBufSize)
// if the buffered reader hits an EOF, the buffered data may still
// contain a full entry, so do not return unless there is definitely no entry
if errors.Is(err, io.EOF) && len(nextBufferBytes) <= len(entryHeader) {
return err
} else if err != nil && !errors.Is(err, io.EOF) {
return Error.Wrap(err)
}
i := bytes.Index(nextBufferBytes, entryHeader[:])
if i > -1 {
_, err = of.br.Discard(i + len(entryHeader))
if err != nil {
return Error.Wrap(err)
}
break
}
// entry header not found; discard all but last (len(entryHeader)-1) bytes for next iteration
_, err = of.br.Discard(len(nextBufferBytes) - len(entryHeader) + 1)
if err != nil {
return Error.Wrap(err)
}
}
return nil
}
// Close closes the file.
func (of *fileV1) Close() error {
return of.f.Close()
}

View File

@ -320,7 +320,7 @@ func TestCleanArchiveFileStore(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// archive one order yesterday, one today // archive one order yesterday, one today
unsentInfo := orders.UnsentInfo{} unsentInfo := orders.UnsentInfo{Version: ordersfile.V1}
unsentInfo.CreatedAtHour = createdAt0.Truncate(time.Hour) unsentInfo.CreatedAtHour = createdAt0.Truncate(time.Hour)
err = node.OrdersStore.Archive(satellite, unsentInfo, yesterday, pb.SettlementWithWindowResponse_ACCEPTED) err = node.OrdersStore.Archive(satellite, unsentInfo, yesterday, pb.SettlementWithWindowResponse_ACCEPTED)
require.NoError(t, err) require.NoError(t, err)

View File

@ -109,7 +109,7 @@ func (store *FileStore) BeginEnqueue(satelliteID storj.NodeID, createdAt time.Ti
} }
// write out the data // write out the data
of, err := ordersfile.OpenWritableUnsent(store.log, store.unsentDir, info.Limit.SatelliteId, info.Limit.OrderCreation) of, err := ordersfile.OpenWritableUnsent(store.unsentDir, info.Limit.SatelliteId, info.Limit.OrderCreation)
if err != nil { if err != nil {
return OrderError.Wrap(err) return OrderError.Wrap(err)
} }
@ -172,6 +172,7 @@ func (store *FileStore) Enqueue(info *ordersfile.Info) (err error) {
// UnsentInfo is a struct containing a window of orders for a satellite and order creation hour. // UnsentInfo is a struct containing a window of orders for a satellite and order creation hour.
type UnsentInfo struct { type UnsentInfo struct {
CreatedAtHour time.Time CreatedAtHour time.Time
Version ordersfile.Version
InfoList []*ordersfile.Info InfoList []*ordersfile.Info
} }
@ -219,9 +220,10 @@ func (store *FileStore) ListUnsentBySatellite(now time.Time) (infoMap map[storj.
newUnsentInfo := UnsentInfo{ newUnsentInfo := UnsentInfo{
CreatedAtHour: fileInfo.CreatedAtHour, CreatedAtHour: fileInfo.CreatedAtHour,
Version: fileInfo.Version,
} }
of, err := ordersfile.OpenReadable(store.log, path) of, err := ordersfile.OpenReadable(path, fileInfo.Version)
if err != nil { if err != nil {
return OrderError.Wrap(err) return OrderError.Wrap(err)
} }
@ -237,6 +239,12 @@ func (store *FileStore) ListUnsentBySatellite(now time.Time) (infoMap map[storj.
if errs.Is(err, io.EOF) { if errs.Is(err, io.EOF) {
break break
} }
// if last entry read is corrupt, attempt to read again
if ordersfile.ErrEntryCorrupt.Has(err) {
store.log.Warn("Corrupted order detected in orders file", zap.Error(err))
mon.Meter("orders_unsent_file_corrupted").Mark64(1)
continue
}
return err return err
} }
@ -267,6 +275,7 @@ func (store *FileStore) Archive(satelliteID storj.NodeID, unsentInfo UnsentInfo,
unsentInfo.CreatedAtHour, unsentInfo.CreatedAtHour,
archivedAt, archivedAt,
status, status,
unsentInfo.Version,
)) ))
} }
@ -290,7 +299,7 @@ func (store *FileStore) ListArchived() ([]*ArchivedInfo, error) {
if err != nil { if err != nil {
return OrderError.Wrap(err) return OrderError.Wrap(err)
} }
of, err := ordersfile.OpenReadable(store.log, path) of, err := ordersfile.OpenReadable(path, fileInfo.Version)
if err != nil { if err != nil {
return OrderError.Wrap(err) return OrderError.Wrap(err)
} }
@ -312,6 +321,12 @@ func (store *FileStore) ListArchived() ([]*ArchivedInfo, error) {
if errs.Is(err, io.EOF) { if errs.Is(err, io.EOF) {
break break
} }
// if last entry read is corrupt, attempt to read again
if ordersfile.ErrEntryCorrupt.Has(err) {
store.log.Warn("Corrupted order detected in orders file", zap.Error(err))
mon.Meter("orders_archive_file_corrupted").Mark64(1)
continue
}
return err return err
} }

View File

@ -243,7 +243,7 @@ func TestOrdersStore_ListUnsentBySatellite_Ongoing(t *testing.T) {
require.Len(t, unsent, 1) require.Len(t, unsent, 1)
} }
func TestOrdersStore_CorruptUnsent(t *testing.T) { func TestOrdersStore_CorruptUnsentV0(t *testing.T) {
ctx := testcontext.New(t) ctx := testcontext.New(t)
defer ctx.Cleanup() defer ctx.Cleanup()
dirName := ctx.Dir("test-orders") dirName := ctx.Dir("test-orders")
@ -273,8 +273,82 @@ func TestOrdersStore_CorruptUnsent(t *testing.T) {
Amount: 1, Amount: 1,
}, },
} }
// store two orders for the same window // store two orders for the same window using deprecated V0
unsentFileName := ordersfile.UnsentFileName(satellite, now, ordersfile.V0)
unsentDir := filepath.Join(dirName, "unsent")
unsentFilePath := filepath.Join(unsentDir, unsentFileName)
of, err := ordersfile.OpenWritableV0(unsentFilePath)
require.NoError(t, err)
require.NoError(t, of.Append(info))
require.NoError(t, of.Append(info))
require.NoError(t, of.Close())
// check that we can see both orders tomorrow
unsent, err = ordersStore.ListUnsentBySatellite(tomorrow)
require.NoError(t, err)
require.Len(t, unsent, 1)
require.Len(t, unsent[satellite].InfoList, 2)
// corrupt unsent orders file by removing the last byte
err = filepath.Walk(unsentDir, func(path string, info os.FileInfo, err error) error {
require.NoError(t, err)
if info.IsDir() {
return nil
}
err = os.Truncate(path, info.Size()-1)
return err
})
require.NoError(t, err)
// add another order, which we shouldn't see for V0 since it is after the corrupted one
of, err = ordersfile.OpenWritableV0(unsentFilePath)
require.NoError(t, err)
require.NoError(t, of.Append(info))
require.NoError(t, of.Close())
// only the second order should be corrupted, so we should still see one order
unsent, err = ordersStore.ListUnsentBySatellite(tomorrow)
require.NoError(t, err)
require.Len(t, unsent, 1)
require.Len(t, unsent[satellite].InfoList, 1)
}
func TestOrdersStore_CorruptUnsentV1(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
dirName := ctx.Dir("test-orders")
now := time.Now()
satellite := testrand.NodeID()
tomorrow := now.Add(24 * time.Hour)
// make order limit grace period 1 hour
ordersStore, err := orders.NewFileStore(zaptest.NewLogger(t), dirName, time.Hour)
require.NoError(t, err)
// empty store means no orders can be listed
unsent, err := ordersStore.ListUnsentBySatellite(tomorrow)
require.NoError(t, err)
require.Len(t, unsent, 0)
sn1 := testrand.SerialNumber()
sn2 := testrand.SerialNumber()
sn3 := testrand.SerialNumber()
info := &ordersfile.Info{
Limit: &pb.OrderLimit{
SerialNumber: sn1,
SatelliteId: satellite,
Action: pb.PieceAction_GET,
OrderCreation: now,
},
Order: &pb.Order{
SerialNumber: sn1,
Amount: 1,
},
}
// store sn1 and sn2 in the same window
require.NoError(t, ordersStore.Enqueue(info)) require.NoError(t, ordersStore.Enqueue(info))
info.Limit.SerialNumber = sn2
info.Order.SerialNumber = sn2
require.NoError(t, ordersStore.Enqueue(info)) require.NoError(t, ordersStore.Enqueue(info))
// check that we can see both orders tomorrow // check that we can see both orders tomorrow
@ -294,11 +368,90 @@ func TestOrdersStore_CorruptUnsent(t *testing.T) {
}) })
require.NoError(t, err) require.NoError(t, err)
// only the second order should be corrupted, so we should still see one order // only the second order should be corrupted, so we should still see one order (sn1)
unsent, err = ordersStore.ListUnsentBySatellite(tomorrow) unsent, err = ordersStore.ListUnsentBySatellite(tomorrow)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, unsent, 1) require.Len(t, unsent, 1)
require.Len(t, unsent[satellite].InfoList, 1) require.Len(t, unsent[satellite].InfoList, 1)
require.EqualValues(t, sn1, unsent[satellite].InfoList[0].Order.SerialNumber)
// add another order, sn3, to the same window
info.Limit.SerialNumber = sn3
info.Order.SerialNumber = sn3
require.NoError(t, ordersStore.Enqueue(info))
// only the second order should be corrupted, so we should still see first and last orders (sn1, sn3)
unsent, err = ordersStore.ListUnsentBySatellite(tomorrow)
require.NoError(t, err)
require.Len(t, unsent, 1)
require.Len(t, unsent[satellite].InfoList, 2)
require.Equal(t, ordersfile.V1, unsent[satellite].Version)
require.EqualValues(t, sn1, unsent[satellite].InfoList[0].Order.SerialNumber)
require.EqualValues(t, sn3, unsent[satellite].InfoList[1].Order.SerialNumber)
}
func TestOrdersStore_V0ToV1(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
dirName := ctx.Dir("test-orders")
now := time.Now()
satellite := testrand.NodeID()
tomorrow := now.Add(24 * time.Hour)
// make order limit grace period 1 hour
ordersStore, err := orders.NewFileStore(zaptest.NewLogger(t), dirName, time.Hour)
require.NoError(t, err)
// empty store means no orders can be listed
unsent, err := ordersStore.ListUnsentBySatellite(tomorrow)
require.NoError(t, err)
require.Len(t, unsent, 0)
sn1 := testrand.SerialNumber()
sn2 := testrand.SerialNumber()
info := &ordersfile.Info{
Limit: &pb.OrderLimit{
SerialNumber: sn1,
SatelliteId: satellite,
Action: pb.PieceAction_GET,
OrderCreation: now,
},
Order: &pb.Order{
SerialNumber: sn1,
Amount: 1,
},
}
// store sn1 and sn2 in the same window
// sn1 is stored with deprecated V0, so sn2 should also be stored with V0 even when Enqueue() is used
unsentFileName := ordersfile.UnsentFileName(satellite, now, ordersfile.V0)
unsentDir := filepath.Join(dirName, "unsent")
unsentFilePath := filepath.Join(unsentDir, unsentFileName)
of, err := ordersfile.OpenWritableV0(unsentFilePath)
require.NoError(t, err)
require.NoError(t, of.Append(info))
info.Limit.SerialNumber = sn2
info.Order.SerialNumber = sn2
require.NoError(t, of.Append(info))
require.NoError(t, of.Close())
// check that we can see both orders tomorrow
unsent, err = ordersStore.ListUnsentBySatellite(tomorrow)
require.NoError(t, err)
require.Len(t, unsent, 1)
require.Len(t, unsent[satellite].InfoList, 2)
require.Equal(t, ordersfile.V0, unsent[satellite].Version)
// archive file to free up window
require.NoError(t, ordersStore.Archive(satellite, unsent[satellite], time.Now(), pb.SettlementWithWindowResponse_ACCEPTED))
// new file should be created with version V1
require.NoError(t, ordersStore.Enqueue(info))
unsent, err = ordersStore.ListUnsentBySatellite(tomorrow)
require.NoError(t, err)
require.Len(t, unsent, 1)
require.Len(t, unsent[satellite].InfoList, 1)
require.Equal(t, ordersfile.V1, unsent[satellite].Version)
} }
func verifyInfosEqual(t *testing.T, a, b *ordersfile.Info) { func verifyInfosEqual(t *testing.T, a, b *ordersfile.Info) {