storj/storagenode/orders/store.go
Egon Elbre 10372afbe4 ci: fix lint errors
Change-Id: Ib5893440807811f77175ccd347aa3f8ca9cccbdf
2021-05-17 13:37:31 +00:00

401 lines
12 KiB
Go

// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package orders
import (
"context"
"io"
"os"
"path/filepath"
"sync"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/storj/private/date"
"storj.io/storj/storagenode/orders/ordersfile"
)
// activeWindow represents a window with active operations waiting to finish to enqueue
// their orders.
type activeWindow struct {
satelliteID storj.NodeID
timestamp int64
}
// FileStore implements the orders.Store interface by appending orders to flat files.
type FileStore struct {
log *zap.Logger
ordersDir string
unsentDir string
archiveDir string
// always acquire the activeMu after the unsentMu to avoid deadlocks. if someone acquires
// activeMu before unsentMu, then you can be in a situation where two goroutines are
// waiting for each other forever.
// mutex for the active map
activeMu sync.Mutex
active map[activeWindow]int
// mutex for unsent directory
unsentMu sync.Mutex
// mutex for archive directory
archiveMu sync.Mutex
// how long after OrderLimit creation date are OrderLimits no longer accepted (piecestore Config)
orderLimitGracePeriod time.Duration
}
// NewFileStore creates a new orders file store, and the directories necessary for its use.
func NewFileStore(log *zap.Logger, ordersDir string, orderLimitGracePeriod time.Duration) (*FileStore, error) {
fs := &FileStore{
log: log,
ordersDir: ordersDir,
unsentDir: filepath.Join(ordersDir, "unsent"),
archiveDir: filepath.Join(ordersDir, "archive"),
active: make(map[activeWindow]int),
orderLimitGracePeriod: orderLimitGracePeriod,
}
err := fs.ensureDirectories()
if err != nil {
return nil, err
}
return fs, nil
}
// BeginEnqueue returns a function that can be called to enqueue the passed in Info. If the Info
// is too old to be enqueued, then an error is returned.
func (store *FileStore) BeginEnqueue(satelliteID storj.NodeID, createdAt time.Time) (commit func(*ordersfile.Info) error, err error) {
store.unsentMu.Lock()
defer store.unsentMu.Unlock()
store.activeMu.Lock()
defer store.activeMu.Unlock()
// if the order is older than the grace period, reject it. We don't check against what
// window the order would go into to make the calculation more predictable: if the order
// is older than the grace limit, it will not be accepted.
if time.Since(createdAt) > store.orderLimitGracePeriod {
return nil, OrderError.New("grace period passed for order limit")
}
// record that there is an operation in flight for this window
store.enqueueStartedLocked(satelliteID, createdAt)
return func(info *ordersfile.Info) error {
// always acquire the activeMu after the unsentMu to avoid deadlocks
store.unsentMu.Lock()
defer store.unsentMu.Unlock()
store.activeMu.Lock()
defer store.activeMu.Unlock()
// always remove the in flight operation
defer store.enqueueFinishedLocked(satelliteID, createdAt)
// caller wants to abort; free file for sending and return with no error
if info == nil {
return nil
}
// check that the info matches what the enqueue was begun with
if info.Limit.SatelliteId != satelliteID || !info.Limit.OrderCreation.Equal(createdAt) {
return OrderError.New("invalid info passed in to enqueue commit")
}
// write out the data
of, err := ordersfile.OpenWritableUnsent(store.unsentDir, info.Limit.SatelliteId, info.Limit.OrderCreation)
if err != nil {
return OrderError.Wrap(err)
}
defer func() {
err = errs.Combine(err, OrderError.Wrap(of.Close()))
}()
err = of.Append(info)
if err != nil {
return OrderError.Wrap(err)
}
return nil
}, nil
}
// enqueueStartedLocked records that there is an order pending to be written to the window.
func (store *FileStore) enqueueStartedLocked(satelliteID storj.NodeID, createdAt time.Time) {
store.active[activeWindow{
satelliteID: satelliteID,
timestamp: date.TruncateToHourInNano(createdAt),
}]++
}
// enqueueFinishedLocked informs that there is no longer an order pending to be written to the
// window.
func (store *FileStore) enqueueFinishedLocked(satelliteID storj.NodeID, createdAt time.Time) {
window := activeWindow{
satelliteID: satelliteID,
timestamp: date.TruncateToHourInNano(createdAt),
}
store.active[window]--
if store.active[window] <= 0 {
delete(store.active, window)
}
}
// hasActiveEnqueue returns true if there are active orders enqueued for the requested window.
func (store *FileStore) hasActiveEnqueue(satelliteID storj.NodeID, createdAt time.Time) bool {
store.activeMu.Lock()
defer store.activeMu.Unlock()
return store.active[activeWindow{
satelliteID: satelliteID,
timestamp: date.TruncateToHourInNano(createdAt),
}] > 0
}
// Enqueue inserts order to be sent at the end of the unsent file for a particular creation hour.
// It ensures the order is not being queued after the order limit grace period.
func (store *FileStore) Enqueue(info *ordersfile.Info) (err error) {
commit, err := store.BeginEnqueue(info.Limit.SatelliteId, info.Limit.OrderCreation)
if err != nil {
return err
}
return commit(info)
}
// UnsentInfo is a struct containing a window of orders for a satellite and order creation hour.
type UnsentInfo struct {
CreatedAtHour time.Time
Version ordersfile.Version
InfoList []*ordersfile.Info
}
// ListUnsentBySatellite returns one window of orders that haven't been sent yet, grouped by satellite.
// It only reads files where the order limit grace period has passed, meaning no new orders will be appended.
// There is a separate window for each created at hour, so if a satellite has 2 windows, `ListUnsentBySatellite`
// needs to be called twice, with calls to `Archive` in between each call, to see all unsent orders.
func (store *FileStore) ListUnsentBySatellite(ctx context.Context, now time.Time) (infoMap map[storj.NodeID]UnsentInfo, err error) {
defer mon.Task()(&ctx)(&err)
// shouldn't be necessary, but acquire archiveMu to ensure we do not attempt to archive files during list
store.archiveMu.Lock()
defer store.archiveMu.Unlock()
var errList error
infoMap = make(map[storj.NodeID]UnsentInfo)
err = filepath.Walk(store.unsentDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
errList = errs.Combine(errList, OrderError.Wrap(err))
return nil //nolint: nilerr // errors are collected separately
}
if info.IsDir() {
return nil
}
fileInfo, err := ordersfile.GetUnsentInfo(info)
if err != nil {
errList = errs.Combine(errList, OrderError.Wrap(err))
return nil //nolint: nilerr // errors are collected separately
}
// if we already have orders for this satellite, ignore the file
if _, ok := infoMap[fileInfo.SatelliteID]; ok {
return nil
}
// if orders can still be added to file, ignore it. We add an hour because that's
// the newest order that could be added to that window.
if now.Sub(fileInfo.CreatedAtHour.Add(time.Hour)) <= store.orderLimitGracePeriod {
return nil
}
// if there are still active orders for the time, ignore it.
if store.hasActiveEnqueue(fileInfo.SatelliteID, fileInfo.CreatedAtHour) {
return nil
}
newUnsentInfo := UnsentInfo{
CreatedAtHour: fileInfo.CreatedAtHour,
Version: fileInfo.Version,
}
of, err := ordersfile.OpenReadable(path, fileInfo.Version)
if err != nil {
return OrderError.Wrap(err)
}
defer func() {
err = errs.Combine(err, OrderError.Wrap(of.Close()))
}()
for {
// if at any point we see an unexpected EOF error, return what orders we could read successfully with no error
// this behavior ensures that we will attempt to archive corrupted files instead of continually failing to read them
newInfo, err := of.ReadOne()
if err != nil {
if errs.Is(err, io.EOF) {
break
}
// if last entry read is corrupt, attempt to read again
if ordersfile.ErrEntryCorrupt.Has(err) {
store.log.Warn("Corrupted order detected in orders file", zap.Error(err))
mon.Meter("orders_unsent_file_corrupted").Mark64(1)
// if the error is unexpected EOF, we want the metrics and logs, but there
// is no use in trying to read from the file again.
if errs.Is(err, io.ErrUnexpectedEOF) {
break
}
continue
}
return err
}
newUnsentInfo.InfoList = append(newUnsentInfo.InfoList, newInfo)
}
infoMap[fileInfo.SatelliteID] = newUnsentInfo
return nil
})
if err != nil {
errList = errs.Combine(errList, err)
}
return infoMap, errList
}
// Archive moves a file from "unsent" to "archive".
func (store *FileStore) Archive(satelliteID storj.NodeID, unsentInfo UnsentInfo, archivedAt time.Time, status pb.SettlementWithWindowResponse_Status) error {
store.unsentMu.Lock()
defer store.unsentMu.Unlock()
store.archiveMu.Lock()
defer store.archiveMu.Unlock()
return OrderError.Wrap(ordersfile.MoveUnsent(
store.unsentDir,
store.archiveDir,
satelliteID,
unsentInfo.CreatedAtHour,
archivedAt,
status,
unsentInfo.Version,
))
}
// ListArchived returns orders that have been sent.
func (store *FileStore) ListArchived() ([]*ArchivedInfo, error) {
store.archiveMu.Lock()
defer store.archiveMu.Unlock()
var errList error
archivedList := []*ArchivedInfo{}
err := filepath.Walk(store.archiveDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
errList = errs.Combine(errList, OrderError.Wrap(err))
return nil //nolint: nilerr // errors are collected separately
}
if info.IsDir() {
return nil
}
fileInfo, err := ordersfile.GetArchivedInfo(info)
if err != nil {
return OrderError.Wrap(err)
}
of, err := ordersfile.OpenReadable(path, fileInfo.Version)
if err != nil {
return OrderError.Wrap(err)
}
defer func() {
err = errs.Combine(err, OrderError.Wrap(of.Close()))
}()
status := StatusUnsent
switch fileInfo.StatusText {
case pb.SettlementWithWindowResponse_ACCEPTED.String():
status = StatusAccepted
case pb.SettlementWithWindowResponse_REJECTED.String():
status = StatusRejected
}
for {
info, err := of.ReadOne()
if err != nil {
if errs.Is(err, io.EOF) {
break
}
// if last entry read is corrupt, attempt to read again
if ordersfile.ErrEntryCorrupt.Has(err) {
store.log.Warn("Corrupted order detected in orders file", zap.Error(err))
mon.Meter("orders_archive_file_corrupted").Mark64(1)
continue
}
return err
}
newInfo := &ArchivedInfo{
Limit: info.Limit,
Order: info.Order,
Status: status,
ArchivedAt: fileInfo.ArchivedAt,
}
archivedList = append(archivedList, newInfo)
}
return nil
})
if err != nil {
errList = errs.Combine(errList, err)
}
return archivedList, errList
}
// CleanArchive deletes all entries archvied before the provided time.
func (store *FileStore) CleanArchive(deleteBefore time.Time) error {
store.archiveMu.Lock()
defer store.archiveMu.Unlock()
// we want to delete everything older than ttl
var errList error
err := filepath.Walk(store.archiveDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
errList = errs.Combine(errList, OrderError.Wrap(err))
return nil
}
if info.IsDir() {
return nil
}
fileInfo, err := ordersfile.GetArchivedInfo(info)
if err != nil {
errList = errs.Combine(errList, err)
return nil
}
if fileInfo.ArchivedAt.Before(deleteBefore) {
return OrderError.Wrap(os.Remove(path))
}
return nil
})
return errs.Combine(errList, err)
}
// ensureDirectories checks for the existence of the unsent and archived directories, and creates them if they do not exist.
func (store *FileStore) ensureDirectories() error {
if _, err := os.Stat(store.unsentDir); os.IsNotExist(err) {
err = os.MkdirAll(store.unsentDir, 0700)
if err != nil {
return OrderError.Wrap(err)
}
}
if _, err := os.Stat(store.archiveDir); os.IsNotExist(err) {
err = os.MkdirAll(store.archiveDir, 0700)
if err != nil {
return OrderError.Wrap(err)
}
}
return nil
}