storagenode/orders: add archive cleanup to orders service (#2821)

This PR introduces functionality for routine deletion of archived orders.

The user may specify an interval at which to run archive cleanup and a TTL for archived items. During each cleanup, all items that have reached the TTL are deleted

This archive cleanup job is combined with the order sender into a new combined orders service
This commit is contained in:
Cameron 2019-08-22 10:33:14 -04:00 committed by GitHub
parent df29699641
commit 3d9441999a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 180 additions and 63 deletions

View File

@ -106,9 +106,11 @@ func (planet *Planet) newStorageNodes(count int, whitelistedSatellites storj.Nod
ExpirationGracePeriod: 0,
MaxConcurrentRequests: 100,
OrderLimitGracePeriod: time.Hour,
Sender: orders.SenderConfig{
Interval: time.Hour,
Timeout: time.Hour,
Orders: orders.Config{
SenderInterval: time.Hour,
SenderTimeout: time.Hour,
CleanupInterval: time.Hour,
ArchiveTTL: time.Hour,
},
Monitor: monitor.Config{
MinimumBandwidth: 100 * memory.MB,

View File

@ -32,7 +32,7 @@ func TestSendingReceivingOrders(t *testing.T) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
planet.Satellites[0].Audit.Service.Loop.Stop()
for _, storageNode := range planet.StorageNodes {
storageNode.Storage2.Sender.Loop.Pause()
storageNode.Storage2.Orders.Sender.Pause()
}
expectedData := testrand.Bytes(50 * memory.KiB)
@ -53,7 +53,7 @@ func TestSendingReceivingOrders(t *testing.T) {
sumArchived := 0
for _, storageNode := range planet.StorageNodes {
storageNode.Storage2.Sender.Loop.TriggerWait()
storageNode.Storage2.Orders.Sender.TriggerWait()
infos, err := storageNode.DB.Orders().ListUnsent(ctx, 10)
require.NoError(t, err)
@ -76,7 +76,7 @@ func TestUnableToSendOrders(t *testing.T) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
planet.Satellites[0].Audit.Service.Loop.Stop()
for _, storageNode := range planet.StorageNodes {
storageNode.Storage2.Sender.Loop.Pause()
storageNode.Storage2.Orders.Sender.Pause()
}
expectedData := testrand.Bytes(50 * memory.KiB)
@ -99,7 +99,7 @@ func TestUnableToSendOrders(t *testing.T) {
sumUnsent := 0
sumArchived := 0
for _, storageNode := range planet.StorageNodes {
storageNode.Storage2.Sender.Loop.TriggerWait()
storageNode.Storage2.Orders.Sender.TriggerWait()
infos, err := storageNode.DB.Orders().ListUnsent(ctx, 10)
require.NoError(t, err)
@ -123,7 +123,7 @@ func TestUploadDownloadBandwidth(t *testing.T) {
planet.Satellites[0].Audit.Service.Loop.Stop()
for _, storageNode := range planet.StorageNodes {
storageNode.Storage2.Sender.Loop.Pause()
storageNode.Storage2.Orders.Sender.Pause()
}
expectedData := testrand.Bytes(50 * memory.KiB)
@ -153,7 +153,7 @@ func TestUploadDownloadBandwidth(t *testing.T) {
}
for _, storageNode := range planet.StorageNodes {
storageNode.Storage2.Sender.Loop.TriggerWait()
storageNode.Storage2.Orders.Sender.TriggerWait()
}
projects, err := planet.Satellites[0].DB.Console().Projects().GetAll(ctx)

View File

@ -25,7 +25,7 @@ func TestCollector(t *testing.T) {
// stop collector, so we can run it manually
storageNode.Collector.Loop.Pause()
// stop order sender because we will stop satellite later
storageNode.Storage2.Sender.Loop.Pause()
storageNode.Storage2.Orders.Sender.Pause()
}
expectedData := testrand.Bytes(100 * memory.KiB)

View File

@ -113,7 +113,8 @@ func TestDB(t *testing.T) {
require.Empty(t, cmp.Diff(expectedGrouped, unsentGrouped, cmp.Comparer(pb.Equal)))
// test archival
err = ordersdb.Archive(ctx, orders.ArchiveRequest{
archivedAt := time.Now().UTC()
err = ordersdb.Archive(ctx, archivedAt, orders.ArchiveRequest{
Satellite: satellite0.ID,
Serial: infos[0].Limit.SerialNumber,
Status: orders.StatusAccepted,
@ -121,7 +122,7 @@ func TestDB(t *testing.T) {
require.NoError(t, err)
// duplicate archive
err = ordersdb.Archive(ctx, orders.ArchiveRequest{
err = ordersdb.Archive(ctx, archivedAt, orders.ArchiveRequest{
Satellite: satellite0.ID,
Serial: infos[0].Limit.SerialNumber,
Status: orders.StatusRejected,
@ -133,7 +134,7 @@ func TestDB(t *testing.T) {
)
// one new archive and one duplicated
err = ordersdb.Archive(ctx, orders.ArchiveRequest{
err = ordersdb.Archive(ctx, archivedAt, orders.ArchiveRequest{
Satellite: satellite0.ID,
Serial: infos[0].Limit.SerialNumber,
Status: orders.StatusRejected,
@ -221,7 +222,7 @@ func TestDB_Trivial(t *testing.T) {
}
{ // Ensure Archive works at all
err := db.Orders().Archive(ctx, orders.ArchiveRequest{satelliteID, serial, orders.StatusAccepted})
err := db.Orders().Archive(ctx, time.Now().UTC(), orders.ArchiveRequest{satelliteID, serial, orders.StatusAccepted})
require.NoError(t, err)
}

View File

@ -71,102 +71,129 @@ type DB interface {
ListUnsentBySatellite(ctx context.Context) (map[storj.NodeID][]*Info, error)
// Archive marks order as being handled.
Archive(ctx context.Context, requests ...ArchiveRequest) error
Archive(ctx context.Context, archivedAt time.Time, requests ...ArchiveRequest) error
// ListArchived returns orders that have been sent.
ListArchived(ctx context.Context, limit int) ([]*ArchivedInfo, error)
// CleanArchive deletes all entries older than ttl
CleanArchive(ctx context.Context, ttl time.Duration) (int, error)
}
// SenderConfig defines configuration for sending orders.
type SenderConfig struct {
Interval time.Duration `help:"duration between sending" default:"1h0m0s"`
Timeout time.Duration `help:"timeout for sending" default:"1h0m0s"`
// Config defines configuration for sending orders.
type Config struct {
SenderInterval time.Duration `help:"duration between sending" default:"1h0m0s"`
SenderTimeout time.Duration `help:"timeout for sending" default:"1h0m0s"`
CleanupInterval time.Duration `help:"duration between archive cleanups" default:"24h0m0s"`
ArchiveTTL time.Duration `help:"length of time to archive orders before deletion" default:"1080h0m0s"` // 45 days
}
// Sender sends every interval unsent orders to the satellite.
type Sender struct {
// Service sends every interval unsent orders to the satellite.
type Service struct {
log *zap.Logger
config SenderConfig
config Config
transport transport.Client
orders DB
trust *trust.Pool
Loop sync2.Cycle
Sender sync2.Cycle
Cleanup sync2.Cycle
}
// NewSender creates an order sender.
func NewSender(log *zap.Logger, transport transport.Client, orders DB, trust *trust.Pool, config SenderConfig) *Sender {
return &Sender{
// NewService creates an order service.
func NewService(log *zap.Logger, transport transport.Client, orders DB, trust *trust.Pool, config Config) *Service {
return &Service{
log: log,
transport: transport,
orders: orders,
config: config,
trust: trust,
Loop: *sync2.NewCycle(config.Interval),
Sender: *sync2.NewCycle(config.SenderInterval),
Cleanup: *sync2.NewCycle(config.CleanupInterval),
}
}
// Run sends orders on every interval to the appropriate satellites.
func (sender *Sender) Run(ctx context.Context) (err error) {
func (service *Service) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
return sender.Loop.Run(ctx, sender.runOnce)
var group errgroup.Group
service.Sender.Start(ctx, &group, service.sendOrders)
service.Cleanup.Start(ctx, &group, service.cleanArchive)
return group.Wait()
}
func (sender *Sender) runOnce(ctx context.Context) (err error) {
func (service *Service) cleanArchive(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
sender.log.Debug("sending")
service.log.Debug("cleaning")
deleted, err := service.orders.CleanArchive(ctx, service.config.ArchiveTTL)
if err != nil {
service.log.Error("cleaning archive", zap.Error(err))
return nil
}
service.log.Debug("cleanup finished", zap.Int("items deleted", deleted))
return nil
}
func (service *Service) sendOrders(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
service.log.Debug("sending")
const batchSize = 1000
ordersBySatellite, err := sender.orders.ListUnsentBySatellite(ctx)
ordersBySatellite, err := service.orders.ListUnsentBySatellite(ctx)
if err != nil {
if ordersBySatellite == nil {
sender.log.Error("listing orders", zap.Error(err))
service.log.Error("listing orders", zap.Error(err))
return nil
}
sender.log.Warn("DB contains invalid marshalled orders", zap.Error(err))
service.log.Warn("DB contains invalid marshalled orders", zap.Error(err))
}
requests := make(chan ArchiveRequest, batchSize)
var batchGroup errgroup.Group
batchGroup.Go(func() error { return sender.handleBatches(ctx, requests) })
batchGroup.Go(func() error { return service.handleBatches(ctx, requests) })
if len(ordersBySatellite) > 0 {
var group errgroup.Group
ctx, cancel := context.WithTimeout(ctx, sender.config.Timeout)
ctx, cancel := context.WithTimeout(ctx, service.config.SenderTimeout)
defer cancel()
for satelliteID, orders := range ordersBySatellite {
satelliteID, orders := satelliteID, orders
group.Go(func() error {
sender.Settle(ctx, satelliteID, orders, requests)
service.Settle(ctx, satelliteID, orders, requests)
return nil
})
}
_ = group.Wait() // doesn't return errors
} else {
sender.log.Debug("no orders to send")
service.log.Debug("no orders to send")
}
close(requests)
return batchGroup.Wait()
err = batchGroup.Wait()
if err != nil {
service.log.Error("archiving orders", zap.Error(err))
}
return nil
}
// Settle uploads orders to the satellite.
func (sender *Sender) Settle(ctx context.Context, satelliteID storj.NodeID, orders []*Info, requests chan ArchiveRequest) {
log := sender.log.Named(satelliteID.String())
err := sender.settle(ctx, log, satelliteID, orders, requests)
func (service *Service) Settle(ctx context.Context, satelliteID storj.NodeID, orders []*Info, requests chan ArchiveRequest) {
log := service.log.Named(satelliteID.String())
err := service.settle(ctx, log, satelliteID, orders, requests)
if err != nil {
log.Error("failed to settle orders", zap.Error(err))
}
}
func (sender *Sender) handleBatches(ctx context.Context, requests chan ArchiveRequest) (err error) {
func (service *Service) handleBatches(ctx context.Context, requests chan ArchiveRequest) (err error) {
defer mon.Task()(&ctx)(&err)
// In case anything goes wrong, discard everything from the channel.
@ -183,29 +210,29 @@ func (sender *Sender) handleBatches(ctx context.Context, requests chan ArchiveRe
continue
}
if err := sender.orders.Archive(ctx, buffer...); err != nil {
if err := service.orders.Archive(ctx, time.Now().UTC(), buffer...); err != nil {
if !OrderNotFoundError.Has(err) {
return err
}
sender.log.Warn("some unsent order aren't in the DB", zap.Error(err))
service.log.Warn("some unsent order aren't in the DB", zap.Error(err))
}
buffer = buffer[:0]
}
if len(buffer) > 0 {
return sender.orders.Archive(ctx, buffer...)
return service.orders.Archive(ctx, time.Now().UTC(), buffer...)
}
return nil
}
func (sender *Sender) settle(ctx context.Context, log *zap.Logger, satelliteID storj.NodeID, orders []*Info, requests chan ArchiveRequest) (err error) {
func (service *Service) settle(ctx context.Context, log *zap.Logger, satelliteID storj.NodeID, orders []*Info, requests chan ArchiveRequest) (err error) {
defer mon.Task()(&ctx)(&err)
log.Info("sending", zap.Int("count", len(orders)))
defer log.Info("finished")
address, err := sender.trust.GetAddress(ctx, satelliteID)
address, err := service.trust.GetAddress(ctx, satelliteID)
if err != nil {
return OrderError.New("unable to get satellite address: %v", err)
}
@ -217,7 +244,7 @@ func (sender *Sender) settle(ctx context.Context, log *zap.Logger, satelliteID s
},
}
conn, err := sender.transport.DialNode(ctx, &satellite)
conn, err := service.transport.DialNode(ctx, &satellite)
if err != nil {
return OrderError.New("unable to connect to the satellite: %v", err)
}
@ -305,7 +332,8 @@ func (sender *Sender) settle(ctx context.Context, log *zap.Logger, satelliteID s
}
// Close stops the sending service.
func (sender *Sender) Close() error {
sender.Loop.Close()
func (service *Service) Close() error {
service.Sender.Close()
service.Cleanup.Close()
return nil
}

View File

@ -0,0 +1,86 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package orders_test
import (
"testing"
"time"
"github.com/stretchr/testify/require"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testplanet"
"storj.io/storj/internal/testrand"
"storj.io/storj/pkg/pb"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/orders"
)
func TestCleanArchive(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
Reconfigure: testplanet.Reconfigure{
StorageNode: func(index int, config *storagenode.Config) {
config.Storage2.Orders.ArchiveTTL = 24 * time.Hour
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
planet.Satellites[0].Audit.Service.Loop.Stop()
satellite := planet.Satellites[0].ID()
node := planet.StorageNodes[0]
service := node.Storage2.Orders
service.Sender.Pause()
service.Cleanup.Pause()
serialNumber0 := testrand.SerialNumber()
serialNumber1 := testrand.SerialNumber()
order0 := &orders.Info{
Limit: &pb.OrderLimit{
SatelliteId: satellite,
SerialNumber: serialNumber0,
},
Order: &pb.Order{},
}
order1 := &orders.Info{
Limit: &pb.OrderLimit{
SatelliteId: satellite,
SerialNumber: serialNumber1,
},
Order: &pb.Order{},
}
// enter orders into unsent_orders
err := node.DB.Orders().Enqueue(ctx, order0)
require.NoError(t, err)
err = node.DB.Orders().Enqueue(ctx, order1)
require.NoError(t, err)
yesterday := time.Now().UTC().Add(-24 * time.Hour)
now := time.Now().UTC()
// archive one order yesterday, one today
err = node.DB.Orders().Archive(ctx, yesterday, orders.ArchiveRequest{
Satellite: satellite,
Serial: serialNumber0,
Status: orders.StatusAccepted,
})
require.NoError(t, err)
err = node.DB.Orders().Archive(ctx, now, orders.ArchiveRequest{
Satellite: satellite,
Serial: serialNumber1,
Status: orders.StatusAccepted,
})
require.NoError(t, err)
// trigger cleanup
service.Cleanup.TriggerWait()
archived, err := node.DB.Orders().ListArchived(ctx, 10)
require.NoError(t, err)
require.Len(t, archived, 1)
require.Equal(t, archived[0].Limit.SerialNumber, serialNumber1)
})
}

View File

@ -133,7 +133,7 @@ type Peer struct {
Endpoint *piecestore.Endpoint
Inspector *inspector.Endpoint
Monitor *monitor.Service
Sender *orders.Sender
Orders *orders.Service
}
Vouchers *vouchers.Service
@ -297,12 +297,12 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
}
pb.RegisterPiecestoreServer(peer.Server.GRPC(), peer.Storage2.Endpoint)
peer.Storage2.Sender = orders.NewSender(
log.Named("piecestore:orderssender"),
peer.Storage2.Orders = orders.NewService(
log.Named("orders"),
peer.Transport,
peer.DB.Orders(),
peer.Storage2.Trust,
config.Storage2.Sender,
config.Storage2.Orders,
)
}
@ -403,7 +403,7 @@ func (peer *Peer) Run(ctx context.Context) (err error) {
return errs2.IgnoreCanceled(peer.Collector.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Storage2.Sender.Run(ctx))
return errs2.IgnoreCanceled(peer.Storage2.Orders.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Storage2.Monitor.Run(ctx))
@ -463,8 +463,8 @@ func (peer *Peer) Close() error {
if peer.Storage2.Monitor != nil {
errlist.Add(peer.Storage2.Monitor.Close())
}
if peer.Storage2.Sender != nil {
errlist.Add(peer.Storage2.Sender.Close())
if peer.Storage2.Orders != nil {
errlist.Add(peer.Storage2.Orders.Close())
}
if peer.Storage2.CacheService != nil {
errlist.Add(peer.Storage2.CacheService.Close())

View File

@ -64,7 +64,7 @@ type Config struct {
RetainTimeBuffer time.Duration `help:"allows for small differences in the satellite and storagenode clocks" default:"1h0m0s"`
Monitor monitor.Config
Sender orders.SenderConfig
Orders orders.Config
}
// Endpoint implements uploading, downloading and deleting for a storage node.

View File

@ -181,7 +181,7 @@ func (db *ordersDB) ListUnsentBySatellite(ctx context.Context) (_ map[storj.Node
// follow with the next ones without interrupting the operation and it will
// return an error of the class orders.OrderNotFoundError. Any other error, will
// abort the operation, rolling back the transaction.
func (db *ordersDB) Archive(ctx context.Context, requests ...orders.ArchiveRequest) (err error) {
func (db *ordersDB) Archive(ctx context.Context, archivedAt time.Time, requests ...orders.ArchiveRequest) (err error) {
defer mon.Task()(&ctx)(&err)
txn, err := db.Begin()
@ -205,7 +205,7 @@ func (db *ordersDB) Archive(ctx context.Context, requests ...orders.ArchiveReque
}()
for _, req := range requests {
err := db.archiveOne(ctx, txn, req)
err := db.archiveOne(ctx, txn, archivedAt, req)
if err != nil {
if orders.OrderNotFoundError.Has(err) {
notFoundErrs.Add(err)
@ -220,7 +220,7 @@ func (db *ordersDB) Archive(ctx context.Context, requests ...orders.ArchiveReque
}
// archiveOne marks order as being handled.
func (db *ordersDB) archiveOne(ctx context.Context, txn *sql.Tx, req orders.ArchiveRequest) (err error) {
func (db *ordersDB) archiveOne(ctx context.Context, txn *sql.Tx, archivedAt time.Time, req orders.ArchiveRequest) (err error) {
defer mon.Task()(&ctx)(&err)
result, err := txn.Exec(`
@ -239,7 +239,7 @@ func (db *ordersDB) archiveOne(ctx context.Context, txn *sql.Tx, req orders.Arch
DELETE FROM unsent_order
WHERE satellite_id = ? AND serial_number = ?;
`, int(req.Status), time.Now().UTC(), req.Satellite, req.Serial, req.Satellite, req.Serial)
`, int(req.Status), archivedAt, req.Satellite, req.Serial, req.Satellite, req.Serial)
if err != nil {
return ErrOrders.Wrap(err)
}