From 85a74b47e75222902221fd3499c22982386c2c74 Mon Sep 17 00:00:00 2001 From: Jeff Wendling Date: Tue, 21 Jul 2020 12:53:32 -0400 Subject: [PATCH] satellite/orders: 3-phase rollout This adds a config flag orders.window-endpoint-rollout-phase that can take on the values phase1, phase2 or phase3. In phase1, the current orders endpoint continues to work as usual, and the windowed orders endpoint uses the same backend as the current one (but also does a bit extra). In phase2, the current orders endpoint is disabled and the windowed orders endpoint continues to use the same backend. In phase3, the current orders endpoint is still disabled and the windowed orders endpoint uses the new backend that requires much less database traffic and state. The intention is to deploy in phase1, roll out code to nodes to have them use the windowed endpoint, switch to phase2, wait a couple days for all existing orders to expire, then switch to phase3. Additionally, it fixes a bug where a node could submit a bunch of orders and rack up charges for a bucket. Change-Id: Ifdc10e09ae1645159cbec7ace687dcb2d594c76d --- private/testplanet/satellite.go | 11 +- satellite/accounting/reportedrollup/chore.go | 22 +- satellite/api.go | 1 + satellite/orders/endpoint.go | 155 ++++++++- satellite/orders/endpoint_test.go | 316 +++++++++++++++---- satellite/orders/service.go | 13 +- satellite/orders/window_endpoint_phase.go | 60 ++++ scripts/testdata/satellite-config.yaml.lock | 3 + 8 files changed, 481 insertions(+), 100 deletions(-) create mode 100644 satellite/orders/window_endpoint_phase.go diff --git a/private/testplanet/satellite.go b/private/testplanet/satellite.go index bba4b2468..4c740f2a8 100644 --- a/private/testplanet/satellite.go +++ b/private/testplanet/satellite.go @@ -467,11 +467,12 @@ func (planet *Planet) newSatellites(count int, satelliteDatabases satellitedbtes }, }, Orders: orders.Config{ - Expiration: 7 * 24 * time.Hour, - SettlementBatchSize: 10, - FlushBatchSize: 10, - FlushInterval: defaultInterval, - NodeStatusLogging: true, + Expiration: 7 * 24 * time.Hour, + SettlementBatchSize: 10, + FlushBatchSize: 10, + FlushInterval: defaultInterval, + NodeStatusLogging: true, + WindowEndpointRolloutPhase: orders.WindowEndpointRolloutPhase1, }, Checker: checker.Config{ Interval: defaultInterval, diff --git a/satellite/accounting/reportedrollup/chore.go b/satellite/accounting/reportedrollup/chore.go index 9f8dc9ced..be350bdee 100644 --- a/satellite/accounting/reportedrollup/chore.go +++ b/satellite/accounting/reportedrollup/chore.go @@ -61,7 +61,7 @@ func NewChore(log *zap.Logger, db orders.DB, config Config) *Chore { func (chore *Chore) Run(ctx context.Context) (err error) { defer mon.Task()(&ctx)(&err) return chore.Loop.Run(ctx, func(ctx context.Context) error { - err := chore.RunOnce(ctx, time.Now()) + err := chore.runOnceNow(ctx, time.Now) if err != nil { chore.log.Error("error flushing reported rollups", zap.Error(err)) } @@ -79,8 +79,20 @@ func (chore *Chore) Close() error { func (chore *Chore) RunOnce(ctx context.Context, now time.Time) (err error) { defer mon.Task()(&ctx)(&err) + return chore.runOnceNow(ctx, func() time.Time { return now }) +} + +// runOnceNow runs the helper repeatedly, calling the nowFn each time it runs it. It does that +// until the helper returns that it is done or an error occurs. +// +// This function exists because tests want to use RunOnce and have a single fixed time for +// reproducibility, but the chore loop wants to use whatever time.Now is every time the helper +// is run. +func (chore *Chore) runOnceNow(ctx context.Context, nowFn func() time.Time) (err error) { + defer mon.Task()(&ctx)(&err) + for { - done, err := chore.runOnceHelper(ctx, now) + done, err := chore.runOnceHelper(ctx, nowFn()) if err != nil { return errs.Wrap(err) } @@ -90,7 +102,7 @@ func (chore *Chore) RunOnce(ctx context.Context, now time.Time) (err error) { } } -func (chore *Chore) readWork(ctx context.Context, now time.Time, queue orders.Queue) ( +func (chore *Chore) readWork(ctx context.Context, queue orders.Queue) ( bucketRollups []orders.BucketBandwidthRollup, storagenodeRollups []orders.StoragenodeBandwidthRollup, consumedSerials []orders.ConsumedSerial, @@ -217,15 +229,13 @@ func (chore *Chore) runOnceHelper(ctx context.Context, now time.Time) (done bool ) // Read the work we should insert. - bucketRollups, storagenodeRollups, consumedSerials, done, err = chore.readWork(ctx, now, queue) + bucketRollups, storagenodeRollups, consumedSerials, done, err = chore.readWork(ctx, queue) if err != nil { return errs.Wrap(err) } // Now that we have work, write it all in its own transaction. return errs.Wrap(chore.db.WithTransaction(ctx, func(ctx context.Context, tx orders.Transaction) error { - now := time.Now() - if err := tx.UpdateBucketBandwidthBatch(ctx, now, bucketRollups); err != nil { return errs.Wrap(err) } diff --git a/satellite/api.go b/satellite/api.go index ac2aaa7b3..9de2a872f 100644 --- a/satellite/api.go +++ b/satellite/api.go @@ -321,6 +321,7 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, peer.Orders.DB, peer.DB.NodeAPIVersion(), config.Orders.SettlementBatchSize, + config.Orders.WindowEndpointRolloutPhase, ) peer.Orders.Service = orders.NewService( peer.Log.Named("orders:service"), diff --git a/satellite/orders/endpoint.go b/satellite/orders/endpoint.go index 39bf09dce..245299b76 100644 --- a/satellite/orders/endpoint.go +++ b/satellite/orders/endpoint.go @@ -198,21 +198,23 @@ type ProcessOrderResponse struct { // // architecture: Endpoint type Endpoint struct { - log *zap.Logger - satelliteSignee signing.Signee - DB DB - nodeAPIVersionDB nodeapiversion.DB - settlementBatchSize int + log *zap.Logger + satelliteSignee signing.Signee + DB DB + nodeAPIVersionDB nodeapiversion.DB + settlementBatchSize int + windowEndpointRolloutPhase WindowEndpointRolloutPhase } // NewEndpoint new orders receiving endpoint. -func NewEndpoint(log *zap.Logger, satelliteSignee signing.Signee, db DB, nodeAPIVersionDB nodeapiversion.DB, settlementBatchSize int) *Endpoint { +func NewEndpoint(log *zap.Logger, satelliteSignee signing.Signee, db DB, nodeAPIVersionDB nodeapiversion.DB, settlementBatchSize int, windowEndpointRolloutPhase WindowEndpointRolloutPhase) *Endpoint { return &Endpoint{ - log: log, - satelliteSignee: satelliteSignee, - DB: db, - nodeAPIVersionDB: nodeAPIVersionDB, - settlementBatchSize: settlementBatchSize, + log: log, + satelliteSignee: satelliteSignee, + DB: db, + nodeAPIVersionDB: nodeAPIVersionDB, + settlementBatchSize: settlementBatchSize, + windowEndpointRolloutPhase: windowEndpointRolloutPhase, } } @@ -239,6 +241,14 @@ func (endpoint *Endpoint) Settlement(stream pb.DRPCOrders_SettlementStream) (err ctx := stream.Context() defer mon.Task()(&ctx)(&err) + switch endpoint.windowEndpointRolloutPhase { + case WindowEndpointRolloutPhase1: + case WindowEndpointRolloutPhase2, WindowEndpointRolloutPhase3: + return rpcstatus.Error(rpcstatus.Unavailable, "endpoint disabled") + default: + return rpcstatus.Error(rpcstatus.Internal, "invalid window endpoint rollout phase") + } + peer, err := identity.PeerIdentityFromContext(ctx) if err != nil { return rpcstatus.Error(rpcstatus.Unauthenticated, err.Error()) @@ -390,14 +400,129 @@ type bucketIDAction struct { // Only one window is processed at a time. // Batches are atomic, all orders are settled successfully or they all fail. func (endpoint *Endpoint) SettlementWithWindow(stream pb.DRPCOrders_SettlementWithWindowStream) (err error) { + switch endpoint.windowEndpointRolloutPhase { + case WindowEndpointRolloutPhase1, WindowEndpointRolloutPhase2: + return endpoint.SettlementWithWindowMigration(stream) + case WindowEndpointRolloutPhase3: + return endpoint.SettlementWithWindowFinal(stream) + default: + return rpcstatus.Error(rpcstatus.Internal, "invalid window endpoint rollout phase") + } +} + +// SettlementWithWindowMigration implements phase 1 and phase 2 of the windowed order rollout where +// it uses the same backend as the non-windowed settlement and inserts entries containing 0 for +// the window which ensures that it is either entirely handled by the queue or entirely handled by +// the phase 3 endpoint. +func (endpoint *Endpoint) SettlementWithWindowMigration(stream pb.DRPCOrders_SettlementWithWindowStream) (err error) { ctx := stream.Context() defer mon.Task()(&ctx)(&err) - // TODO: remove once the storagenode side of this endpoint is implemented - if true { - return rpcstatus.Error(rpcstatus.Unimplemented, "endpoint not supporrted") + peer, err := identity.PeerIdentityFromContext(ctx) + if err != nil { + endpoint.log.Debug("err peer identity from context", zap.Error(err)) + return rpcstatus.Error(rpcstatus.Unauthenticated, err.Error()) } + err = endpoint.nodeAPIVersionDB.UpdateVersionAtLeast(ctx, peer.ID, nodeapiversion.HasWindowedOrders) + if err != nil { + return rpcstatus.Wrap(rpcstatus.Internal, err) + } + + log := endpoint.log.Named(peer.ID.String()) + log.Debug("SettlementWithWindow") + + var receivedCount int + var window int64 + var actions = map[pb.PieceAction]struct{}{} + var requests []*ProcessOrderRequest + var finished bool + + for !finished { + requests = requests[:0] + + for len(requests) < endpoint.settlementBatchSize { + request, err := stream.Recv() + if err != nil { + if errors.Is(err, io.EOF) { + finished = true + break + } + log.Debug("err streaming order request", zap.Error(err)) + return rpcstatus.Error(rpcstatus.Unknown, err.Error()) + } + receivedCount++ + + orderLimit := request.Limit + if orderLimit == nil { + log.Debug("request.OrderLimit is nil") + continue + } + + order := request.Order + if order == nil { + log.Debug("request.Order is nil") + continue + } + + if window == 0 { + window = date.TruncateToHourInNano(orderLimit.OrderCreation) + } + + // don't process orders that aren't valid + if !endpoint.isValid(ctx, log, order, orderLimit, peer.ID, window) { + continue + } + + actions[orderLimit.Action] = struct{}{} + + requests = append(requests, &ProcessOrderRequest{ + Order: order, + OrderLimit: orderLimit, + }) + } + + // process all of the orders in the old way + _, err = endpoint.DB.ProcessOrders(ctx, requests) + if err != nil { + return rpcstatus.Wrap(rpcstatus.Internal, err) + } + } + + // if we received no valid orders, then respond with rejected + if len(actions) == 0 || window == 0 { + return stream.SendAndClose(&pb.SettlementWithWindowResponse{ + Status: pb.SettlementWithWindowResponse_REJECTED, + }) + } + + // insert zero rows for every action involved in the set of orders. this prevents + // many problems (double spends and underspends) by ensuring that any window is + // either handled entirely by the queue or entirely with the phase 3 windowed endpoint. + windowTime := time.Unix(0, window) + for action := range actions { + if err := endpoint.DB.UpdateStoragenodeBandwidthSettle(ctx, peer.ID, action, 0, windowTime); err != nil { + return rpcstatus.Wrap(rpcstatus.Internal, err) + } + } + + log.Debug("orders processed", + zap.Int("total orders received", receivedCount), + zap.Time("window", windowTime), + ) + + return stream.SendAndClose(&pb.SettlementWithWindowResponse{ + Status: pb.SettlementWithWindowResponse_ACCEPTED, + }) +} + +// SettlementWithWindowFinal processes all orders that were created in a 1 hour window. +// Only one window is processed at a time. +// Batches are atomic, all orders are settled successfully or they all fail. +func (endpoint *Endpoint) SettlementWithWindowFinal(stream pb.DRPCOrders_SettlementWithWindowStream) (err error) { + ctx := stream.Context() + defer mon.Task()(&ctx)(&err) + peer, err := identity.PeerIdentityFromContext(ctx) if err != nil { endpoint.log.Debug("err peer identity from context", zap.Error(err)) @@ -495,7 +620,7 @@ func (endpoint *Endpoint) SettlementWithWindow(stream pb.DRPCOrders_SettlementWi zap.String("status", status.String()), ) - if !alreadyProcessed { + if status == pb.SettlementWithWindowResponse_ACCEPTED && !alreadyProcessed { for bucketIDAction, amount := range bucketSettled { err = endpoint.DB.UpdateBucketBandwidthSettle(ctx, bucketIDAction.projectID, []byte(bucketIDAction.bucketname), bucketIDAction.action, amount, time.Unix(0, window), diff --git a/satellite/orders/endpoint_test.go b/satellite/orders/endpoint_test.go index d62b1e3ef..256ec0911 100644 --- a/satellite/orders/endpoint_test.go +++ b/satellite/orders/endpoint_test.go @@ -4,35 +4,58 @@ package orders_test import ( + "io" "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" "storj.io/common/pb" + "storj.io/common/rpc/rpcstatus" "storj.io/common/signing" "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/common/testrand" "storj.io/storj/private/testplanet" + "storj.io/storj/satellite" + "storj.io/storj/satellite/orders" ) -func TestSettlementWithWindowEndpointManyOrders(t *testing.T) { - t.Skip("endpoint currently disabled") +func runTestWithPhases(t *testing.T, fn func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet)) { + run := func(phase orders.WindowEndpointRolloutPhase) func(t *testing.T) { + return func(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: func(_ *zap.Logger, _ int, config *satellite.Config) { + config.Orders.WindowEndpointRolloutPhase = phase + }, + }, + }, fn) + } + } - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + t.Run("Phase1", run(orders.WindowEndpointRolloutPhase1)) + t.Run("Phase2", run(orders.WindowEndpointRolloutPhase2)) + t.Run("Phase3", run(orders.WindowEndpointRolloutPhase3)) +} + +func TestSettlementWithWindowEndpointManyOrders(t *testing.T) { + runTestWithPhases(t, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { satellite := planet.Satellites[0] ordersDB := satellite.Orders.DB storagenode := planet.StorageNodes[0] - now := time.Now().UTC() + now := time.Now() projectID := testrand.UUID() bucketname := "testbucket" + bucketID := storj.JoinPaths(projectID.String(), bucketname) // stop any async flushes because we want to be sure when some values are // written to avoid races satellite.Orders.Chore.Loop.Pause() + satellite.Accounting.ReportedRollup.Loop.Pause() // confirm storagenode and bucket bandwidth tables start empty snbw, err := ordersDB.GetStorageNodeBandwidth(ctx, satellite.ID(), time.Time{}, now) @@ -42,17 +65,6 @@ func TestSettlementWithWindowEndpointManyOrders(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(0), bucketbw) - // create serial number to use in test - serialNumber1 := testrand.SerialNumber() - bucketID := storj.JoinPaths(projectID.String(), bucketname) - err = ordersDB.CreateSerialInfo(ctx, serialNumber1, []byte(bucketID), now.AddDate(1, 0, 10)) - serialNumber2 := testrand.SerialNumber() - require.NoError(t, err) - err = ordersDB.CreateSerialInfo(ctx, serialNumber2, []byte(bucketID), now.AddDate(1, 0, 10)) - require.NoError(t, err) - piecePublicKey, piecePrivateKey, err := storj.NewPieceKey() - require.NoError(t, err) - var testCases = []struct { name string dataAmount int64 @@ -64,6 +76,18 @@ func TestSettlementWithWindowEndpointManyOrders(t *testing.T) { } for _, tt := range testCases { + // create serial number to use in test. must be unique for each run. + serialNumber1 := testrand.SerialNumber() + err = ordersDB.CreateSerialInfo(ctx, serialNumber1, []byte(bucketID), now.AddDate(1, 0, 10)) + require.NoError(t, err) + + serialNumber2 := testrand.SerialNumber() + err = ordersDB.CreateSerialInfo(ctx, serialNumber2, []byte(bucketID), now.AddDate(1, 0, 10)) + require.NoError(t, err) + + piecePublicKey, piecePrivateKey, err := storj.NewPieceKey() + require.NoError(t, err) + // create signed orderlimit or order to test with limit1 := &pb.OrderLimit{ SerialNumber: serialNumber1, @@ -79,11 +103,13 @@ func TestSettlementWithWindowEndpointManyOrders(t *testing.T) { } orderLimit1, err := signing.SignOrderLimit(ctx, signing.SignerFromFullIdentity(satellite.Identity), limit1) require.NoError(t, err) + order1, err := signing.SignUplinkOrder(ctx, piecePrivateKey, &pb.Order{ SerialNumber: serialNumber1, Amount: tt.dataAmount, }) require.NoError(t, err) + limit2 := &pb.OrderLimit{ SerialNumber: serialNumber2, SatelliteId: satellite.ID(), @@ -98,6 +124,7 @@ func TestSettlementWithWindowEndpointManyOrders(t *testing.T) { } orderLimit2, err := signing.SignOrderLimit(ctx, signing.SignerFromFullIdentity(satellite.Identity), limit2) require.NoError(t, err) + order2, err := signing.SignUplinkOrder(ctx, piecePrivateKey, &pb.Order{ SerialNumber: serialNumber2, Amount: tt.dataAmount, @@ -107,8 +134,12 @@ func TestSettlementWithWindowEndpointManyOrders(t *testing.T) { // create connection between storagenode and satellite conn, err := storagenode.Dialer.DialNodeURL(ctx, storj.NodeURL{ID: satellite.ID(), Address: satellite.Addr()}) require.NoError(t, err) + defer ctx.Check(conn.Close) + stream, err := pb.NewDRPCOrdersClient(conn).SettlementWithWindow(ctx) require.NoError(t, err) + defer ctx.Check(stream.Close) + // storagenode settles an order and orderlimit err = stream.Send(&pb.SettlementRequest{ Limit: orderLimit1, @@ -123,14 +154,25 @@ func TestSettlementWithWindowEndpointManyOrders(t *testing.T) { resp, err := stream.CloseAndRecv() require.NoError(t, err) - settled := map[int32]int64{int32(pb.PieceAction_PUT): tt.settledAmt} - require.Equal(t, &pb.SettlementWithWindowResponse{Status: pb.SettlementWithWindowResponse_ACCEPTED, ActionSettled: settled}, resp) + // the settled amount is only returned during phase3 + var settled map[int32]int64 + if satellite.Config.Orders.WindowEndpointRolloutPhase == orders.WindowEndpointRolloutPhase3 { + settled = map[int32]int64{int32(pb.PieceAction_PUT): tt.settledAmt} + } + require.Equal(t, &pb.SettlementWithWindowResponse{ + Status: pb.SettlementWithWindowResponse_ACCEPTED, + ActionSettled: settled, + }, resp) + + // trigger and wait for all of the chores necessary to flush the orders + assert.NoError(t, satellite.Accounting.ReportedRollup.RunOnce(ctx, tt.orderCreation)) + satellite.Orders.Chore.Loop.TriggerWait() + // assert all the right stuff is in the satellite storagenode and bucket bandwidth tables snbw, err = ordersDB.GetStorageNodeBandwidth(ctx, storagenode.ID(), time.Time{}, tt.orderCreation) require.NoError(t, err) require.EqualValues(t, tt.settledAmt, snbw) - satellite.Orders.Chore.Loop.TriggerWait() newBbw, err := ordersDB.GetBucketBandwidth(ctx, projectID, []byte(bucketname), time.Time{}, tt.orderCreation) require.NoError(t, err) require.EqualValues(t, tt.settledAmt, newBbw) @@ -138,53 +180,52 @@ func TestSettlementWithWindowEndpointManyOrders(t *testing.T) { }) } func TestSettlementWithWindowEndpointSingleOrder(t *testing.T) { - t.Skip("endpoint currently disabled") - - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + runTestWithPhases(t, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + const dataAmount int64 = 50 satellite := planet.Satellites[0] ordersDB := satellite.Orders.DB storagenode := planet.StorageNodes[0] - now := time.Now().UTC() + now := time.Now() projectID := testrand.UUID() bucketname := "testbucket" + bucketID := storj.JoinPaths(projectID.String(), bucketname) // stop any async flushes because we want to be sure when some values are // written to avoid races satellite.Orders.Chore.Loop.Pause() + satellite.Accounting.ReportedRollup.Loop.Pause() // confirm storagenode and bucket bandwidth tables start empty snbw, err := ordersDB.GetStorageNodeBandwidth(ctx, satellite.ID(), time.Time{}, now) require.NoError(t, err) require.EqualValues(t, 0, snbw) + bucketbw, err := ordersDB.GetBucketBandwidth(ctx, projectID, []byte(bucketname), time.Time{}, now) require.NoError(t, err) require.EqualValues(t, 0, bucketbw) // create serial number to use in test serialNumber := testrand.SerialNumber() - bucketID := storj.JoinPaths(projectID.String(), bucketname) err = ordersDB.CreateSerialInfo(ctx, serialNumber, []byte(bucketID), now.AddDate(1, 0, 10)) require.NoError(t, err) + piecePublicKey, piecePrivateKey, err := storj.NewPieceKey() require.NoError(t, err) var testCases = []struct { name string - serialNumber storj.SerialNumber dataAmount int64 expectedStatus pb.SettlementWithWindowResponse_Status }{ - {"first settlement", serialNumber, int64(50), pb.SettlementWithWindowResponse_ACCEPTED}, - {"settle the same a second time, matches first", serialNumber, int64(50), pb.SettlementWithWindowResponse_ACCEPTED}, - {"settle a third time, doesn't match first", serialNumber, int64(0), pb.SettlementWithWindowResponse_REJECTED}, + {"first settlement", dataAmount, pb.SettlementWithWindowResponse_ACCEPTED}, + {"settle the same a second time, matches first", dataAmount, pb.SettlementWithWindowResponse_ACCEPTED}, + {"settle a third time, doesn't match first", dataAmount + 1, pb.SettlementWithWindowResponse_REJECTED}, } for _, tt := range testCases { // create signed orderlimit or order to test with limit := &pb.OrderLimit{ - SerialNumber: tt.serialNumber, + SerialNumber: serialNumber, SatelliteId: satellite.ID(), UplinkPublicKey: piecePublicKey, StorageNodeId: storagenode.ID(), @@ -197,8 +238,9 @@ func TestSettlementWithWindowEndpointSingleOrder(t *testing.T) { } orderLimit, err := signing.SignOrderLimit(ctx, signing.SignerFromFullIdentity(satellite.Identity), limit) require.NoError(t, err) + order, err := signing.SignUplinkOrder(ctx, piecePrivateKey, &pb.Order{ - SerialNumber: tt.serialNumber, + SerialNumber: serialNumber, Amount: tt.dataAmount, }) require.NoError(t, err) @@ -206,8 +248,12 @@ func TestSettlementWithWindowEndpointSingleOrder(t *testing.T) { // create connection between storagenode and satellite conn, err := storagenode.Dialer.DialNodeURL(ctx, storj.NodeURL{ID: satellite.ID(), Address: satellite.Addr()}) require.NoError(t, err) + defer ctx.Check(conn.Close) + stream, err := pb.NewDRPCOrdersClient(conn).SettlementWithWindow(ctx) require.NoError(t, err) + defer ctx.Check(stream.Close) + // storagenode settles an order and orderlimit err = stream.Send(&pb.SettlementRequest{ Limit: orderLimit, @@ -216,71 +262,76 @@ func TestSettlementWithWindowEndpointSingleOrder(t *testing.T) { require.NoError(t, err) resp, err := stream.CloseAndRecv() require.NoError(t, err) - settled := map[int32]int64{int32(pb.PieceAction_PUT): tt.dataAmount} - if tt.expectedStatus == pb.SettlementWithWindowResponse_REJECTED { - require.Equal(t, &pb.SettlementWithWindowResponse{Status: tt.expectedStatus, ActionSettled: nil}, resp) - } else { - require.Equal(t, &pb.SettlementWithWindowResponse{Status: tt.expectedStatus, ActionSettled: settled}, resp) - } - // assert all the right stuff is in the satellite storagenode and bucket bandwidth tables - snbw, err = ordersDB.GetStorageNodeBandwidth(ctx, storagenode.ID(), time.Time{}, time.Now().UTC()) - require.NoError(t, err) - if tt.expectedStatus == pb.SettlementWithWindowResponse_REJECTED { - require.NotEqual(t, tt.dataAmount, snbw) - } else { - require.Equal(t, tt.dataAmount, snbw) - } - // wait for rollup_write_cache to flush, this on average takes 1ms to sleep to complete - satellite.Orders.Chore.Loop.TriggerWait() - newBbw, err := ordersDB.GetBucketBandwidth(ctx, projectID, []byte(bucketname), time.Time{}, time.Now().UTC()) - require.NoError(t, err) - if tt.expectedStatus == pb.SettlementWithWindowResponse_REJECTED { - require.NotEqual(t, tt.dataAmount, newBbw) - } else { - require.Equal(t, tt.dataAmount, newBbw) + expected := new(pb.SettlementWithWindowResponse) + switch { + case satellite.Config.Orders.WindowEndpointRolloutPhase != orders.WindowEndpointRolloutPhase3: + expected.Status = pb.SettlementWithWindowResponse_ACCEPTED + expected.ActionSettled = nil + case tt.expectedStatus == pb.SettlementWithWindowResponse_ACCEPTED: + expected.Status = pb.SettlementWithWindowResponse_ACCEPTED + expected.ActionSettled = map[int32]int64{int32(pb.PieceAction_PUT): tt.dataAmount} + default: + expected.Status = pb.SettlementWithWindowResponse_REJECTED + expected.ActionSettled = nil } + require.Equal(t, expected, resp) + + // flush all the chores + assert.NoError(t, satellite.Accounting.ReportedRollup.RunOnce(ctx, now)) + satellite.Orders.Chore.Loop.TriggerWait() + + // assert all the right stuff is in the satellite storagenode and bucket bandwidth tables + snbw, err = ordersDB.GetStorageNodeBandwidth(ctx, storagenode.ID(), time.Time{}, now) + require.NoError(t, err) + require.Equal(t, dataAmount, snbw) + + newBbw, err := ordersDB.GetBucketBandwidth(ctx, projectID, []byte(bucketname), time.Time{}, now) + require.NoError(t, err) + require.Equal(t, dataAmount, newBbw) } }) } func TestSettlementWithWindowEndpointErrors(t *testing.T) { - t.Skip("endpoint currently disabled") - - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + runTestWithPhases(t, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { satellite := planet.Satellites[0] ordersDB := satellite.Orders.DB storagenode := planet.StorageNodes[0] - now := time.Now().UTC() + now := time.Now() projectID := testrand.UUID() bucketname := "testbucket" + bucketID := storj.JoinPaths(projectID.String(), bucketname) // stop any async flushes because we want to be sure when some values are // written to avoid races satellite.Orders.Chore.Loop.Pause() + satellite.Accounting.ReportedRollup.Loop.Pause() // confirm storagenode and bucket bandwidth tables start empty snbw, err := ordersDB.GetStorageNodeBandwidth(ctx, satellite.ID(), time.Time{}, now) require.NoError(t, err) require.EqualValues(t, 0, snbw) + bucketbw, err := ordersDB.GetBucketBandwidth(ctx, projectID, []byte(bucketname), time.Time{}, now) require.NoError(t, err) require.EqualValues(t, 0, bucketbw) // create serial number to use in test serialNumber1 := testrand.SerialNumber() - serialNumber2 := testrand.SerialNumber() - bucketID := storj.JoinPaths(projectID.String(), bucketname) err = ordersDB.CreateSerialInfo(ctx, serialNumber1, []byte(bucketID), now.AddDate(1, 0, 10)) require.NoError(t, err) + + serialNumber2 := testrand.SerialNumber() err = ordersDB.CreateSerialInfo(ctx, serialNumber2, []byte(bucketID), now.AddDate(1, 0, 10)) require.NoError(t, err) + piecePublicKey1, piecePrivateKey1, err := storj.NewPieceKey() require.NoError(t, err) + _, piecePrivateKey2, err := storj.NewPieceKey() require.NoError(t, err) + limit := pb.OrderLimit{ SerialNumber: serialNumber1, SatelliteId: satellite.ID(), @@ -295,16 +346,19 @@ func TestSettlementWithWindowEndpointErrors(t *testing.T) { } orderLimit1, err := signing.SignOrderLimit(ctx, signing.SignerFromFullIdentity(satellite.Identity), &limit) require.NoError(t, err) + order1, err := signing.SignUplinkOrder(ctx, piecePrivateKey1, &pb.Order{ SerialNumber: serialNumber1, Amount: int64(50), }) require.NoError(t, err) + order2, err := signing.SignUplinkOrder(ctx, piecePrivateKey1, &pb.Order{ SerialNumber: serialNumber2, Amount: int64(50), }) require.NoError(t, err) + order3, err := signing.SignUplinkOrder(ctx, piecePrivateKey2, &pb.Order{ SerialNumber: serialNumber2, Amount: int64(50), @@ -327,28 +381,154 @@ func TestSettlementWithWindowEndpointErrors(t *testing.T) { t.Run(tt.name, func(t *testing.T) { conn, err := storagenode.Dialer.DialNodeURL(ctx, storj.NodeURL{ID: satellite.ID(), Address: satellite.Addr()}) require.NoError(t, err) + defer ctx.Check(conn.Close) + stream, err := pb.NewDRPCOrdersClient(conn).SettlementWithWindow(ctx) require.NoError(t, err) + defer ctx.Check(stream.Close) + err = stream.Send(&pb.SettlementRequest{ Limit: tt.orderLimit, Order: tt.order, }) require.NoError(t, err) + resp, err := stream.CloseAndRecv() require.NoError(t, err) - require.Equal(t, &pb.SettlementWithWindowResponse{Status: pb.SettlementWithWindowResponse_REJECTED, ActionSettled: nil}, resp) + require.Equal(t, &pb.SettlementWithWindowResponse{ + Status: pb.SettlementWithWindowResponse_REJECTED, + ActionSettled: nil, + }, resp) + + // flush all the chores + assert.NoError(t, satellite.Accounting.ReportedRollup.RunOnce(ctx, now)) + satellite.Orders.Chore.Loop.TriggerWait() // assert no data was added to satellite storagenode or bucket bandwidth tables - snbw, err = ordersDB.GetStorageNodeBandwidth(ctx, storagenode.ID(), time.Time{}, time.Now().UTC()) + snbw, err = ordersDB.GetStorageNodeBandwidth(ctx, storagenode.ID(), time.Time{}, now) require.NoError(t, err) require.EqualValues(t, 0, snbw) - // wait for rollup_write_cache to flush - satellite.Orders.Chore.Loop.TriggerWait() - newBbw, err := ordersDB.GetBucketBandwidth(ctx, projectID, []byte(bucketname), time.Time{}, time.Now().UTC()) + newBbw, err := ordersDB.GetBucketBandwidth(ctx, projectID, []byte(bucketname), time.Time{}, now) require.NoError(t, err) require.EqualValues(t, 0, newBbw) }) } }) } + +func TestSettlementEndpointSingleOrder(t *testing.T) { + runTestWithPhases(t, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + const dataAmount int64 = 50 + satellite := planet.Satellites[0] + ordersDB := satellite.Orders.DB + storagenode := planet.StorageNodes[0] + now := time.Now() + projectID := testrand.UUID() + bucketname := "testbucket" + bucketID := storj.JoinPaths(projectID.String(), bucketname) + + // stop any async flushes because we want to be sure when some values are + // written to avoid races + satellite.Orders.Chore.Loop.Pause() + satellite.Accounting.ReportedRollup.Loop.Pause() + + // confirm storagenode and bucket bandwidth tables start empty + snbw, err := ordersDB.GetStorageNodeBandwidth(ctx, satellite.ID(), time.Time{}, now) + require.NoError(t, err) + require.EqualValues(t, 0, snbw) + + bucketbw, err := ordersDB.GetBucketBandwidth(ctx, projectID, []byte(bucketname), time.Time{}, now) + require.NoError(t, err) + require.EqualValues(t, 0, bucketbw) + + // create serial number to use in test + serialNumber := testrand.SerialNumber() + err = ordersDB.CreateSerialInfo(ctx, serialNumber, []byte(bucketID), now.AddDate(1, 0, 10)) + require.NoError(t, err) + + piecePublicKey, piecePrivateKey, err := storj.NewPieceKey() + require.NoError(t, err) + + // create signed orderlimit or order to test with + limit := &pb.OrderLimit{ + SerialNumber: serialNumber, + SatelliteId: satellite.ID(), + UplinkPublicKey: piecePublicKey, + StorageNodeId: storagenode.ID(), + PieceId: storj.NewPieceID(), + Action: pb.PieceAction_PUT, + Limit: 1000, + PieceExpiration: time.Time{}, + OrderCreation: now, + OrderExpiration: now.Add(24 * time.Hour), + } + orderLimit, err := signing.SignOrderLimit(ctx, signing.SignerFromFullIdentity(satellite.Identity), limit) + require.NoError(t, err) + + order, err := signing.SignUplinkOrder(ctx, piecePrivateKey, &pb.Order{ + SerialNumber: serialNumber, + Amount: dataAmount, + }) + require.NoError(t, err) + + // create connection between storagenode and satellite + conn, err := storagenode.Dialer.DialNodeURL(ctx, storj.NodeURL{ID: satellite.ID(), Address: satellite.Addr()}) + require.NoError(t, err) + defer ctx.Check(conn.Close) + + stream, err := pb.NewDRPCOrdersClient(conn).Settlement(ctx) + require.NoError(t, err) + defer ctx.Check(stream.Close) + + // storagenode settles an order and orderlimit + var resp *pb.SettlementResponse + if satellite.Config.Orders.WindowEndpointRolloutPhase == orders.WindowEndpointRolloutPhase1 { + err = stream.Send(&pb.SettlementRequest{ + Limit: orderLimit, + Order: order, + }) + require.NoError(t, err) + require.NoError(t, stream.CloseSend()) + + resp, err = stream.Recv() + require.NoError(t, err) + } else { + // in phase2 and phase3, the endpoint is disabled. depending on how fast the + // server sends that error message, we may see an io.EOF on the Send call, or + // we may see no error at all. In either case, we have to call stream.Recv to + // see the actual error. gRPC semantics are funky. + err = stream.Send(&pb.SettlementRequest{ + Limit: orderLimit, + Order: order, + }) + if err != io.EOF { + require.NoError(t, err) + } + require.NoError(t, stream.CloseSend()) + + _, err = stream.Recv() + require.Error(t, err) + require.Equal(t, rpcstatus.Unavailable, rpcstatus.Code(err)) + return + } + + require.Equal(t, &pb.SettlementResponse{ + SerialNumber: serialNumber, + Status: pb.SettlementResponse_ACCEPTED, + }, resp) + + // flush all the chores + assert.NoError(t, satellite.Accounting.ReportedRollup.RunOnce(ctx, now)) + satellite.Orders.Chore.Loop.TriggerWait() + + // assert all the right stuff is in the satellite storagenode and bucket bandwidth tables + snbw, err = ordersDB.GetStorageNodeBandwidth(ctx, storagenode.ID(), time.Time{}, now) + require.NoError(t, err) + require.Equal(t, dataAmount, snbw) + + newBbw, err := ordersDB.GetBucketBandwidth(ctx, projectID, []byte(bucketname), time.Time{}, now) + require.NoError(t, err) + require.Equal(t, dataAmount, newBbw) + }) +} diff --git a/satellite/orders/service.go b/satellite/orders/service.go index 875d6376d..b8546db10 100644 --- a/satellite/orders/service.go +++ b/satellite/orders/service.go @@ -29,12 +29,13 @@ var ErrDownloadFailedNotEnoughPieces = errs.Class("not enough pieces for downloa // Config is a configuration struct for orders Service. type Config struct { - Expiration time.Duration `help:"how long until an order expires" default:"48h"` // 2 days - SettlementBatchSize int `help:"how many orders to batch per transaction" default:"250"` - FlushBatchSize int `help:"how many items in the rollups write cache before they are flushed to the database" devDefault:"20" releaseDefault:"10000"` - FlushInterval time.Duration `help:"how often to flush the rollups write cache to the database" devDefault:"30s" releaseDefault:"1m"` - ReportedRollupsReadBatchSize int `help:"how many records to read in a single transaction when calculating billable bandwidth" default:"1000"` - NodeStatusLogging bool `hidden:"true" help:"deprecated, log the offline/disqualification status of nodes" default:"false"` + Expiration time.Duration `help:"how long until an order expires" default:"48h"` // 2 days + SettlementBatchSize int `help:"how many orders to batch per transaction" default:"250"` + FlushBatchSize int `help:"how many items in the rollups write cache before they are flushed to the database" devDefault:"20" releaseDefault:"10000"` + FlushInterval time.Duration `help:"how often to flush the rollups write cache to the database" devDefault:"30s" releaseDefault:"1m"` + ReportedRollupsReadBatchSize int `help:"how many records to read in a single transaction when calculating billable bandwidth" default:"1000"` + NodeStatusLogging bool `hidden:"true" help:"deprecated, log the offline/disqualification status of nodes" default:"false"` + WindowEndpointRolloutPhase WindowEndpointRolloutPhase `help:"rollout phase for the windowed endpoint" default:"phase1"` } // BucketsDB returns information about buckets. diff --git a/satellite/orders/window_endpoint_phase.go b/satellite/orders/window_endpoint_phase.go new file mode 100644 index 000000000..79579faef --- /dev/null +++ b/satellite/orders/window_endpoint_phase.go @@ -0,0 +1,60 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +package orders + +import ( + "fmt" + "strings" + + "github.com/zeebo/errs" +) + +// WindowEndpointRolloutPhase controls the phase of the new orders endpoint rollout. +type WindowEndpointRolloutPhase int + +const ( + // WindowEndpointRolloutPhase1 is when both the old and new endpoint are enabled and + // the new endpoint places orders in the queue just like the old endpoint. + WindowEndpointRolloutPhase1 WindowEndpointRolloutPhase = 1 + iota + + // WindowEndpointRolloutPhase2 is when the old endpoint is disabled and the new endpint + // places orders in the queue just like the old endpoint used to. + WindowEndpointRolloutPhase2 + + // WindowEndpointRolloutPhase3 is when the old endpoint is disabled and the new endpoint + // does not use a queue and just does direct insertion of rollup values. + WindowEndpointRolloutPhase3 +) + +// String provides a human readable form of the rollout phase. +func (phase WindowEndpointRolloutPhase) String() string { + switch phase { + case WindowEndpointRolloutPhase1: + return "phase1" + case WindowEndpointRolloutPhase2: + return "phase2" + case WindowEndpointRolloutPhase3: + return "phase3" + default: + return fmt.Sprintf("WindowEndpointRolloutPhase(%d)", int(phase)) + } +} + +// Set implements flag.Value interface. +func (phase *WindowEndpointRolloutPhase) Set(s string) error { + switch strings.ToLower(s) { + case "phase1": + *phase = WindowEndpointRolloutPhase1 + case "phase2": + *phase = WindowEndpointRolloutPhase2 + case "phase3": + *phase = WindowEndpointRolloutPhase3 + default: + return errs.New("invalid window endpoint rollout phase: %q", s) + } + return nil +} + +// Type implements pflag.Value. +func (WindowEndpointRolloutPhase) Type() string { return "orders.WindowEndpointRolloutPhase" } diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index d211ba197..f7baec106 100644 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -451,6 +451,9 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key # how many orders to batch per transaction # orders.settlement-batch-size: 250 +# rollout phase for the windowed endpoint +# orders.window-endpoint-rollout-phase: phase1 + # disable node cache # overlay.node-selection-cache.disabled: false