satellite/metabase: limit maximum number of parts and size
Multipart upload limits added. Last part has no size limit. Max number of parts: 10000, min part size: 5 MiB Change-Id: Ic2262ce25f989b34d92f662bde720d4c4d0dc93d
This commit is contained in:
parent
49492d28fc
commit
50baefa10e
@ -155,7 +155,7 @@ func Delete(ctx context.Context, log *zap.Logger, config Config) (err error) {
|
||||
func findOrphanedSegments(ctx context.Context, log *zap.Logger, config Config) (_ []uuid.UUID, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), config.MetabaseDB)
|
||||
metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), config.MetabaseDB, metabase.Config{})
|
||||
if err != nil {
|
||||
return nil, errs.New("unable to connect %q: %w", config.MetabaseDB, err)
|
||||
}
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
||||
"storj.io/common/memory"
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/private/dbutil"
|
||||
"storj.io/private/dbutil/tempdb"
|
||||
@ -113,7 +114,10 @@ func test(t *testing.T, prepare func(t *testing.T, ctx *testcontext.Context, raw
|
||||
metabaseTempDB, err := tempdb.OpenUnique(ctx, satelliteDB.MetabaseDB.URL, schema)
|
||||
require.NoError(t, err)
|
||||
|
||||
metabaseDB, err := satellitedbtest.CreateMetabaseDBOnTopOf(ctx, log, metabaseTempDB)
|
||||
metabaseDB, err := satellitedbtest.CreateMetabaseDBOnTopOf(ctx, log, metabaseTempDB, metabase.Config{
|
||||
MinPartSize: 5 * memory.MiB,
|
||||
MaxNumberOfParts: 10000,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(metabaseDB.Close)
|
||||
|
||||
|
@ -59,7 +59,7 @@ func VerifyCommand(log *zap.Logger) *cobra.Command {
|
||||
ctx, cancel := process.Ctx(cmd)
|
||||
defer cancel()
|
||||
|
||||
mdb, err := metabase.Open(ctx, log.Named("mdb"), metabaseDB)
|
||||
mdb, err := metabase.Open(ctx, log.Named("mdb"), metabaseDB, metabase.Config{})
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
@ -44,7 +44,10 @@ func cmdAPIRun(cmd *cobra.Command, args []string) (err error) {
|
||||
err = errs.Combine(err, db.Close())
|
||||
}()
|
||||
|
||||
metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), runCfg.Config.Metainfo.DatabaseURL)
|
||||
metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), runCfg.Config.Metainfo.DatabaseURL, metabase.Config{
|
||||
MinPartSize: runCfg.Config.Metainfo.MinPartSize,
|
||||
MaxNumberOfParts: runCfg.Config.Metainfo.MaxNumberOfParts,
|
||||
})
|
||||
if err != nil {
|
||||
return errs.New("Error creating metabase connection on satellite api: %+v", err)
|
||||
}
|
||||
|
@ -36,7 +36,10 @@ func cmdGCRun(cmd *cobra.Command, args []string) (err error) {
|
||||
err = errs.Combine(err, db.Close())
|
||||
}()
|
||||
|
||||
metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL)
|
||||
metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL, metabase.Config{
|
||||
MinPartSize: runCfg.Config.Metainfo.MinPartSize,
|
||||
MaxNumberOfParts: runCfg.Config.Metainfo.MaxNumberOfParts,
|
||||
})
|
||||
if err != nil {
|
||||
return errs.New("Error creating metabase connection: %+v", err)
|
||||
}
|
||||
|
@ -374,7 +374,10 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
|
||||
err = errs.Combine(err, db.Close())
|
||||
}()
|
||||
|
||||
metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL)
|
||||
metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL, metabase.Config{
|
||||
MinPartSize: runCfg.Config.Metainfo.MinPartSize,
|
||||
MaxNumberOfParts: runCfg.Config.Metainfo.MaxNumberOfParts,
|
||||
})
|
||||
if err != nil {
|
||||
return errs.New("Error creating metabase connection: %+v", err)
|
||||
}
|
||||
@ -458,7 +461,10 @@ func cmdMigrationRun(cmd *cobra.Command, args []string) (err error) {
|
||||
return errs.New("Error creating tables for master database on satellite: %+v", err)
|
||||
}
|
||||
|
||||
metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL)
|
||||
metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL, metabase.Config{
|
||||
MinPartSize: runCfg.Config.Metainfo.MinPartSize,
|
||||
MaxNumberOfParts: runCfg.Config.Metainfo.MaxNumberOfParts,
|
||||
})
|
||||
if err != nil {
|
||||
return errs.New("Error creating metabase connection: %+v", err)
|
||||
}
|
||||
|
@ -39,7 +39,10 @@ func cmdRepairerRun(cmd *cobra.Command, args []string) (err error) {
|
||||
err = errs.Combine(err, db.Close())
|
||||
}()
|
||||
|
||||
metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL)
|
||||
metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL, metabase.Config{
|
||||
MinPartSize: runCfg.Config.Metainfo.MinPartSize,
|
||||
MaxNumberOfParts: runCfg.Config.Metainfo.MaxNumberOfParts,
|
||||
})
|
||||
if err != nil {
|
||||
return errs.New("Error creating metabase connection: %+v", err)
|
||||
}
|
||||
|
@ -390,7 +390,17 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int
|
||||
}
|
||||
planet.databases = append(planet.databases, db)
|
||||
|
||||
metabaseDB, err := satellitedbtest.CreateMetabaseDB(context.TODO(), log.Named("metabase"), planet.config.Name, "M", index, databases.MetabaseDB)
|
||||
var config satellite.Config
|
||||
cfgstruct.Bind(pflag.NewFlagSet("", pflag.PanicOnError), &config,
|
||||
cfgstruct.UseTestDefaults(),
|
||||
cfgstruct.ConfDir(storageDir),
|
||||
cfgstruct.IdentityDir(storageDir),
|
||||
cfgstruct.ConfigVar("TESTINTERVAL", defaultInterval.String()))
|
||||
|
||||
metabaseDB, err := satellitedbtest.CreateMetabaseDB(context.TODO(), log.Named("metabase"), planet.config.Name, "M", index, databases.MetabaseDB, metabase.Config{
|
||||
MinPartSize: config.Metainfo.MinPartSize,
|
||||
MaxNumberOfParts: config.Metainfo.MaxNumberOfParts,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -417,13 +427,6 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var config satellite.Config
|
||||
cfgstruct.Bind(pflag.NewFlagSet("", pflag.PanicOnError), &config,
|
||||
cfgstruct.UseTestDefaults(),
|
||||
cfgstruct.ConfDir(storageDir),
|
||||
cfgstruct.IdentityDir(storageDir),
|
||||
cfgstruct.ConfigVar("TESTINTERVAL", defaultInterval.String()))
|
||||
|
||||
// TODO: these are almost certainly mistakenly set to the zero value
|
||||
// in tests due to a prior mismatch between testplanet config and
|
||||
// cfgstruct devDefaults. we need to make sure it's safe to remove
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
pgxerrcode "github.com/jackc/pgerrcode"
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/common/memory"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/private/dbutil/pgutil/pgerrcode"
|
||||
"storj.io/private/dbutil/txutil"
|
||||
@ -435,6 +436,10 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
|
||||
return Error.New("failed to fetch segments: %w", err)
|
||||
}
|
||||
|
||||
if err = db.validateParts(segments); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
finalSegments := convertToFinalSegments(segments)
|
||||
err = updateSegmentOffsets(ctx, tx, opts.StreamID, finalSegments)
|
||||
if err != nil {
|
||||
@ -540,3 +545,32 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
|
||||
|
||||
return object, nil
|
||||
}
|
||||
|
||||
func (db *DB) validateParts(segments []segmentInfoForCommit) error {
|
||||
partSize := make(map[uint32]memory.Size)
|
||||
|
||||
var lastPart uint32
|
||||
for _, segment := range segments {
|
||||
partSize[segment.Position.Part] += memory.Size(segment.PlainSize)
|
||||
if lastPart < segment.Position.Part {
|
||||
lastPart = segment.Position.Part
|
||||
}
|
||||
}
|
||||
|
||||
if len(partSize) > db.config.MaxNumberOfParts {
|
||||
return Error.New("exceeded maximum number of parts: %d", db.config.MaxNumberOfParts)
|
||||
}
|
||||
|
||||
for part, size := range partSize {
|
||||
// Last part has no minimum size.
|
||||
if part == lastPart {
|
||||
continue
|
||||
}
|
||||
|
||||
if size < db.config.MinPartSize {
|
||||
return Error.New("size of part number %d is below minimum threshold, got: %s, min: %s", part, size, db.config.MinPartSize)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"storj.io/common/memory"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/testrand"
|
||||
@ -2454,3 +2455,422 @@ func TestCommitObject(t *testing.T) {
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestCommitObjectWithIncorrectPartSize(t *testing.T) {
|
||||
metabasetest.RunWithConfig(t, metabase.Config{
|
||||
MinPartSize: 5 * memory.MiB,
|
||||
MaxNumberOfParts: 1000,
|
||||
}, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||
obj := metabasetest.RandObjectStream()
|
||||
|
||||
t.Run("part size less then 5MB", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.BeginObjectExactVersion{
|
||||
Opts: metabase.BeginObjectExactVersion{
|
||||
ObjectStream: obj,
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
},
|
||||
Version: 1,
|
||||
}.Check(ctx, t, db)
|
||||
now := time.Now()
|
||||
zombieDeadline := now.Add(24 * time.Hour)
|
||||
|
||||
rootPieceID := testrand.PieceID()
|
||||
pieces := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}}
|
||||
encryptedKey := testrand.Bytes(32)
|
||||
encryptedKeyNonce := testrand.Nonce()
|
||||
|
||||
metabasetest.CommitSegment{
|
||||
Opts: metabase.CommitSegment{
|
||||
ObjectStream: obj,
|
||||
Position: metabase.SegmentPosition{Part: 0, Index: 0},
|
||||
RootPieceID: rootPieceID,
|
||||
Pieces: pieces,
|
||||
|
||||
EncryptedKey: encryptedKey,
|
||||
EncryptedKeyNonce: encryptedKeyNonce[:],
|
||||
|
||||
EncryptedSize: 2 * memory.MiB.Int32(),
|
||||
PlainSize: 2 * memory.MiB.Int32(),
|
||||
Redundancy: metabasetest.DefaultRedundancy,
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.CommitSegment{
|
||||
Opts: metabase.CommitSegment{
|
||||
ObjectStream: obj,
|
||||
Position: metabase.SegmentPosition{Part: 1, Index: 0},
|
||||
RootPieceID: rootPieceID,
|
||||
Pieces: pieces,
|
||||
|
||||
EncryptedKey: encryptedKey,
|
||||
EncryptedKeyNonce: encryptedKeyNonce[:],
|
||||
|
||||
EncryptedSize: 2 * memory.MiB.Int32(),
|
||||
PlainSize: 2 * memory.MiB.Int32(),
|
||||
Redundancy: metabasetest.DefaultRedundancy,
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.CommitObject{
|
||||
Opts: metabase.CommitObject{
|
||||
ObjectStream: obj,
|
||||
},
|
||||
ErrClass: &metabase.Error,
|
||||
ErrText: "size of part number 0 is below minimum threshold, got: 2.0 MiB, min: 5.0 MiB",
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
{
|
||||
ObjectStream: obj,
|
||||
CreatedAt: now,
|
||||
Status: metabase.Pending,
|
||||
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
ZombieDeletionDeadline: &zombieDeadline,
|
||||
},
|
||||
},
|
||||
Segments: []metabase.RawSegment{
|
||||
{
|
||||
StreamID: obj.StreamID,
|
||||
Position: metabase.SegmentPosition{Part: 0, Index: 0},
|
||||
CreatedAt: now,
|
||||
|
||||
RootPieceID: rootPieceID,
|
||||
EncryptedKey: encryptedKey,
|
||||
EncryptedKeyNonce: encryptedKeyNonce[:],
|
||||
|
||||
EncryptedSize: 2 * memory.MiB.Int32(),
|
||||
PlainSize: 2 * memory.MiB.Int32(),
|
||||
|
||||
Redundancy: metabasetest.DefaultRedundancy,
|
||||
|
||||
Pieces: pieces,
|
||||
},
|
||||
{
|
||||
StreamID: obj.StreamID,
|
||||
Position: metabase.SegmentPosition{Part: 1, Index: 0},
|
||||
CreatedAt: now,
|
||||
|
||||
RootPieceID: rootPieceID,
|
||||
EncryptedKey: encryptedKey,
|
||||
EncryptedKeyNonce: encryptedKeyNonce[:],
|
||||
|
||||
EncryptedSize: 2 * memory.MiB.Int32(),
|
||||
PlainSize: 2 * memory.MiB.Int32(),
|
||||
|
||||
Redundancy: metabasetest.DefaultRedundancy,
|
||||
|
||||
Pieces: pieces,
|
||||
},
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("size validation with part with multiple segments", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.BeginObjectExactVersion{
|
||||
Opts: metabase.BeginObjectExactVersion{
|
||||
ObjectStream: obj,
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
},
|
||||
Version: 1,
|
||||
}.Check(ctx, t, db)
|
||||
now := time.Now()
|
||||
|
||||
rootPieceID := testrand.PieceID()
|
||||
pieces := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}}
|
||||
encryptedKey := testrand.Bytes(32)
|
||||
encryptedKeyNonce := testrand.Nonce()
|
||||
|
||||
metabasetest.CommitSegment{
|
||||
Opts: metabase.CommitSegment{
|
||||
ObjectStream: obj,
|
||||
Position: metabase.SegmentPosition{Part: 1, Index: 0},
|
||||
RootPieceID: rootPieceID,
|
||||
Pieces: pieces,
|
||||
|
||||
EncryptedKey: encryptedKey,
|
||||
EncryptedKeyNonce: encryptedKeyNonce[:],
|
||||
|
||||
EncryptedSize: memory.MiB.Int32(),
|
||||
PlainSize: memory.MiB.Int32(),
|
||||
Redundancy: metabasetest.DefaultRedundancy,
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.CommitSegment{
|
||||
Opts: metabase.CommitSegment{
|
||||
ObjectStream: obj,
|
||||
Position: metabase.SegmentPosition{Part: 1, Index: 1},
|
||||
RootPieceID: rootPieceID,
|
||||
Pieces: pieces,
|
||||
|
||||
EncryptedKey: encryptedKey,
|
||||
EncryptedKeyNonce: encryptedKeyNonce[:],
|
||||
|
||||
EncryptedSize: memory.MiB.Int32(),
|
||||
PlainSize: memory.MiB.Int32(),
|
||||
Redundancy: metabasetest.DefaultRedundancy,
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.CommitObject{
|
||||
Opts: metabase.CommitObject{
|
||||
ObjectStream: obj,
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
{
|
||||
ObjectStream: obj,
|
||||
CreatedAt: now,
|
||||
Status: metabase.Committed,
|
||||
|
||||
SegmentCount: 2,
|
||||
FixedSegmentSize: -1,
|
||||
TotalPlainSize: 2 * memory.MiB.Int64(),
|
||||
TotalEncryptedSize: 2 * memory.MiB.Int64(),
|
||||
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
},
|
||||
},
|
||||
Segments: []metabase.RawSegment{
|
||||
{
|
||||
StreamID: obj.StreamID,
|
||||
Position: metabase.SegmentPosition{Part: 1, Index: 0},
|
||||
CreatedAt: now,
|
||||
|
||||
RootPieceID: rootPieceID,
|
||||
EncryptedKey: encryptedKey,
|
||||
EncryptedKeyNonce: encryptedKeyNonce[:],
|
||||
|
||||
EncryptedSize: memory.MiB.Int32(),
|
||||
PlainSize: memory.MiB.Int32(),
|
||||
|
||||
Redundancy: metabasetest.DefaultRedundancy,
|
||||
|
||||
Pieces: pieces,
|
||||
},
|
||||
{
|
||||
StreamID: obj.StreamID,
|
||||
Position: metabase.SegmentPosition{Part: 1, Index: 1},
|
||||
CreatedAt: now,
|
||||
|
||||
RootPieceID: rootPieceID,
|
||||
EncryptedKey: encryptedKey,
|
||||
EncryptedKeyNonce: encryptedKeyNonce[:],
|
||||
|
||||
EncryptedSize: memory.MiB.Int32(),
|
||||
PlainSize: memory.MiB.Int32(),
|
||||
PlainOffset: memory.MiB.Int64(),
|
||||
|
||||
Redundancy: metabasetest.DefaultRedundancy,
|
||||
|
||||
Pieces: pieces,
|
||||
},
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("size validation with multiple parts", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.BeginObjectExactVersion{
|
||||
Opts: metabase.BeginObjectExactVersion{
|
||||
ObjectStream: obj,
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
},
|
||||
Version: 1,
|
||||
}.Check(ctx, t, db)
|
||||
now := time.Now()
|
||||
zombieDeadline := now.Add(24 * time.Hour)
|
||||
|
||||
rootPieceID := testrand.PieceID()
|
||||
pieces := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}}
|
||||
encryptedKey := testrand.Bytes(32)
|
||||
encryptedKeyNonce := testrand.Nonce()
|
||||
partsSizes := []memory.Size{6 * memory.MiB, 1 * memory.MiB, 1 * memory.MiB}
|
||||
|
||||
for i, size := range partsSizes {
|
||||
metabasetest.CommitSegment{
|
||||
Opts: metabase.CommitSegment{
|
||||
ObjectStream: obj,
|
||||
Position: metabase.SegmentPosition{Part: uint32(i + 1), Index: 1},
|
||||
RootPieceID: rootPieceID,
|
||||
Pieces: pieces,
|
||||
|
||||
EncryptedKey: encryptedKey,
|
||||
EncryptedKeyNonce: encryptedKeyNonce[:],
|
||||
|
||||
EncryptedSize: size.Int32(),
|
||||
PlainSize: size.Int32(),
|
||||
Redundancy: metabasetest.DefaultRedundancy,
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
}
|
||||
|
||||
metabasetest.CommitObject{
|
||||
Opts: metabase.CommitObject{
|
||||
ObjectStream: obj,
|
||||
},
|
||||
ErrClass: &metabase.Error,
|
||||
ErrText: "size of part number 2 is below minimum threshold, got: 1.0 MiB, min: 5.0 MiB",
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
{
|
||||
ObjectStream: obj,
|
||||
CreatedAt: now,
|
||||
Status: metabase.Pending,
|
||||
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
ZombieDeletionDeadline: &zombieDeadline,
|
||||
},
|
||||
},
|
||||
Segments: []metabase.RawSegment{
|
||||
{
|
||||
StreamID: obj.StreamID,
|
||||
Position: metabase.SegmentPosition{Part: 1, Index: 1},
|
||||
CreatedAt: now,
|
||||
|
||||
RootPieceID: rootPieceID,
|
||||
EncryptedKey: encryptedKey,
|
||||
EncryptedKeyNonce: encryptedKeyNonce[:],
|
||||
|
||||
EncryptedSize: 6 * memory.MiB.Int32(),
|
||||
PlainSize: 6 * memory.MiB.Int32(),
|
||||
|
||||
Redundancy: metabasetest.DefaultRedundancy,
|
||||
|
||||
Pieces: pieces,
|
||||
},
|
||||
{
|
||||
StreamID: obj.StreamID,
|
||||
Position: metabase.SegmentPosition{Part: 2, Index: 1},
|
||||
CreatedAt: now,
|
||||
|
||||
RootPieceID: rootPieceID,
|
||||
EncryptedKey: encryptedKey,
|
||||
EncryptedKeyNonce: encryptedKeyNonce[:],
|
||||
|
||||
EncryptedSize: memory.MiB.Int32(),
|
||||
PlainSize: memory.MiB.Int32(),
|
||||
|
||||
Redundancy: metabasetest.DefaultRedundancy,
|
||||
|
||||
Pieces: pieces,
|
||||
},
|
||||
{
|
||||
StreamID: obj.StreamID,
|
||||
Position: metabase.SegmentPosition{Part: 3, Index: 1},
|
||||
CreatedAt: now,
|
||||
|
||||
RootPieceID: rootPieceID,
|
||||
EncryptedKey: encryptedKey,
|
||||
EncryptedKeyNonce: encryptedKeyNonce[:],
|
||||
|
||||
EncryptedSize: memory.MiB.Int32(),
|
||||
PlainSize: memory.MiB.Int32(),
|
||||
|
||||
Redundancy: metabasetest.DefaultRedundancy,
|
||||
|
||||
Pieces: pieces,
|
||||
},
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestCommitObjectWithIncorrectAmountOfParts(t *testing.T) {
|
||||
metabasetest.RunWithConfig(t, metabase.Config{
|
||||
MinPartSize: 5 * memory.MiB,
|
||||
MaxNumberOfParts: 3,
|
||||
}, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||
obj := metabasetest.RandObjectStream()
|
||||
|
||||
t.Run("number of parts check", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.BeginObjectExactVersion{
|
||||
Opts: metabase.BeginObjectExactVersion{
|
||||
ObjectStream: obj,
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
},
|
||||
Version: 1,
|
||||
}.Check(ctx, t, db)
|
||||
now := time.Now()
|
||||
zombieDeadline := now.Add(24 * time.Hour)
|
||||
|
||||
rootPieceID := testrand.PieceID()
|
||||
pieces := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}}
|
||||
encryptedKey := testrand.Bytes(32)
|
||||
encryptedKeyNonce := testrand.Nonce()
|
||||
|
||||
var segments []metabase.RawSegment
|
||||
|
||||
for i := 1; i < 5; i++ {
|
||||
metabasetest.CommitSegment{
|
||||
Opts: metabase.CommitSegment{
|
||||
ObjectStream: obj,
|
||||
Position: metabase.SegmentPosition{Part: uint32(i), Index: 0},
|
||||
RootPieceID: rootPieceID,
|
||||
Pieces: pieces,
|
||||
|
||||
EncryptedKey: encryptedKey,
|
||||
EncryptedKeyNonce: encryptedKeyNonce[:],
|
||||
|
||||
EncryptedSize: 6 * memory.MiB.Int32(),
|
||||
PlainSize: 6 * memory.MiB.Int32(),
|
||||
Redundancy: metabasetest.DefaultRedundancy,
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
segments = append(segments, metabase.RawSegment{
|
||||
StreamID: obj.StreamID,
|
||||
Position: metabase.SegmentPosition{Part: uint32(i), Index: 0},
|
||||
CreatedAt: now,
|
||||
|
||||
RootPieceID: rootPieceID,
|
||||
EncryptedKey: encryptedKey,
|
||||
EncryptedKeyNonce: encryptedKeyNonce[:],
|
||||
|
||||
EncryptedSize: 6 * memory.MiB.Int32(),
|
||||
PlainSize: 6 * memory.MiB.Int32(),
|
||||
|
||||
Redundancy: metabasetest.DefaultRedundancy,
|
||||
|
||||
Pieces: pieces,
|
||||
})
|
||||
}
|
||||
|
||||
metabasetest.CommitObject{
|
||||
Opts: metabase.CommitObject{
|
||||
ObjectStream: obj,
|
||||
},
|
||||
ErrClass: &metabase.Error,
|
||||
ErrText: "exceeded maximum number of parts: 3",
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
{
|
||||
ObjectStream: obj,
|
||||
CreatedAt: now,
|
||||
Status: metabase.Pending,
|
||||
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
ZombieDeletionDeadline: &zombieDeadline,
|
||||
},
|
||||
},
|
||||
Segments: segments,
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ import (
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/memory"
|
||||
"storj.io/private/dbutil"
|
||||
"storj.io/private/dbutil/pgutil"
|
||||
"storj.io/private/tagsql"
|
||||
@ -26,6 +27,12 @@ var (
|
||||
mon = monkit.Package()
|
||||
)
|
||||
|
||||
// Config is a configuration struct for part validation.
|
||||
type Config struct {
|
||||
MinPartSize memory.Size
|
||||
MaxNumberOfParts int
|
||||
}
|
||||
|
||||
// DB implements a database for storing objects and segments.
|
||||
type DB struct {
|
||||
log *zap.Logger
|
||||
@ -36,10 +43,12 @@ type DB struct {
|
||||
aliasCache *NodeAliasCache
|
||||
|
||||
testCleanup func() error
|
||||
|
||||
config Config
|
||||
}
|
||||
|
||||
// Open opens a connection to metabase.
|
||||
func Open(ctx context.Context, log *zap.Logger, connstr string) (*DB, error) {
|
||||
func Open(ctx context.Context, log *zap.Logger, connstr string, config Config) (*DB, error) {
|
||||
var driverName string
|
||||
_, _, impl, err := dbutil.SplitConnStr(connstr)
|
||||
if err != nil {
|
||||
@ -66,6 +75,7 @@ func Open(ctx context.Context, log *zap.Logger, connstr string) (*DB, error) {
|
||||
connstr: connstr,
|
||||
impl: impl,
|
||||
testCleanup: func() error { return nil },
|
||||
config: config,
|
||||
}
|
||||
db.aliasCache = NewNodeAliasCache(db)
|
||||
|
||||
|
@ -6,15 +6,19 @@ package metabasetest
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/spf13/pflag"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
||||
"storj.io/common/memory"
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/private/cfgstruct"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
"storj.io/storj/satellite/metainfo"
|
||||
"storj.io/storj/satellite/satellitedb/satellitedbtest"
|
||||
)
|
||||
|
||||
// Run runs tests against all configured databases.
|
||||
func Run(t *testing.T, fn func(ctx *testcontext.Context, t *testing.T, db *metabase.DB)) {
|
||||
// RunWithConfig runs tests with specific metabase configuration.
|
||||
func RunWithConfig(t *testing.T, config metabase.Config, fn func(ctx *testcontext.Context, t *testing.T, db *metabase.DB)) {
|
||||
for _, dbinfo := range satellitedbtest.Databases() {
|
||||
dbinfo := dbinfo
|
||||
t.Run(dbinfo.Name, func(t *testing.T) {
|
||||
@ -23,7 +27,7 @@ func Run(t *testing.T, fn func(ctx *testcontext.Context, t *testing.T, db *metab
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
db, err := satellitedbtest.CreateMetabaseDB(ctx, zaptest.NewLogger(t), t.Name(), "M", 0, dbinfo.MetabaseDB)
|
||||
db, err := satellitedbtest.CreateMetabaseDB(ctx, zaptest.NewLogger(t), t.Name(), "M", 0, dbinfo.MetabaseDB, config)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -42,6 +46,19 @@ func Run(t *testing.T, fn func(ctx *testcontext.Context, t *testing.T, db *metab
|
||||
}
|
||||
}
|
||||
|
||||
// Run runs tests against all configured databases.
|
||||
func Run(t *testing.T, fn func(ctx *testcontext.Context, t *testing.T, db *metabase.DB)) {
|
||||
var config metainfo.Config
|
||||
cfgstruct.Bind(pflag.NewFlagSet("", pflag.PanicOnError), &config,
|
||||
cfgstruct.UseTestDefaults(),
|
||||
)
|
||||
|
||||
RunWithConfig(t, metabase.Config{
|
||||
MinPartSize: config.MinPartSize,
|
||||
MaxNumberOfParts: config.MaxNumberOfParts,
|
||||
}, fn)
|
||||
}
|
||||
|
||||
// Bench runs benchmark for all configured databases.
|
||||
func Bench(b *testing.B, fn func(ctx *testcontext.Context, b *testing.B, db *metabase.DB)) {
|
||||
for _, dbinfo := range satellitedbtest.Databases() {
|
||||
@ -49,8 +66,10 @@ func Bench(b *testing.B, fn func(ctx *testcontext.Context, b *testing.B, db *met
|
||||
b.Run(dbinfo.Name, func(b *testing.B) {
|
||||
ctx := testcontext.New(b)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
db, err := satellitedbtest.CreateMetabaseDB(ctx, zaptest.NewLogger(b), b.Name(), "M", 0, dbinfo.MetabaseDB)
|
||||
db, err := satellitedbtest.CreateMetabaseDB(ctx, zaptest.NewLogger(b), b.Name(), "M", 0, dbinfo.MetabaseDB, metabase.Config{
|
||||
MinPartSize: 5 * memory.MiB,
|
||||
MaxNumberOfParts: 10000,
|
||||
})
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
@ -114,6 +114,8 @@ type Config struct {
|
||||
MaxSegmentSize memory.Size `default:"64MiB" help:"maximum segment size"`
|
||||
MaxMetadataSize memory.Size `default:"2KiB" help:"maximum segment metadata size"`
|
||||
MaxCommitInterval time.Duration `default:"48h" testDefault:"1h" help:"maximum time allowed to pass between creating and committing a segment"`
|
||||
MinPartSize memory.Size `default:"5MiB" testDefault:"0" help:"minimum allowed part size (last part has no minimum size limit)"`
|
||||
MaxNumberOfParts int `default:"10000" help:"maximum number of parts object can contain"`
|
||||
Overlay bool `default:"true" help:"toggle flag if overlay is enabled"`
|
||||
RS RSConfig `releaseDefault:"29/35/80/110-256B" devDefault:"4/6/8/10-256B" help:"redundancy scheme configuration in the format k/m/o/n-sharesize"`
|
||||
SegmentLoop segmentloop.Config `help:"segment loop configuration"`
|
||||
|
@ -88,7 +88,7 @@ type Endpoint struct {
|
||||
}
|
||||
|
||||
// NewEndpoint creates new metainfo endpoint instance.
|
||||
func NewEndpoint(log *zap.Logger, buckets BucketsDB, metabase *metabase.DB,
|
||||
func NewEndpoint(log *zap.Logger, buckets BucketsDB, metabaseDB *metabase.DB,
|
||||
deletePieces *piecedeletion.Service, orders *orders.Service, cache *overlay.Service,
|
||||
attributions attribution.DB, partners *rewards.PartnersService, peerIdentities overlay.PeerIdentities,
|
||||
apiKeys APIKeys, projectUsage *accounting.Service, projects console.Projects,
|
||||
@ -115,7 +115,7 @@ func NewEndpoint(log *zap.Logger, buckets BucketsDB, metabase *metabase.DB,
|
||||
return &Endpoint{
|
||||
log: log,
|
||||
buckets: buckets,
|
||||
metabase: metabase,
|
||||
metabase: metabaseDB,
|
||||
deletePieces: deletePieces,
|
||||
orders: orders,
|
||||
overlay: cache,
|
||||
|
@ -138,7 +138,7 @@ func CreateMasterDBOnTopOf(ctx context.Context, log *zap.Logger, tempDB *dbutil.
|
||||
}
|
||||
|
||||
// CreateMetabaseDB creates a new satellite metabase for testing.
|
||||
func CreateMetabaseDB(ctx context.Context, log *zap.Logger, name string, category string, index int, dbInfo Database) (db *metabase.DB, err error) {
|
||||
func CreateMetabaseDB(ctx context.Context, log *zap.Logger, name string, category string, index int, dbInfo Database, config metabase.Config) (db *metabase.DB, err error) {
|
||||
if dbInfo.URL == "" {
|
||||
return nil, fmt.Errorf("Database %s connection string not provided. %s", dbInfo.Name, dbInfo.Message)
|
||||
}
|
||||
@ -153,13 +153,13 @@ func CreateMetabaseDB(ctx context.Context, log *zap.Logger, name string, categor
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return CreateMetabaseDBOnTopOf(ctx, log, tempDB)
|
||||
return CreateMetabaseDBOnTopOf(ctx, log, tempDB, config)
|
||||
}
|
||||
|
||||
// CreateMetabaseDBOnTopOf creates a new metabase on top of an already existing
|
||||
// temporary database.
|
||||
func CreateMetabaseDBOnTopOf(ctx context.Context, log *zap.Logger, tempDB *dbutil.TempDatabase) (*metabase.DB, error) {
|
||||
db, err := metabase.Open(ctx, log.Named("metabase"), tempDB.ConnStr)
|
||||
func CreateMetabaseDBOnTopOf(ctx context.Context, log *zap.Logger, tempDB *dbutil.TempDatabase, config metabase.Config) (*metabase.DB, error) {
|
||||
db, err := metabase.Open(ctx, log.Named("metabase"), tempDB.ConnStr, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
6
scripts/testdata/satellite-config.yaml.lock
vendored
6
scripts/testdata/satellite-config.yaml.lock
vendored
@ -388,9 +388,15 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
|
||||
# maximum segment metadata size
|
||||
# metainfo.max-metadata-size: 2.0 KiB
|
||||
|
||||
# maximum number of parts object can contain
|
||||
# metainfo.max-number-of-parts: 10000
|
||||
|
||||
# maximum segment size
|
||||
# metainfo.max-segment-size: 64.0 MiB
|
||||
|
||||
# minimum allowed part size (last part has no minimum size limit)
|
||||
# metainfo.min-part-size: 5.0 MiB
|
||||
|
||||
# minimum remote segment size
|
||||
# metainfo.min-remote-segment-size: 1.2 KiB
|
||||
|
||||
|
@ -34,7 +34,7 @@ major_release_tags=$(
|
||||
grep -v rc | # remove release candidates
|
||||
sort -n -k2,2 -t'.' --unique | # only keep the largest patch version
|
||||
sort -V | # resort based using "version sort"
|
||||
tail -n 3 # kepep only last 3 releases
|
||||
tail -n 3 # kepep only last 3 releases
|
||||
)
|
||||
major_release_tags=$(echo $IMPORTANT_VERSIONS $major_release_tags )
|
||||
current_release_version=$(echo $major_release_tags | xargs -n 1 | tail -1)
|
||||
|
Loading…
Reference in New Issue
Block a user