satellite/metainfo: enable UsePendingObjectsTable by project

This small feature will give us ability to test pending_objects table
without enabling it globally.

Change-Id: I802f45987ad329f94adfc0f02957c802b21d8251
This commit is contained in:
Michal Niewrzal 2023-08-04 15:25:36 +02:00
parent 780c0e0b35
commit 67371c43bd
4 changed files with 91 additions and 7 deletions

View File

@ -12,6 +12,7 @@ import (
"github.com/vivint/infectious"
"storj.io/common/memory"
"storj.io/common/uuid"
"storj.io/storj/satellite/metabase"
"storj.io/uplink/private/eestream"
)
@ -147,6 +148,8 @@ type Config struct {
ServerSideCopyDisabled bool `help:"disable already enabled server-side copy. this is because once server side copy is enabled, delete code should stay changed, even if you want to disable server side copy" default:"false"`
UsePendingObjectsTable bool `help:"enable new flow for upload which is using pending_objects table" default:"false"`
// flag to simplify testing by enabling feature only for specific projects
UsePendingObjectsTableProjects []string `help:"list of projects which will have UsePendingObjectsTable feature flag enabled" default:"" hidden:"true"`
// TODO remove when we benchmarking are done and decision is made.
TestListingQuery bool `default:"false" help:"test the new query for non-recursive listing"`
@ -161,3 +164,38 @@ func (c Config) Metabase(applicationName string) metabase.Config {
ServerSideCopy: c.ServerSideCopy,
}
}
// ExtendedConfig extended config keeps additional helper fields and methods around Config.
// TODO potentially can be removed when UsePendingObjectsTableProjects won't be used anymore.
type ExtendedConfig struct {
Config
usePendingObjectsTableProjects []uuid.UUID
}
// NewExtendedConfig creates new instance of extended config.
func NewExtendedConfig(config Config) (ExtendedConfig, error) {
extendedConfig := ExtendedConfig{Config: config}
for _, projectIDString := range config.UsePendingObjectsTableProjects {
projectID, err := uuid.FromString(projectIDString)
if err != nil {
return ExtendedConfig{}, err
}
extendedConfig.usePendingObjectsTableProjects = append(extendedConfig.usePendingObjectsTableProjects, projectID)
}
return extendedConfig, nil
}
// UsePendingObjectsTableByProject checks if UsePendingObjectsTable should be enabled for specific project.
func (ec ExtendedConfig) UsePendingObjectsTableByProject(projectID uuid.UUID) bool {
// if its globally enabled don't look at projects
if ec.UsePendingObjectsTable {
return true
}
for _, p := range ec.usePendingObjectsTableProjects {
if p == projectID {
return true
}
}
return false
}

View File

@ -9,6 +9,8 @@ import (
"github.com/stretchr/testify/require"
"storj.io/common/memory"
"storj.io/common/testrand"
"storj.io/common/uuid"
"storj.io/storj/satellite/metainfo"
)
@ -96,3 +98,42 @@ func TestRSConfigValidation(t *testing.T) {
}
}
}
func TestExtendedConfig_UsePendingObjectsTable(t *testing.T) {
projectA := testrand.UUID()
projectB := testrand.UUID()
projectC := testrand.UUID()
config, err := metainfo.NewExtendedConfig(metainfo.Config{
UsePendingObjectsTable: false,
UsePendingObjectsTableProjects: []string{
projectA.String(),
projectB.String(),
},
})
require.NoError(t, err)
require.True(t, config.UsePendingObjectsTableByProject(projectA))
require.True(t, config.UsePendingObjectsTableByProject(projectB))
require.False(t, config.UsePendingObjectsTableByProject(projectC))
config, err = metainfo.NewExtendedConfig(metainfo.Config{
UsePendingObjectsTable: true,
UsePendingObjectsTableProjects: []string{
projectA.String(),
},
})
require.NoError(t, err)
require.True(t, config.UsePendingObjectsTableByProject(projectA))
require.True(t, config.UsePendingObjectsTableByProject(projectB))
require.True(t, config.UsePendingObjectsTableByProject(projectC))
config, err = metainfo.NewExtendedConfig(metainfo.Config{
UsePendingObjectsTable: false,
UsePendingObjectsTableProjects: []string{
"01000000-0000-0000-0000-000000000000",
},
})
require.NoError(t, err)
require.True(t, config.UsePendingObjectsTableByProject(uuid.UUID{1}))
}

