From 02cbf1e72a3be1a3d81543c122d299d154a5135c Mon Sep 17 00:00:00 2001 From: Moby von Briesen Date: Tue, 6 Oct 2020 12:14:25 -0400 Subject: [PATCH] storagenode/orders: Add V1 orders file V1 allows the storagenode to continue reading orders from an unsent/archived orders file, even if orders in the middle are corrupted. Change-Id: Iea4117d55c05ceeb77f47d5c973e5ba95da46c66 --- storagenode/orders/ordersfile/common.go | 128 ++++++----- storagenode/orders/ordersfile/v0.go | 34 ++- storagenode/orders/ordersfile/v1.go | 268 ++++++++++++++++++++++++ storagenode/orders/service_test.go | 2 +- storagenode/orders/store.go | 21 +- storagenode/orders/store_test.go | 159 +++++++++++++- 6 files changed, 548 insertions(+), 64 deletions(-) create mode 100644 storagenode/orders/ordersfile/v1.go diff --git a/storagenode/orders/ordersfile/common.go b/storagenode/orders/ordersfile/common.go index 06facf7b6..58e549a15 100644 --- a/storagenode/orders/ordersfile/common.go +++ b/storagenode/orders/ordersfile/common.go @@ -11,16 +11,22 @@ import ( "strings" "time" - "github.com/spacemonkeygo/monkit/v3" "github.com/zeebo/errs" - "go.uber.org/zap" "storj.io/common/pb" "storj.io/common/storj" "storj.io/storj/private/date" ) +// Version is a type for defining different file versions. +type Version string + const ( + // V0 is the first orders file version. It stores orders and limits with no checksum. + V0 = Version("v0") + // V1 is the second orders file version. It includes a checksum for each entry so that file corruption is handled better. + V1 = Version("v1") + unsentFilePrefix = "unsent-orders-" archiveFilePrefix = "archived-orders-" ) @@ -28,8 +34,8 @@ const ( var ( // Error identifies errors with orders files. Error = errs.Class("ordersfile") - - mon = monkit.Package() + // ErrEntryCorrupt is returned when a corrupt entry is found. + ErrEntryCorrupt = errs.Class("ordersfile corrupt entry") ) // Info contains full information about an order. @@ -51,26 +57,28 @@ type Readable interface { } // OpenWritableUnsent creates or opens for appending the unsent orders file for a given satellite ID and creation hour. -func OpenWritableUnsent(log *zap.Logger, unsentDir string, satelliteID storj.NodeID, creationTime time.Time) (Writable, error) { - fileName := unsentFileName(satelliteID, creationTime) +func OpenWritableUnsent(unsentDir string, satelliteID storj.NodeID, creationTime time.Time) (Writable, error) { + // if V0 file already exists, use that. Otherwise use V1 file. + versionToUse := V0 + fileName := UnsentFileName(satelliteID, creationTime, V0) filePath := filepath.Join(unsentDir, fileName) - - // create file if not exists or append - f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - return nil, Error.Wrap(err) + if _, err := os.Stat(filePath); os.IsNotExist(err) { + fileName = UnsentFileName(satelliteID, creationTime, V1) + filePath = filepath.Join(unsentDir, fileName) + versionToUse = V1 } - return &fileV0{ - log: log.Named("writable V0 orders file"), - f: f, - }, nil + if versionToUse == V0 { + return OpenWritableV0(filePath) + } + return OpenWritableV1(filePath, satelliteID, creationTime) } // UnsentInfo contains information relevant to an unsent orders file, as well as information necessary to open it for reading. type UnsentInfo struct { SatelliteID storj.NodeID CreatedAtHour time.Time + Version Version } // ArchivedInfo contains information relevant to an archived orders file, as well as information necessary to open it for reading. @@ -79,11 +87,12 @@ type ArchivedInfo struct { CreatedAtHour time.Time ArchivedAt time.Time StatusText string + Version Version } // GetUnsentInfo returns a new UnsentInfo which can be used to get information about and read from an unsent orders file. func GetUnsentInfo(info os.FileInfo) (*UnsentInfo, error) { - satelliteID, createdAtHour, err := getUnsentFileInfo(info.Name()) + satelliteID, createdAtHour, version, err := getUnsentFileInfo(info.Name()) if err != nil { return nil, err } @@ -91,12 +100,13 @@ func GetUnsentInfo(info os.FileInfo) (*UnsentInfo, error) { return &UnsentInfo{ SatelliteID: satelliteID, CreatedAtHour: createdAtHour, + Version: version, }, nil } // GetArchivedInfo returns a new ArchivedInfo which can be used to get information about and read from an archived orders file. func GetArchivedInfo(info os.FileInfo) (*ArchivedInfo, error) { - satelliteID, createdAtHour, archivedAt, statusText, err := getArchivedFileInfo(info.Name()) + satelliteID, createdAtHour, archivedAt, statusText, version, err := getArchivedFileInfo(info.Name()) if err != nil { return nil, err } @@ -106,113 +116,125 @@ func GetArchivedInfo(info os.FileInfo) (*ArchivedInfo, error) { CreatedAtHour: createdAtHour, ArchivedAt: archivedAt, StatusText: statusText, + Version: version, }, nil } // OpenReadable opens for reading the unsent or archived orders file at a given path. // It assumes the path has already been validated with GetUnsentInfo or GetArchivedInfo. -func OpenReadable(log *zap.Logger, path string) (Readable, error) { - f, err := os.Open(path) - if err != nil { - return nil, Error.Wrap(err) +func OpenReadable(path string, version Version) (Readable, error) { + if version == V0 { + return OpenReadableV0(path) } - - return &fileV0{ - log: log.Named("readable V0 orders file"), - f: f, - }, nil + return OpenReadableV1(path) } // MoveUnsent moves an unsent orders file to the archived orders file directory. -func MoveUnsent(unsentDir, archiveDir string, satelliteID storj.NodeID, createdAtHour, archivedAt time.Time, status pb.SettlementWithWindowResponse_Status) error { - oldFilePath := filepath.Join(unsentDir, unsentFileName(satelliteID, createdAtHour)) - newFilePath := filepath.Join(archiveDir, archiveFileName(satelliteID, createdAtHour, archivedAt, status)) +func MoveUnsent(unsentDir, archiveDir string, satelliteID storj.NodeID, createdAtHour, archivedAt time.Time, status pb.SettlementWithWindowResponse_Status, version Version) error { + oldFilePath := filepath.Join(unsentDir, UnsentFileName(satelliteID, createdAtHour, version)) + newFilePath := filepath.Join(archiveDir, ArchiveFileName(satelliteID, createdAtHour, archivedAt, status, version)) return Error.Wrap(os.Rename(oldFilePath, newFilePath)) } -// it expects the file name to be in the format "unsent-orders--". -func getUnsentFileInfo(filename string) (satellite storj.NodeID, createdHour time.Time, err error) { +// it expects the file name to be in the format "unsent-orders--.". +// V0 will not have "." at the end of the filename. +func getUnsentFileInfo(filename string) (satellite storj.NodeID, createdHour time.Time, version Version, err error) { + filename, version = getVersion(filename) + if !strings.HasPrefix(filename, unsentFilePrefix) { - return storj.NodeID{}, time.Time{}, Error.New("invalid path: %q", filename) + return storj.NodeID{}, time.Time{}, version, Error.New("invalid path: %q", filename) } // chop off prefix to get satellite ID and created hours infoStr := filename[len(unsentFilePrefix):] infoSlice := strings.Split(infoStr, "-") if len(infoSlice) != 2 { - return storj.NodeID{}, time.Time{}, Error.New("invalid path: %q", filename) + return storj.NodeID{}, time.Time{}, version, Error.New("invalid path: %q", filename) } satelliteIDStr := infoSlice[0] satelliteID, err := storj.NodeIDFromString(satelliteIDStr) if err != nil { - return storj.NodeID{}, time.Time{}, Error.New("invalid path: %q", filename) + return storj.NodeID{}, time.Time{}, version, Error.New("invalid path: %q", filename) } timeStr := infoSlice[1] createdHourUnixNano, err := strconv.ParseInt(timeStr, 10, 64) if err != nil { - return satelliteID, time.Time{}, Error.Wrap(err) + return satelliteID, time.Time{}, version, Error.Wrap(err) } createdAtHour := time.Unix(0, createdHourUnixNano) - return satelliteID, createdAtHour, nil + return satelliteID, createdAtHour, version, nil } // getArchivedFileInfo gets the archived at time from an archive file name. -// it expects the file name to be in the format "archived-orders----". -func getArchivedFileInfo(name string) (satelliteID storj.NodeID, createdAtHour, archivedAt time.Time, status string, err error) { +// it expects the file name to be in the format "archived-orders----.". +// V0 will not have "." at the end of the filename. +func getArchivedFileInfo(name string) (satelliteID storj.NodeID, createdAtHour, archivedAt time.Time, status string, version Version, err error) { + name, version = getVersion(name) + if !strings.HasPrefix(name, archiveFilePrefix) { - return storj.NodeID{}, time.Time{}, time.Time{}, "", Error.New("invalid path: %q", name) + return storj.NodeID{}, time.Time{}, time.Time{}, "", version, Error.New("invalid path: %q", name) } // chop off prefix to get satellite ID, created hour, archive time, and status infoStr := name[len(archiveFilePrefix):] infoSlice := strings.Split(infoStr, "-") if len(infoSlice) != 4 { - return storj.NodeID{}, time.Time{}, time.Time{}, "", Error.New("invalid path: %q", name) + return storj.NodeID{}, time.Time{}, time.Time{}, "", version, Error.New("invalid path: %q", name) } satelliteIDStr := infoSlice[0] satelliteID, err = storj.NodeIDFromString(satelliteIDStr) if err != nil { - return storj.NodeID{}, time.Time{}, time.Time{}, "", Error.New("invalid path: %q", name) + return storj.NodeID{}, time.Time{}, time.Time{}, "", version, Error.New("invalid path: %q", name) } createdAtStr := infoSlice[1] createdHourUnixNano, err := strconv.ParseInt(createdAtStr, 10, 64) if err != nil { - return satelliteID, time.Time{}, time.Time{}, "", Error.New("invalid path: %q", name) + return satelliteID, time.Time{}, time.Time{}, "", version, Error.New("invalid path: %q", name) } createdAtHour = time.Unix(0, createdHourUnixNano) archivedAtStr := infoSlice[2] archivedAtUnixNano, err := strconv.ParseInt(archivedAtStr, 10, 64) if err != nil { - return satelliteID, createdAtHour, time.Time{}, "", Error.New("invalid path: %q", name) + return satelliteID, createdAtHour, time.Time{}, "", version, Error.New("invalid path: %q", name) } archivedAt = time.Unix(0, archivedAtUnixNano) status = infoSlice[3] - return satelliteID, createdAtHour, archivedAt, status, nil + return satelliteID, createdAtHour, archivedAt, status, version, nil } -func unsentFileName(satelliteID storj.NodeID, creationTime time.Time) string { - return fmt.Sprintf("%s%s-%s", +// UnsentFileName gets the filename of an unsent file. +func UnsentFileName(satelliteID storj.NodeID, creationTime time.Time, version Version) string { + filename := fmt.Sprintf("%s%s-%s", unsentFilePrefix, satelliteID, getCreationHourString(creationTime), ) + if version != V0 { + filename = fmt.Sprintf("%s.%s", filename, version) + } + return filename } -func archiveFileName(satelliteID storj.NodeID, creationTime, archiveTime time.Time, status pb.SettlementWithWindowResponse_Status) string { - return fmt.Sprintf("%s%s-%s-%s-%s", +// ArchiveFileName gets the filename of an archived file. +func ArchiveFileName(satelliteID storj.NodeID, creationTime, archiveTime time.Time, status pb.SettlementWithWindowResponse_Status, version Version) string { + filename := fmt.Sprintf("%s%s-%s-%s-%s", archiveFilePrefix, satelliteID, getCreationHourString(creationTime), strconv.FormatInt(archiveTime.UnixNano(), 10), pb.SettlementWithWindowResponse_Status_name[int32(status)], ) + if version != V0 { + filename = fmt.Sprintf("%s.%s", filename, version) + } + return filename } func getCreationHourString(t time.Time) string { @@ -220,3 +242,13 @@ func getCreationHourString(t time.Time) string { timeStr := strconv.FormatInt(creationHour, 10) return timeStr } + +func getVersion(filename string) (trimmed string, version Version) { + ext := filepath.Ext(filename) + if ext == "."+string(V1) { + version = V1 + trimmed = strings.TrimSuffix(filename, ext) + return trimmed, V1 + } + return filename, V0 +} diff --git a/storagenode/orders/ordersfile/v0.go b/storagenode/orders/ordersfile/v0.go index 810cbe3fc..5b276401b 100644 --- a/storagenode/orders/ordersfile/v0.go +++ b/storagenode/orders/ordersfile/v0.go @@ -9,15 +9,35 @@ import ( "io" "os" - "go.uber.org/zap" - "storj.io/common/pb" ) // fileV0 is a version 0 orders file. type fileV0 struct { - log *zap.Logger - f *os.File + f *os.File +} + +// OpenWritableV0 opens for writing the unsent or archived orders file at a given path. +func OpenWritableV0(path string) (Writable, error) { + // create file if not exists or append + f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return nil, Error.Wrap(err) + } + return &fileV0{ + f: f, + }, nil +} + +// OpenReadableV0 opens for reading the unsent or archived orders file at a given path. +func OpenReadableV0(path string) (Readable, error) { + f, err := os.Open(path) + if err != nil { + return nil, Error.Wrap(err) + } + return &fileV0{ + f: f, + }, nil } // Append writes limit and order to the file as @@ -55,12 +75,8 @@ func (of *fileV0) Append(info *Info) error { // ReadOne reads one entry from the file. func (of *fileV0) ReadOne() (info *Info, err error) { defer func() { - // if error is unexpected EOF, file is corrupted. - // V0 files do not handle corruption, so just return EOF so caller thinks we have reached the end of the file. if errors.Is(err, io.ErrUnexpectedEOF) { - of.log.Warn("Unexpected EOF while reading archived order file", zap.Error(err)) - mon.Meter("orders_archive_file_corrupted").Mark64(1) - err = io.EOF + err = ErrEntryCorrupt.Wrap(err) } }() diff --git a/storagenode/orders/ordersfile/v1.go b/storagenode/orders/ordersfile/v1.go new file mode 100644 index 000000000..856108fcd --- /dev/null +++ b/storagenode/orders/ordersfile/v1.go @@ -0,0 +1,268 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +package ordersfile + +import ( + "bufio" + "bytes" + "encoding/binary" + "errors" + "hash/crc32" + "io" + "os" + "time" + + "github.com/zeebo/errs" + + "storj.io/common/memory" + "storj.io/common/pb" + "storj.io/common/storj" + "storj.io/storj/private/date" +) + +var ( + // fileMagic used to identify header of file. + // "0ddba11 acc01ade5". + fileMagic = [8]byte{0x0d, 0xdb, 0xa1, 0x1a, 0xcc, 0x01, 0xad, 0xe5} + + // entryHeader is 8 bytes that appears before every order/limit in a V1 file. + // "5ca1ab1e ba5eba11". + entryHeader = [8]byte{0x5c, 0xa1, 0xab, 0x1e, 0xba, 0x5e, 0xba, 0x11} + // entryFooter is 8 bytes that appears after every order/limit in a V1 file. + // "feed 1 f00d 1 c0ffee". + entryFooter = [8]byte{0xfe, 0xed, 0x1f, 0x00, 0xd1, 0xc0, 0xff, 0xee} +) + +// fileV1 is a version 1 orders file. +type fileV1 struct { + f *os.File + br *bufio.Reader +} + +// OpenWritableV1 opens for writing the unsent or archived orders file at a given path. +// If the file is new, the file header is written. +func OpenWritableV1(path string, satelliteID storj.NodeID, creationTime time.Time) (Writable, error) { + // create file if not exists or append + f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return nil, Error.Wrap(err) + } + + of := &fileV1{ + f: f, + } + + currentPos, err := of.f.Seek(0, io.SeekCurrent) + if err != nil { + return nil, Error.Wrap(err) + } + if currentPos == 0 { + err = of.writeHeader(satelliteID, creationTime) + if err != nil { + return of, err + } + } + + return of, nil +} + +// writeHeader writes file header as [filemagic][satellite ID][creation hour]. +func (of *fileV1) writeHeader(satelliteID storj.NodeID, creationTime time.Time) error { + toWrite := fileMagic[:] + toWrite = append(toWrite, satelliteID.Bytes()...) + creationHour := date.TruncateToHourInNano(creationTime) + creationHourBytes := [8]byte{} + binary.LittleEndian.PutUint64(creationHourBytes[:], uint64(creationHour)) + toWrite = append(toWrite, creationHourBytes[:]...) + + if _, err := of.f.Write(toWrite); err != nil { + return Error.New("Couldn't write file header: %w", err) + } + + return nil +} + +// OpenReadableV1 opens for reading the unsent or archived orders file at a given path. +func OpenReadableV1(path string) (Readable, error) { + f, err := os.Open(path) + if err != nil { + return nil, Error.Wrap(err) + } + return &fileV1{ + f: f, + // buffered reader is used to search for entryHeader that precedes limit and order. + br: bufio.NewReader(f), + }, nil +} + +// Append writes limit and order to the file as +// [entryHeader][limitSize][limitBytes][orderSize][orderBytes][checksum][entryFooter]. +func (of *fileV1) Append(info *Info) error { + toWrite := entryHeader[:] + + limitSerialized, err := pb.Marshal(info.Limit) + if err != nil { + return Error.Wrap(err) + } + orderSerialized, err := pb.Marshal(info.Order) + if err != nil { + return Error.Wrap(err) + } + + limitSizeBytes := [2]byte{} + binary.LittleEndian.PutUint16(limitSizeBytes[:], uint16(len(limitSerialized))) + + orderSizeBytes := [2]byte{} + binary.LittleEndian.PutUint16(orderSizeBytes[:], uint16(len(orderSerialized))) + + toWrite = append(toWrite, limitSizeBytes[:]...) + toWrite = append(toWrite, limitSerialized...) + toWrite = append(toWrite, orderSizeBytes[:]...) + toWrite = append(toWrite, orderSerialized...) + checksumInt := crc32.ChecksumIEEE(toWrite[len(entryHeader):]) + + checksumBytes := [4]byte{} + binary.LittleEndian.PutUint32(checksumBytes[:], checksumInt) + toWrite = append(toWrite, checksumBytes[:]...) + + toWrite = append(toWrite, entryFooter[:]...) + + if _, err = of.f.Write(toWrite); err != nil { + return Error.New("Couldn't write serialized order and limit: %w", err) + } + + return nil +} + +// ReadOne reads one entry from the file. +// It returns ErrEntryCorrupt upon finding a corrupt limit/order combo. On next call after a corrupt entry, it will find the next valid order. +func (of *fileV1) ReadOne() (info *Info, err error) { + // attempt to read an order/limit; if corrupted, keep trying until EOF or uncorrupted pair found. + // start position will be the position of the of.f cursor minus the number of unread buffered bytes in of.br + startPosition, err := of.f.Seek(0, io.SeekCurrent) + if err != nil { + return nil, Error.Wrap(err) + } + startPosition -= int64(of.br.Buffered()) + + defer func() { + // Treat all non-EOF errors as corrupt entry errors so that ReadOne is called again. + if err != nil && !errors.Is(err, io.EOF) { + // seek to just after where we started at the beginning of ReadOne + _, seekErr := of.f.Seek(startPosition+1, io.SeekStart) + if err != nil { + err = errs.Combine(err, seekErr) + } + of.br.Reset(of.f) + + err = ErrEntryCorrupt.Wrap(err) + } + }() + + err = of.gotoNextEntry() + if err != nil { + return nil, err + } + + limitSizeBytes := [2]byte{} + _, err = io.ReadFull(of.br, limitSizeBytes[:]) + if err != nil { + return nil, Error.Wrap(err) + } + limitSize := binary.LittleEndian.Uint16(limitSizeBytes[:]) + limitSerialized := make([]byte, limitSize) + _, err = io.ReadFull(of.br, limitSerialized) + if err != nil { + return nil, Error.Wrap(err) + } + limit := &pb.OrderLimit{} + err = pb.Unmarshal(limitSerialized, limit) + if err != nil { + return nil, Error.Wrap(err) + } + + orderSizeBytes := [2]byte{} + _, err = io.ReadFull(of.br, orderSizeBytes[:]) + if err != nil { + return nil, Error.Wrap(err) + } + orderSize := binary.LittleEndian.Uint16(orderSizeBytes[:]) + orderSerialized := make([]byte, orderSize) + _, err = io.ReadFull(of.br, orderSerialized) + if err != nil { + return nil, Error.Wrap(err) + } + order := &pb.Order{} + err = pb.Unmarshal(orderSerialized, order) + if err != nil { + return nil, Error.Wrap(err) + } + + // read checksum + checksumBytes := [4]byte{} + _, err = io.ReadFull(of.br, checksumBytes[:]) + if err != nil { + return nil, Error.Wrap(err) + } + expectedChecksum := binary.LittleEndian.Uint32(checksumBytes[:]) + + actualChecksum := uint32(0) + actualChecksum = crc32.Update(actualChecksum, crc32.IEEETable, limitSizeBytes[:]) + actualChecksum = crc32.Update(actualChecksum, crc32.IEEETable, limitSerialized) + actualChecksum = crc32.Update(actualChecksum, crc32.IEEETable, orderSizeBytes[:]) + actualChecksum = crc32.Update(actualChecksum, crc32.IEEETable, orderSerialized) + + if expectedChecksum != actualChecksum { + return nil, Error.New("checksum does not match") + } + + footerBytes := [len(entryFooter)]byte{} + _, err = io.ReadFull(of.br, footerBytes[:]) + if err != nil { + return nil, Error.Wrap(err) + } + if !bytes.Equal(entryFooter[:], footerBytes[:]) { + return nil, Error.New("footer bytes do not match") + } + + return &Info{ + Limit: limit, + Order: order, + }, nil +} + +func (of *fileV1) gotoNextEntry() error { + // search file for next occurrence of entry header, or until EOF + for { + searchBufSize := 2 * memory.KiB.Int() + nextBufferBytes, err := of.br.Peek(searchBufSize) + // if the buffered reader hits an EOF, the buffered data may still + // contain a full entry, so do not return unless there is definitely no entry + if errors.Is(err, io.EOF) && len(nextBufferBytes) <= len(entryHeader) { + return err + } else if err != nil && !errors.Is(err, io.EOF) { + return Error.Wrap(err) + } + + i := bytes.Index(nextBufferBytes, entryHeader[:]) + if i > -1 { + _, err = of.br.Discard(i + len(entryHeader)) + if err != nil { + return Error.Wrap(err) + } + break + } + // entry header not found; discard all but last (len(entryHeader)-1) bytes for next iteration + _, err = of.br.Discard(len(nextBufferBytes) - len(entryHeader) + 1) + if err != nil { + return Error.Wrap(err) + } + } + return nil +} + +// Close closes the file. +func (of *fileV1) Close() error { + return of.f.Close() +} diff --git a/storagenode/orders/service_test.go b/storagenode/orders/service_test.go index 270a50a5d..3c43a6b1d 100644 --- a/storagenode/orders/service_test.go +++ b/storagenode/orders/service_test.go @@ -320,7 +320,7 @@ func TestCleanArchiveFileStore(t *testing.T) { require.NoError(t, err) // archive one order yesterday, one today - unsentInfo := orders.UnsentInfo{} + unsentInfo := orders.UnsentInfo{Version: ordersfile.V1} unsentInfo.CreatedAtHour = createdAt0.Truncate(time.Hour) err = node.OrdersStore.Archive(satellite, unsentInfo, yesterday, pb.SettlementWithWindowResponse_ACCEPTED) require.NoError(t, err) diff --git a/storagenode/orders/store.go b/storagenode/orders/store.go index 8bb4d6a7e..5675a8a25 100644 --- a/storagenode/orders/store.go +++ b/storagenode/orders/store.go @@ -109,7 +109,7 @@ func (store *FileStore) BeginEnqueue(satelliteID storj.NodeID, createdAt time.Ti } // write out the data - of, err := ordersfile.OpenWritableUnsent(store.log, store.unsentDir, info.Limit.SatelliteId, info.Limit.OrderCreation) + of, err := ordersfile.OpenWritableUnsent(store.unsentDir, info.Limit.SatelliteId, info.Limit.OrderCreation) if err != nil { return OrderError.Wrap(err) } @@ -172,6 +172,7 @@ func (store *FileStore) Enqueue(info *ordersfile.Info) (err error) { // UnsentInfo is a struct containing a window of orders for a satellite and order creation hour. type UnsentInfo struct { CreatedAtHour time.Time + Version ordersfile.Version InfoList []*ordersfile.Info } @@ -219,9 +220,10 @@ func (store *FileStore) ListUnsentBySatellite(now time.Time) (infoMap map[storj. newUnsentInfo := UnsentInfo{ CreatedAtHour: fileInfo.CreatedAtHour, + Version: fileInfo.Version, } - of, err := ordersfile.OpenReadable(store.log, path) + of, err := ordersfile.OpenReadable(path, fileInfo.Version) if err != nil { return OrderError.Wrap(err) } @@ -237,6 +239,12 @@ func (store *FileStore) ListUnsentBySatellite(now time.Time) (infoMap map[storj. if errs.Is(err, io.EOF) { break } + // if last entry read is corrupt, attempt to read again + if ordersfile.ErrEntryCorrupt.Has(err) { + store.log.Warn("Corrupted order detected in orders file", zap.Error(err)) + mon.Meter("orders_unsent_file_corrupted").Mark64(1) + continue + } return err } @@ -267,6 +275,7 @@ func (store *FileStore) Archive(satelliteID storj.NodeID, unsentInfo UnsentInfo, unsentInfo.CreatedAtHour, archivedAt, status, + unsentInfo.Version, )) } @@ -290,7 +299,7 @@ func (store *FileStore) ListArchived() ([]*ArchivedInfo, error) { if err != nil { return OrderError.Wrap(err) } - of, err := ordersfile.OpenReadable(store.log, path) + of, err := ordersfile.OpenReadable(path, fileInfo.Version) if err != nil { return OrderError.Wrap(err) } @@ -312,6 +321,12 @@ func (store *FileStore) ListArchived() ([]*ArchivedInfo, error) { if errs.Is(err, io.EOF) { break } + // if last entry read is corrupt, attempt to read again + if ordersfile.ErrEntryCorrupt.Has(err) { + store.log.Warn("Corrupted order detected in orders file", zap.Error(err)) + mon.Meter("orders_archive_file_corrupted").Mark64(1) + continue + } return err } diff --git a/storagenode/orders/store_test.go b/storagenode/orders/store_test.go index 013e40227..878e383ff 100644 --- a/storagenode/orders/store_test.go +++ b/storagenode/orders/store_test.go @@ -243,7 +243,7 @@ func TestOrdersStore_ListUnsentBySatellite_Ongoing(t *testing.T) { require.Len(t, unsent, 1) } -func TestOrdersStore_CorruptUnsent(t *testing.T) { +func TestOrdersStore_CorruptUnsentV0(t *testing.T) { ctx := testcontext.New(t) defer ctx.Cleanup() dirName := ctx.Dir("test-orders") @@ -273,8 +273,82 @@ func TestOrdersStore_CorruptUnsent(t *testing.T) { Amount: 1, }, } - // store two orders for the same window + // store two orders for the same window using deprecated V0 + unsentFileName := ordersfile.UnsentFileName(satellite, now, ordersfile.V0) + unsentDir := filepath.Join(dirName, "unsent") + unsentFilePath := filepath.Join(unsentDir, unsentFileName) + of, err := ordersfile.OpenWritableV0(unsentFilePath) + require.NoError(t, err) + require.NoError(t, of.Append(info)) + require.NoError(t, of.Append(info)) + require.NoError(t, of.Close()) + + // check that we can see both orders tomorrow + unsent, err = ordersStore.ListUnsentBySatellite(tomorrow) + require.NoError(t, err) + require.Len(t, unsent, 1) + require.Len(t, unsent[satellite].InfoList, 2) + + // corrupt unsent orders file by removing the last byte + err = filepath.Walk(unsentDir, func(path string, info os.FileInfo, err error) error { + require.NoError(t, err) + if info.IsDir() { + return nil + } + err = os.Truncate(path, info.Size()-1) + return err + }) + require.NoError(t, err) + + // add another order, which we shouldn't see for V0 since it is after the corrupted one + of, err = ordersfile.OpenWritableV0(unsentFilePath) + require.NoError(t, err) + require.NoError(t, of.Append(info)) + require.NoError(t, of.Close()) + + // only the second order should be corrupted, so we should still see one order + unsent, err = ordersStore.ListUnsentBySatellite(tomorrow) + require.NoError(t, err) + require.Len(t, unsent, 1) + require.Len(t, unsent[satellite].InfoList, 1) +} + +func TestOrdersStore_CorruptUnsentV1(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + dirName := ctx.Dir("test-orders") + now := time.Now() + satellite := testrand.NodeID() + tomorrow := now.Add(24 * time.Hour) + + // make order limit grace period 1 hour + ordersStore, err := orders.NewFileStore(zaptest.NewLogger(t), dirName, time.Hour) + require.NoError(t, err) + + // empty store means no orders can be listed + unsent, err := ordersStore.ListUnsentBySatellite(tomorrow) + require.NoError(t, err) + require.Len(t, unsent, 0) + + sn1 := testrand.SerialNumber() + sn2 := testrand.SerialNumber() + sn3 := testrand.SerialNumber() + info := &ordersfile.Info{ + Limit: &pb.OrderLimit{ + SerialNumber: sn1, + SatelliteId: satellite, + Action: pb.PieceAction_GET, + OrderCreation: now, + }, + Order: &pb.Order{ + SerialNumber: sn1, + Amount: 1, + }, + } + // store sn1 and sn2 in the same window require.NoError(t, ordersStore.Enqueue(info)) + info.Limit.SerialNumber = sn2 + info.Order.SerialNumber = sn2 require.NoError(t, ordersStore.Enqueue(info)) // check that we can see both orders tomorrow @@ -294,11 +368,90 @@ func TestOrdersStore_CorruptUnsent(t *testing.T) { }) require.NoError(t, err) - // only the second order should be corrupted, so we should still see one order + // only the second order should be corrupted, so we should still see one order (sn1) unsent, err = ordersStore.ListUnsentBySatellite(tomorrow) require.NoError(t, err) require.Len(t, unsent, 1) require.Len(t, unsent[satellite].InfoList, 1) + require.EqualValues(t, sn1, unsent[satellite].InfoList[0].Order.SerialNumber) + + // add another order, sn3, to the same window + info.Limit.SerialNumber = sn3 + info.Order.SerialNumber = sn3 + require.NoError(t, ordersStore.Enqueue(info)) + + // only the second order should be corrupted, so we should still see first and last orders (sn1, sn3) + unsent, err = ordersStore.ListUnsentBySatellite(tomorrow) + require.NoError(t, err) + require.Len(t, unsent, 1) + require.Len(t, unsent[satellite].InfoList, 2) + require.Equal(t, ordersfile.V1, unsent[satellite].Version) + require.EqualValues(t, sn1, unsent[satellite].InfoList[0].Order.SerialNumber) + require.EqualValues(t, sn3, unsent[satellite].InfoList[1].Order.SerialNumber) +} + +func TestOrdersStore_V0ToV1(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + dirName := ctx.Dir("test-orders") + now := time.Now() + satellite := testrand.NodeID() + tomorrow := now.Add(24 * time.Hour) + + // make order limit grace period 1 hour + ordersStore, err := orders.NewFileStore(zaptest.NewLogger(t), dirName, time.Hour) + require.NoError(t, err) + + // empty store means no orders can be listed + unsent, err := ordersStore.ListUnsentBySatellite(tomorrow) + require.NoError(t, err) + require.Len(t, unsent, 0) + + sn1 := testrand.SerialNumber() + sn2 := testrand.SerialNumber() + info := &ordersfile.Info{ + Limit: &pb.OrderLimit{ + SerialNumber: sn1, + SatelliteId: satellite, + Action: pb.PieceAction_GET, + OrderCreation: now, + }, + Order: &pb.Order{ + SerialNumber: sn1, + Amount: 1, + }, + } + // store sn1 and sn2 in the same window + // sn1 is stored with deprecated V0, so sn2 should also be stored with V0 even when Enqueue() is used + unsentFileName := ordersfile.UnsentFileName(satellite, now, ordersfile.V0) + unsentDir := filepath.Join(dirName, "unsent") + unsentFilePath := filepath.Join(unsentDir, unsentFileName) + of, err := ordersfile.OpenWritableV0(unsentFilePath) + require.NoError(t, err) + + require.NoError(t, of.Append(info)) + info.Limit.SerialNumber = sn2 + info.Order.SerialNumber = sn2 + require.NoError(t, of.Append(info)) + require.NoError(t, of.Close()) + + // check that we can see both orders tomorrow + unsent, err = ordersStore.ListUnsentBySatellite(tomorrow) + require.NoError(t, err) + require.Len(t, unsent, 1) + require.Len(t, unsent[satellite].InfoList, 2) + require.Equal(t, ordersfile.V0, unsent[satellite].Version) + + // archive file to free up window + require.NoError(t, ordersStore.Archive(satellite, unsent[satellite], time.Now(), pb.SettlementWithWindowResponse_ACCEPTED)) + // new file should be created with version V1 + require.NoError(t, ordersStore.Enqueue(info)) + + unsent, err = ordersStore.ListUnsentBySatellite(tomorrow) + require.NoError(t, err) + require.Len(t, unsent, 1) + require.Len(t, unsent[satellite].InfoList, 1) + require.Equal(t, ordersfile.V1, unsent[satellite].Version) } func verifyInfosEqual(t *testing.T, a, b *ordersfile.Info) {