diff --git a/cmd/metabase-orphaned-segments/main.go b/cmd/metabase-orphaned-segments/main.go index f2d9fe1d8..d1ab71a89 100644 --- a/cmd/metabase-orphaned-segments/main.go +++ b/cmd/metabase-orphaned-segments/main.go @@ -155,7 +155,7 @@ func Delete(ctx context.Context, log *zap.Logger, config Config) (err error) { func findOrphanedSegments(ctx context.Context, log *zap.Logger, config Config) (_ []uuid.UUID, err error) { defer mon.Task()(&ctx)(&err) - metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), config.MetabaseDB) + metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), config.MetabaseDB, metabase.Config{}) if err != nil { return nil, errs.New("unable to connect %q: %w", config.MetabaseDB, err) } diff --git a/cmd/metabase-orphaned-segments/main_test.go b/cmd/metabase-orphaned-segments/main_test.go index e3b04d4f2..767a719ed 100644 --- a/cmd/metabase-orphaned-segments/main_test.go +++ b/cmd/metabase-orphaned-segments/main_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" + "storj.io/common/memory" "storj.io/common/testcontext" "storj.io/private/dbutil" "storj.io/private/dbutil/tempdb" @@ -113,7 +114,10 @@ func test(t *testing.T, prepare func(t *testing.T, ctx *testcontext.Context, raw metabaseTempDB, err := tempdb.OpenUnique(ctx, satelliteDB.MetabaseDB.URL, schema) require.NoError(t, err) - metabaseDB, err := satellitedbtest.CreateMetabaseDBOnTopOf(ctx, log, metabaseTempDB) + metabaseDB, err := satellitedbtest.CreateMetabaseDBOnTopOf(ctx, log, metabaseTempDB, metabase.Config{ + MinPartSize: 5 * memory.MiB, + MaxNumberOfParts: 10000, + }) require.NoError(t, err) defer ctx.Check(metabaseDB.Close) diff --git a/cmd/metabase-verify/main.go b/cmd/metabase-verify/main.go index a5eea321d..3f9c27fd2 100644 --- a/cmd/metabase-verify/main.go +++ b/cmd/metabase-verify/main.go @@ -59,7 +59,7 @@ func VerifyCommand(log *zap.Logger) *cobra.Command { ctx, cancel := process.Ctx(cmd) defer cancel() - mdb, err := metabase.Open(ctx, log.Named("mdb"), metabaseDB) + mdb, err := metabase.Open(ctx, log.Named("mdb"), metabaseDB, metabase.Config{}) if err != nil { return Error.Wrap(err) } diff --git a/cmd/satellite/api.go b/cmd/satellite/api.go index 6d0333a2a..4d195331a 100644 --- a/cmd/satellite/api.go +++ b/cmd/satellite/api.go @@ -44,7 +44,10 @@ func cmdAPIRun(cmd *cobra.Command, args []string) (err error) { err = errs.Combine(err, db.Close()) }() - metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), runCfg.Config.Metainfo.DatabaseURL) + metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), runCfg.Config.Metainfo.DatabaseURL, metabase.Config{ + MinPartSize: runCfg.Config.Metainfo.MinPartSize, + MaxNumberOfParts: runCfg.Config.Metainfo.MaxNumberOfParts, + }) if err != nil { return errs.New("Error creating metabase connection on satellite api: %+v", err) } diff --git a/cmd/satellite/gc.go b/cmd/satellite/gc.go index 2142650f4..43207f169 100644 --- a/cmd/satellite/gc.go +++ b/cmd/satellite/gc.go @@ -36,7 +36,10 @@ func cmdGCRun(cmd *cobra.Command, args []string) (err error) { err = errs.Combine(err, db.Close()) }() - metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL) + metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL, metabase.Config{ + MinPartSize: runCfg.Config.Metainfo.MinPartSize, + MaxNumberOfParts: runCfg.Config.Metainfo.MaxNumberOfParts, + }) if err != nil { return errs.New("Error creating metabase connection: %+v", err) } diff --git a/cmd/satellite/main.go b/cmd/satellite/main.go index cb41bfe3e..00f7b81db 100644 --- a/cmd/satellite/main.go +++ b/cmd/satellite/main.go @@ -374,7 +374,10 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) { err = errs.Combine(err, db.Close()) }() - metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL) + metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL, metabase.Config{ + MinPartSize: runCfg.Config.Metainfo.MinPartSize, + MaxNumberOfParts: runCfg.Config.Metainfo.MaxNumberOfParts, + }) if err != nil { return errs.New("Error creating metabase connection: %+v", err) } @@ -458,7 +461,10 @@ func cmdMigrationRun(cmd *cobra.Command, args []string) (err error) { return errs.New("Error creating tables for master database on satellite: %+v", err) } - metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL) + metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL, metabase.Config{ + MinPartSize: runCfg.Config.Metainfo.MinPartSize, + MaxNumberOfParts: runCfg.Config.Metainfo.MaxNumberOfParts, + }) if err != nil { return errs.New("Error creating metabase connection: %+v", err) } diff --git a/cmd/satellite/repairer.go b/cmd/satellite/repairer.go index 5aab167b9..d9051e655 100644 --- a/cmd/satellite/repairer.go +++ b/cmd/satellite/repairer.go @@ -39,7 +39,10 @@ func cmdRepairerRun(cmd *cobra.Command, args []string) (err error) { err = errs.Combine(err, db.Close()) }() - metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL) + metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL, metabase.Config{ + MinPartSize: runCfg.Config.Metainfo.MinPartSize, + MaxNumberOfParts: runCfg.Config.Metainfo.MaxNumberOfParts, + }) if err != nil { return errs.New("Error creating metabase connection: %+v", err) } diff --git a/private/testplanet/satellite.go b/private/testplanet/satellite.go index a40b08b67..175e7c087 100644 --- a/private/testplanet/satellite.go +++ b/private/testplanet/satellite.go @@ -390,7 +390,17 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int } planet.databases = append(planet.databases, db) - metabaseDB, err := satellitedbtest.CreateMetabaseDB(context.TODO(), log.Named("metabase"), planet.config.Name, "M", index, databases.MetabaseDB) + var config satellite.Config + cfgstruct.Bind(pflag.NewFlagSet("", pflag.PanicOnError), &config, + cfgstruct.UseTestDefaults(), + cfgstruct.ConfDir(storageDir), + cfgstruct.IdentityDir(storageDir), + cfgstruct.ConfigVar("TESTINTERVAL", defaultInterval.String())) + + metabaseDB, err := satellitedbtest.CreateMetabaseDB(context.TODO(), log.Named("metabase"), planet.config.Name, "M", index, databases.MetabaseDB, metabase.Config{ + MinPartSize: config.Metainfo.MinPartSize, + MaxNumberOfParts: config.Metainfo.MaxNumberOfParts, + }) if err != nil { return nil, err } @@ -417,13 +427,6 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int return nil, err } - var config satellite.Config - cfgstruct.Bind(pflag.NewFlagSet("", pflag.PanicOnError), &config, - cfgstruct.UseTestDefaults(), - cfgstruct.ConfDir(storageDir), - cfgstruct.IdentityDir(storageDir), - cfgstruct.ConfigVar("TESTINTERVAL", defaultInterval.String())) - // TODO: these are almost certainly mistakenly set to the zero value // in tests due to a prior mismatch between testplanet config and // cfgstruct devDefaults. we need to make sure it's safe to remove diff --git a/satellite/metabase/commit.go b/satellite/metabase/commit.go index 71fbc1dc4..951cfe8fc 100644 --- a/satellite/metabase/commit.go +++ b/satellite/metabase/commit.go @@ -12,6 +12,7 @@ import ( pgxerrcode "github.com/jackc/pgerrcode" "github.com/zeebo/errs" + "storj.io/common/memory" "storj.io/common/storj" "storj.io/private/dbutil/pgutil/pgerrcode" "storj.io/private/dbutil/txutil" @@ -435,6 +436,10 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec return Error.New("failed to fetch segments: %w", err) } + if err = db.validateParts(segments); err != nil { + return err + } + finalSegments := convertToFinalSegments(segments) err = updateSegmentOffsets(ctx, tx, opts.StreamID, finalSegments) if err != nil { @@ -540,3 +545,32 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec return object, nil } + +func (db *DB) validateParts(segments []segmentInfoForCommit) error { + partSize := make(map[uint32]memory.Size) + + var lastPart uint32 + for _, segment := range segments { + partSize[segment.Position.Part] += memory.Size(segment.PlainSize) + if lastPart < segment.Position.Part { + lastPart = segment.Position.Part + } + } + + if len(partSize) > db.config.MaxNumberOfParts { + return Error.New("exceeded maximum number of parts: %d", db.config.MaxNumberOfParts) + } + + for part, size := range partSize { + // Last part has no minimum size. + if part == lastPart { + continue + } + + if size < db.config.MinPartSize { + return Error.New("size of part number %d is below minimum threshold, got: %s, min: %s", part, size, db.config.MinPartSize) + } + } + + return nil +} diff --git a/satellite/metabase/commit_test.go b/satellite/metabase/commit_test.go index fa109289a..85d78035d 100644 --- a/satellite/metabase/commit_test.go +++ b/satellite/metabase/commit_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "storj.io/common/memory" "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/common/testrand" @@ -2454,3 +2455,422 @@ func TestCommitObject(t *testing.T) { }) }) } + +func TestCommitObjectWithIncorrectPartSize(t *testing.T) { + metabasetest.RunWithConfig(t, metabase.Config{ + MinPartSize: 5 * memory.MiB, + MaxNumberOfParts: 1000, + }, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + obj := metabasetest.RandObjectStream() + + t.Run("part size less then 5MB", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + metabasetest.BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: obj, + Encryption: metabasetest.DefaultEncryption, + }, + Version: 1, + }.Check(ctx, t, db) + now := time.Now() + zombieDeadline := now.Add(24 * time.Hour) + + rootPieceID := testrand.PieceID() + pieces := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}} + encryptedKey := testrand.Bytes(32) + encryptedKeyNonce := testrand.Nonce() + + metabasetest.CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 0, Index: 0}, + RootPieceID: rootPieceID, + Pieces: pieces, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce[:], + + EncryptedSize: 2 * memory.MiB.Int32(), + PlainSize: 2 * memory.MiB.Int32(), + Redundancy: metabasetest.DefaultRedundancy, + }, + }.Check(ctx, t, db) + + metabasetest.CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 1, Index: 0}, + RootPieceID: rootPieceID, + Pieces: pieces, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce[:], + + EncryptedSize: 2 * memory.MiB.Int32(), + PlainSize: 2 * memory.MiB.Int32(), + Redundancy: metabasetest.DefaultRedundancy, + }, + }.Check(ctx, t, db) + + metabasetest.CommitObject{ + Opts: metabase.CommitObject{ + ObjectStream: obj, + }, + ErrClass: &metabase.Error, + ErrText: "size of part number 0 is below minimum threshold, got: 2.0 MiB, min: 5.0 MiB", + }.Check(ctx, t, db) + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: obj, + CreatedAt: now, + Status: metabase.Pending, + + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieDeadline, + }, + }, + Segments: []metabase.RawSegment{ + { + StreamID: obj.StreamID, + Position: metabase.SegmentPosition{Part: 0, Index: 0}, + CreatedAt: now, + + RootPieceID: rootPieceID, + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce[:], + + EncryptedSize: 2 * memory.MiB.Int32(), + PlainSize: 2 * memory.MiB.Int32(), + + Redundancy: metabasetest.DefaultRedundancy, + + Pieces: pieces, + }, + { + StreamID: obj.StreamID, + Position: metabase.SegmentPosition{Part: 1, Index: 0}, + CreatedAt: now, + + RootPieceID: rootPieceID, + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce[:], + + EncryptedSize: 2 * memory.MiB.Int32(), + PlainSize: 2 * memory.MiB.Int32(), + + Redundancy: metabasetest.DefaultRedundancy, + + Pieces: pieces, + }, + }, + }.Check(ctx, t, db) + }) + + t.Run("size validation with part with multiple segments", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + metabasetest.BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: obj, + Encryption: metabasetest.DefaultEncryption, + }, + Version: 1, + }.Check(ctx, t, db) + now := time.Now() + + rootPieceID := testrand.PieceID() + pieces := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}} + encryptedKey := testrand.Bytes(32) + encryptedKeyNonce := testrand.Nonce() + + metabasetest.CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 1, Index: 0}, + RootPieceID: rootPieceID, + Pieces: pieces, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce[:], + + EncryptedSize: memory.MiB.Int32(), + PlainSize: memory.MiB.Int32(), + Redundancy: metabasetest.DefaultRedundancy, + }, + }.Check(ctx, t, db) + + metabasetest.CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 1, Index: 1}, + RootPieceID: rootPieceID, + Pieces: pieces, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce[:], + + EncryptedSize: memory.MiB.Int32(), + PlainSize: memory.MiB.Int32(), + Redundancy: metabasetest.DefaultRedundancy, + }, + }.Check(ctx, t, db) + + metabasetest.CommitObject{ + Opts: metabase.CommitObject{ + ObjectStream: obj, + }, + }.Check(ctx, t, db) + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: obj, + CreatedAt: now, + Status: metabase.Committed, + + SegmentCount: 2, + FixedSegmentSize: -1, + TotalPlainSize: 2 * memory.MiB.Int64(), + TotalEncryptedSize: 2 * memory.MiB.Int64(), + + Encryption: metabasetest.DefaultEncryption, + }, + }, + Segments: []metabase.RawSegment{ + { + StreamID: obj.StreamID, + Position: metabase.SegmentPosition{Part: 1, Index: 0}, + CreatedAt: now, + + RootPieceID: rootPieceID, + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce[:], + + EncryptedSize: memory.MiB.Int32(), + PlainSize: memory.MiB.Int32(), + + Redundancy: metabasetest.DefaultRedundancy, + + Pieces: pieces, + }, + { + StreamID: obj.StreamID, + Position: metabase.SegmentPosition{Part: 1, Index: 1}, + CreatedAt: now, + + RootPieceID: rootPieceID, + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce[:], + + EncryptedSize: memory.MiB.Int32(), + PlainSize: memory.MiB.Int32(), + PlainOffset: memory.MiB.Int64(), + + Redundancy: metabasetest.DefaultRedundancy, + + Pieces: pieces, + }, + }, + }.Check(ctx, t, db) + }) + + t.Run("size validation with multiple parts", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + metabasetest.BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: obj, + Encryption: metabasetest.DefaultEncryption, + }, + Version: 1, + }.Check(ctx, t, db) + now := time.Now() + zombieDeadline := now.Add(24 * time.Hour) + + rootPieceID := testrand.PieceID() + pieces := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}} + encryptedKey := testrand.Bytes(32) + encryptedKeyNonce := testrand.Nonce() + partsSizes := []memory.Size{6 * memory.MiB, 1 * memory.MiB, 1 * memory.MiB} + + for i, size := range partsSizes { + metabasetest.CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: uint32(i + 1), Index: 1}, + RootPieceID: rootPieceID, + Pieces: pieces, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce[:], + + EncryptedSize: size.Int32(), + PlainSize: size.Int32(), + Redundancy: metabasetest.DefaultRedundancy, + }, + }.Check(ctx, t, db) + } + + metabasetest.CommitObject{ + Opts: metabase.CommitObject{ + ObjectStream: obj, + }, + ErrClass: &metabase.Error, + ErrText: "size of part number 2 is below minimum threshold, got: 1.0 MiB, min: 5.0 MiB", + }.Check(ctx, t, db) + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: obj, + CreatedAt: now, + Status: metabase.Pending, + + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieDeadline, + }, + }, + Segments: []metabase.RawSegment{ + { + StreamID: obj.StreamID, + Position: metabase.SegmentPosition{Part: 1, Index: 1}, + CreatedAt: now, + + RootPieceID: rootPieceID, + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce[:], + + EncryptedSize: 6 * memory.MiB.Int32(), + PlainSize: 6 * memory.MiB.Int32(), + + Redundancy: metabasetest.DefaultRedundancy, + + Pieces: pieces, + }, + { + StreamID: obj.StreamID, + Position: metabase.SegmentPosition{Part: 2, Index: 1}, + CreatedAt: now, + + RootPieceID: rootPieceID, + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce[:], + + EncryptedSize: memory.MiB.Int32(), + PlainSize: memory.MiB.Int32(), + + Redundancy: metabasetest.DefaultRedundancy, + + Pieces: pieces, + }, + { + StreamID: obj.StreamID, + Position: metabase.SegmentPosition{Part: 3, Index: 1}, + CreatedAt: now, + + RootPieceID: rootPieceID, + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce[:], + + EncryptedSize: memory.MiB.Int32(), + PlainSize: memory.MiB.Int32(), + + Redundancy: metabasetest.DefaultRedundancy, + + Pieces: pieces, + }, + }, + }.Check(ctx, t, db) + }) + }) +} + +func TestCommitObjectWithIncorrectAmountOfParts(t *testing.T) { + metabasetest.RunWithConfig(t, metabase.Config{ + MinPartSize: 5 * memory.MiB, + MaxNumberOfParts: 3, + }, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + obj := metabasetest.RandObjectStream() + + t.Run("number of parts check", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + metabasetest.BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: obj, + Encryption: metabasetest.DefaultEncryption, + }, + Version: 1, + }.Check(ctx, t, db) + now := time.Now() + zombieDeadline := now.Add(24 * time.Hour) + + rootPieceID := testrand.PieceID() + pieces := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}} + encryptedKey := testrand.Bytes(32) + encryptedKeyNonce := testrand.Nonce() + + var segments []metabase.RawSegment + + for i := 1; i < 5; i++ { + metabasetest.CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: uint32(i), Index: 0}, + RootPieceID: rootPieceID, + Pieces: pieces, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce[:], + + EncryptedSize: 6 * memory.MiB.Int32(), + PlainSize: 6 * memory.MiB.Int32(), + Redundancy: metabasetest.DefaultRedundancy, + }, + }.Check(ctx, t, db) + + segments = append(segments, metabase.RawSegment{ + StreamID: obj.StreamID, + Position: metabase.SegmentPosition{Part: uint32(i), Index: 0}, + CreatedAt: now, + + RootPieceID: rootPieceID, + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce[:], + + EncryptedSize: 6 * memory.MiB.Int32(), + PlainSize: 6 * memory.MiB.Int32(), + + Redundancy: metabasetest.DefaultRedundancy, + + Pieces: pieces, + }) + } + + metabasetest.CommitObject{ + Opts: metabase.CommitObject{ + ObjectStream: obj, + }, + ErrClass: &metabase.Error, + ErrText: "exceeded maximum number of parts: 3", + }.Check(ctx, t, db) + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: obj, + CreatedAt: now, + Status: metabase.Pending, + + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieDeadline, + }, + }, + Segments: segments, + }.Check(ctx, t, db) + }) + }) +} diff --git a/satellite/metabase/db.go b/satellite/metabase/db.go index b3d1c3adc..c4ac0336d 100644 --- a/satellite/metabase/db.go +++ b/satellite/metabase/db.go @@ -16,6 +16,7 @@ import ( "github.com/zeebo/errs" "go.uber.org/zap" + "storj.io/common/memory" "storj.io/private/dbutil" "storj.io/private/dbutil/pgutil" "storj.io/private/tagsql" @@ -26,6 +27,12 @@ var ( mon = monkit.Package() ) +// Config is a configuration struct for part validation. +type Config struct { + MinPartSize memory.Size + MaxNumberOfParts int +} + // DB implements a database for storing objects and segments. type DB struct { log *zap.Logger @@ -36,10 +43,12 @@ type DB struct { aliasCache *NodeAliasCache testCleanup func() error + + config Config } // Open opens a connection to metabase. -func Open(ctx context.Context, log *zap.Logger, connstr string) (*DB, error) { +func Open(ctx context.Context, log *zap.Logger, connstr string, config Config) (*DB, error) { var driverName string _, _, impl, err := dbutil.SplitConnStr(connstr) if err != nil { @@ -66,6 +75,7 @@ func Open(ctx context.Context, log *zap.Logger, connstr string) (*DB, error) { connstr: connstr, impl: impl, testCleanup: func() error { return nil }, + config: config, } db.aliasCache = NewNodeAliasCache(db) diff --git a/satellite/metabase/metabasetest/run.go b/satellite/metabase/metabasetest/run.go index a46bfd3fc..3f358f107 100644 --- a/satellite/metabase/metabasetest/run.go +++ b/satellite/metabase/metabasetest/run.go @@ -6,15 +6,19 @@ package metabasetest import ( "testing" + "github.com/spf13/pflag" "go.uber.org/zap/zaptest" + "storj.io/common/memory" "storj.io/common/testcontext" + "storj.io/private/cfgstruct" "storj.io/storj/satellite/metabase" + "storj.io/storj/satellite/metainfo" "storj.io/storj/satellite/satellitedb/satellitedbtest" ) -// Run runs tests against all configured databases. -func Run(t *testing.T, fn func(ctx *testcontext.Context, t *testing.T, db *metabase.DB)) { +// RunWithConfig runs tests with specific metabase configuration. +func RunWithConfig(t *testing.T, config metabase.Config, fn func(ctx *testcontext.Context, t *testing.T, db *metabase.DB)) { for _, dbinfo := range satellitedbtest.Databases() { dbinfo := dbinfo t.Run(dbinfo.Name, func(t *testing.T) { @@ -23,7 +27,7 @@ func Run(t *testing.T, fn func(ctx *testcontext.Context, t *testing.T, db *metab ctx := testcontext.New(t) defer ctx.Cleanup() - db, err := satellitedbtest.CreateMetabaseDB(ctx, zaptest.NewLogger(t), t.Name(), "M", 0, dbinfo.MetabaseDB) + db, err := satellitedbtest.CreateMetabaseDB(ctx, zaptest.NewLogger(t), t.Name(), "M", 0, dbinfo.MetabaseDB, config) if err != nil { t.Fatal(err) } @@ -42,6 +46,19 @@ func Run(t *testing.T, fn func(ctx *testcontext.Context, t *testing.T, db *metab } } +// Run runs tests against all configured databases. +func Run(t *testing.T, fn func(ctx *testcontext.Context, t *testing.T, db *metabase.DB)) { + var config metainfo.Config + cfgstruct.Bind(pflag.NewFlagSet("", pflag.PanicOnError), &config, + cfgstruct.UseTestDefaults(), + ) + + RunWithConfig(t, metabase.Config{ + MinPartSize: config.MinPartSize, + MaxNumberOfParts: config.MaxNumberOfParts, + }, fn) +} + // Bench runs benchmark for all configured databases. func Bench(b *testing.B, fn func(ctx *testcontext.Context, b *testing.B, db *metabase.DB)) { for _, dbinfo := range satellitedbtest.Databases() { @@ -49,8 +66,10 @@ func Bench(b *testing.B, fn func(ctx *testcontext.Context, b *testing.B, db *met b.Run(dbinfo.Name, func(b *testing.B) { ctx := testcontext.New(b) defer ctx.Cleanup() - - db, err := satellitedbtest.CreateMetabaseDB(ctx, zaptest.NewLogger(b), b.Name(), "M", 0, dbinfo.MetabaseDB) + db, err := satellitedbtest.CreateMetabaseDB(ctx, zaptest.NewLogger(b), b.Name(), "M", 0, dbinfo.MetabaseDB, metabase.Config{ + MinPartSize: 5 * memory.MiB, + MaxNumberOfParts: 10000, + }) if err != nil { b.Fatal(err) } diff --git a/satellite/metainfo/config.go b/satellite/metainfo/config.go index bc80f3a76..66c7f097a 100644 --- a/satellite/metainfo/config.go +++ b/satellite/metainfo/config.go @@ -114,6 +114,8 @@ type Config struct { MaxSegmentSize memory.Size `default:"64MiB" help:"maximum segment size"` MaxMetadataSize memory.Size `default:"2KiB" help:"maximum segment metadata size"` MaxCommitInterval time.Duration `default:"48h" testDefault:"1h" help:"maximum time allowed to pass between creating and committing a segment"` + MinPartSize memory.Size `default:"5MiB" testDefault:"0" help:"minimum allowed part size (last part has no minimum size limit)"` + MaxNumberOfParts int `default:"10000" help:"maximum number of parts object can contain"` Overlay bool `default:"true" help:"toggle flag if overlay is enabled"` RS RSConfig `releaseDefault:"29/35/80/110-256B" devDefault:"4/6/8/10-256B" help:"redundancy scheme configuration in the format k/m/o/n-sharesize"` SegmentLoop segmentloop.Config `help:"segment loop configuration"` diff --git a/satellite/metainfo/metainfo.go b/satellite/metainfo/metainfo.go index 37ad0ad26..d43406397 100644 --- a/satellite/metainfo/metainfo.go +++ b/satellite/metainfo/metainfo.go @@ -88,7 +88,7 @@ type Endpoint struct { } // NewEndpoint creates new metainfo endpoint instance. -func NewEndpoint(log *zap.Logger, buckets BucketsDB, metabase *metabase.DB, +func NewEndpoint(log *zap.Logger, buckets BucketsDB, metabaseDB *metabase.DB, deletePieces *piecedeletion.Service, orders *orders.Service, cache *overlay.Service, attributions attribution.DB, partners *rewards.PartnersService, peerIdentities overlay.PeerIdentities, apiKeys APIKeys, projectUsage *accounting.Service, projects console.Projects, @@ -115,7 +115,7 @@ func NewEndpoint(log *zap.Logger, buckets BucketsDB, metabase *metabase.DB, return &Endpoint{ log: log, buckets: buckets, - metabase: metabase, + metabase: metabaseDB, deletePieces: deletePieces, orders: orders, overlay: cache, diff --git a/satellite/satellitedb/satellitedbtest/run.go b/satellite/satellitedb/satellitedbtest/run.go index 5ddabcb02..f3105582d 100644 --- a/satellite/satellitedb/satellitedbtest/run.go +++ b/satellite/satellitedb/satellitedbtest/run.go @@ -138,7 +138,7 @@ func CreateMasterDBOnTopOf(ctx context.Context, log *zap.Logger, tempDB *dbutil. } // CreateMetabaseDB creates a new satellite metabase for testing. -func CreateMetabaseDB(ctx context.Context, log *zap.Logger, name string, category string, index int, dbInfo Database) (db *metabase.DB, err error) { +func CreateMetabaseDB(ctx context.Context, log *zap.Logger, name string, category string, index int, dbInfo Database, config metabase.Config) (db *metabase.DB, err error) { if dbInfo.URL == "" { return nil, fmt.Errorf("Database %s connection string not provided. %s", dbInfo.Name, dbInfo.Message) } @@ -153,13 +153,13 @@ func CreateMetabaseDB(ctx context.Context, log *zap.Logger, name string, categor return nil, err } - return CreateMetabaseDBOnTopOf(ctx, log, tempDB) + return CreateMetabaseDBOnTopOf(ctx, log, tempDB, config) } // CreateMetabaseDBOnTopOf creates a new metabase on top of an already existing // temporary database. -func CreateMetabaseDBOnTopOf(ctx context.Context, log *zap.Logger, tempDB *dbutil.TempDatabase) (*metabase.DB, error) { - db, err := metabase.Open(ctx, log.Named("metabase"), tempDB.ConnStr) +func CreateMetabaseDBOnTopOf(ctx context.Context, log *zap.Logger, tempDB *dbutil.TempDatabase, config metabase.Config) (*metabase.DB, error) { + db, err := metabase.Open(ctx, log.Named("metabase"), tempDB.ConnStr, config) if err != nil { return nil, err } diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index c563bd586..b0b9cff61 100755 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -388,9 +388,15 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key # maximum segment metadata size # metainfo.max-metadata-size: 2.0 KiB +# maximum number of parts object can contain +# metainfo.max-number-of-parts: 10000 + # maximum segment size # metainfo.max-segment-size: 64.0 MiB +# minimum allowed part size (last part has no minimum size limit) +# metainfo.min-part-size: 5.0 MiB + # minimum remote segment size # metainfo.min-remote-segment-size: 1.2 KiB diff --git a/scripts/tests/testversions/test-sim-versions.sh b/scripts/tests/testversions/test-sim-versions.sh index 95e2a06eb..7782f4d2a 100755 --- a/scripts/tests/testversions/test-sim-versions.sh +++ b/scripts/tests/testversions/test-sim-versions.sh @@ -34,7 +34,7 @@ major_release_tags=$( grep -v rc | # remove release candidates sort -n -k2,2 -t'.' --unique | # only keep the largest patch version sort -V | # resort based using "version sort" - tail -n 3 # kepep only last 3 releases + tail -n 3 # kepep only last 3 releases ) major_release_tags=$(echo $IMPORTANT_VERSIONS $major_release_tags ) current_release_version=$(echo $major_release_tags | xargs -n 1 | tail -1)