satellite/orders: stop inserting/reading from serial_numbers table
This PR contains the minimum changes needed to stop inserting into the serial_numbers table. This is the first step in completely deprecating that table. The next step is to create another PR to remove the expiredSerial chore, fix more tests, and remove any other methods on the serial_number table. Change-Id: I5f12a56ebf3fa4d1a1976141d2911f25a98d2cc3
This commit is contained in:
parent
aaa4a9f31b
commit
97a5e6c814
@ -647,52 +647,32 @@ func (endpoint *Endpoint) SettlementWithWindowFinal(stream pb.DRPCOrders_Settlem
|
||||
|
||||
storagenodeSettled[int32(orderLimit.Action)] += order.Amount
|
||||
|
||||
var bucketName string
|
||||
var projectID uuid.UUID
|
||||
if len(orderLimit.EncryptedMetadata) > 0 {
|
||||
metadata, err := endpoint.ordersService.DecryptOrderMetadata(ctx, orderLimit)
|
||||
if err != nil {
|
||||
log.Info("decrypt order metadata err:", zap.Error(err))
|
||||
mon.Event("bucketinfo_from_orders_metadata_error_1")
|
||||
goto idFromSerialTable
|
||||
}
|
||||
bucketInfo, err := metabase.ParseBucketPrefix(
|
||||
metabase.BucketPrefix(metadata.GetProjectBucketPrefix()),
|
||||
)
|
||||
if err != nil {
|
||||
log.Info("decrypt order: ParseBucketPrefix", zap.Error(err))
|
||||
mon.Event("bucketinfo_from_orders_metadata_error_2")
|
||||
goto idFromSerialTable
|
||||
}
|
||||
bucketName = bucketInfo.BucketName
|
||||
projectID = bucketInfo.ProjectID
|
||||
mon.Event("bucketinfo_from_orders_metadata")
|
||||
metadata, err := endpoint.ordersService.DecryptOrderMetadata(ctx, orderLimit)
|
||||
if err != nil {
|
||||
log.Debug("decrypt order metadata err:", zap.Error(err))
|
||||
mon.Event("bucketinfo_from_orders_metadata_error_1")
|
||||
continue
|
||||
}
|
||||
|
||||
// If we cannot get the bucket name and project ID from the orderLimit metadata, then fallback
|
||||
// to the old method of getting it from the serial_numbers table.
|
||||
// This is only temporary to make sure the orderLimit metadata is working correctly.
|
||||
idFromSerialTable:
|
||||
if bucketName == "" || projectID.IsZero() {
|
||||
bucketPrefix, err := endpoint.DB.GetBucketIDFromSerialNumber(ctx, serialNum)
|
||||
if err != nil {
|
||||
log.Info("get bucketPrefix from serial number table err", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
bucket, err := metabase.ParseBucketPrefix(metabase.BucketPrefix(bucketPrefix))
|
||||
if err != nil {
|
||||
log.Info("split bucket err", zap.Error(err), zap.String("bucketPrefix", string(bucketPrefix)))
|
||||
continue
|
||||
}
|
||||
bucketName = bucket.BucketName
|
||||
projectID = bucket.ProjectID
|
||||
mon.Event("bucketinfo_from_serial_number")
|
||||
bucketInfo, err := metabase.ParseBucketPrefix(
|
||||
metabase.BucketPrefix(metadata.GetProjectBucketPrefix()),
|
||||
)
|
||||
if err != nil {
|
||||
log.Debug("decrypt order: ParseBucketPrefix", zap.Error(err))
|
||||
mon.Event("bucketinfo_from_orders_metadata_error_2")
|
||||
continue
|
||||
}
|
||||
if bucketInfo.BucketName == "" || bucketInfo.ProjectID.IsZero() {
|
||||
log.Info("decrypt order: bucketName or projectID not set",
|
||||
zap.String("bucketName", bucketInfo.BucketName),
|
||||
zap.String("projectID", bucketInfo.ProjectID.String()),
|
||||
)
|
||||
mon.Event("bucketinfo_from_orders_metadata_error_3")
|
||||
continue
|
||||
}
|
||||
|
||||
bucketSettled[bucketIDAction{
|
||||
bucketname: bucketName,
|
||||
projectID: projectID,
|
||||
bucketname: bucketInfo.BucketName,
|
||||
projectID: bucketInfo.ProjectID,
|
||||
action: orderLimit.Action,
|
||||
}] += order.Amount
|
||||
}
|
||||
|
@ -43,7 +43,9 @@ func runTestWithPhases(t *testing.T, fn func(t *testing.T, ctx *testcontext.Cont
|
||||
}
|
||||
|
||||
func TestSettlementWithWindowEndpointManyOrders(t *testing.T) {
|
||||
runTestWithPhases(t, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
satellite := planet.Satellites[0]
|
||||
ordersDB := satellite.Orders.DB
|
||||
storagenode := planet.StorageNodes[0]
|
||||
@ -51,6 +53,7 @@ func TestSettlementWithWindowEndpointManyOrders(t *testing.T) {
|
||||
projectID := testrand.UUID()
|
||||
bucketname := "testbucket"
|
||||
bucketID := storj.JoinPaths(projectID.String(), bucketname)
|
||||
key := satellite.Config.Orders.EncryptionKeys.Default
|
||||
|
||||
// stop any async flushes because we want to be sure when some values are
|
||||
// written to avoid races
|
||||
@ -79,11 +82,21 @@ func TestSettlementWithWindowEndpointManyOrders(t *testing.T) {
|
||||
func() {
|
||||
// create serial number to use in test. must be unique for each run.
|
||||
serialNumber1 := testrand.SerialNumber()
|
||||
err = ordersDB.CreateSerialInfo(ctx, serialNumber1, []byte(bucketID), now.AddDate(1, 0, 10))
|
||||
encrypted1, err := key.EncryptMetadata(
|
||||
serialNumber1,
|
||||
&pb.OrderLimitMetadata{
|
||||
ProjectBucketPrefix: []byte(bucketID),
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
serialNumber2 := testrand.SerialNumber()
|
||||
err = ordersDB.CreateSerialInfo(ctx, serialNumber2, []byte(bucketID), now.AddDate(1, 0, 10))
|
||||
encrypted2, err := key.EncryptMetadata(
|
||||
serialNumber2,
|
||||
&pb.OrderLimitMetadata{
|
||||
ProjectBucketPrefix: []byte(bucketID),
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
piecePublicKey, piecePrivateKey, err := storj.NewPieceKey()
|
||||
@ -91,16 +104,18 @@ func TestSettlementWithWindowEndpointManyOrders(t *testing.T) {
|
||||
|
||||
// create signed orderlimit or order to test with
|
||||
limit1 := &pb.OrderLimit{
|
||||
SerialNumber: serialNumber1,
|
||||
SatelliteId: satellite.ID(),
|
||||
UplinkPublicKey: piecePublicKey,
|
||||
StorageNodeId: storagenode.ID(),
|
||||
PieceId: storj.NewPieceID(),
|
||||
Action: pb.PieceAction_PUT,
|
||||
Limit: 1000,
|
||||
PieceExpiration: time.Time{},
|
||||
OrderCreation: tt.orderCreation,
|
||||
OrderExpiration: now.Add(24 * time.Hour),
|
||||
SerialNumber: serialNumber1,
|
||||
SatelliteId: satellite.ID(),
|
||||
UplinkPublicKey: piecePublicKey,
|
||||
StorageNodeId: storagenode.ID(),
|
||||
PieceId: storj.NewPieceID(),
|
||||
Action: pb.PieceAction_PUT,
|
||||
Limit: 1000,
|
||||
PieceExpiration: time.Time{},
|
||||
OrderCreation: tt.orderCreation,
|
||||
OrderExpiration: now.Add(24 * time.Hour),
|
||||
EncryptedMetadataKeyId: key.ID[:],
|
||||
EncryptedMetadata: encrypted1,
|
||||
}
|
||||
orderLimit1, err := signing.SignOrderLimit(ctx, signing.SignerFromFullIdentity(satellite.Identity), limit1)
|
||||
require.NoError(t, err)
|
||||
@ -112,16 +127,18 @@ func TestSettlementWithWindowEndpointManyOrders(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
limit2 := &pb.OrderLimit{
|
||||
SerialNumber: serialNumber2,
|
||||
SatelliteId: satellite.ID(),
|
||||
UplinkPublicKey: piecePublicKey,
|
||||
StorageNodeId: storagenode.ID(),
|
||||
PieceId: storj.NewPieceID(),
|
||||
Action: pb.PieceAction_PUT,
|
||||
Limit: 1000,
|
||||
PieceExpiration: time.Time{},
|
||||
OrderCreation: now,
|
||||
OrderExpiration: now.Add(24 * time.Hour),
|
||||
SerialNumber: serialNumber2,
|
||||
SatelliteId: satellite.ID(),
|
||||
UplinkPublicKey: piecePublicKey,
|
||||
StorageNodeId: storagenode.ID(),
|
||||
PieceId: storj.NewPieceID(),
|
||||
Action: pb.PieceAction_PUT,
|
||||
Limit: 1000,
|
||||
PieceExpiration: time.Time{},
|
||||
OrderCreation: now,
|
||||
OrderExpiration: now.Add(24 * time.Hour),
|
||||
EncryptedMetadataKeyId: key.ID[:],
|
||||
EncryptedMetadata: encrypted2,
|
||||
}
|
||||
orderLimit2, err := signing.SignOrderLimit(ctx, signing.SignerFromFullIdentity(satellite.Identity), limit2)
|
||||
require.NoError(t, err)
|
||||
@ -182,7 +199,10 @@ func TestSettlementWithWindowEndpointManyOrders(t *testing.T) {
|
||||
})
|
||||
}
|
||||
func TestSettlementWithWindowEndpointSingleOrder(t *testing.T) {
|
||||
runTestWithPhases(t, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
|
||||
const dataAmount int64 = 50
|
||||
satellite := planet.Satellites[0]
|
||||
ordersDB := satellite.Orders.DB
|
||||
@ -191,6 +211,7 @@ func TestSettlementWithWindowEndpointSingleOrder(t *testing.T) {
|
||||
projectID := testrand.UUID()
|
||||
bucketname := "testbucket"
|
||||
bucketID := storj.JoinPaths(projectID.String(), bucketname)
|
||||
key := satellite.Config.Orders.EncryptionKeys.Default
|
||||
|
||||
// stop any async flushes because we want to be sure when some values are
|
||||
// written to avoid races
|
||||
@ -208,7 +229,12 @@ func TestSettlementWithWindowEndpointSingleOrder(t *testing.T) {
|
||||
|
||||
// create serial number to use in test
|
||||
serialNumber := testrand.SerialNumber()
|
||||
err = ordersDB.CreateSerialInfo(ctx, serialNumber, []byte(bucketID), now.AddDate(1, 0, 10))
|
||||
encrypted, err := key.EncryptMetadata(
|
||||
serialNumber,
|
||||
&pb.OrderLimitMetadata{
|
||||
ProjectBucketPrefix: []byte(bucketID),
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
piecePublicKey, piecePrivateKey, err := storj.NewPieceKey()
|
||||
@ -228,16 +254,18 @@ func TestSettlementWithWindowEndpointSingleOrder(t *testing.T) {
|
||||
func() {
|
||||
// create signed orderlimit or order to test with
|
||||
limit := &pb.OrderLimit{
|
||||
SerialNumber: serialNumber,
|
||||
SatelliteId: satellite.ID(),
|
||||
UplinkPublicKey: piecePublicKey,
|
||||
StorageNodeId: storagenode.ID(),
|
||||
PieceId: storj.NewPieceID(),
|
||||
Action: pb.PieceAction_PUT,
|
||||
Limit: 1000,
|
||||
PieceExpiration: time.Time{},
|
||||
OrderCreation: now,
|
||||
OrderExpiration: now.Add(24 * time.Hour),
|
||||
SerialNumber: serialNumber,
|
||||
SatelliteId: satellite.ID(),
|
||||
UplinkPublicKey: piecePublicKey,
|
||||
StorageNodeId: storagenode.ID(),
|
||||
PieceId: storj.NewPieceID(),
|
||||
Action: pb.PieceAction_PUT,
|
||||
Limit: 1000,
|
||||
PieceExpiration: time.Time{},
|
||||
OrderCreation: now,
|
||||
OrderExpiration: now.Add(24 * time.Hour),
|
||||
EncryptedMetadataKeyId: key.ID[:],
|
||||
EncryptedMetadata: encrypted,
|
||||
}
|
||||
orderLimit, err := signing.SignOrderLimit(ctx, signing.SignerFromFullIdentity(satellite.Identity), limit)
|
||||
require.NoError(t, err)
|
||||
|
@ -32,7 +32,7 @@ var (
|
||||
// Config is a configuration struct for orders Service.
|
||||
type Config struct {
|
||||
EncryptionKeys EncryptionKeys `help:"encryption keys to encrypt info in orders" default:""`
|
||||
IncludeEncryptedMetadata bool `help:"include encrypted metadata in the order limit" default:"false"`
|
||||
IncludeEncryptedMetadata bool `help:"include encrypted metadata in the order limit" default:"true"`
|
||||
Expiration time.Duration `help:"how long until an order expires" default:"48h"` // 2 days
|
||||
SettlementBatchSize int `help:"how many orders to batch per transaction" default:"250"`
|
||||
FlushBatchSize int `help:"how many items in the rollups write cache before they are flushed to the database" devDefault:"20" releaseDefault:"10000"`
|
||||
@ -103,11 +103,6 @@ func (service *Service) VerifyOrderLimitSignature(ctx context.Context, signed *p
|
||||
return signing.VerifyOrderLimitSignature(ctx, service.satellite, signed)
|
||||
}
|
||||
|
||||
func (service *Service) saveSerial(ctx context.Context, serialNumber storj.SerialNumber, bucket metabase.BucketLocation, expiresAt time.Time) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
return service.orders.CreateSerialInfo(ctx, serialNumber, []byte(bucket.Prefix()), expiresAt)
|
||||
}
|
||||
|
||||
func (service *Service) updateBandwidth(ctx context.Context, bucket metabase.BucketLocation, addressedOrderLimits ...*pb.AddressedOrderLimit) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
if len(addressedOrderLimits) == 0 {
|
||||
@ -195,11 +190,6 @@ func (service *Service) CreateGetOrderLimits(ctx context.Context, bucket metabas
|
||||
return nil, storj.PiecePrivateKey{}, ErrDownloadFailedNotEnoughPieces.New("not enough orderlimits: got %d, required %d", len(signer.AddressedLimits), redundancy.RequiredCount())
|
||||
}
|
||||
|
||||
err = service.saveSerial(ctx, signer.Serial, bucket, signer.OrderExpiration)
|
||||
if err != nil {
|
||||
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
||||
}
|
||||
|
||||
if err := service.updateBandwidth(ctx, bucket, signer.AddressedLimits...); err != nil {
|
||||
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
||||
}
|
||||
@ -233,11 +223,6 @@ func (service *Service) CreatePutOrderLimits(ctx context.Context, bucket metabas
|
||||
}
|
||||
}
|
||||
|
||||
err = service.saveSerial(ctx, signer.Serial, bucket, signer.OrderExpiration)
|
||||
if err != nil {
|
||||
return storj.PieceID{}, nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
||||
}
|
||||
|
||||
if err := service.updateBandwidth(ctx, bucket, signer.AddressedLimits...); err != nil {
|
||||
return storj.PieceID{}, nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
||||
}
|
||||
@ -287,11 +272,6 @@ func (service *Service) CreateDeleteOrderLimits(ctx context.Context, bucket meta
|
||||
return nil, storj.PiecePrivateKey{}, Error.New("failed creating order limits: %w", nodeErrors.Err())
|
||||
}
|
||||
|
||||
err = service.saveSerial(ctx, signer.Serial, bucket, signer.OrderExpiration)
|
||||
if err != nil {
|
||||
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
||||
}
|
||||
|
||||
return signer.AddressedLimits, signer.PrivateKey, nil
|
||||
}
|
||||
|
||||
@ -353,11 +333,6 @@ func (service *Service) CreateAuditOrderLimits(ctx context.Context, bucket metab
|
||||
err = Error.New("not enough nodes available: got %d, required %d", limitsCount, redundancy.GetMinReq())
|
||||
return nil, storj.PiecePrivateKey{}, nil, errs.Combine(err, nodeErrors.Err())
|
||||
}
|
||||
|
||||
err = service.saveSerial(ctx, signer.Serial, bucket, signer.OrderExpiration)
|
||||
if err != nil {
|
||||
return nil, storj.PiecePrivateKey{}, nil, Error.Wrap(err)
|
||||
}
|
||||
if err := service.updateBandwidth(ctx, bucket, limits...); err != nil {
|
||||
return nil, storj.PiecePrivateKey{}, nil, Error.Wrap(err)
|
||||
}
|
||||
@ -397,10 +372,6 @@ func (service *Service) CreateAuditOrderLimit(ctx context.Context, bucket metaba
|
||||
return nil, storj.PiecePrivateKey{}, "", Error.Wrap(err)
|
||||
}
|
||||
|
||||
err = service.saveSerial(ctx, signer.Serial, bucket, signer.OrderExpiration)
|
||||
if err != nil {
|
||||
return nil, storj.PiecePrivateKey{}, "", Error.Wrap(err)
|
||||
}
|
||||
if err := service.updateBandwidth(ctx, bucket, limit); err != nil {
|
||||
return nil, storj.PiecePrivateKey{}, "", Error.Wrap(err)
|
||||
}
|
||||
@ -468,10 +439,6 @@ func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, bucket m
|
||||
return nil, storj.PiecePrivateKey{}, errs.Combine(err, nodeErrors.Err())
|
||||
}
|
||||
|
||||
err = service.saveSerial(ctx, signer.Serial, bucket, signer.OrderExpiration)
|
||||
if err != nil {
|
||||
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
||||
}
|
||||
if err := service.updateBandwidth(ctx, bucket, limits...); err != nil {
|
||||
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
||||
}
|
||||
@ -538,10 +505,6 @@ func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, bucket m
|
||||
}
|
||||
}
|
||||
|
||||
err = service.saveSerial(ctx, signer.Serial, bucket, signer.OrderExpiration)
|
||||
if err != nil {
|
||||
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
||||
}
|
||||
if err := service.updateBandwidth(ctx, bucket, limits...); err != nil {
|
||||
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
||||
}
|
||||
@ -580,10 +543,6 @@ func (service *Service) CreateGracefulExitPutOrderLimit(ctx context.Context, buc
|
||||
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
||||
}
|
||||
|
||||
err = service.saveSerial(ctx, signer.Serial, bucket, signer.OrderExpiration)
|
||||
if err != nil {
|
||||
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
||||
}
|
||||
if err := service.updateBandwidth(ctx, bucket, limit); err != nil {
|
||||
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
||||
}
|
||||
|
@ -59,12 +59,5 @@ func TestOrderLimitsEncryptedMetadata(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, bucketName, actualBucketInfo.BucketName)
|
||||
require.Equal(t, projectID, actualBucketInfo.ProjectID)
|
||||
|
||||
bucketPrefix, err := satellitePeer.Orders.DB.GetBucketIDFromSerialNumber(ctx, orderLimit1.SerialNumber)
|
||||
require.NoError(t, err)
|
||||
bucket1, err := metabase.ParseBucketPrefix(metabase.BucketPrefix(bucketPrefix))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, actualBucketInfo.BucketName, bucket1.BucketName)
|
||||
require.Equal(t, actualBucketInfo.ProjectID, bucket1.ProjectID)
|
||||
})
|
||||
}
|
||||
|
2
scripts/testdata/satellite-config.yaml.lock
vendored
2
scripts/testdata/satellite-config.yaml.lock
vendored
@ -434,7 +434,7 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
|
||||
# orders.flush-interval: 1m0s
|
||||
|
||||
# include encrypted metadata in the order limit
|
||||
# orders.include-encrypted-metadata: false
|
||||
# orders.include-encrypted-metadata: true
|
||||
|
||||
# how many concurrent orders to process at once. zero is unlimited
|
||||
# orders.orders-semaphore-size: 2
|
||||
|
Loading…
Reference in New Issue
Block a user