diff --git a/satellite/metainfo/config.go b/satellite/metainfo/config.go index 4cab876d3..8ceceb943 100644 --- a/satellite/metainfo/config.go +++ b/satellite/metainfo/config.go @@ -12,6 +12,7 @@ import ( "github.com/vivint/infectious" "storj.io/common/memory" + "storj.io/common/uuid" "storj.io/storj/satellite/metabase" "storj.io/uplink/private/eestream" ) @@ -147,6 +148,8 @@ type Config struct { ServerSideCopyDisabled bool `help:"disable already enabled server-side copy. this is because once server side copy is enabled, delete code should stay changed, even if you want to disable server side copy" default:"false"` UsePendingObjectsTable bool `help:"enable new flow for upload which is using pending_objects table" default:"false"` + // flag to simplify testing by enabling feature only for specific projects + UsePendingObjectsTableProjects []string `help:"list of projects which will have UsePendingObjectsTable feature flag enabled" default:"" hidden:"true"` // TODO remove when we benchmarking are done and decision is made. TestListingQuery bool `default:"false" help:"test the new query for non-recursive listing"` @@ -161,3 +164,38 @@ func (c Config) Metabase(applicationName string) metabase.Config { ServerSideCopy: c.ServerSideCopy, } } + +// ExtendedConfig extended config keeps additional helper fields and methods around Config. +// TODO potentially can be removed when UsePendingObjectsTableProjects won't be used anymore. +type ExtendedConfig struct { + Config + + usePendingObjectsTableProjects []uuid.UUID +} + +// NewExtendedConfig creates new instance of extended config. +func NewExtendedConfig(config Config) (ExtendedConfig, error) { + extendedConfig := ExtendedConfig{Config: config} + for _, projectIDString := range config.UsePendingObjectsTableProjects { + projectID, err := uuid.FromString(projectIDString) + if err != nil { + return ExtendedConfig{}, err + } + extendedConfig.usePendingObjectsTableProjects = append(extendedConfig.usePendingObjectsTableProjects, projectID) + } + return extendedConfig, nil +} + +// UsePendingObjectsTableByProject checks if UsePendingObjectsTable should be enabled for specific project. +func (ec ExtendedConfig) UsePendingObjectsTableByProject(projectID uuid.UUID) bool { + // if its globally enabled don't look at projects + if ec.UsePendingObjectsTable { + return true + } + for _, p := range ec.usePendingObjectsTableProjects { + if p == projectID { + return true + } + } + return false +} diff --git a/satellite/metainfo/config_test.go b/satellite/metainfo/config_test.go index 8209f2af8..7e5665e9e 100644 --- a/satellite/metainfo/config_test.go +++ b/satellite/metainfo/config_test.go @@ -9,6 +9,8 @@ import ( "github.com/stretchr/testify/require" "storj.io/common/memory" + "storj.io/common/testrand" + "storj.io/common/uuid" "storj.io/storj/satellite/metainfo" ) @@ -96,3 +98,42 @@ func TestRSConfigValidation(t *testing.T) { } } } + +func TestExtendedConfig_UsePendingObjectsTable(t *testing.T) { + projectA := testrand.UUID() + projectB := testrand.UUID() + projectC := testrand.UUID() + config, err := metainfo.NewExtendedConfig(metainfo.Config{ + UsePendingObjectsTable: false, + UsePendingObjectsTableProjects: []string{ + projectA.String(), + projectB.String(), + }, + }) + require.NoError(t, err) + + require.True(t, config.UsePendingObjectsTableByProject(projectA)) + require.True(t, config.UsePendingObjectsTableByProject(projectB)) + require.False(t, config.UsePendingObjectsTableByProject(projectC)) + + config, err = metainfo.NewExtendedConfig(metainfo.Config{ + UsePendingObjectsTable: true, + UsePendingObjectsTableProjects: []string{ + projectA.String(), + }, + }) + require.NoError(t, err) + + require.True(t, config.UsePendingObjectsTableByProject(projectA)) + require.True(t, config.UsePendingObjectsTableByProject(projectB)) + require.True(t, config.UsePendingObjectsTableByProject(projectC)) + + config, err = metainfo.NewExtendedConfig(metainfo.Config{ + UsePendingObjectsTable: false, + UsePendingObjectsTableProjects: []string{ + "01000000-0000-0000-0000-000000000000", + }, + }) + require.NoError(t, err) + require.True(t, config.UsePendingObjectsTableByProject(uuid.UUID{1})) +} diff --git a/satellite/metainfo/endpoint.go b/satellite/metainfo/endpoint.go index 6c0c31a4d..33a599dbd 100644 --- a/satellite/metainfo/endpoint.go +++ b/satellite/metainfo/endpoint.go @@ -80,7 +80,7 @@ type Endpoint struct { encInlineSegmentSize int64 // max inline segment size + encryption overhead revocations revocation.DB defaultRS *pb.RedundancyScheme - config Config + config ExtendedConfig versionCollector *versionCollector } @@ -91,6 +91,11 @@ func NewEndpoint(log *zap.Logger, buckets *buckets.Service, metabaseDB *metabase satellite signing.Signer, revocations revocation.DB, config Config) (*Endpoint, error) { // TODO do something with too many params + extendedConfig, err := NewExtendedConfig(config) + if err != nil { + return nil, err + } + encInlineSegmentSize, err := encryption.CalcEncryptedSize(config.MaxInlineSegmentSize.Int64(), storj.EncryptionParameters{ CipherSuite: storj.EncAESGCM, BlockSize: 128, // intentionally low block size to allow maximum possible encryption overhead @@ -133,7 +138,7 @@ func NewEndpoint(log *zap.Logger, buckets *buckets.Service, metabaseDB *metabase encInlineSegmentSize: encInlineSegmentSize, revocations: revocations, defaultRS: defaultRSScheme, - config: config, + config: extendedConfig, versionCollector: newVersionCollector(log), }, nil } diff --git a/satellite/metainfo/endpoint_object.go b/satellite/metainfo/endpoint_object.go index d6be94dcc..0afe46f5b 100644 --- a/satellite/metainfo/endpoint_object.go +++ b/satellite/metainfo/endpoint_object.go @@ -126,6 +126,7 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe nonce = req.EncryptedMetadataNonce[:] } + usePendingObjectsTable := endpoint.config.UsePendingObjectsTableByProject(keyInfo.ProjectID) object, err := endpoint.metabase.BeginObjectNextVersion(ctx, metabase.BeginObjectNextVersion{ ObjectStream: metabase.ObjectStream{ ProjectID: keyInfo.ProjectID, @@ -141,7 +142,7 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe EncryptedMetadataEncryptedKey: req.EncryptedMetadataEncryptedKey, EncryptedMetadataNonce: nonce, - UsePendingObjectsTable: endpoint.config.UsePendingObjectsTable, + UsePendingObjectsTable: usePendingObjectsTable, }) if err != nil { return nil, endpoint.convertMetabaseErr(err) @@ -158,7 +159,7 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe EncryptionParameters: req.EncryptionParameters, Placement: int32(placement), - UsePendingObjectsTable: endpoint.config.UsePendingObjectsTable, + UsePendingObjectsTable: usePendingObjectsTable, }) if err != nil { endpoint.log.Error("internal", zap.Error(err)) @@ -903,7 +904,7 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq } resp.More = result.More } else { - if status == metabase.Pending && endpoint.config.UsePendingObjectsTable { + if status == metabase.Pending && endpoint.config.UsePendingObjectsTableByProject(keyInfo.ProjectID) { type ObjectListItem struct { Item *pb.ObjectListItem StreamID uuid.UUID @@ -1107,8 +1108,7 @@ func (endpoint *Endpoint) ListPendingObjectStreams(ctx context.Context, req *pb. BatchSize: limit + 1, Cursor: cursor, } - - if endpoint.config.UsePendingObjectsTable { + if endpoint.config.UsePendingObjectsTableByProject(keyInfo.ProjectID) { err = endpoint.metabase.IteratePendingObjectsByKeyNew(ctx, options, func(ctx context.Context, it metabase.PendingObjectsIterator) error { entry := metabase.PendingObjectEntry{}