storagenode/orders: archive unsent order for untrusted satellite
The order service still tries to settle orders at all instances even when the satellite is marked as untrusted by the trust service, which will always fail because the trust cache no longer has record of the URL of the satellite, and it will keep retrying. This leaves a lot of "satellite is untrusted" errors in the logs. There has been several complaints on the forum because this was happening a lot for the stefanlite and I expect it will be the same issue for the decommisioned satellites US2 and EUN-1 once the forget-satellite command is run to clean up the satellite. This change allows the order service to archive unsent orders available for any untrusted satellite, and will not attempt to settle the order. https://github.com/storj/storj/issues/6262 Change-Id: If0f7f1783587cd18fab8917d45948f22df5b1dcf
This commit is contained in:
parent
4e0f062cb5
commit
db7c6d38e5
@ -178,6 +178,12 @@ func (service *Service) SendOrders(ctx context.Context, now time.Time) {
|
|||||||
errorSatellites := make(map[storj.NodeID]struct{})
|
errorSatellites := make(map[storj.NodeID]struct{})
|
||||||
var errorSatellitesMu sync.Mutex
|
var errorSatellitesMu sync.Mutex
|
||||||
|
|
||||||
|
addErrorSatellite := func(satelliteID storj.NodeID) {
|
||||||
|
errorSatellitesMu.Lock()
|
||||||
|
defer errorSatellitesMu.Unlock()
|
||||||
|
errorSatellites[satelliteID] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
// Continue sending until there are no more windows to send, or all relevant satellites are offline.
|
// Continue sending until there are no more windows to send, or all relevant satellites are offline.
|
||||||
for {
|
for {
|
||||||
ordersBySatellite, err := service.ordersStore.ListUnsentBySatellite(ctx, now)
|
ordersBySatellite, err := service.ordersStore.ListUnsentBySatellite(ctx, now)
|
||||||
@ -202,15 +208,31 @@ func (service *Service) SendOrders(ctx context.Context, now time.Time) {
|
|||||||
|
|
||||||
group.Go(func() error {
|
group.Go(func() error {
|
||||||
log := service.log.Named(satelliteID.String())
|
log := service.log.Named(satelliteID.String())
|
||||||
status, err := service.settleWindow(ctx, log, satelliteID, unsentInfo.InfoList)
|
|
||||||
|
skipSettlement := false
|
||||||
|
nodeURL, err := service.trust.GetNodeURL(ctx, satelliteID)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("unable to get satellite address", zap.Error(err))
|
||||||
|
|
||||||
|
if !errs.Is(err, trust.ErrUntrusted) {
|
||||||
|
addErrorSatellite(satelliteID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
skipSettlement = true
|
||||||
|
}
|
||||||
|
|
||||||
|
status := pb.SettlementWithWindowResponse_REJECTED
|
||||||
|
if !skipSettlement {
|
||||||
|
status, err = service.settleWindow(ctx, log, nodeURL, unsentInfo.InfoList)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// satellite returned an error, but settlement was not explicitly rejected; we want to retry later
|
// satellite returned an error, but settlement was not explicitly rejected; we want to retry later
|
||||||
errorSatellitesMu.Lock()
|
addErrorSatellite(satelliteID)
|
||||||
errorSatellites[satelliteID] = struct{}{}
|
|
||||||
errorSatellitesMu.Unlock()
|
|
||||||
log.Error("failed to settle orders for satellite", zap.String("satellite ID", satelliteID.String()), zap.Error(err))
|
log.Error("failed to settle orders for satellite", zap.String("satellite ID", satelliteID.String()), zap.Error(err))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
log.Warn("skipping order settlement for untrusted satellite. Order will be archived", zap.String("satellite ID", satelliteID.String()))
|
||||||
|
}
|
||||||
|
|
||||||
err = service.ordersStore.Archive(satelliteID, unsentInfo, time.Now().UTC(), status)
|
err = service.ordersStore.Archive(satelliteID, unsentInfo, time.Now().UTC(), status)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -232,18 +254,13 @@ func (service *Service) SendOrders(ctx context.Context, now time.Time) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (service *Service) settleWindow(ctx context.Context, log *zap.Logger, satelliteID storj.NodeID, orders []*ordersfile.Info) (status pb.SettlementWithWindowResponse_Status, err error) {
|
func (service *Service) settleWindow(ctx context.Context, log *zap.Logger, nodeURL storj.NodeURL, orders []*ordersfile.Info) (status pb.SettlementWithWindowResponse_Status, err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
log.Info("sending", zap.Int("count", len(orders)))
|
log.Info("sending", zap.Int("count", len(orders)))
|
||||||
defer log.Info("finished")
|
defer log.Info("finished")
|
||||||
|
|
||||||
nodeurl, err := service.trust.GetNodeURL(ctx, satelliteID)
|
conn, err := service.dialer.DialNodeURL(ctx, nodeURL)
|
||||||
if err != nil {
|
|
||||||
return 0, OrderError.New("unable to get satellite address: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
conn, err := service.dialer.DialNodeURL(ctx, nodeurl)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, OrderError.New("unable to connect to the satellite: %w", err)
|
return 0, OrderError.New("unable to connect to the satellite: %w", err)
|
||||||
}
|
}
|
||||||
@ -280,6 +297,11 @@ func (service *Service) settleWindow(ctx context.Context, log *zap.Logger, satel
|
|||||||
return res.Status, nil
|
return res.Status, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestSetLogger sets the logger.
|
||||||
|
func (service *Service) TestSetLogger(log *zap.Logger) {
|
||||||
|
service.log = log
|
||||||
|
}
|
||||||
|
|
||||||
// sleep for random interval in [0;maxSleep).
|
// sleep for random interval in [0;maxSleep).
|
||||||
// Returns an error if context was cancelled.
|
// Returns an error if context was cancelled.
|
||||||
func (service *Service) sleep(ctx context.Context) error {
|
func (service *Service) sleep(ctx context.Context) error {
|
||||||
|
@ -8,6 +8,8 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
"go.uber.org/zap/zaptest/observer"
|
||||||
|
|
||||||
"storj.io/common/memory"
|
"storj.io/common/memory"
|
||||||
"storj.io/common/pb"
|
"storj.io/common/pb"
|
||||||
@ -124,6 +126,63 @@ func TestOrderFileStoreSettle(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestOrderFileStoreSettle_UntrustedSatellite(t *testing.T) {
|
||||||
|
testplanet.Run(t, testplanet.Config{
|
||||||
|
SatelliteCount: 2, StorageNodeCount: 1, UplinkCount: 1,
|
||||||
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||||
|
satellite := planet.Satellites[0]
|
||||||
|
satellite2 := planet.Satellites[1]
|
||||||
|
uplinkPeer := planet.Uplinks[0]
|
||||||
|
satellite.Audit.Worker.Loop.Pause()
|
||||||
|
node := planet.StorageNodes[0]
|
||||||
|
service := node.Storage2.Orders
|
||||||
|
service.Sender.Pause()
|
||||||
|
service.Cleanup.Pause()
|
||||||
|
tomorrow := time.Now().Add(24 * time.Hour)
|
||||||
|
|
||||||
|
// upload a file to generate an order on the storagenode
|
||||||
|
testData := testrand.Bytes(8 * memory.KiB)
|
||||||
|
require.NoError(t, uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path", testData))
|
||||||
|
testData2 := testrand.Bytes(8 * memory.KiB)
|
||||||
|
require.NoError(t, uplinkPeer.Upload(ctx, satellite2, "testbucket", "test/path", testData2))
|
||||||
|
|
||||||
|
require.NoError(t, planet.WaitForStorageNodeEndpoints(ctx))
|
||||||
|
|
||||||
|
// mark satellite2 as untrusted
|
||||||
|
require.NoError(t, node.Storage2.Trust.DeleteSatellite(ctx, satellite2.ID()))
|
||||||
|
|
||||||
|
toSend, err := node.OrdersStore.ListUnsentBySatellite(ctx, tomorrow)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, toSend, 2)
|
||||||
|
ordersForSat := toSend[satellite.ID()]
|
||||||
|
require.Len(t, ordersForSat.InfoList, 1)
|
||||||
|
ordersForSat2 := toSend[satellite2.ID()]
|
||||||
|
require.Len(t, ordersForSat2.InfoList, 1)
|
||||||
|
|
||||||
|
// create new observed logger
|
||||||
|
observedZapCore, observedLogs := observer.New(zap.DebugLevel)
|
||||||
|
observedLogger := zap.New(observedZapCore).Named("orders")
|
||||||
|
service.TestSetLogger(observedLogger)
|
||||||
|
// trigger order send
|
||||||
|
service.SendOrders(ctx, tomorrow)
|
||||||
|
|
||||||
|
// check that the untrusted satellite was skipped
|
||||||
|
require.NotZero(t, observedLogs.All())
|
||||||
|
skipLogs := observedLogs.FilterMessage("skipping order settlement for untrusted satellite. Order will be archived").All()
|
||||||
|
require.Len(t, skipLogs, 1)
|
||||||
|
logFields := observedLogs.FilterField(zap.String("satellite ID", satellite2.ID().String())).All()
|
||||||
|
require.Len(t, logFields, 1)
|
||||||
|
|
||||||
|
toSend, err = node.OrdersStore.ListUnsentBySatellite(ctx, tomorrow)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, toSend, 0)
|
||||||
|
|
||||||
|
archived, err := node.OrdersStore.ListArchived()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, archived, 2)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// TODO remove when db is removed.
|
// TODO remove when db is removed.
|
||||||
// TestOrderFileStoreAndDBSettle ensures that if orders exist in both DB and filestore, that the DB orders and filestore are both settled.
|
// TestOrderFileStoreAndDBSettle ensures that if orders exist in both DB and filestore, that the DB orders and filestore are both settled.
|
||||||
func TestOrderFileStoreAndDBSettle(t *testing.T) {
|
func TestOrderFileStoreAndDBSettle(t *testing.T) {
|
||||||
|
@ -26,6 +26,7 @@ import (
|
|||||||
// Error is the default error class.
|
// Error is the default error class.
|
||||||
var (
|
var (
|
||||||
Error = errs.Class("trust")
|
Error = errs.Class("trust")
|
||||||
|
ErrUntrusted = Error.New("satellite is untrusted")
|
||||||
|
|
||||||
mon = monkit.Package()
|
mon = monkit.Package()
|
||||||
)
|
)
|
||||||
@ -176,6 +177,14 @@ func (pool *Pool) GetNodeURL(ctx context.Context, id storj.NodeID) (_ storj.Node
|
|||||||
return info.url, nil
|
return info.url, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsTrusted returns true if the satellite is trusted.
|
||||||
|
func (pool *Pool) IsTrusted(ctx context.Context, id storj.NodeID) bool {
|
||||||
|
defer mon.Task()(&ctx)(nil)
|
||||||
|
|
||||||
|
_, err := pool.getInfo(id)
|
||||||
|
return err == nil
|
||||||
|
}
|
||||||
|
|
||||||
// Refresh refreshes the set of trusted satellites in the pool. Concurrent
|
// Refresh refreshes the set of trusted satellites in the pool. Concurrent
|
||||||
// callers will be synchronized so only one proceeds at a time.
|
// callers will be synchronized so only one proceeds at a time.
|
||||||
func (pool *Pool) Refresh(ctx context.Context) error {
|
func (pool *Pool) Refresh(ctx context.Context) error {
|
||||||
@ -243,13 +252,26 @@ func (pool *Pool) Refresh(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeleteSatellite deletes a satellite from the pool.
|
||||||
|
func (pool *Pool) DeleteSatellite(ctx context.Context, id storj.NodeID) error {
|
||||||
|
pool.satellitesMu.Lock()
|
||||||
|
defer pool.satellitesMu.Unlock()
|
||||||
|
|
||||||
|
if _, ok := pool.satellites[id]; !ok {
|
||||||
|
return ErrUntrusted
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(pool.satellites, id)
|
||||||
|
return pool.satellitesDB.UpdateSatelliteStatus(ctx, id, satellites.Untrusted)
|
||||||
|
}
|
||||||
|
|
||||||
func (pool *Pool) getInfo(id storj.NodeID) (*satelliteInfoCache, error) {
|
func (pool *Pool) getInfo(id storj.NodeID) (*satelliteInfoCache, error) {
|
||||||
pool.satellitesMu.RLock()
|
pool.satellitesMu.RLock()
|
||||||
defer pool.satellitesMu.RUnlock()
|
defer pool.satellitesMu.RUnlock()
|
||||||
|
|
||||||
info, ok := pool.satellites[id]
|
info, ok := pool.satellites[id]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, Error.New("satellite %q is untrusted", id)
|
return nil, ErrUntrusted
|
||||||
}
|
}
|
||||||
return info, nil
|
return info, nil
|
||||||
}
|
}
|
||||||
|
@ -7,7 +7,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"crypto/x509"
|
"crypto/x509"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -40,7 +39,7 @@ func TestPoolVerifySatelliteID(t *testing.T) {
|
|||||||
|
|
||||||
// Assert the ID is not trusted
|
// Assert the ID is not trusted
|
||||||
err := pool.VerifySatelliteID(context.Background(), id)
|
err := pool.VerifySatelliteID(context.Background(), id)
|
||||||
require.EqualError(t, err, fmt.Sprintf("trust: satellite %q is untrusted", id))
|
require.ErrorIs(t, err, trust.ErrUntrusted)
|
||||||
|
|
||||||
// Refresh the pool with the new trust entry
|
// Refresh the pool with the new trust entry
|
||||||
source.entries = []trust.Entry{
|
source.entries = []trust.Entry{
|
||||||
@ -64,7 +63,7 @@ func TestPoolVerifySatelliteID(t *testing.T) {
|
|||||||
|
|
||||||
// Assert the ID is no longer trusted
|
// Assert the ID is no longer trusted
|
||||||
err = pool.VerifySatelliteID(context.Background(), id)
|
err = pool.VerifySatelliteID(context.Background(), id)
|
||||||
require.EqualError(t, err, fmt.Sprintf("trust: satellite %q is untrusted", id))
|
require.ErrorIs(t, err, trust.ErrUntrusted)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -81,7 +80,7 @@ func TestPoolGetSignee(t *testing.T) {
|
|||||||
|
|
||||||
// ID is untrusted
|
// ID is untrusted
|
||||||
_, err := pool.GetSignee(context.Background(), id)
|
_, err := pool.GetSignee(context.Background(), id)
|
||||||
require.EqualError(t, err, fmt.Sprintf("trust: satellite %q is untrusted", id))
|
require.ErrorIs(t, err, trust.ErrUntrusted)
|
||||||
|
|
||||||
// Refresh the pool with the new trust entry
|
// Refresh the pool with the new trust entry
|
||||||
source.entries = []trust.Entry{{SatelliteURL: url}}
|
source.entries = []trust.Entry{{SatelliteURL: url}}
|
||||||
@ -265,7 +264,7 @@ func TestPoolGetAddress(t *testing.T) {
|
|||||||
|
|
||||||
// Assert the ID is not trusted
|
// Assert the ID is not trusted
|
||||||
nodeurl, err := pool.GetNodeURL(context.Background(), id)
|
nodeurl, err := pool.GetNodeURL(context.Background(), id)
|
||||||
require.EqualError(t, err, fmt.Sprintf("trust: satellite %q is untrusted", id))
|
require.ErrorIs(t, err, trust.ErrUntrusted)
|
||||||
require.Empty(t, nodeurl)
|
require.Empty(t, nodeurl)
|
||||||
|
|
||||||
// Refresh the pool with the new trust entry
|
// Refresh the pool with the new trust entry
|
||||||
|
Loading…
Reference in New Issue
Block a user