satellite/orders: rollout phase3 of SettlementWithWindow endpoint

Change-Id: Id19fae4f444c83157ce58c933a18be1898430ad0
This commit is contained in:
Jessica Grebenschikov 2020-10-20 11:54:17 -07:00 committed by Jess G
parent 9a29ec5b3e
commit f5880f6833
8 changed files with 74 additions and 17 deletions

View File

@ -483,7 +483,7 @@ func (planet *Planet) newSatellites(count int, satelliteDatabases satellitedbtes
FlushBatchSize: 10,
FlushInterval: defaultInterval,
NodeStatusLogging: true,
WindowEndpointRolloutPhase: orders.WindowEndpointRolloutPhase1,
WindowEndpointRolloutPhase: orders.WindowEndpointRolloutPhase3,
},
Checker: checker.Config{
Interval: defaultInterval,

View File

@ -281,7 +281,7 @@ func TestBilling_AuditRepairTraffic(t *testing.T) {
})
}
func TestBilling_DownloadAndNoUploadTraffic(t *testing.T) {
func TestBilling_UploadNoEgress(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
@ -320,15 +320,50 @@ func TestBilling_DownloadAndNoUploadTraffic(t *testing.T) {
usage = getProjectTotal(ctx, t, planet, 0, projectID, since)
require.Zero(t, usage.Egress, "billed usage")
})
}
func TestBilling_DownloadTraffic(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.ReconfigureRS(2, 3, 4, 4),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
const (
bucketName = "a-bucket"
objectKey = "object-filename"
)
satelliteSys := planet.Satellites[0]
// Make sure that we don't have interference with billed repair traffic
// in case of a bug. There is a specific test to verify that the repair
// traffic isn't billed.
satelliteSys.Audit.Chore.Loop.Stop()
satelliteSys.Repair.Repairer.Loop.Stop()
// stop any async flushes because we want to be sure when some values are
// written to avoid races
satelliteSys.Orders.Chore.Loop.Pause()
var (
uplnk = planet.Uplinks[0]
projectID = uplnk.Projects[0].ID
)
{
data := testrand.Bytes(10 * memory.KiB)
err := uplnk.Upload(ctx, satelliteSys, bucketName, objectKey, data)
require.NoError(t, err)
}
_, err := uplnk.Download(ctx, satelliteSys, bucketName, objectKey)
require.NoError(t, err)
usage = getProjectTotal(ctx, t, planet, 0, projectID, since)
since := time.Now().Add(-10 * time.Hour)
usage := getProjectTotal(ctx, t, planet, 0, projectID, since)
require.NotZero(t, usage.Egress, "billed usage")
})
}
func TestBilling_ExpiredFiles(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,

View File

@ -216,14 +216,17 @@ func TestQueryAttribution(t *testing.T) {
require.NoError(t, err)
}
// Wait for the storage nodes to be done processing the download
require.NoError(t, planet.WaitForStorageNodeEndpoints(ctx))
{ // Flush all the pending information through the system.
// Calculate the usage used for upload
for _, sn := range planet.StorageNodes {
sn.Storage2.Orders.SendOrders(ctx, tomorrow)
}
rollout := planet.Satellites[0].Core.Accounting.ReportedRollupChore
require.NoError(t, rollout.RunOnce(ctx, now))
// The orders chore writes bucket bandwidth rollup changes to satellitedb
planet.Satellites[0].Orders.Chore.Loop.TriggerWait()
// Trigger tally so it gets all set up and can return a storage usage
planet.Satellites[0].Accounting.Tally.Loop.TriggerWait()
@ -291,8 +294,8 @@ func TestAttributionReport(t *testing.T) {
sn.Storage2.Orders.SendOrders(ctx, tomorrow)
}
rollout := planet.Satellites[0].Core.Accounting.ReportedRollupChore
require.NoError(t, rollout.RunOnce(ctx, now))
// The orders chore writes bucket bandwidth rollup changes to satellitedb
planet.Satellites[0].Orders.Chore.Loop.TriggerWait()
// Trigger tally so it gets all set up and can return a storage usage
planet.Satellites[0].Accounting.Tally.Loop.TriggerWait()

View File

@ -554,6 +554,17 @@ func (endpoint *Endpoint) SettlementWithWindowMigration(stream pb.DRPCOrders_Set
})
}
func trackFinalStatus(status pb.SettlementWithWindowResponse_Status) {
switch status {
case pb.SettlementWithWindowResponse_ACCEPTED:
mon.Event("settlement_response_accepted")
case pb.SettlementWithWindowResponse_REJECTED:
mon.Event("settlement_response_rejected")
default:
mon.Event("settlement_response_unknown")
}
}
// SettlementWithWindowFinal processes all orders that were created in a 1 hour window.
// Only one window is processed at a time.
// Batches are atomic, all orders are settled successfully or they all fail.
@ -561,6 +572,10 @@ func (endpoint *Endpoint) SettlementWithWindowFinal(stream pb.DRPCOrders_Settlem
ctx := stream.Context()
defer mon.Task()(&ctx)(&err)
var alreadyProcessed bool
var status pb.SettlementWithWindowResponse_Status
defer trackFinalStatus(status)
peer, err := identity.PeerIdentityFromContext(ctx)
if err != nil {
endpoint.log.Debug("err peer identity from context", zap.Error(err))
@ -640,12 +655,13 @@ func (endpoint *Endpoint) SettlementWithWindowFinal(stream pb.DRPCOrders_Settlem
}
if len(storagenodeSettled) == 0 {
log.Debug("no orders were successfully processed", zap.Int("received count", receivedCount))
status = pb.SettlementWithWindowResponse_REJECTED
return stream.SendAndClose(&pb.SettlementWithWindowResponse{
Status: pb.SettlementWithWindowResponse_REJECTED,
Status: status,
ActionSettled: storagenodeSettled,
})
}
status, alreadyProcessed, err := endpoint.DB.UpdateStoragenodeBandwidthSettleWithWindow(
status, alreadyProcessed, err = endpoint.DB.UpdateStoragenodeBandwidthSettleWithWindow(
ctx, peer.ID, storagenodeSettled, time.Unix(0, window),
)
if err != nil {

View File

@ -184,6 +184,7 @@ func TestUploadDownloadBandwidth(t *testing.T) {
for _, storageNode := range planet.StorageNodes {
storageNode.Storage2.Orders.SendOrders(ctx, tomorrow)
}
planet.Satellites[0].Orders.Chore.Loop.TriggerWait()
reportedRollupChore := planet.Satellites[0].Core.Accounting.ReportedRollupChore
require.NoError(t, reportedRollupChore.RunOnce(ctx, now))

View File

@ -35,7 +35,7 @@ type Config struct {
FlushInterval time.Duration `help:"how often to flush the rollups write cache to the database" devDefault:"30s" releaseDefault:"1m"`
ReportedRollupsReadBatchSize int `help:"how many records to read in a single transaction when calculating billable bandwidth" default:"1000"`
NodeStatusLogging bool `hidden:"true" help:"deprecated, log the offline/disqualification status of nodes" default:"false"`
WindowEndpointRolloutPhase WindowEndpointRolloutPhase `help:"rollout phase for the windowed endpoint" default:"phase2"`
WindowEndpointRolloutPhase WindowEndpointRolloutPhase `help:"rollout phase for the windowed endpoint" default:"phase3"`
OrdersSemaphoreSize int `help:"how many concurrent orders to process at once. zero is unlimited" default:"2"`
}

View File

@ -473,7 +473,7 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
# orders.settlement-batch-size: 250
# rollout phase for the windowed endpoint
# orders.window-endpoint-rollout-phase: phase2
# orders.window-endpoint-rollout-phase: phase3
# The length of time to give suspended SNOs to diagnose and fix issues causing downtime. Afterwards, they will have one tracking period to reach the minimum online score before disqualification
# overlay.audit-history.grace-period: 168h0m0s

View File

@ -69,13 +69,15 @@ func TestOrderDBSettle(t *testing.T) {
// trigger order send
service.Sender.TriggerWait()
// in phase3 the orders are only sent from the filestore
// so we expect any orders in ordersDB will remain there
toSend, err = node.DB.Orders().ListUnsent(ctx, 10)
require.NoError(t, err)
require.Len(t, toSend, 0)
require.Len(t, toSend, 1)
archived, err := node.DB.Orders().ListArchived(ctx, 10)
require.NoError(t, err)
require.Len(t, archived, 1)
require.Len(t, archived, 0)
})
}
@ -178,14 +180,14 @@ func TestOrderFileStoreAndDBSettle(t *testing.T) {
// trigger order send
service.SendOrders(ctx, tomorrow)
// DB and filestore orders should both be archived.
// DB should not be archived in phase3, but and filestore orders should be archived.
toSendDB, err = node.DB.Orders().ListUnsent(ctx, 10)
require.NoError(t, err)
require.Len(t, toSendDB, 0)
require.Len(t, toSendDB, 1)
archived, err := node.DB.Orders().ListArchived(ctx, 10)
require.NoError(t, err)
require.Len(t, archived, 1)
require.Len(t, archived, 0)
toSendFileStore, err = node.OrdersStore.ListUnsentBySatellite(ctx, tomorrow)
require.NoError(t, err)