satellite/metabase: adjust BeginObjectNextVersion to use pending_objects

Change is adjusting BeginObjectNextVersion to create pending object in
`pending_objects` or `objects` table depends on configuration. This is
first change to move pending objects from objects table.

General goal is to support both tables until `objects` table will be
free from pending objects. Whenever it will be needed code will be
supporting both tables at once.

To be able to decide if we need to use `pending_objects` table or
`objects` table we extend satellite stream id to keep that information
for later use.

BeginObjectExactVersion will be not adjusted because at the moment it's
used only in tests.

Part of https://github.com/storj/storj/issues/6046

Change-Id: Ibf21965f63cca5e1775469994a29f1fd1261af4e
This commit is contained in:
Michal Niewrzal 2023-07-21 11:36:59 +02:00 committed by Storj Robot
parent 391a63f9fa
commit cebf255d64
8 changed files with 309 additions and 64 deletions

View File

@ -36,9 +36,12 @@ type StreamID struct {
SatelliteSignature []byte `protobuf:"bytes,9,opt,name=satellite_signature,json=satelliteSignature,proto3" json:"satellite_signature,omitempty"`
StreamId []byte `protobuf:"bytes,10,opt,name=stream_id,json=streamId,proto3" json:"stream_id,omitempty"`
Placement int32 `protobuf:"varint,13,opt,name=placement,proto3" json:"placement,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
// temporary field to determine if we should go with new pending_objects table or
// fallback to pending object in objects table.
UsePendingObjectsTable bool `protobuf:"varint,14,opt,name=use_pending_objects_table,json=usePendingObjectsTable,proto3" json:"use_pending_objects_table,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *StreamID) Reset() { *m = StreamID{} }
@ -135,6 +138,13 @@ func (m *StreamID) GetPlacement() int32 {
return 0
}
func (m *StreamID) GetUsePendingObjectsTable() bool {
if m != nil {
return m.UsePendingObjectsTable
}
return false
}
type SegmentID struct {
StreamId *StreamID `protobuf:"bytes,1,opt,name=stream_id,json=streamId,proto3" json:"stream_id,omitempty"`
PartNumber int32 `protobuf:"varint,2,opt,name=part_number,json=partNumber,proto3" json:"part_number,omitempty"`
@ -222,40 +232,42 @@ func init() {
func init() { proto.RegisterFile("metainfo_sat.proto", fileDescriptor_47c60bd892d94aaf) }
var fileDescriptor_47c60bd892d94aaf = []byte{
// 548 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x52, 0xcf, 0x6e, 0xd3, 0x4e,
0x10, 0xfe, 0xf9, 0x17, 0xe5, 0xdf, 0x26, 0x69, 0xaa, 0x6d, 0x8a, 0xac, 0x50, 0x14, 0xab, 0x08,
0xc9, 0x5c, 0x6c, 0xd4, 0x9e, 0x38, 0x12, 0x85, 0x43, 0xc4, 0x9f, 0x16, 0x07, 0x2e, 0x5c, 0xac,
0xb5, 0x3d, 0xb5, 0xb6, 0xb5, 0x77, 0xad, 0xdd, 0x09, 0x6a, 0x8e, 0xbc, 0x01, 0x8f, 0xc5, 0x33,
0x70, 0x28, 0x6f, 0xc1, 0x19, 0x79, 0x1d, 0x3b, 0x91, 0x68, 0x0f, 0x70, 0x9b, 0xf9, 0xe6, 0xdb,
0x6f, 0x76, 0xbe, 0x19, 0x42, 0x73, 0x40, 0xc6, 0xc5, 0x95, 0x0c, 0x35, 0x43, 0xaf, 0x50, 0x12,
0x25, 0xa5, 0x9a, 0x21, 0x64, 0x19, 0x47, 0xf0, 0xea, 0xea, 0xf4, 0x10, 0x44, 0xac, 0x36, 0x05,
0x72, 0x29, 0x2a, 0xd6, 0x94, 0xa4, 0x32, 0x95, 0xdb, 0x78, 0x96, 0x4a, 0x99, 0x66, 0xe0, 0x9b,
0x2c, 0x5a, 0x5f, 0xf9, 0xc8, 0x73, 0xd0, 0xc8, 0xf2, 0x62, 0x4b, 0x38, 0xa8, 0x85, 0xaa, 0xfc,
0xf4, 0x57, 0x8b, 0xf4, 0x56, 0xa8, 0x80, 0xe5, 0xcb, 0x05, 0x7d, 0x44, 0x3a, 0xd1, 0x3a, 0xbe,
0x01, 0xb4, 0x2d, 0xc7, 0x72, 0x87, 0xc1, 0x36, 0xa3, 0x2f, 0xc8, 0x64, 0xdb, 0x15, 0x92, 0x50,
0x46, 0xd7, 0x10, 0x63, 0x78, 0x03, 0x1b, 0xfb, 0x7f, 0xc3, 0xa2, 0x4d, 0xed, 0xc2, 0x94, 0xde,
0xc0, 0x86, 0xda, 0xa4, 0xfb, 0x05, 0x94, 0xe6, 0x52, 0xd8, 0x2d, 0xc7, 0x72, 0x5b, 0x41, 0x9d,
0xd2, 0x4f, 0xe4, 0x78, 0x37, 0x41, 0x58, 0x30, 0xc5, 0x72, 0x40, 0x50, 0xda, 0x1e, 0x3a, 0x96,
0x3b, 0x38, 0x73, 0xbc, 0xbd, 0xf9, 0x5e, 0x37, 0xe1, 0x65, 0xc3, 0x0b, 0x26, 0x70, 0x0f, 0x4a,
0x97, 0x64, 0x14, 0x2b, 0x60, 0x46, 0x34, 0x61, 0x08, 0x76, 0xdb, 0xc8, 0x4d, 0xbd, 0xca, 0x10,
0xaf, 0x36, 0xc4, 0xfb, 0x58, 0x1b, 0x32, 0xef, 0x7d, 0xbf, 0x9b, 0xfd, 0xf7, 0xed, 0xe7, 0xcc,
0x0a, 0x86, 0xf5, 0xd3, 0x05, 0x43, 0xa0, 0xef, 0xc8, 0x18, 0x6e, 0x0b, 0xae, 0xf6, 0xc4, 0x3a,
0x7f, 0x21, 0x76, 0xb0, 0x7b, 0x6c, 0xe4, 0x9e, 0x93, 0xc3, 0x7c, 0x9d, 0x21, 0x2f, 0x98, 0xc2,
0xad, 0x79, 0xf6, 0xc0, 0xb1, 0xdc, 0x5e, 0x30, 0x6e, 0xf0, 0xca, 0x38, 0xea, 0x93, 0xa3, 0x66,
0xe3, 0xa1, 0xe6, 0xa9, 0x60, 0xb8, 0x56, 0x60, 0xf7, 0x2b, 0x9b, 0x9b, 0xd2, 0xaa, 0xae, 0xd0,
0xc7, 0xa4, 0xaf, 0xcd, 0xf2, 0x42, 0x9e, 0xd8, 0xc4, 0xd0, 0x7a, 0x15, 0xb0, 0x4c, 0xe8, 0x09,
0xe9, 0x17, 0x19, 0x8b, 0x21, 0x07, 0x81, 0xf6, 0xc8, 0xb1, 0xdc, 0x76, 0xb0, 0x03, 0x4e, 0xbf,
0xb6, 0x48, 0x7f, 0x05, 0x69, 0x19, 0x2f, 0x17, 0xf4, 0xe5, 0xbe, 0x90, 0x65, 0xa6, 0x3d, 0xf1,
0xfe, 0xbc, 0x3e, 0xaf, 0x3e, 0x95, 0xbd, 0x36, 0x33, 0x32, 0x30, 0xa3, 0x89, 0x75, 0x1e, 0x81,
0x32, 0x37, 0xd1, 0x0e, 0x48, 0x09, 0xbd, 0x37, 0x08, 0x9d, 0x90, 0x36, 0x17, 0x09, 0xdc, 0x9a,
0x4b, 0x68, 0x07, 0x55, 0x42, 0xcf, 0xc9, 0x48, 0x49, 0x89, 0x61, 0xc1, 0x21, 0x86, 0xb2, 0x6b,
0xb9, 0xb0, 0xe1, 0x7c, 0x5c, 0xfa, 0xf8, 0xe3, 0x6e, 0xd6, 0xbd, 0x2c, 0xf1, 0xe5, 0x22, 0x18,
0x94, 0xac, 0x2a, 0x49, 0xe8, 0x07, 0x72, 0x2c, 0x15, 0x4f, 0xb9, 0x60, 0x59, 0x28, 0x55, 0x02,
0x2a, 0xcc, 0x78, 0xce, 0x51, 0xdb, 0x1d, 0xa7, 0xe5, 0x0e, 0xce, 0x9e, 0xec, 0x3e, 0xfa, 0x2a,
0x49, 0x14, 0x68, 0x0d, 0xc9, 0x45, 0x49, 0x7b, 0x5b, 0xb2, 0x82, 0xa3, 0xfa, 0xed, 0x0e, 0xbb,
0xe7, 0x70, 0xba, 0xff, 0x7c, 0x38, 0x0f, 0xac, 0xaf, 0xf7, 0xd0, 0xfa, 0xe6, 0xcf, 0x3e, 0x3f,
0xd5, 0x28, 0xd5, 0xb5, 0xc7, 0xa5, 0x6f, 0x02, 0xbf, 0x21, 0xf9, 0x5c, 0x20, 0x28, 0xc1, 0xb2,
0x22, 0x8a, 0x3a, 0xe6, 0x0f, 0xe7, 0xbf, 0x03, 0x00, 0x00, 0xff, 0xff, 0x43, 0x2e, 0x40, 0x6c,
0x23, 0x04, 0x00, 0x00,
// 580 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x52, 0xcb, 0x6e, 0xd3, 0x40,
0x14, 0xc5, 0x84, 0xa4, 0xc9, 0x24, 0x7d, 0x68, 0xfa, 0xd0, 0x10, 0x8a, 0x62, 0x15, 0x21, 0x99,
0x8d, 0x8d, 0xda, 0x55, 0x97, 0x44, 0x61, 0x11, 0xf1, 0x68, 0x70, 0xca, 0x86, 0x8d, 0x35, 0xb6,
0x6f, 0xad, 0x69, 0xed, 0x19, 0x6b, 0x66, 0x8c, 0x9a, 0x25, 0x7f, 0xc0, 0xaf, 0xf0, 0x17, 0x7c,
0x03, 0x8b, 0xf2, 0x2b, 0xc8, 0xe3, 0x47, 0x22, 0xd1, 0x2e, 0x60, 0x37, 0xf7, 0xdc, 0x33, 0x67,
0xce, 0x9c, 0x7b, 0x11, 0xce, 0x40, 0x53, 0xc6, 0xaf, 0x44, 0xa0, 0xa8, 0x76, 0x73, 0x29, 0xb4,
0xc0, 0x58, 0x51, 0x0d, 0x69, 0xca, 0x34, 0xb8, 0x4d, 0x77, 0xbc, 0x07, 0x3c, 0x92, 0xab, 0x5c,
0x33, 0xc1, 0x2b, 0xd6, 0x18, 0x25, 0x22, 0x11, 0xf5, 0x79, 0x92, 0x08, 0x91, 0xa4, 0xe0, 0x99,
0x2a, 0x2c, 0xae, 0x3c, 0xcd, 0x32, 0x50, 0x9a, 0x66, 0x79, 0x4d, 0xd8, 0x69, 0x84, 0xaa, 0xfa,
0xe4, 0xc7, 0x13, 0xd4, 0x5f, 0x6a, 0x09, 0x34, 0x9b, 0xcf, 0xf0, 0x11, 0xea, 0x85, 0x45, 0x74,
0x03, 0x9a, 0x58, 0xb6, 0xe5, 0x8c, 0xfc, 0xba, 0xc2, 0xaf, 0xd1, 0x41, 0xfd, 0x2a, 0xc4, 0x81,
0x08, 0xaf, 0x21, 0xd2, 0xc1, 0x0d, 0xac, 0xc8, 0x63, 0xc3, 0xc2, 0x6d, 0xef, 0xc2, 0xb4, 0xde,
0xc1, 0x0a, 0x13, 0xb4, 0xf5, 0x15, 0xa4, 0x62, 0x82, 0x93, 0x8e, 0x6d, 0x39, 0x1d, 0xbf, 0x29,
0xf1, 0x67, 0x74, 0xb8, 0xfe, 0x41, 0x90, 0x53, 0x49, 0x33, 0xd0, 0x20, 0x15, 0x19, 0xd9, 0x96,
0x33, 0x3c, 0xb5, 0xdd, 0x8d, 0xff, 0xbd, 0x6d, 0x8f, 0x8b, 0x96, 0xe7, 0x1f, 0xc0, 0x3d, 0x28,
0x9e, 0xa3, 0xed, 0x48, 0x02, 0x35, 0xa2, 0x31, 0xd5, 0x40, 0xba, 0x46, 0x6e, 0xec, 0x56, 0x81,
0xb8, 0x4d, 0x20, 0xee, 0x65, 0x13, 0xc8, 0xb4, 0xff, 0xf3, 0x6e, 0xf2, 0xe8, 0xfb, 0xef, 0x89,
0xe5, 0x8f, 0x9a, 0xab, 0x33, 0xaa, 0x01, 0x7f, 0x40, 0xbb, 0x70, 0x9b, 0x33, 0xb9, 0x21, 0xd6,
0xfb, 0x07, 0xb1, 0x9d, 0xf5, 0x65, 0x23, 0xf7, 0x0a, 0xed, 0x65, 0x45, 0xaa, 0x59, 0x4e, 0xa5,
0xae, 0xc3, 0x23, 0x43, 0xdb, 0x72, 0xfa, 0xfe, 0x6e, 0x8b, 0x57, 0xc1, 0x61, 0x0f, 0xed, 0xb7,
0x13, 0x0f, 0x14, 0x4b, 0x38, 0xd5, 0x85, 0x04, 0x32, 0xa8, 0x62, 0x6e, 0x5b, 0xcb, 0xa6, 0x83,
0x9f, 0xa1, 0x81, 0x32, 0xc3, 0x0b, 0x58, 0x4c, 0x90, 0xa1, 0xf5, 0x2b, 0x60, 0x1e, 0xe3, 0x63,
0x34, 0xc8, 0x53, 0x1a, 0x41, 0x06, 0x5c, 0x93, 0x6d, 0xdb, 0x72, 0xba, 0xfe, 0x1a, 0xc0, 0xe7,
0xe8, 0x69, 0xa1, 0x20, 0xc8, 0x81, 0xc7, 0x8c, 0x27, 0xb5, 0x31, 0x15, 0x68, 0x1a, 0xa6, 0x40,
0x76, 0x8c, 0xbf, 0xa3, 0x42, 0xc1, 0xa2, 0xea, 0x57, 0x06, 0xd5, 0x65, 0xd9, 0x3d, 0xf9, 0xd6,
0x41, 0x83, 0x25, 0x24, 0xa5, 0xcc, 0x7c, 0x86, 0xcf, 0x37, 0x3d, 0x58, 0x26, 0xa8, 0x63, 0xf7,
0xef, 0xc5, 0x75, 0x9b, 0x2d, 0xdb, 0x70, 0x38, 0x41, 0x43, 0x93, 0x0a, 0x2f, 0xb2, 0x10, 0xa4,
0x59, 0xa7, 0xae, 0x8f, 0x4a, 0xe8, 0xa3, 0x41, 0xf0, 0x01, 0xea, 0x32, 0x1e, 0xc3, 0xad, 0x59,
0xa2, 0xae, 0x5f, 0x15, 0xf8, 0x0c, 0x6d, 0x4b, 0x21, 0x74, 0x90, 0x33, 0x88, 0xa0, 0x7c, 0xb5,
0x9c, 0xf5, 0x68, 0xba, 0x5b, 0x8e, 0xe0, 0xd7, 0xdd, 0x64, 0x6b, 0x51, 0xe2, 0xf3, 0x99, 0x3f,
0x2c, 0x59, 0x55, 0x11, 0xe3, 0x4f, 0xe8, 0x50, 0x48, 0x96, 0x30, 0x4e, 0xd3, 0x40, 0xc8, 0x18,
0x64, 0x90, 0xb2, 0x8c, 0x69, 0x45, 0x7a, 0x76, 0xc7, 0x19, 0x9e, 0x3e, 0x5f, 0x1b, 0x7d, 0x13,
0xc7, 0x12, 0x94, 0x82, 0xf8, 0xa2, 0xa4, 0xbd, 0x2f, 0x59, 0xfe, 0x7e, 0x73, 0x77, 0x8d, 0xdd,
0xb3, 0x73, 0x5b, 0xff, 0xbd, 0x73, 0x0f, 0x4c, 0xbe, 0xff, 0xd0, 0xe4, 0xa7, 0x2f, 0xbf, 0xbc,
0x50, 0x5a, 0xc8, 0x6b, 0x97, 0x09, 0xcf, 0x1c, 0xbc, 0x96, 0xe4, 0x31, 0xae, 0x41, 0x72, 0x9a,
0xe6, 0x61, 0xd8, 0x33, 0x1e, 0xce, 0xfe, 0x04, 0x00, 0x00, 0xff, 0xff, 0xec, 0x7c, 0xdf, 0x4d,
0x5e, 0x04, 0x00, 0x00,
}

View File

@ -28,6 +28,10 @@ message StreamID {
bytes stream_id = 10;
int32 placement = 13;
// temporary field to determine if we should go with new pending_objects table or
// fallback to pending object in objects table.
bool use_pending_objects_table = 14;
}
message SegmentID {

View File

@ -46,6 +46,11 @@ type BeginObjectNextVersion struct {
EncryptedMetadataEncryptedKey []byte // optional
Encryption storj.EncryptionParameters
// UsePendingObjectsTable was added to options not metabase configuration
// to be able to test scenarios with pending object in pending_objects and
// objects table with the same test case.
UsePendingObjectsTable bool
}
// Verify verifies get object request fields.
@ -67,6 +72,7 @@ func (opts *BeginObjectNextVersion) Verify() error {
}
// BeginObjectNextVersion adds a pending object to the database, with automatically assigned version.
// TODO at the end of transition to pending_objects table we can rename this metod to just BeginObject.
func (db *DB) BeginObjectNextVersion(ctx context.Context, opts BeginObjectNextVersion) (object Object, err error) {
defer mon.Task()(&ctx)(&err)
@ -91,31 +97,59 @@ func (db *DB) BeginObjectNextVersion(ctx context.Context, opts BeginObjectNextVe
ZombieDeletionDeadline: opts.ZombieDeletionDeadline,
}
if err := db.db.QueryRowContext(ctx, `
INSERT INTO objects (
project_id, bucket_name, object_key, version, stream_id,
expires_at, encryption,
zombie_deletion_deadline,
encrypted_metadata, encrypted_metadata_nonce, encrypted_metadata_encrypted_key
) VALUES (
$1, $2, $3,
coalesce((
SELECT version + 1
FROM objects
WHERE project_id = $1 AND bucket_name = $2 AND object_key = $3
ORDER BY version DESC
LIMIT 1
), 1),
$4, $5, $6,
$7,
$8, $9, $10)
RETURNING status, version, created_at
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.StreamID,
opts.ExpiresAt, encryptionParameters{&opts.Encryption},
opts.ZombieDeletionDeadline,
opts.EncryptedMetadata, opts.EncryptedMetadataNonce, opts.EncryptedMetadataEncryptedKey,
).Scan(&object.Status, &object.Version, &object.CreatedAt); err != nil {
return Object{}, Error.New("unable to insert object: %w", err)
if opts.UsePendingObjectsTable {
object.Status = Pending
object.Version = DefaultVersion
if err := db.db.QueryRowContext(ctx, `
INSERT INTO pending_objects (
project_id, bucket_name, object_key, stream_id,
expires_at, encryption,
zombie_deletion_deadline,
encrypted_metadata, encrypted_metadata_nonce, encrypted_metadata_encrypted_key
) VALUES (
$1, $2, $3, $4,
$5, $6,
$7,
$8, $9, $10)
RETURNING created_at
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.StreamID,
opts.ExpiresAt, encryptionParameters{&opts.Encryption},
opts.ZombieDeletionDeadline,
opts.EncryptedMetadata, opts.EncryptedMetadataNonce, opts.EncryptedMetadataEncryptedKey,
).Scan(&object.CreatedAt); err != nil {
if code := pgerrcode.FromError(err); code == pgxerrcode.UniqueViolation {
return Object{}, Error.Wrap(ErrObjectAlreadyExists.New(""))
}
return Object{}, Error.New("unable to insert object: %w", err)
}
} else {
if err := db.db.QueryRowContext(ctx, `
INSERT INTO objects (
project_id, bucket_name, object_key, version, stream_id,
expires_at, encryption,
zombie_deletion_deadline,
encrypted_metadata, encrypted_metadata_nonce, encrypted_metadata_encrypted_key
) VALUES (
$1, $2, $3,
coalesce((
SELECT version + 1
FROM objects
WHERE project_id = $1 AND bucket_name = $2 AND object_key = $3
ORDER BY version DESC
LIMIT 1
), 1),
$4, $5, $6,
$7,
$8, $9, $10)
RETURNING status, version, created_at
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.StreamID,
opts.ExpiresAt, encryptionParameters{&opts.Encryption},
opts.ZombieDeletionDeadline,
opts.EncryptedMetadata, opts.EncryptedMetadataNonce, opts.EncryptedMetadataEncryptedKey,
).Scan(&object.Status, &object.Version, &object.CreatedAt); err != nil {
return Object{}, Error.New("unable to insert object: %w", err)
}
}
mon.Meter("object_begin").Mark(1)

View File

@ -338,6 +338,184 @@ func TestBeginObjectNextVersion(t *testing.T) {
})
}
func TestBeginObjectNextVersion_PendingObjects(t *testing.T) {
// TODO when we stop storing pending objects in objects tabe we will be able
// to merge this tests with TestBeginObjectNextVersion
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
obj := metabasetest.RandObjectStream()
objectStream := metabase.ObjectStream{
ProjectID: obj.ProjectID,
BucketName: obj.BucketName,
ObjectKey: obj.ObjectKey,
StreamID: obj.StreamID,
}
t.Run("object already exists", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
now1 := time.Now()
zombieDeadline := now1.Add(24 * time.Hour)
futureTime := now1.Add(10 * 24 * time.Hour)
objectStream.Version = metabase.NextVersion
metabasetest.BeginObjectNextVersion{
Opts: metabase.BeginObjectNextVersion{
ObjectStream: objectStream,
Encryption: metabasetest.DefaultEncryption,
UsePendingObjectsTable: true,
},
Version: metabase.DefaultVersion,
}.Check(ctx, t, db)
metabasetest.BeginObjectNextVersion{
Opts: metabase.BeginObjectNextVersion{
ObjectStream: objectStream,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &futureTime,
UsePendingObjectsTable: true,
},
Version: metabase.DefaultVersion,
ErrClass: &metabase.ErrObjectAlreadyExists,
}.Check(ctx, t, db)
metabasetest.Verify{
PendingObjects: []metabase.RawPendingObject{
{
PendingObjectStream: metabase.PendingObjectStream{
ProjectID: obj.ProjectID,
BucketName: obj.BucketName,
ObjectKey: obj.ObjectKey,
StreamID: obj.StreamID,
},
CreatedAt: now1,
ExpiresAt: nil,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &zombieDeadline,
},
},
}.Check(ctx, t, db)
})
t.Run("multiple versions", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
now1 := time.Now()
zombieDeadline := now1.Add(24 * time.Hour)
futureTime := now1.Add(10 * 24 * time.Hour)
objectStream.Version = metabase.NextVersion
metabasetest.BeginObjectNextVersion{
Opts: metabase.BeginObjectNextVersion{
ObjectStream: objectStream,
Encryption: metabasetest.DefaultEncryption,
UsePendingObjectsTable: true,
},
Version: metabase.DefaultVersion,
}.Check(ctx, t, db)
now2 := time.Now()
secondObjectStream := objectStream
secondObjectStream.StreamID = testrand.UUID()
metabasetest.BeginObjectNextVersion{
Opts: metabase.BeginObjectNextVersion{
ObjectStream: secondObjectStream,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &futureTime,
UsePendingObjectsTable: true,
},
Version: metabase.DefaultVersion,
}.Check(ctx, t, db)
metabasetest.Verify{
PendingObjects: []metabase.RawPendingObject{
{
PendingObjectStream: metabase.PendingObjectStream{
ProjectID: obj.ProjectID,
BucketName: obj.BucketName,
ObjectKey: obj.ObjectKey,
StreamID: obj.StreamID,
},
CreatedAt: now1,
ExpiresAt: nil,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &zombieDeadline,
},
{
PendingObjectStream: metabase.PendingObjectStream{
ProjectID: secondObjectStream.ProjectID,
BucketName: secondObjectStream.BucketName,
ObjectKey: secondObjectStream.ObjectKey,
StreamID: secondObjectStream.StreamID,
},
CreatedAt: now2,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &futureTime,
},
},
}.Check(ctx, t, db)
})
t.Run("begin object next version with metadata", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
now := time.Now()
zombieDeadline := now.Add(24 * time.Hour)
objectStream.Version = metabase.NextVersion
encryptedMetadata := testrand.BytesInt(64)
encryptedMetadataNonce := testrand.Nonce()
encryptedMetadataEncryptedKey := testrand.BytesInt(32)
metabasetest.BeginObjectNextVersion{
Opts: metabase.BeginObjectNextVersion{
ObjectStream: objectStream,
Encryption: metabasetest.DefaultEncryption,
EncryptedMetadata: encryptedMetadata,
EncryptedMetadataNonce: encryptedMetadataNonce[:],
EncryptedMetadataEncryptedKey: encryptedMetadataEncryptedKey,
UsePendingObjectsTable: true,
},
Version: metabase.DefaultVersion,
}.Check(ctx, t, db)
metabasetest.Verify{
PendingObjects: []metabase.RawPendingObject{
{
PendingObjectStream: metabase.PendingObjectStream{
ProjectID: obj.ProjectID,
BucketName: obj.BucketName,
ObjectKey: obj.ObjectKey,
StreamID: obj.StreamID,
},
CreatedAt: now,
EncryptedMetadata: encryptedMetadata,
EncryptedMetadataNonce: encryptedMetadataNonce[:],
EncryptedMetadataEncryptedKey: encryptedMetadataEncryptedKey,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &zombieDeadline,
},
},
}.Check(ctx, t, db)
})
})
}
func TestBeginObjectExactVersion(t *testing.T) {
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
obj := metabasetest.RandObjectStream()

View File

@ -37,6 +37,8 @@ func (step Verify) Check(ctx *testcontext.Context, t testing.TB, db *metabase.DB
sortRawObjects(state.Objects)
sortRawObjects(step.Objects)
sortRawPendingObjects(state.PendingObjects)
sortRawPendingObjects(step.PendingObjects)
sortRawSegments(state.Segments)
sortRawSegments(step.Segments)
sortRawCopies(state.Copies)
@ -69,6 +71,12 @@ func sortRawObjects(objects []metabase.RawObject) {
})
}
func sortRawPendingObjects(objects []metabase.RawPendingObject) {
sort.Slice(objects, func(i, j int) bool {
return objects[i].StreamID.Less(objects[j].StreamID)
})
}
func sortRawSegments(segments []metabase.RawSegment) {
sort.Slice(segments, func(i, j int) bool {
if segments[i].StreamID == segments[j].StreamID {

View File

@ -147,6 +147,8 @@ type Config struct {
ServerSideCopyDisabled bool `help:"disable already enabled server-side copy. this is because once server side copy is enabled, delete code should stay changed, even if you want to disable server side copy" default:"false"`
ServerSideCopyDuplicateMetadata bool `help:"perform server-side copy by duplicating metadata, instead of using segment_copies" default:"false"`
UsePendingObjectsTable bool `help:"enable new flow for upload which is using pending_objects table" default:"false"`
// TODO remove when we benchmarking are done and decision is made.
TestListingQuery bool `default:"false" help:"test the new query for non-recursive listing"`
}

View File

@ -139,6 +139,8 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe
EncryptedMetadata: req.EncryptedMetadata,
EncryptedMetadataEncryptedKey: req.EncryptedMetadataEncryptedKey,
EncryptedMetadataNonce: nonce,
UsePendingObjectsTable: endpoint.config.UsePendingObjectsTable,
})
if err != nil {
return nil, endpoint.convertMetabaseErr(err)
@ -154,6 +156,8 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe
MultipartObject: object.FixedSegmentSize <= 0,
EncryptionParameters: req.EncryptionParameters,
Placement: int32(placement),
UsePendingObjectsTable: endpoint.config.UsePendingObjectsTable,
})
if err != nil {
endpoint.log.Error("internal", zap.Error(err))

View File

@ -706,6 +706,9 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
# how often we can upload to the single object (the same location) per API instance
# metainfo.upload-limiter.single-object-limit: 1s
# enable new flow for upload which is using pending_objects table
# metainfo.use-pending-objects-table: false
# address(es) to send telemetry to (comma-separated)
# metrics.addr: collectora.storj.io:9000