From 97a5e6c814c934380b49ca676fb995d46b5f25f1 Mon Sep 17 00:00:00 2001 From: Jessica Grebenschikov Date: Thu, 17 Dec 2020 06:46:46 -0800 Subject: [PATCH] satellite/orders: stop inserting/reading from serial_numbers table This PR contains the minimum changes needed to stop inserting into the serial_numbers table. This is the first step in completely deprecating that table. The next step is to create another PR to remove the expiredSerial chore, fix more tests, and remove any other methods on the serial_number table. Change-Id: I5f12a56ebf3fa4d1a1976141d2911f25a98d2cc3 --- satellite/orders/endpoint.go | 64 +++++--------- satellite/orders/endpoint_test.go | 98 +++++++++++++-------- satellite/orders/service.go | 43 +-------- satellite/orders/service_test.go | 7 -- scripts/testdata/satellite-config.yaml.lock | 2 +- 5 files changed, 87 insertions(+), 127 deletions(-) diff --git a/satellite/orders/endpoint.go b/satellite/orders/endpoint.go index f8100a044..55fde3dee 100644 --- a/satellite/orders/endpoint.go +++ b/satellite/orders/endpoint.go @@ -647,52 +647,32 @@ func (endpoint *Endpoint) SettlementWithWindowFinal(stream pb.DRPCOrders_Settlem storagenodeSettled[int32(orderLimit.Action)] += order.Amount - var bucketName string - var projectID uuid.UUID - if len(orderLimit.EncryptedMetadata) > 0 { - metadata, err := endpoint.ordersService.DecryptOrderMetadata(ctx, orderLimit) - if err != nil { - log.Info("decrypt order metadata err:", zap.Error(err)) - mon.Event("bucketinfo_from_orders_metadata_error_1") - goto idFromSerialTable - } - bucketInfo, err := metabase.ParseBucketPrefix( - metabase.BucketPrefix(metadata.GetProjectBucketPrefix()), - ) - if err != nil { - log.Info("decrypt order: ParseBucketPrefix", zap.Error(err)) - mon.Event("bucketinfo_from_orders_metadata_error_2") - goto idFromSerialTable - } - bucketName = bucketInfo.BucketName - projectID = bucketInfo.ProjectID - mon.Event("bucketinfo_from_orders_metadata") + metadata, err := endpoint.ordersService.DecryptOrderMetadata(ctx, orderLimit) + if err != nil { + log.Debug("decrypt order metadata err:", zap.Error(err)) + mon.Event("bucketinfo_from_orders_metadata_error_1") + continue } - - // If we cannot get the bucket name and project ID from the orderLimit metadata, then fallback - // to the old method of getting it from the serial_numbers table. - // This is only temporary to make sure the orderLimit metadata is working correctly. - idFromSerialTable: - if bucketName == "" || projectID.IsZero() { - bucketPrefix, err := endpoint.DB.GetBucketIDFromSerialNumber(ctx, serialNum) - if err != nil { - log.Info("get bucketPrefix from serial number table err", zap.Error(err)) - continue - } - - bucket, err := metabase.ParseBucketPrefix(metabase.BucketPrefix(bucketPrefix)) - if err != nil { - log.Info("split bucket err", zap.Error(err), zap.String("bucketPrefix", string(bucketPrefix))) - continue - } - bucketName = bucket.BucketName - projectID = bucket.ProjectID - mon.Event("bucketinfo_from_serial_number") + bucketInfo, err := metabase.ParseBucketPrefix( + metabase.BucketPrefix(metadata.GetProjectBucketPrefix()), + ) + if err != nil { + log.Debug("decrypt order: ParseBucketPrefix", zap.Error(err)) + mon.Event("bucketinfo_from_orders_metadata_error_2") + continue + } + if bucketInfo.BucketName == "" || bucketInfo.ProjectID.IsZero() { + log.Info("decrypt order: bucketName or projectID not set", + zap.String("bucketName", bucketInfo.BucketName), + zap.String("projectID", bucketInfo.ProjectID.String()), + ) + mon.Event("bucketinfo_from_orders_metadata_error_3") + continue } bucketSettled[bucketIDAction{ - bucketname: bucketName, - projectID: projectID, + bucketname: bucketInfo.BucketName, + projectID: bucketInfo.ProjectID, action: orderLimit.Action, }] += order.Amount } diff --git a/satellite/orders/endpoint_test.go b/satellite/orders/endpoint_test.go index 9ca8ad622..55821d95d 100644 --- a/satellite/orders/endpoint_test.go +++ b/satellite/orders/endpoint_test.go @@ -43,7 +43,9 @@ func runTestWithPhases(t *testing.T, fn func(t *testing.T, ctx *testcontext.Cont } func TestSettlementWithWindowEndpointManyOrders(t *testing.T) { - runTestWithPhases(t, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { satellite := planet.Satellites[0] ordersDB := satellite.Orders.DB storagenode := planet.StorageNodes[0] @@ -51,6 +53,7 @@ func TestSettlementWithWindowEndpointManyOrders(t *testing.T) { projectID := testrand.UUID() bucketname := "testbucket" bucketID := storj.JoinPaths(projectID.String(), bucketname) + key := satellite.Config.Orders.EncryptionKeys.Default // stop any async flushes because we want to be sure when some values are // written to avoid races @@ -79,11 +82,21 @@ func TestSettlementWithWindowEndpointManyOrders(t *testing.T) { func() { // create serial number to use in test. must be unique for each run. serialNumber1 := testrand.SerialNumber() - err = ordersDB.CreateSerialInfo(ctx, serialNumber1, []byte(bucketID), now.AddDate(1, 0, 10)) + encrypted1, err := key.EncryptMetadata( + serialNumber1, + &pb.OrderLimitMetadata{ + ProjectBucketPrefix: []byte(bucketID), + }, + ) require.NoError(t, err) serialNumber2 := testrand.SerialNumber() - err = ordersDB.CreateSerialInfo(ctx, serialNumber2, []byte(bucketID), now.AddDate(1, 0, 10)) + encrypted2, err := key.EncryptMetadata( + serialNumber2, + &pb.OrderLimitMetadata{ + ProjectBucketPrefix: []byte(bucketID), + }, + ) require.NoError(t, err) piecePublicKey, piecePrivateKey, err := storj.NewPieceKey() @@ -91,16 +104,18 @@ func TestSettlementWithWindowEndpointManyOrders(t *testing.T) { // create signed orderlimit or order to test with limit1 := &pb.OrderLimit{ - SerialNumber: serialNumber1, - SatelliteId: satellite.ID(), - UplinkPublicKey: piecePublicKey, - StorageNodeId: storagenode.ID(), - PieceId: storj.NewPieceID(), - Action: pb.PieceAction_PUT, - Limit: 1000, - PieceExpiration: time.Time{}, - OrderCreation: tt.orderCreation, - OrderExpiration: now.Add(24 * time.Hour), + SerialNumber: serialNumber1, + SatelliteId: satellite.ID(), + UplinkPublicKey: piecePublicKey, + StorageNodeId: storagenode.ID(), + PieceId: storj.NewPieceID(), + Action: pb.PieceAction_PUT, + Limit: 1000, + PieceExpiration: time.Time{}, + OrderCreation: tt.orderCreation, + OrderExpiration: now.Add(24 * time.Hour), + EncryptedMetadataKeyId: key.ID[:], + EncryptedMetadata: encrypted1, } orderLimit1, err := signing.SignOrderLimit(ctx, signing.SignerFromFullIdentity(satellite.Identity), limit1) require.NoError(t, err) @@ -112,16 +127,18 @@ func TestSettlementWithWindowEndpointManyOrders(t *testing.T) { require.NoError(t, err) limit2 := &pb.OrderLimit{ - SerialNumber: serialNumber2, - SatelliteId: satellite.ID(), - UplinkPublicKey: piecePublicKey, - StorageNodeId: storagenode.ID(), - PieceId: storj.NewPieceID(), - Action: pb.PieceAction_PUT, - Limit: 1000, - PieceExpiration: time.Time{}, - OrderCreation: now, - OrderExpiration: now.Add(24 * time.Hour), + SerialNumber: serialNumber2, + SatelliteId: satellite.ID(), + UplinkPublicKey: piecePublicKey, + StorageNodeId: storagenode.ID(), + PieceId: storj.NewPieceID(), + Action: pb.PieceAction_PUT, + Limit: 1000, + PieceExpiration: time.Time{}, + OrderCreation: now, + OrderExpiration: now.Add(24 * time.Hour), + EncryptedMetadataKeyId: key.ID[:], + EncryptedMetadata: encrypted2, } orderLimit2, err := signing.SignOrderLimit(ctx, signing.SignerFromFullIdentity(satellite.Identity), limit2) require.NoError(t, err) @@ -182,7 +199,10 @@ func TestSettlementWithWindowEndpointManyOrders(t *testing.T) { }) } func TestSettlementWithWindowEndpointSingleOrder(t *testing.T) { - runTestWithPhases(t, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + const dataAmount int64 = 50 satellite := planet.Satellites[0] ordersDB := satellite.Orders.DB @@ -191,6 +211,7 @@ func TestSettlementWithWindowEndpointSingleOrder(t *testing.T) { projectID := testrand.UUID() bucketname := "testbucket" bucketID := storj.JoinPaths(projectID.String(), bucketname) + key := satellite.Config.Orders.EncryptionKeys.Default // stop any async flushes because we want to be sure when some values are // written to avoid races @@ -208,7 +229,12 @@ func TestSettlementWithWindowEndpointSingleOrder(t *testing.T) { // create serial number to use in test serialNumber := testrand.SerialNumber() - err = ordersDB.CreateSerialInfo(ctx, serialNumber, []byte(bucketID), now.AddDate(1, 0, 10)) + encrypted, err := key.EncryptMetadata( + serialNumber, + &pb.OrderLimitMetadata{ + ProjectBucketPrefix: []byte(bucketID), + }, + ) require.NoError(t, err) piecePublicKey, piecePrivateKey, err := storj.NewPieceKey() @@ -228,16 +254,18 @@ func TestSettlementWithWindowEndpointSingleOrder(t *testing.T) { func() { // create signed orderlimit or order to test with limit := &pb.OrderLimit{ - SerialNumber: serialNumber, - SatelliteId: satellite.ID(), - UplinkPublicKey: piecePublicKey, - StorageNodeId: storagenode.ID(), - PieceId: storj.NewPieceID(), - Action: pb.PieceAction_PUT, - Limit: 1000, - PieceExpiration: time.Time{}, - OrderCreation: now, - OrderExpiration: now.Add(24 * time.Hour), + SerialNumber: serialNumber, + SatelliteId: satellite.ID(), + UplinkPublicKey: piecePublicKey, + StorageNodeId: storagenode.ID(), + PieceId: storj.NewPieceID(), + Action: pb.PieceAction_PUT, + Limit: 1000, + PieceExpiration: time.Time{}, + OrderCreation: now, + OrderExpiration: now.Add(24 * time.Hour), + EncryptedMetadataKeyId: key.ID[:], + EncryptedMetadata: encrypted, } orderLimit, err := signing.SignOrderLimit(ctx, signing.SignerFromFullIdentity(satellite.Identity), limit) require.NoError(t, err) diff --git a/satellite/orders/service.go b/satellite/orders/service.go index c78fe7f33..cea5a7759 100644 --- a/satellite/orders/service.go +++ b/satellite/orders/service.go @@ -32,7 +32,7 @@ var ( // Config is a configuration struct for orders Service. type Config struct { EncryptionKeys EncryptionKeys `help:"encryption keys to encrypt info in orders" default:""` - IncludeEncryptedMetadata bool `help:"include encrypted metadata in the order limit" default:"false"` + IncludeEncryptedMetadata bool `help:"include encrypted metadata in the order limit" default:"true"` Expiration time.Duration `help:"how long until an order expires" default:"48h"` // 2 days SettlementBatchSize int `help:"how many orders to batch per transaction" default:"250"` FlushBatchSize int `help:"how many items in the rollups write cache before they are flushed to the database" devDefault:"20" releaseDefault:"10000"` @@ -103,11 +103,6 @@ func (service *Service) VerifyOrderLimitSignature(ctx context.Context, signed *p return signing.VerifyOrderLimitSignature(ctx, service.satellite, signed) } -func (service *Service) saveSerial(ctx context.Context, serialNumber storj.SerialNumber, bucket metabase.BucketLocation, expiresAt time.Time) (err error) { - defer mon.Task()(&ctx)(&err) - return service.orders.CreateSerialInfo(ctx, serialNumber, []byte(bucket.Prefix()), expiresAt) -} - func (service *Service) updateBandwidth(ctx context.Context, bucket metabase.BucketLocation, addressedOrderLimits ...*pb.AddressedOrderLimit) (err error) { defer mon.Task()(&ctx)(&err) if len(addressedOrderLimits) == 0 { @@ -195,11 +190,6 @@ func (service *Service) CreateGetOrderLimits(ctx context.Context, bucket metabas return nil, storj.PiecePrivateKey{}, ErrDownloadFailedNotEnoughPieces.New("not enough orderlimits: got %d, required %d", len(signer.AddressedLimits), redundancy.RequiredCount()) } - err = service.saveSerial(ctx, signer.Serial, bucket, signer.OrderExpiration) - if err != nil { - return nil, storj.PiecePrivateKey{}, Error.Wrap(err) - } - if err := service.updateBandwidth(ctx, bucket, signer.AddressedLimits...); err != nil { return nil, storj.PiecePrivateKey{}, Error.Wrap(err) } @@ -233,11 +223,6 @@ func (service *Service) CreatePutOrderLimits(ctx context.Context, bucket metabas } } - err = service.saveSerial(ctx, signer.Serial, bucket, signer.OrderExpiration) - if err != nil { - return storj.PieceID{}, nil, storj.PiecePrivateKey{}, Error.Wrap(err) - } - if err := service.updateBandwidth(ctx, bucket, signer.AddressedLimits...); err != nil { return storj.PieceID{}, nil, storj.PiecePrivateKey{}, Error.Wrap(err) } @@ -287,11 +272,6 @@ func (service *Service) CreateDeleteOrderLimits(ctx context.Context, bucket meta return nil, storj.PiecePrivateKey{}, Error.New("failed creating order limits: %w", nodeErrors.Err()) } - err = service.saveSerial(ctx, signer.Serial, bucket, signer.OrderExpiration) - if err != nil { - return nil, storj.PiecePrivateKey{}, Error.Wrap(err) - } - return signer.AddressedLimits, signer.PrivateKey, nil } @@ -353,11 +333,6 @@ func (service *Service) CreateAuditOrderLimits(ctx context.Context, bucket metab err = Error.New("not enough nodes available: got %d, required %d", limitsCount, redundancy.GetMinReq()) return nil, storj.PiecePrivateKey{}, nil, errs.Combine(err, nodeErrors.Err()) } - - err = service.saveSerial(ctx, signer.Serial, bucket, signer.OrderExpiration) - if err != nil { - return nil, storj.PiecePrivateKey{}, nil, Error.Wrap(err) - } if err := service.updateBandwidth(ctx, bucket, limits...); err != nil { return nil, storj.PiecePrivateKey{}, nil, Error.Wrap(err) } @@ -397,10 +372,6 @@ func (service *Service) CreateAuditOrderLimit(ctx context.Context, bucket metaba return nil, storj.PiecePrivateKey{}, "", Error.Wrap(err) } - err = service.saveSerial(ctx, signer.Serial, bucket, signer.OrderExpiration) - if err != nil { - return nil, storj.PiecePrivateKey{}, "", Error.Wrap(err) - } if err := service.updateBandwidth(ctx, bucket, limit); err != nil { return nil, storj.PiecePrivateKey{}, "", Error.Wrap(err) } @@ -468,10 +439,6 @@ func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, bucket m return nil, storj.PiecePrivateKey{}, errs.Combine(err, nodeErrors.Err()) } - err = service.saveSerial(ctx, signer.Serial, bucket, signer.OrderExpiration) - if err != nil { - return nil, storj.PiecePrivateKey{}, Error.Wrap(err) - } if err := service.updateBandwidth(ctx, bucket, limits...); err != nil { return nil, storj.PiecePrivateKey{}, Error.Wrap(err) } @@ -538,10 +505,6 @@ func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, bucket m } } - err = service.saveSerial(ctx, signer.Serial, bucket, signer.OrderExpiration) - if err != nil { - return nil, storj.PiecePrivateKey{}, Error.Wrap(err) - } if err := service.updateBandwidth(ctx, bucket, limits...); err != nil { return nil, storj.PiecePrivateKey{}, Error.Wrap(err) } @@ -580,10 +543,6 @@ func (service *Service) CreateGracefulExitPutOrderLimit(ctx context.Context, buc return nil, storj.PiecePrivateKey{}, Error.Wrap(err) } - err = service.saveSerial(ctx, signer.Serial, bucket, signer.OrderExpiration) - if err != nil { - return nil, storj.PiecePrivateKey{}, Error.Wrap(err) - } if err := service.updateBandwidth(ctx, bucket, limit); err != nil { return nil, storj.PiecePrivateKey{}, Error.Wrap(err) } diff --git a/satellite/orders/service_test.go b/satellite/orders/service_test.go index 3a557759a..442ecf5d5 100644 --- a/satellite/orders/service_test.go +++ b/satellite/orders/service_test.go @@ -59,12 +59,5 @@ func TestOrderLimitsEncryptedMetadata(t *testing.T) { require.NoError(t, err) require.Equal(t, bucketName, actualBucketInfo.BucketName) require.Equal(t, projectID, actualBucketInfo.ProjectID) - - bucketPrefix, err := satellitePeer.Orders.DB.GetBucketIDFromSerialNumber(ctx, orderLimit1.SerialNumber) - require.NoError(t, err) - bucket1, err := metabase.ParseBucketPrefix(metabase.BucketPrefix(bucketPrefix)) - require.NoError(t, err) - require.Equal(t, actualBucketInfo.BucketName, bucket1.BucketName) - require.Equal(t, actualBucketInfo.ProjectID, bucket1.ProjectID) }) } diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index c890e222c..716e1008f 100755 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -434,7 +434,7 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key # orders.flush-interval: 1m0s # include encrypted metadata in the order limit -# orders.include-encrypted-metadata: false +# orders.include-encrypted-metadata: true # how many concurrent orders to process at once. zero is unlimited # orders.orders-semaphore-size: 2