View File

@ -80,7 +80,7 @@ type Endpoint struct {
encInlineSegmentSize int64 // max inline segment size + encryption overhead
revocations revocation.DB
defaultRS *pb.RedundancyScheme
config Config
config ExtendedConfig
versionCollector *versionCollector
}
@ -91,6 +91,11 @@ func NewEndpoint(log *zap.Logger, buckets *buckets.Service, metabaseDB *metabase
satellite signing.Signer, revocations revocation.DB, config Config) (*Endpoint, error) {
// TODO do something with too many params
extendedConfig, err := NewExtendedConfig(config)
if err != nil {
return nil, err
}
encInlineSegmentSize, err := encryption.CalcEncryptedSize(config.MaxInlineSegmentSize.Int64(), storj.EncryptionParameters{
CipherSuite: storj.EncAESGCM,
BlockSize: 128, // intentionally low block size to allow maximum possible encryption overhead
@ -133,7 +138,7 @@ func NewEndpoint(log *zap.Logger, buckets *buckets.Service, metabaseDB *metabase
encInlineSegmentSize: encInlineSegmentSize,
revocations: revocations,
defaultRS: defaultRSScheme,
config: config,
config: extendedConfig,
versionCollector: newVersionCollector(log),
}, nil
}

View File

@ -126,6 +126,7 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe
nonce = req.EncryptedMetadataNonce[:]
}
usePendingObjectsTable := endpoint.config.UsePendingObjectsTableByProject(keyInfo.ProjectID)
object, err := endpoint.metabase.BeginObjectNextVersion(ctx, metabase.BeginObjectNextVersion{
ObjectStream: metabase.ObjectStream{
ProjectID: keyInfo.ProjectID,
@ -141,7 +142,7 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe
EncryptedMetadataEncryptedKey: req.EncryptedMetadataEncryptedKey,
EncryptedMetadataNonce: nonce,
UsePendingObjectsTable: endpoint.config.UsePendingObjectsTable,
UsePendingObjectsTable: usePendingObjectsTable,
})
if err != nil {
return nil, endpoint.convertMetabaseErr(err)
@ -158,7 +159,7 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe
EncryptionParameters: req.EncryptionParameters,
Placement: int32(placement),
UsePendingObjectsTable: endpoint.config.UsePendingObjectsTable,
UsePendingObjectsTable: usePendingObjectsTable,
})
if err != nil {
endpoint.log.Error("internal", zap.Error(err))
@ -903,7 +904,7 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq
}
resp.More = result.More
} else {
if status == metabase.Pending && endpoint.config.UsePendingObjectsTable {
if status == metabase.Pending && endpoint.config.UsePendingObjectsTableByProject(keyInfo.ProjectID) {
type ObjectListItem struct {
Item *pb.ObjectListItem
StreamID uuid.UUID
@ -1107,8 +1108,7 @@ func (endpoint *Endpoint) ListPendingObjectStreams(ctx context.Context, req *pb.
BatchSize: limit + 1,
Cursor: cursor,
}
if endpoint.config.UsePendingObjectsTable {
if endpoint.config.UsePendingObjectsTableByProject(keyInfo.ProjectID) {
err = endpoint.metabase.IteratePendingObjectsByKeyNew(ctx,
options, func(ctx context.Context, it metabase.PendingObjectsIterator) error {
entry := metabase.PendingObjectEntry{}