satellite/orders: 3-phase rollout

This adds a config flag orders.window-endpoint-rollout-phase
that can take on the values phase1, phase2 or phase3.

In phase1, the current orders endpoint continues to work as
usual, and the windowed orders endpoint uses the same backend
as the current one (but also does a bit extra).

In phase2, the current orders endpoint is disabled and the
windowed orders endpoint continues to use the same backend.

In phase3, the current orders endpoint is still disabled and
the windowed orders endpoint uses the new backend that requires
much less database traffic and state.

The intention is to deploy in phase1, roll out code to nodes
to have them use the windowed endpoint, switch to phase2, wait
a couple days for all existing orders to expire, then switch
to phase3.

Additionally, it fixes a bug where a node could submit a bunch
of orders and rack up charges for a bucket.

Change-Id: Ifdc10e09ae1645159cbec7ace687dcb2d594c76d
This commit is contained in:
Jeff Wendling 2020-07-21 12:53:32 -04:00
parent 935f44ddb7
commit 85a74b47e7
8 changed files with 481 additions and 100 deletions

View File

@ -467,11 +467,12 @@ func (planet *Planet) newSatellites(count int, satelliteDatabases satellitedbtes
},
},
Orders: orders.Config{
Expiration: 7 * 24 * time.Hour,
SettlementBatchSize: 10,
FlushBatchSize: 10,
FlushInterval: defaultInterval,
NodeStatusLogging: true,
Expiration: 7 * 24 * time.Hour,
SettlementBatchSize: 10,
FlushBatchSize: 10,
FlushInterval: defaultInterval,
NodeStatusLogging: true,
WindowEndpointRolloutPhase: orders.WindowEndpointRolloutPhase1,
},
Checker: checker.Config{
Interval: defaultInterval,

View File

@ -61,7 +61,7 @@ func NewChore(log *zap.Logger, db orders.DB, config Config) *Chore {
func (chore *Chore) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
return chore.Loop.Run(ctx, func(ctx context.Context) error {
err := chore.RunOnce(ctx, time.Now())
err := chore.runOnceNow(ctx, time.Now)
if err != nil {
chore.log.Error("error flushing reported rollups", zap.Error(err))
}
@ -79,8 +79,20 @@ func (chore *Chore) Close() error {
func (chore *Chore) RunOnce(ctx context.Context, now time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
return chore.runOnceNow(ctx, func() time.Time { return now })
}
// runOnceNow runs the helper repeatedly, calling the nowFn each time it runs it. It does that
// until the helper returns that it is done or an error occurs.
//
// This function exists because tests want to use RunOnce and have a single fixed time for
// reproducibility, but the chore loop wants to use whatever time.Now is every time the helper
// is run.
func (chore *Chore) runOnceNow(ctx context.Context, nowFn func() time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
for {
done, err := chore.runOnceHelper(ctx, now)
done, err := chore.runOnceHelper(ctx, nowFn())
if err != nil {
return errs.Wrap(err)
}
@ -90,7 +102,7 @@ func (chore *Chore) RunOnce(ctx context.Context, now time.Time) (err error) {
}
}
func (chore *Chore) readWork(ctx context.Context, now time.Time, queue orders.Queue) (
func (chore *Chore) readWork(ctx context.Context, queue orders.Queue) (
bucketRollups []orders.BucketBandwidthRollup,
storagenodeRollups []orders.StoragenodeBandwidthRollup,
consumedSerials []orders.ConsumedSerial,
@ -217,15 +229,13 @@ func (chore *Chore) runOnceHelper(ctx context.Context, now time.Time) (done bool
)
// Read the work we should insert.
bucketRollups, storagenodeRollups, consumedSerials, done, err = chore.readWork(ctx, now, queue)
bucketRollups, storagenodeRollups, consumedSerials, done, err = chore.readWork(ctx, queue)
if err != nil {
return errs.Wrap(err)
}
// Now that we have work, write it all in its own transaction.
return errs.Wrap(chore.db.WithTransaction(ctx, func(ctx context.Context, tx orders.Transaction) error {
now := time.Now()
if err := tx.UpdateBucketBandwidthBatch(ctx, now, bucketRollups); err != nil {
return errs.Wrap(err)
}

View File

@ -321,6 +321,7 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
peer.Orders.DB,
peer.DB.NodeAPIVersion(),
config.Orders.SettlementBatchSize,
config.Orders.WindowEndpointRolloutPhase,
)
peer.Orders.Service = orders.NewService(
peer.Log.Named("orders:service"),

View File

@ -198,21 +198,23 @@ type ProcessOrderResponse struct {
//
// architecture: Endpoint
type Endpoint struct {
log *zap.Logger
satelliteSignee signing.Signee
DB DB
nodeAPIVersionDB nodeapiversion.DB
settlementBatchSize int
log *zap.Logger
satelliteSignee signing.Signee
DB DB
nodeAPIVersionDB nodeapiversion.DB
settlementBatchSize int
windowEndpointRolloutPhase WindowEndpointRolloutPhase
}
// NewEndpoint new orders receiving endpoint.
func NewEndpoint(log *zap.Logger, satelliteSignee signing.Signee, db DB, nodeAPIVersionDB nodeapiversion.DB, settlementBatchSize int) *Endpoint {
func NewEndpoint(log *zap.Logger, satelliteSignee signing.Signee, db DB, nodeAPIVersionDB nodeapiversion.DB, settlementBatchSize int, windowEndpointRolloutPhase WindowEndpointRolloutPhase) *Endpoint {
return &Endpoint{
log: log,
satelliteSignee: satelliteSignee,
DB: db,
nodeAPIVersionDB: nodeAPIVersionDB,
settlementBatchSize: settlementBatchSize,
log: log,
satelliteSignee: satelliteSignee,
DB: db,
nodeAPIVersionDB: nodeAPIVersionDB,
settlementBatchSize: settlementBatchSize,
windowEndpointRolloutPhase: windowEndpointRolloutPhase,
}
}
@ -239,6 +241,14 @@ func (endpoint *Endpoint) Settlement(stream pb.DRPCOrders_SettlementStream) (err
ctx := stream.Context()
defer mon.Task()(&ctx)(&err)
switch endpoint.windowEndpointRolloutPhase {
case WindowEndpointRolloutPhase1:
case WindowEndpointRolloutPhase2, WindowEndpointRolloutPhase3:
return rpcstatus.Error(rpcstatus.Unavailable, "endpoint disabled")
default:
return rpcstatus.Error(rpcstatus.Internal, "invalid window endpoint rollout phase")
}
peer, err := identity.PeerIdentityFromContext(ctx)
if err != nil {
return rpcstatus.Error(rpcstatus.Unauthenticated, err.Error())
@ -390,14 +400,129 @@ type bucketIDAction struct {
// Only one window is processed at a time.
// Batches are atomic, all orders are settled successfully or they all fail.
func (endpoint *Endpoint) SettlementWithWindow(stream pb.DRPCOrders_SettlementWithWindowStream) (err error) {
switch endpoint.windowEndpointRolloutPhase {
case WindowEndpointRolloutPhase1, WindowEndpointRolloutPhase2:
return endpoint.SettlementWithWindowMigration(stream)
case WindowEndpointRolloutPhase3:
return endpoint.SettlementWithWindowFinal(stream)
default:
return rpcstatus.Error(rpcstatus.Internal, "invalid window endpoint rollout phase")
}
}
// SettlementWithWindowMigration implements phase 1 and phase 2 of the windowed order rollout where
// it uses the same backend as the non-windowed settlement and inserts entries containing 0 for
// the window which ensures that it is either entirely handled by the queue or entirely handled by
// the phase 3 endpoint.
func (endpoint *Endpoint) SettlementWithWindowMigration(stream pb.DRPCOrders_SettlementWithWindowStream) (err error) {
ctx := stream.Context()
defer mon.Task()(&ctx)(&err)
// TODO: remove once the storagenode side of this endpoint is implemented
if true {
return rpcstatus.Error(rpcstatus.Unimplemented, "endpoint not supporrted")
peer, err := identity.PeerIdentityFromContext(ctx)
if err != nil {
endpoint.log.Debug("err peer identity from context", zap.Error(err))
return rpcstatus.Error(rpcstatus.Unauthenticated, err.Error())
}
err = endpoint.nodeAPIVersionDB.UpdateVersionAtLeast(ctx, peer.ID, nodeapiversion.HasWindowedOrders)
if err != nil {
return rpcstatus.Wrap(rpcstatus.Internal, err)
}
log := endpoint.log.Named(peer.ID.String())
log.Debug("SettlementWithWindow")
var receivedCount int
var window int64
var actions = map[pb.PieceAction]struct{}{}
var requests []*ProcessOrderRequest
var finished bool
for !finished {
requests = requests[:0]
for len(requests) < endpoint.settlementBatchSize {
request, err := stream.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
finished = true
break
}
log.Debug("err streaming order request", zap.Error(err))
return rpcstatus.Error(rpcstatus.Unknown, err.Error())
}
receivedCount++
orderLimit := request.Limit
if orderLimit == nil {
log.Debug("request.OrderLimit is nil")
continue
}
order := request.Order
if order == nil {
log.Debug("request.Order is nil")
continue
}
if window == 0 {
window = date.TruncateToHourInNano(orderLimit.OrderCreation)
}
// don't process orders that aren't valid
if !endpoint.isValid(ctx, log, order, orderLimit, peer.ID, window) {
continue
}
actions[orderLimit.Action] = struct{}{}
requests = append(requests, &ProcessOrderRequest{
Order: order,
OrderLimit: orderLimit,
})
}
// process all of the orders in the old way
_, err = endpoint.DB.ProcessOrders(ctx, requests)
if err != nil {
return rpcstatus.Wrap(rpcstatus.Internal, err)
}
}
// if we received no valid orders, then respond with rejected
if len(actions) == 0 || window == 0 {
return stream.SendAndClose(&pb.SettlementWithWindowResponse{
Status: pb.SettlementWithWindowResponse_REJECTED,
})
}
// insert zero rows for every action involved in the set of orders. this prevents
// many problems (double spends and underspends) by ensuring that any window is
// either handled entirely by the queue or entirely with the phase 3 windowed endpoint.
windowTime := time.Unix(0, window)
for action := range actions {
if err := endpoint.DB.UpdateStoragenodeBandwidthSettle(ctx, peer.ID, action, 0, windowTime); err != nil {
return rpcstatus.Wrap(rpcstatus.Internal, err)
}
}
log.Debug("orders processed",
zap.Int("total orders received", receivedCount),
zap.Time("window", windowTime),
)
return stream.SendAndClose(&pb.SettlementWithWindowResponse{
Status: pb.SettlementWithWindowResponse_ACCEPTED,
})
}
// SettlementWithWindowFinal processes all orders that were created in a 1 hour window.
// Only one window is processed at a time.
// Batches are atomic, all orders are settled successfully or they all fail.
func (endpoint *Endpoint) SettlementWithWindowFinal(stream pb.DRPCOrders_SettlementWithWindowStream) (err error) {
ctx := stream.Context()
defer mon.Task()(&ctx)(&err)
peer, err := identity.PeerIdentityFromContext(ctx)
if err != nil {
endpoint.log.Debug("err peer identity from context", zap.Error(err))
@ -495,7 +620,7 @@ func (endpoint *Endpoint) SettlementWithWindow(stream pb.DRPCOrders_SettlementWi
zap.String("status", status.String()),
)
if !alreadyProcessed {
if status == pb.SettlementWithWindowResponse_ACCEPTED && !alreadyProcessed {
for bucketIDAction, amount := range bucketSettled {
err = endpoint.DB.UpdateBucketBandwidthSettle(ctx,
bucketIDAction.projectID, []byte(bucketIDAction.bucketname), bucketIDAction.action, amount, time.Unix(0, window),

View File

@ -4,35 +4,58 @@
package orders_test
import (
"io"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"storj.io/common/pb"
"storj.io/common/rpc/rpcstatus"
"storj.io/common/signing"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/satellite/orders"
)
func TestSettlementWithWindowEndpointManyOrders(t *testing.T) {
t.Skip("endpoint currently disabled")
func runTestWithPhases(t *testing.T, fn func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet)) {
run := func(phase orders.WindowEndpointRolloutPhase) func(t *testing.T) {
return func(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(_ *zap.Logger, _ int, config *satellite.Config) {
config.Orders.WindowEndpointRolloutPhase = phase
},
},
}, fn)
}
}
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
t.Run("Phase1", run(orders.WindowEndpointRolloutPhase1))
t.Run("Phase2", run(orders.WindowEndpointRolloutPhase2))
t.Run("Phase3", run(orders.WindowEndpointRolloutPhase3))
}
func TestSettlementWithWindowEndpointManyOrders(t *testing.T) {
runTestWithPhases(t, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
ordersDB := satellite.Orders.DB
storagenode := planet.StorageNodes[0]
now := time.Now().UTC()
now := time.Now()
projectID := testrand.UUID()
bucketname := "testbucket"
bucketID := storj.JoinPaths(projectID.String(), bucketname)
// stop any async flushes because we want to be sure when some values are
// written to avoid races
satellite.Orders.Chore.Loop.Pause()
satellite.Accounting.ReportedRollup.Loop.Pause()
// confirm storagenode and bucket bandwidth tables start empty
snbw, err := ordersDB.GetStorageNodeBandwidth(ctx, satellite.ID(), time.Time{}, now)
@ -42,17 +65,6 @@ func TestSettlementWithWindowEndpointManyOrders(t *testing.T) {
require.NoError(t, err)
require.Equal(t, int64(0), bucketbw)
// create serial number to use in test
serialNumber1 := testrand.SerialNumber()
bucketID := storj.JoinPaths(projectID.String(), bucketname)
err = ordersDB.CreateSerialInfo(ctx, serialNumber1, []byte(bucketID), now.AddDate(1, 0, 10))
serialNumber2 := testrand.SerialNumber()
require.NoError(t, err)
err = ordersDB.CreateSerialInfo(ctx, serialNumber2, []byte(bucketID), now.AddDate(1, 0, 10))
require.NoError(t, err)
piecePublicKey, piecePrivateKey, err := storj.NewPieceKey()
require.NoError(t, err)
var testCases = []struct {
name string
dataAmount int64
@ -64,6 +76,18 @@ func TestSettlementWithWindowEndpointManyOrders(t *testing.T) {
}
for _, tt := range testCases {
// create serial number to use in test. must be unique for each run.
serialNumber1 := testrand.SerialNumber()
err = ordersDB.CreateSerialInfo(ctx, serialNumber1, []byte(bucketID), now.AddDate(1, 0, 10))
require.NoError(t, err)
serialNumber2 := testrand.SerialNumber()
err = ordersDB.CreateSerialInfo(ctx, serialNumber2, []byte(bucketID), now.AddDate(1, 0, 10))
require.NoError(t, err)
piecePublicKey, piecePrivateKey, err := storj.NewPieceKey()
require.NoError(t, err)
// create signed orderlimit or order to test with
limit1 := &pb.OrderLimit{
SerialNumber: serialNumber1,
@ -79,11 +103,13 @@ func TestSettlementWithWindowEndpointManyOrders(t *testing.T) {
}
orderLimit1, err := signing.SignOrderLimit(ctx, signing.SignerFromFullIdentity(satellite.Identity), limit1)
require.NoError(t, err)
order1, err := signing.SignUplinkOrder(ctx, piecePrivateKey, &pb.Order{
SerialNumber: serialNumber1,
Amount: tt.dataAmount,
})
require.NoError(t, err)
limit2 := &pb.OrderLimit{
SerialNumber: serialNumber2,
SatelliteId: satellite.ID(),
@ -98,6 +124,7 @@ func TestSettlementWithWindowEndpointManyOrders(t *testing.T) {
}
orderLimit2, err := signing.SignOrderLimit(ctx, signing.SignerFromFullIdentity(satellite.Identity), limit2)
require.NoError(t, err)
order2, err := signing.SignUplinkOrder(ctx, piecePrivateKey, &pb.Order{
SerialNumber: serialNumber2,
Amount: tt.dataAmount,
@ -107,8 +134,12 @@ func TestSettlementWithWindowEndpointManyOrders(t *testing.T) {
// create connection between storagenode and satellite
conn, err := storagenode.Dialer.DialNodeURL(ctx, storj.NodeURL{ID: satellite.ID(), Address: satellite.Addr()})
require.NoError(t, err)
defer ctx.Check(conn.Close)
stream, err := pb.NewDRPCOrdersClient(conn).SettlementWithWindow(ctx)
require.NoError(t, err)
defer ctx.Check(stream.Close)
// storagenode settles an order and orderlimit
err = stream.Send(&pb.SettlementRequest{
Limit: orderLimit1,
@ -123,14 +154,25 @@ func TestSettlementWithWindowEndpointManyOrders(t *testing.T) {
resp, err := stream.CloseAndRecv()
require.NoError(t, err)
settled := map[int32]int64{int32(pb.PieceAction_PUT): tt.settledAmt}
require.Equal(t, &pb.SettlementWithWindowResponse{Status: pb.SettlementWithWindowResponse_ACCEPTED, ActionSettled: settled}, resp)
// the settled amount is only returned during phase3
var settled map[int32]int64
if satellite.Config.Orders.WindowEndpointRolloutPhase == orders.WindowEndpointRolloutPhase3 {
settled = map[int32]int64{int32(pb.PieceAction_PUT): tt.settledAmt}
}
require.Equal(t, &pb.SettlementWithWindowResponse{
Status: pb.SettlementWithWindowResponse_ACCEPTED,
ActionSettled: settled,
}, resp)
// trigger and wait for all of the chores necessary to flush the orders
assert.NoError(t, satellite.Accounting.ReportedRollup.RunOnce(ctx, tt.orderCreation))
satellite.Orders.Chore.Loop.TriggerWait()
// assert all the right stuff is in the satellite storagenode and bucket bandwidth tables
snbw, err = ordersDB.GetStorageNodeBandwidth(ctx, storagenode.ID(), time.Time{}, tt.orderCreation)
require.NoError(t, err)
require.EqualValues(t, tt.settledAmt, snbw)
satellite.Orders.Chore.Loop.TriggerWait()
newBbw, err := ordersDB.GetBucketBandwidth(ctx, projectID, []byte(bucketname), time.Time{}, tt.orderCreation)
require.NoError(t, err)
require.EqualValues(t, tt.settledAmt, newBbw)
@ -138,53 +180,52 @@ func TestSettlementWithWindowEndpointManyOrders(t *testing.T) {
})
}
func TestSettlementWithWindowEndpointSingleOrder(t *testing.T) {
t.Skip("endpoint currently disabled")
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
runTestWithPhases(t, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
const dataAmount int64 = 50
satellite := planet.Satellites[0]
ordersDB := satellite.Orders.DB
storagenode := planet.StorageNodes[0]
now := time.Now().UTC()
now := time.Now()
projectID := testrand.UUID()
bucketname := "testbucket"
bucketID := storj.JoinPaths(projectID.String(), bucketname)
// stop any async flushes because we want to be sure when some values are
// written to avoid races
satellite.Orders.Chore.Loop.Pause()
satellite.Accounting.ReportedRollup.Loop.Pause()
// confirm storagenode and bucket bandwidth tables start empty
snbw, err := ordersDB.GetStorageNodeBandwidth(ctx, satellite.ID(), time.Time{}, now)
require.NoError(t, err)
require.EqualValues(t, 0, snbw)
bucketbw, err := ordersDB.GetBucketBandwidth(ctx, projectID, []byte(bucketname), time.Time{}, now)
require.NoError(t, err)
require.EqualValues(t, 0, bucketbw)
// create serial number to use in test
serialNumber := testrand.SerialNumber()
bucketID := storj.JoinPaths(projectID.String(), bucketname)
err = ordersDB.CreateSerialInfo(ctx, serialNumber, []byte(bucketID), now.AddDate(1, 0, 10))
require.NoError(t, err)
piecePublicKey, piecePrivateKey, err := storj.NewPieceKey()
require.NoError(t, err)
var testCases = []struct {
name string
serialNumber storj.SerialNumber
dataAmount int64
expectedStatus pb.SettlementWithWindowResponse_Status
}{
{"first settlement", serialNumber, int64(50), pb.SettlementWithWindowResponse_ACCEPTED},
{"settle the same a second time, matches first", serialNumber, int64(50), pb.SettlementWithWindowResponse_ACCEPTED},
{"settle a third time, doesn't match first", serialNumber, int64(0), pb.SettlementWithWindowResponse_REJECTED},
{"first settlement", dataAmount, pb.SettlementWithWindowResponse_ACCEPTED},
{"settle the same a second time, matches first", dataAmount, pb.SettlementWithWindowResponse_ACCEPTED},
{"settle a third time, doesn't match first", dataAmount + 1, pb.SettlementWithWindowResponse_REJECTED},
}
for _, tt := range testCases {
// create signed orderlimit or order to test with
limit := &pb.OrderLimit{
SerialNumber: tt.serialNumber,
SerialNumber: serialNumber,
SatelliteId: satellite.ID(),
UplinkPublicKey: piecePublicKey,
StorageNodeId: storagenode.ID(),
@ -197,8 +238,9 @@ func TestSettlementWithWindowEndpointSingleOrder(t *testing.T) {
}
orderLimit, err := signing.SignOrderLimit(ctx, signing.SignerFromFullIdentity(satellite.Identity), limit)
require.NoError(t, err)
order, err := signing.SignUplinkOrder(ctx, piecePrivateKey, &pb.Order{
SerialNumber: tt.serialNumber,
SerialNumber: serialNumber,
Amount: tt.dataAmount,
})
require.NoError(t, err)
@ -206,8 +248,12 @@ func TestSettlementWithWindowEndpointSingleOrder(t *testing.T) {
// create connection between storagenode and satellite
conn, err := storagenode.Dialer.DialNodeURL(ctx, storj.NodeURL{ID: satellite.ID(), Address: satellite.Addr()})
require.NoError(t, err)
defer ctx.Check(conn.Close)
stream, err := pb.NewDRPCOrdersClient(conn).SettlementWithWindow(ctx)
require.NoError(t, err)
defer ctx.Check(stream.Close)
// storagenode settles an order and orderlimit
err = stream.Send(&pb.SettlementRequest{
Limit: orderLimit,
@ -216,71 +262,76 @@ func TestSettlementWithWindowEndpointSingleOrder(t *testing.T) {
require.NoError(t, err)
resp, err := stream.CloseAndRecv()
require.NoError(t, err)
settled := map[int32]int64{int32(pb.PieceAction_PUT): tt.dataAmount}
if tt.expectedStatus == pb.SettlementWithWindowResponse_REJECTED {
require.Equal(t, &pb.SettlementWithWindowResponse{Status: tt.expectedStatus, ActionSettled: nil}, resp)
} else {
require.Equal(t, &pb.SettlementWithWindowResponse{Status: tt.expectedStatus, ActionSettled: settled}, resp)
}
// assert all the right stuff is in the satellite storagenode and bucket bandwidth tables
snbw, err = ordersDB.GetStorageNodeBandwidth(ctx, storagenode.ID(), time.Time{}, time.Now().UTC())
require.NoError(t, err)
if tt.expectedStatus == pb.SettlementWithWindowResponse_REJECTED {
require.NotEqual(t, tt.dataAmount, snbw)
} else {
require.Equal(t, tt.dataAmount, snbw)
}
// wait for rollup_write_cache to flush, this on average takes 1ms to sleep to complete
satellite.Orders.Chore.Loop.TriggerWait()
newBbw, err := ordersDB.GetBucketBandwidth(ctx, projectID, []byte(bucketname), time.Time{}, time.Now().UTC())
require.NoError(t, err)
if tt.expectedStatus == pb.SettlementWithWindowResponse_REJECTED {
require.NotEqual(t, tt.dataAmount, newBbw)
} else {
require.Equal(t, tt.dataAmount, newBbw)
expected := new(pb.SettlementWithWindowResponse)
switch {
case satellite.Config.Orders.WindowEndpointRolloutPhase != orders.WindowEndpointRolloutPhase3:
expected.Status = pb.SettlementWithWindowResponse_ACCEPTED
expected.ActionSettled = nil
case tt.expectedStatus == pb.SettlementWithWindowResponse_ACCEPTED:
expected.Status = pb.SettlementWithWindowResponse_ACCEPTED
expected.ActionSettled = map[int32]int64{int32(pb.PieceAction_PUT): tt.dataAmount}
default:
expected.Status = pb.SettlementWithWindowResponse_REJECTED
expected.ActionSettled = nil
}
require.Equal(t, expected, resp)
// flush all the chores
assert.NoError(t, satellite.Accounting.ReportedRollup.RunOnce(ctx, now))
satellite.Orders.Chore.Loop.TriggerWait()
// assert all the right stuff is in the satellite storagenode and bucket bandwidth tables
snbw, err = ordersDB.GetStorageNodeBandwidth(ctx, storagenode.ID(), time.Time{}, now)
require.NoError(t, err)
require.Equal(t, dataAmount, snbw)
newBbw, err := ordersDB.GetBucketBandwidth(ctx, projectID, []byte(bucketname), time.Time{}, now)
require.NoError(t, err)
require.Equal(t, dataAmount, newBbw)
}
})
}
func TestSettlementWithWindowEndpointErrors(t *testing.T) {
t.Skip("endpoint currently disabled")
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
runTestWithPhases(t, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
ordersDB := satellite.Orders.DB
storagenode := planet.StorageNodes[0]
now := time.Now().UTC()
now := time.Now()
projectID := testrand.UUID()
bucketname := "testbucket"
bucketID := storj.JoinPaths(projectID.String(), bucketname)
// stop any async flushes because we want to be sure when some values are
// written to avoid races
satellite.Orders.Chore.Loop.Pause()
satellite.Accounting.ReportedRollup.Loop.Pause()
// confirm storagenode and bucket bandwidth tables start empty
snbw, err := ordersDB.GetStorageNodeBandwidth(ctx, satellite.ID(), time.Time{}, now)
require.NoError(t, err)
require.EqualValues(t, 0, snbw)
bucketbw, err := ordersDB.GetBucketBandwidth(ctx, projectID, []byte(bucketname), time.Time{}, now)
require.NoError(t, err)
require.EqualValues(t, 0, bucketbw)
// create serial number to use in test
serialNumber1 := testrand.SerialNumber()
serialNumber2 := testrand.SerialNumber()
bucketID := storj.JoinPaths(projectID.String(), bucketname)
err = ordersDB.CreateSerialInfo(ctx, serialNumber1, []byte(bucketID), now.AddDate(1, 0, 10))
require.NoError(t, err)
serialNumber2 := testrand.SerialNumber()
err = ordersDB.CreateSerialInfo(ctx, serialNumber2, []byte(bucketID), now.AddDate(1, 0, 10))
require.NoError(t, err)
piecePublicKey1, piecePrivateKey1, err := storj.NewPieceKey()
require.NoError(t, err)
_, piecePrivateKey2, err := storj.NewPieceKey()
require.NoError(t, err)
limit := pb.OrderLimit{
SerialNumber: serialNumber1,
SatelliteId: satellite.ID(),
@ -295,16 +346,19 @@ func TestSettlementWithWindowEndpointErrors(t *testing.T) {
}
orderLimit1, err := signing.SignOrderLimit(ctx, signing.SignerFromFullIdentity(satellite.Identity), &limit)
require.NoError(t, err)
order1, err := signing.SignUplinkOrder(ctx, piecePrivateKey1, &pb.Order{
SerialNumber: serialNumber1,
Amount: int64(50),
})
require.NoError(t, err)
order2, err := signing.SignUplinkOrder(ctx, piecePrivateKey1, &pb.Order{
SerialNumber: serialNumber2,
Amount: int64(50),
})
require.NoError(t, err)
order3, err := signing.SignUplinkOrder(ctx, piecePrivateKey2, &pb.Order{
SerialNumber: serialNumber2,
Amount: int64(50),
@ -327,28 +381,154 @@ func TestSettlementWithWindowEndpointErrors(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
conn, err := storagenode.Dialer.DialNodeURL(ctx, storj.NodeURL{ID: satellite.ID(), Address: satellite.Addr()})
require.NoError(t, err)
defer ctx.Check(conn.Close)
stream, err := pb.NewDRPCOrdersClient(conn).SettlementWithWindow(ctx)
require.NoError(t, err)
defer ctx.Check(stream.Close)
err = stream.Send(&pb.SettlementRequest{
Limit: tt.orderLimit,
Order: tt.order,
})
require.NoError(t, err)
resp, err := stream.CloseAndRecv()
require.NoError(t, err)
require.Equal(t, &pb.SettlementWithWindowResponse{Status: pb.SettlementWithWindowResponse_REJECTED, ActionSettled: nil}, resp)
require.Equal(t, &pb.SettlementWithWindowResponse{
Status: pb.SettlementWithWindowResponse_REJECTED,
ActionSettled: nil,
}, resp)
// flush all the chores
assert.NoError(t, satellite.Accounting.ReportedRollup.RunOnce(ctx, now))
satellite.Orders.Chore.Loop.TriggerWait()
// assert no data was added to satellite storagenode or bucket bandwidth tables
snbw, err = ordersDB.GetStorageNodeBandwidth(ctx, storagenode.ID(), time.Time{}, time.Now().UTC())
snbw, err = ordersDB.GetStorageNodeBandwidth(ctx, storagenode.ID(), time.Time{}, now)
require.NoError(t, err)
require.EqualValues(t, 0, snbw)
// wait for rollup_write_cache to flush
satellite.Orders.Chore.Loop.TriggerWait()
newBbw, err := ordersDB.GetBucketBandwidth(ctx, projectID, []byte(bucketname), time.Time{}, time.Now().UTC())
newBbw, err := ordersDB.GetBucketBandwidth(ctx, projectID, []byte(bucketname), time.Time{}, now)
require.NoError(t, err)
require.EqualValues(t, 0, newBbw)
})
}
})
}
func TestSettlementEndpointSingleOrder(t *testing.T) {
runTestWithPhases(t, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
const dataAmount int64 = 50
satellite := planet.Satellites[0]
ordersDB := satellite.Orders.DB
storagenode := planet.StorageNodes[0]
now := time.Now()
projectID := testrand.UUID()
bucketname := "testbucket"
bucketID := storj.JoinPaths(projectID.String(), bucketname)
// stop any async flushes because we want to be sure when some values are
// written to avoid races
satellite.Orders.Chore.Loop.Pause()
satellite.Accounting.ReportedRollup.Loop.Pause()
// confirm storagenode and bucket bandwidth tables start empty
snbw, err := ordersDB.GetStorageNodeBandwidth(ctx, satellite.ID(), time.Time{}, now)
require.NoError(t, err)
require.EqualValues(t, 0, snbw)
bucketbw, err := ordersDB.GetBucketBandwidth(ctx, projectID, []byte(bucketname), time.Time{}, now)
require.NoError(t, err)
require.EqualValues(t, 0, bucketbw)
// create serial number to use in test
serialNumber := testrand.SerialNumber()
err = ordersDB.CreateSerialInfo(ctx, serialNumber, []byte(bucketID), now.AddDate(1, 0, 10))
require.NoError(t, err)
piecePublicKey, piecePrivateKey, err := storj.NewPieceKey()
require.NoError(t, err)
// create signed orderlimit or order to test with
limit := &pb.OrderLimit{
SerialNumber: serialNumber,
SatelliteId: satellite.ID(),
UplinkPublicKey: piecePublicKey,
StorageNodeId: storagenode.ID(),
PieceId: storj.NewPieceID(),
Action: pb.PieceAction_PUT,
Limit: 1000,
PieceExpiration: time.Time{},
OrderCreation: now,
OrderExpiration: now.Add(24 * time.Hour),
}
orderLimit, err := signing.SignOrderLimit(ctx, signing.SignerFromFullIdentity(satellite.Identity), limit)
require.NoError(t, err)
order, err := signing.SignUplinkOrder(ctx, piecePrivateKey, &pb.Order{
SerialNumber: serialNumber,
Amount: dataAmount,
})
require.NoError(t, err)
// create connection between storagenode and satellite
conn, err := storagenode.Dialer.DialNodeURL(ctx, storj.NodeURL{ID: satellite.ID(), Address: satellite.Addr()})
require.NoError(t, err)
defer ctx.Check(conn.Close)
stream, err := pb.NewDRPCOrdersClient(conn).Settlement(ctx)
require.NoError(t, err)
defer ctx.Check(stream.Close)
// storagenode settles an order and orderlimit
var resp *pb.SettlementResponse
if satellite.Config.Orders.WindowEndpointRolloutPhase == orders.WindowEndpointRolloutPhase1 {
err = stream.Send(&pb.SettlementRequest{
Limit: orderLimit,
Order: order,
})
require.NoError(t, err)
require.NoError(t, stream.CloseSend())
resp, err = stream.Recv()
require.NoError(t, err)
} else {
// in phase2 and phase3, the endpoint is disabled. depending on how fast the
// server sends that error message, we may see an io.EOF on the Send call, or
// we may see no error at all. In either case, we have to call stream.Recv to
// see the actual error. gRPC semantics are funky.
err = stream.Send(&pb.SettlementRequest{
Limit: orderLimit,
Order: order,
})
if err != io.EOF {
require.NoError(t, err)
}
require.NoError(t, stream.CloseSend())
_, err = stream.Recv()
require.Error(t, err)
require.Equal(t, rpcstatus.Unavailable, rpcstatus.Code(err))
return
}
require.Equal(t, &pb.SettlementResponse{
SerialNumber: serialNumber,
Status: pb.SettlementResponse_ACCEPTED,
}, resp)
// flush all the chores
assert.NoError(t, satellite.Accounting.ReportedRollup.RunOnce(ctx, now))
satellite.Orders.Chore.Loop.TriggerWait()
// assert all the right stuff is in the satellite storagenode and bucket bandwidth tables
snbw, err = ordersDB.GetStorageNodeBandwidth(ctx, storagenode.ID(), time.Time{}, now)
require.NoError(t, err)
require.Equal(t, dataAmount, snbw)
newBbw, err := ordersDB.GetBucketBandwidth(ctx, projectID, []byte(bucketname), time.Time{}, now)
require.NoError(t, err)
require.Equal(t, dataAmount, newBbw)
})
}

View File

@ -29,12 +29,13 @@ var ErrDownloadFailedNotEnoughPieces = errs.Class("not enough pieces for downloa
// Config is a configuration struct for orders Service.
type Config struct {
Expiration time.Duration `help:"how long until an order expires" default:"48h"` // 2 days
SettlementBatchSize int `help:"how many orders to batch per transaction" default:"250"`
FlushBatchSize int `help:"how many items in the rollups write cache before they are flushed to the database" devDefault:"20" releaseDefault:"10000"`
FlushInterval time.Duration `help:"how often to flush the rollups write cache to the database" devDefault:"30s" releaseDefault:"1m"`
ReportedRollupsReadBatchSize int `help:"how many records to read in a single transaction when calculating billable bandwidth" default:"1000"`
NodeStatusLogging bool `hidden:"true" help:"deprecated, log the offline/disqualification status of nodes" default:"false"`
Expiration time.Duration `help:"how long until an order expires" default:"48h"` // 2 days
SettlementBatchSize int `help:"how many orders to batch per transaction" default:"250"`
FlushBatchSize int `help:"how many items in the rollups write cache before they are flushed to the database" devDefault:"20" releaseDefault:"10000"`
FlushInterval time.Duration `help:"how often to flush the rollups write cache to the database" devDefault:"30s" releaseDefault:"1m"`
ReportedRollupsReadBatchSize int `help:"how many records to read in a single transaction when calculating billable bandwidth" default:"1000"`
NodeStatusLogging bool `hidden:"true" help:"deprecated, log the offline/disqualification status of nodes" default:"false"`
WindowEndpointRolloutPhase WindowEndpointRolloutPhase `help:"rollout phase for the windowed endpoint" default:"phase1"`
}
// BucketsDB returns information about buckets.

View File

@ -0,0 +1,60 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package orders
import (
"fmt"
"strings"
"github.com/zeebo/errs"
)
// WindowEndpointRolloutPhase controls the phase of the new orders endpoint rollout.
type WindowEndpointRolloutPhase int
const (
// WindowEndpointRolloutPhase1 is when both the old and new endpoint are enabled and
// the new endpoint places orders in the queue just like the old endpoint.
WindowEndpointRolloutPhase1 WindowEndpointRolloutPhase = 1 + iota
// WindowEndpointRolloutPhase2 is when the old endpoint is disabled and the new endpint
// places orders in the queue just like the old endpoint used to.
WindowEndpointRolloutPhase2
// WindowEndpointRolloutPhase3 is when the old endpoint is disabled and the new endpoint
// does not use a queue and just does direct insertion of rollup values.
WindowEndpointRolloutPhase3
)
// String provides a human readable form of the rollout phase.
func (phase WindowEndpointRolloutPhase) String() string {
switch phase {
case WindowEndpointRolloutPhase1:
return "phase1"
case WindowEndpointRolloutPhase2:
return "phase2"
case WindowEndpointRolloutPhase3:
return "phase3"
default:
return fmt.Sprintf("WindowEndpointRolloutPhase(%d)", int(phase))
}
}
// Set implements flag.Value interface.
func (phase *WindowEndpointRolloutPhase) Set(s string) error {
switch strings.ToLower(s) {
case "phase1":
*phase = WindowEndpointRolloutPhase1
case "phase2":
*phase = WindowEndpointRolloutPhase2
case "phase3":
*phase = WindowEndpointRolloutPhase3
default:
return errs.New("invalid window endpoint rollout phase: %q", s)
}
return nil
}
// Type implements pflag.Value.
func (WindowEndpointRolloutPhase) Type() string { return "orders.WindowEndpointRolloutPhase" }

View File

@ -451,6 +451,9 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
# how many orders to batch per transaction
# orders.settlement-batch-size: 250
# rollout phase for the windowed endpoint
# orders.window-endpoint-rollout-phase: phase1
# disable node cache
# overlay.node-selection-cache.disabled: false