From cebf255d6457f95b58d4d4f241311d3c154667d1 Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Fri, 21 Jul 2023 11:36:59 +0200 Subject: [PATCH] satellite/metabase: adjust BeginObjectNextVersion to use pending_objects Change is adjusting BeginObjectNextVersion to create pending object in `pending_objects` or `objects` table depends on configuration. This is first change to move pending objects from objects table. General goal is to support both tables until `objects` table will be free from pending objects. Whenever it will be needed code will be supporting both tables at once. To be able to decide if we need to use `pending_objects` table or `objects` table we extend satellite stream id to keep that information for later use. BeginObjectExactVersion will be not adjusted because at the moment it's used only in tests. Part of https://github.com/storj/storj/issues/6046 Change-Id: Ibf21965f63cca5e1775469994a29f1fd1261af4e --- satellite/internalpb/metainfo_sat.pb.go | 90 +++++----- satellite/internalpb/metainfo_sat.proto | 4 + satellite/metabase/commit.go | 84 ++++++--- satellite/metabase/commit_test.go | 178 ++++++++++++++++++++ satellite/metabase/metabasetest/common.go | 8 + satellite/metainfo/config.go | 2 + satellite/metainfo/endpoint_object.go | 4 + scripts/testdata/satellite-config.yaml.lock | 3 + 8 files changed, 309 insertions(+), 64 deletions(-) diff --git a/satellite/internalpb/metainfo_sat.pb.go b/satellite/internalpb/metainfo_sat.pb.go index d21384c3e..80156e837 100644 --- a/satellite/internalpb/metainfo_sat.pb.go +++ b/satellite/internalpb/metainfo_sat.pb.go @@ -36,9 +36,12 @@ type StreamID struct { SatelliteSignature []byte `protobuf:"bytes,9,opt,name=satellite_signature,json=satelliteSignature,proto3" json:"satellite_signature,omitempty"` StreamId []byte `protobuf:"bytes,10,opt,name=stream_id,json=streamId,proto3" json:"stream_id,omitempty"` Placement int32 `protobuf:"varint,13,opt,name=placement,proto3" json:"placement,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + // temporary field to determine if we should go with new pending_objects table or + // fallback to pending object in objects table. + UsePendingObjectsTable bool `protobuf:"varint,14,opt,name=use_pending_objects_table,json=usePendingObjectsTable,proto3" json:"use_pending_objects_table,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *StreamID) Reset() { *m = StreamID{} } @@ -135,6 +138,13 @@ func (m *StreamID) GetPlacement() int32 { return 0 } +func (m *StreamID) GetUsePendingObjectsTable() bool { + if m != nil { + return m.UsePendingObjectsTable + } + return false +} + type SegmentID struct { StreamId *StreamID `protobuf:"bytes,1,opt,name=stream_id,json=streamId,proto3" json:"stream_id,omitempty"` PartNumber int32 `protobuf:"varint,2,opt,name=part_number,json=partNumber,proto3" json:"part_number,omitempty"` @@ -222,40 +232,42 @@ func init() { func init() { proto.RegisterFile("metainfo_sat.proto", fileDescriptor_47c60bd892d94aaf) } var fileDescriptor_47c60bd892d94aaf = []byte{ - // 548 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x52, 0xcf, 0x6e, 0xd3, 0x4e, - 0x10, 0xfe, 0xf9, 0x17, 0xe5, 0xdf, 0x26, 0x69, 0xaa, 0x6d, 0x8a, 0xac, 0x50, 0x14, 0xab, 0x08, - 0xc9, 0x5c, 0x6c, 0xd4, 0x9e, 0x38, 0x12, 0x85, 0x43, 0xc4, 0x9f, 0x16, 0x07, 0x2e, 0x5c, 0xac, - 0xb5, 0x3d, 0xb5, 0xb6, 0xb5, 0x77, 0xad, 0xdd, 0x09, 0x6a, 0x8e, 0xbc, 0x01, 0x8f, 0xc5, 0x33, - 0x70, 0x28, 0x6f, 0xc1, 0x19, 0x79, 0x1d, 0x3b, 0x91, 0x68, 0x0f, 0x70, 0x9b, 0xf9, 0xe6, 0xdb, - 0x6f, 0x76, 0xbe, 0x19, 0x42, 0x73, 0x40, 0xc6, 0xc5, 0x95, 0x0c, 0x35, 0x43, 0xaf, 0x50, 0x12, - 0x25, 0xa5, 0x9a, 0x21, 0x64, 0x19, 0x47, 0xf0, 0xea, 0xea, 0xf4, 0x10, 0x44, 0xac, 0x36, 0x05, - 0x72, 0x29, 0x2a, 0xd6, 0x94, 0xa4, 0x32, 0x95, 0xdb, 0x78, 0x96, 0x4a, 0x99, 0x66, 0xe0, 0x9b, - 0x2c, 0x5a, 0x5f, 0xf9, 0xc8, 0x73, 0xd0, 0xc8, 0xf2, 0x62, 0x4b, 0x38, 0xa8, 0x85, 0xaa, 0xfc, - 0xf4, 0x57, 0x8b, 0xf4, 0x56, 0xa8, 0x80, 0xe5, 0xcb, 0x05, 0x7d, 0x44, 0x3a, 0xd1, 0x3a, 0xbe, - 0x01, 0xb4, 0x2d, 0xc7, 0x72, 0x87, 0xc1, 0x36, 0xa3, 0x2f, 0xc8, 0x64, 0xdb, 0x15, 0x92, 0x50, - 0x46, 0xd7, 0x10, 0x63, 0x78, 0x03, 0x1b, 0xfb, 0x7f, 0xc3, 0xa2, 0x4d, 0xed, 0xc2, 0x94, 0xde, - 0xc0, 0x86, 0xda, 0xa4, 0xfb, 0x05, 0x94, 0xe6, 0x52, 0xd8, 0x2d, 0xc7, 0x72, 0x5b, 0x41, 0x9d, - 0xd2, 0x4f, 0xe4, 0x78, 0x37, 0x41, 0x58, 0x30, 0xc5, 0x72, 0x40, 0x50, 0xda, 0x1e, 0x3a, 0x96, - 0x3b, 0x38, 0x73, 0xbc, 0xbd, 0xf9, 0x5e, 0x37, 0xe1, 0x65, 0xc3, 0x0b, 0x26, 0x70, 0x0f, 0x4a, - 0x97, 0x64, 0x14, 0x2b, 0x60, 0x46, 0x34, 0x61, 0x08, 0x76, 0xdb, 0xc8, 0x4d, 0xbd, 0xca, 0x10, - 0xaf, 0x36, 0xc4, 0xfb, 0x58, 0x1b, 0x32, 0xef, 0x7d, 0xbf, 0x9b, 0xfd, 0xf7, 0xed, 0xe7, 0xcc, - 0x0a, 0x86, 0xf5, 0xd3, 0x05, 0x43, 0xa0, 0xef, 0xc8, 0x18, 0x6e, 0x0b, 0xae, 0xf6, 0xc4, 0x3a, - 0x7f, 0x21, 0x76, 0xb0, 0x7b, 0x6c, 0xe4, 0x9e, 0x93, 0xc3, 0x7c, 0x9d, 0x21, 0x2f, 0x98, 0xc2, - 0xad, 0x79, 0xf6, 0xc0, 0xb1, 0xdc, 0x5e, 0x30, 0x6e, 0xf0, 0xca, 0x38, 0xea, 0x93, 0xa3, 0x66, - 0xe3, 0xa1, 0xe6, 0xa9, 0x60, 0xb8, 0x56, 0x60, 0xf7, 0x2b, 0x9b, 0x9b, 0xd2, 0xaa, 0xae, 0xd0, - 0xc7, 0xa4, 0xaf, 0xcd, 0xf2, 0x42, 0x9e, 0xd8, 0xc4, 0xd0, 0x7a, 0x15, 0xb0, 0x4c, 0xe8, 0x09, - 0xe9, 0x17, 0x19, 0x8b, 0x21, 0x07, 0x81, 0xf6, 0xc8, 0xb1, 0xdc, 0x76, 0xb0, 0x03, 0x4e, 0xbf, - 0xb6, 0x48, 0x7f, 0x05, 0x69, 0x19, 0x2f, 0x17, 0xf4, 0xe5, 0xbe, 0x90, 0x65, 0xa6, 0x3d, 0xf1, - 0xfe, 0xbc, 0x3e, 0xaf, 0x3e, 0x95, 0xbd, 0x36, 0x33, 0x32, 0x30, 0xa3, 0x89, 0x75, 0x1e, 0x81, - 0x32, 0x37, 0xd1, 0x0e, 0x48, 0x09, 0xbd, 0x37, 0x08, 0x9d, 0x90, 0x36, 0x17, 0x09, 0xdc, 0x9a, - 0x4b, 0x68, 0x07, 0x55, 0x42, 0xcf, 0xc9, 0x48, 0x49, 0x89, 0x61, 0xc1, 0x21, 0x86, 0xb2, 0x6b, - 0xb9, 0xb0, 0xe1, 0x7c, 0x5c, 0xfa, 0xf8, 0xe3, 0x6e, 0xd6, 0xbd, 0x2c, 0xf1, 0xe5, 0x22, 0x18, - 0x94, 0xac, 0x2a, 0x49, 0xe8, 0x07, 0x72, 0x2c, 0x15, 0x4f, 0xb9, 0x60, 0x59, 0x28, 0x55, 0x02, - 0x2a, 0xcc, 0x78, 0xce, 0x51, 0xdb, 0x1d, 0xa7, 0xe5, 0x0e, 0xce, 0x9e, 0xec, 0x3e, 0xfa, 0x2a, - 0x49, 0x14, 0x68, 0x0d, 0xc9, 0x45, 0x49, 0x7b, 0x5b, 0xb2, 0x82, 0xa3, 0xfa, 0xed, 0x0e, 0xbb, - 0xe7, 0x70, 0xba, 0xff, 0x7c, 0x38, 0x0f, 0xac, 0xaf, 0xf7, 0xd0, 0xfa, 0xe6, 0xcf, 0x3e, 0x3f, - 0xd5, 0x28, 0xd5, 0xb5, 0xc7, 0xa5, 0x6f, 0x02, 0xbf, 0x21, 0xf9, 0x5c, 0x20, 0x28, 0xc1, 0xb2, - 0x22, 0x8a, 0x3a, 0xe6, 0x0f, 0xe7, 0xbf, 0x03, 0x00, 0x00, 0xff, 0xff, 0x43, 0x2e, 0x40, 0x6c, - 0x23, 0x04, 0x00, 0x00, + // 580 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x52, 0xcb, 0x6e, 0xd3, 0x40, + 0x14, 0xc5, 0x84, 0xa4, 0xc9, 0x24, 0x7d, 0x68, 0xfa, 0xd0, 0x10, 0x8a, 0x62, 0x15, 0x21, 0x99, + 0x8d, 0x8d, 0xda, 0x55, 0x97, 0x44, 0x61, 0x11, 0xf1, 0x68, 0x70, 0xca, 0x86, 0x8d, 0x35, 0xb6, + 0x6f, 0xad, 0x69, 0xed, 0x19, 0x6b, 0x66, 0x8c, 0x9a, 0x25, 0x7f, 0xc0, 0xaf, 0xf0, 0x17, 0x7c, + 0x03, 0x8b, 0xf2, 0x2b, 0xc8, 0xe3, 0x47, 0x22, 0xd1, 0x2e, 0x60, 0x37, 0xf7, 0xdc, 0x33, 0x67, + 0xce, 0x9c, 0x7b, 0x11, 0xce, 0x40, 0x53, 0xc6, 0xaf, 0x44, 0xa0, 0xa8, 0x76, 0x73, 0x29, 0xb4, + 0xc0, 0x58, 0x51, 0x0d, 0x69, 0xca, 0x34, 0xb8, 0x4d, 0x77, 0xbc, 0x07, 0x3c, 0x92, 0xab, 0x5c, + 0x33, 0xc1, 0x2b, 0xd6, 0x18, 0x25, 0x22, 0x11, 0xf5, 0x79, 0x92, 0x08, 0x91, 0xa4, 0xe0, 0x99, + 0x2a, 0x2c, 0xae, 0x3c, 0xcd, 0x32, 0x50, 0x9a, 0x66, 0x79, 0x4d, 0xd8, 0x69, 0x84, 0xaa, 0xfa, + 0xe4, 0xc7, 0x13, 0xd4, 0x5f, 0x6a, 0x09, 0x34, 0x9b, 0xcf, 0xf0, 0x11, 0xea, 0x85, 0x45, 0x74, + 0x03, 0x9a, 0x58, 0xb6, 0xe5, 0x8c, 0xfc, 0xba, 0xc2, 0xaf, 0xd1, 0x41, 0xfd, 0x2a, 0xc4, 0x81, + 0x08, 0xaf, 0x21, 0xd2, 0xc1, 0x0d, 0xac, 0xc8, 0x63, 0xc3, 0xc2, 0x6d, 0xef, 0xc2, 0xb4, 0xde, + 0xc1, 0x0a, 0x13, 0xb4, 0xf5, 0x15, 0xa4, 0x62, 0x82, 0x93, 0x8e, 0x6d, 0x39, 0x1d, 0xbf, 0x29, + 0xf1, 0x67, 0x74, 0xb8, 0xfe, 0x41, 0x90, 0x53, 0x49, 0x33, 0xd0, 0x20, 0x15, 0x19, 0xd9, 0x96, + 0x33, 0x3c, 0xb5, 0xdd, 0x8d, 0xff, 0xbd, 0x6d, 0x8f, 0x8b, 0x96, 0xe7, 0x1f, 0xc0, 0x3d, 0x28, + 0x9e, 0xa3, 0xed, 0x48, 0x02, 0x35, 0xa2, 0x31, 0xd5, 0x40, 0xba, 0x46, 0x6e, 0xec, 0x56, 0x81, + 0xb8, 0x4d, 0x20, 0xee, 0x65, 0x13, 0xc8, 0xb4, 0xff, 0xf3, 0x6e, 0xf2, 0xe8, 0xfb, 0xef, 0x89, + 0xe5, 0x8f, 0x9a, 0xab, 0x33, 0xaa, 0x01, 0x7f, 0x40, 0xbb, 0x70, 0x9b, 0x33, 0xb9, 0x21, 0xd6, + 0xfb, 0x07, 0xb1, 0x9d, 0xf5, 0x65, 0x23, 0xf7, 0x0a, 0xed, 0x65, 0x45, 0xaa, 0x59, 0x4e, 0xa5, + 0xae, 0xc3, 0x23, 0x43, 0xdb, 0x72, 0xfa, 0xfe, 0x6e, 0x8b, 0x57, 0xc1, 0x61, 0x0f, 0xed, 0xb7, + 0x13, 0x0f, 0x14, 0x4b, 0x38, 0xd5, 0x85, 0x04, 0x32, 0xa8, 0x62, 0x6e, 0x5b, 0xcb, 0xa6, 0x83, + 0x9f, 0xa1, 0x81, 0x32, 0xc3, 0x0b, 0x58, 0x4c, 0x90, 0xa1, 0xf5, 0x2b, 0x60, 0x1e, 0xe3, 0x63, + 0x34, 0xc8, 0x53, 0x1a, 0x41, 0x06, 0x5c, 0x93, 0x6d, 0xdb, 0x72, 0xba, 0xfe, 0x1a, 0xc0, 0xe7, + 0xe8, 0x69, 0xa1, 0x20, 0xc8, 0x81, 0xc7, 0x8c, 0x27, 0xb5, 0x31, 0x15, 0x68, 0x1a, 0xa6, 0x40, + 0x76, 0x8c, 0xbf, 0xa3, 0x42, 0xc1, 0xa2, 0xea, 0x57, 0x06, 0xd5, 0x65, 0xd9, 0x3d, 0xf9, 0xd6, + 0x41, 0x83, 0x25, 0x24, 0xa5, 0xcc, 0x7c, 0x86, 0xcf, 0x37, 0x3d, 0x58, 0x26, 0xa8, 0x63, 0xf7, + 0xef, 0xc5, 0x75, 0x9b, 0x2d, 0xdb, 0x70, 0x38, 0x41, 0x43, 0x93, 0x0a, 0x2f, 0xb2, 0x10, 0xa4, + 0x59, 0xa7, 0xae, 0x8f, 0x4a, 0xe8, 0xa3, 0x41, 0xf0, 0x01, 0xea, 0x32, 0x1e, 0xc3, 0xad, 0x59, + 0xa2, 0xae, 0x5f, 0x15, 0xf8, 0x0c, 0x6d, 0x4b, 0x21, 0x74, 0x90, 0x33, 0x88, 0xa0, 0x7c, 0xb5, + 0x9c, 0xf5, 0x68, 0xba, 0x5b, 0x8e, 0xe0, 0xd7, 0xdd, 0x64, 0x6b, 0x51, 0xe2, 0xf3, 0x99, 0x3f, + 0x2c, 0x59, 0x55, 0x11, 0xe3, 0x4f, 0xe8, 0x50, 0x48, 0x96, 0x30, 0x4e, 0xd3, 0x40, 0xc8, 0x18, + 0x64, 0x90, 0xb2, 0x8c, 0x69, 0x45, 0x7a, 0x76, 0xc7, 0x19, 0x9e, 0x3e, 0x5f, 0x1b, 0x7d, 0x13, + 0xc7, 0x12, 0x94, 0x82, 0xf8, 0xa2, 0xa4, 0xbd, 0x2f, 0x59, 0xfe, 0x7e, 0x73, 0x77, 0x8d, 0xdd, + 0xb3, 0x73, 0x5b, 0xff, 0xbd, 0x73, 0x0f, 0x4c, 0xbe, 0xff, 0xd0, 0xe4, 0xa7, 0x2f, 0xbf, 0xbc, + 0x50, 0x5a, 0xc8, 0x6b, 0x97, 0x09, 0xcf, 0x1c, 0xbc, 0x96, 0xe4, 0x31, 0xae, 0x41, 0x72, 0x9a, + 0xe6, 0x61, 0xd8, 0x33, 0x1e, 0xce, 0xfe, 0x04, 0x00, 0x00, 0xff, 0xff, 0xec, 0x7c, 0xdf, 0x4d, + 0x5e, 0x04, 0x00, 0x00, } diff --git a/satellite/internalpb/metainfo_sat.proto b/satellite/internalpb/metainfo_sat.proto index da576d77c..36d06544a 100644 --- a/satellite/internalpb/metainfo_sat.proto +++ b/satellite/internalpb/metainfo_sat.proto @@ -28,6 +28,10 @@ message StreamID { bytes stream_id = 10; int32 placement = 13; + + // temporary field to determine if we should go with new pending_objects table or + // fallback to pending object in objects table. + bool use_pending_objects_table = 14; } message SegmentID { diff --git a/satellite/metabase/commit.go b/satellite/metabase/commit.go index 17f4ffd2f..0587f46d1 100644 --- a/satellite/metabase/commit.go +++ b/satellite/metabase/commit.go @@ -46,6 +46,11 @@ type BeginObjectNextVersion struct { EncryptedMetadataEncryptedKey []byte // optional Encryption storj.EncryptionParameters + + // UsePendingObjectsTable was added to options not metabase configuration + // to be able to test scenarios with pending object in pending_objects and + // objects table with the same test case. + UsePendingObjectsTable bool } // Verify verifies get object request fields. @@ -67,6 +72,7 @@ func (opts *BeginObjectNextVersion) Verify() error { } // BeginObjectNextVersion adds a pending object to the database, with automatically assigned version. +// TODO at the end of transition to pending_objects table we can rename this metod to just BeginObject. func (db *DB) BeginObjectNextVersion(ctx context.Context, opts BeginObjectNextVersion) (object Object, err error) { defer mon.Task()(&ctx)(&err) @@ -91,31 +97,59 @@ func (db *DB) BeginObjectNextVersion(ctx context.Context, opts BeginObjectNextVe ZombieDeletionDeadline: opts.ZombieDeletionDeadline, } - if err := db.db.QueryRowContext(ctx, ` - INSERT INTO objects ( - project_id, bucket_name, object_key, version, stream_id, - expires_at, encryption, - zombie_deletion_deadline, - encrypted_metadata, encrypted_metadata_nonce, encrypted_metadata_encrypted_key - ) VALUES ( - $1, $2, $3, - coalesce(( - SELECT version + 1 - FROM objects - WHERE project_id = $1 AND bucket_name = $2 AND object_key = $3 - ORDER BY version DESC - LIMIT 1 - ), 1), - $4, $5, $6, - $7, - $8, $9, $10) - RETURNING status, version, created_at - `, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.StreamID, - opts.ExpiresAt, encryptionParameters{&opts.Encryption}, - opts.ZombieDeletionDeadline, - opts.EncryptedMetadata, opts.EncryptedMetadataNonce, opts.EncryptedMetadataEncryptedKey, - ).Scan(&object.Status, &object.Version, &object.CreatedAt); err != nil { - return Object{}, Error.New("unable to insert object: %w", err) + if opts.UsePendingObjectsTable { + object.Status = Pending + object.Version = DefaultVersion + + if err := db.db.QueryRowContext(ctx, ` + INSERT INTO pending_objects ( + project_id, bucket_name, object_key, stream_id, + expires_at, encryption, + zombie_deletion_deadline, + encrypted_metadata, encrypted_metadata_nonce, encrypted_metadata_encrypted_key + ) VALUES ( + $1, $2, $3, $4, + $5, $6, + $7, + $8, $9, $10) + RETURNING created_at + `, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.StreamID, + opts.ExpiresAt, encryptionParameters{&opts.Encryption}, + opts.ZombieDeletionDeadline, + opts.EncryptedMetadata, opts.EncryptedMetadataNonce, opts.EncryptedMetadataEncryptedKey, + ).Scan(&object.CreatedAt); err != nil { + if code := pgerrcode.FromError(err); code == pgxerrcode.UniqueViolation { + return Object{}, Error.Wrap(ErrObjectAlreadyExists.New("")) + } + return Object{}, Error.New("unable to insert object: %w", err) + } + } else { + if err := db.db.QueryRowContext(ctx, ` + INSERT INTO objects ( + project_id, bucket_name, object_key, version, stream_id, + expires_at, encryption, + zombie_deletion_deadline, + encrypted_metadata, encrypted_metadata_nonce, encrypted_metadata_encrypted_key + ) VALUES ( + $1, $2, $3, + coalesce(( + SELECT version + 1 + FROM objects + WHERE project_id = $1 AND bucket_name = $2 AND object_key = $3 + ORDER BY version DESC + LIMIT 1 + ), 1), + $4, $5, $6, + $7, + $8, $9, $10) + RETURNING status, version, created_at + `, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.StreamID, + opts.ExpiresAt, encryptionParameters{&opts.Encryption}, + opts.ZombieDeletionDeadline, + opts.EncryptedMetadata, opts.EncryptedMetadataNonce, opts.EncryptedMetadataEncryptedKey, + ).Scan(&object.Status, &object.Version, &object.CreatedAt); err != nil { + return Object{}, Error.New("unable to insert object: %w", err) + } } mon.Meter("object_begin").Mark(1) diff --git a/satellite/metabase/commit_test.go b/satellite/metabase/commit_test.go index 5ea419915..6b73af37d 100644 --- a/satellite/metabase/commit_test.go +++ b/satellite/metabase/commit_test.go @@ -338,6 +338,184 @@ func TestBeginObjectNextVersion(t *testing.T) { }) } +func TestBeginObjectNextVersion_PendingObjects(t *testing.T) { + // TODO when we stop storing pending objects in objects tabe we will be able + // to merge this tests with TestBeginObjectNextVersion + metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + obj := metabasetest.RandObjectStream() + + objectStream := metabase.ObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + StreamID: obj.StreamID, + } + + t.Run("object already exists", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + now1 := time.Now() + zombieDeadline := now1.Add(24 * time.Hour) + futureTime := now1.Add(10 * 24 * time.Hour) + + objectStream.Version = metabase.NextVersion + + metabasetest.BeginObjectNextVersion{ + Opts: metabase.BeginObjectNextVersion{ + ObjectStream: objectStream, + Encryption: metabasetest.DefaultEncryption, + + UsePendingObjectsTable: true, + }, + Version: metabase.DefaultVersion, + }.Check(ctx, t, db) + + metabasetest.BeginObjectNextVersion{ + Opts: metabase.BeginObjectNextVersion{ + ObjectStream: objectStream, + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &futureTime, + + UsePendingObjectsTable: true, + }, + Version: metabase.DefaultVersion, + ErrClass: &metabase.ErrObjectAlreadyExists, + }.Check(ctx, t, db) + + metabasetest.Verify{ + PendingObjects: []metabase.RawPendingObject{ + { + PendingObjectStream: metabase.PendingObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + StreamID: obj.StreamID, + }, + CreatedAt: now1, + ExpiresAt: nil, + + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieDeadline, + }, + }, + }.Check(ctx, t, db) + }) + + t.Run("multiple versions", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + now1 := time.Now() + zombieDeadline := now1.Add(24 * time.Hour) + futureTime := now1.Add(10 * 24 * time.Hour) + + objectStream.Version = metabase.NextVersion + + metabasetest.BeginObjectNextVersion{ + Opts: metabase.BeginObjectNextVersion{ + ObjectStream: objectStream, + Encryption: metabasetest.DefaultEncryption, + + UsePendingObjectsTable: true, + }, + Version: metabase.DefaultVersion, + }.Check(ctx, t, db) + + now2 := time.Now() + + secondObjectStream := objectStream + secondObjectStream.StreamID = testrand.UUID() + metabasetest.BeginObjectNextVersion{ + Opts: metabase.BeginObjectNextVersion{ + ObjectStream: secondObjectStream, + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &futureTime, + + UsePendingObjectsTable: true, + }, + Version: metabase.DefaultVersion, + }.Check(ctx, t, db) + + metabasetest.Verify{ + PendingObjects: []metabase.RawPendingObject{ + { + PendingObjectStream: metabase.PendingObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + StreamID: obj.StreamID, + }, + CreatedAt: now1, + ExpiresAt: nil, + + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieDeadline, + }, + { + PendingObjectStream: metabase.PendingObjectStream{ + ProjectID: secondObjectStream.ProjectID, + BucketName: secondObjectStream.BucketName, + ObjectKey: secondObjectStream.ObjectKey, + StreamID: secondObjectStream.StreamID, + }, + CreatedAt: now2, + + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &futureTime, + }, + }, + }.Check(ctx, t, db) + }) + + t.Run("begin object next version with metadata", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + now := time.Now() + zombieDeadline := now.Add(24 * time.Hour) + + objectStream.Version = metabase.NextVersion + + encryptedMetadata := testrand.BytesInt(64) + encryptedMetadataNonce := testrand.Nonce() + encryptedMetadataEncryptedKey := testrand.BytesInt(32) + + metabasetest.BeginObjectNextVersion{ + Opts: metabase.BeginObjectNextVersion{ + ObjectStream: objectStream, + Encryption: metabasetest.DefaultEncryption, + + EncryptedMetadata: encryptedMetadata, + EncryptedMetadataNonce: encryptedMetadataNonce[:], + EncryptedMetadataEncryptedKey: encryptedMetadataEncryptedKey, + + UsePendingObjectsTable: true, + }, + Version: metabase.DefaultVersion, + }.Check(ctx, t, db) + + metabasetest.Verify{ + PendingObjects: []metabase.RawPendingObject{ + { + PendingObjectStream: metabase.PendingObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + StreamID: obj.StreamID, + }, + CreatedAt: now, + + EncryptedMetadata: encryptedMetadata, + EncryptedMetadataNonce: encryptedMetadataNonce[:], + EncryptedMetadataEncryptedKey: encryptedMetadataEncryptedKey, + + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieDeadline, + }, + }, + }.Check(ctx, t, db) + }) + }) +} + func TestBeginObjectExactVersion(t *testing.T) { metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { obj := metabasetest.RandObjectStream() diff --git a/satellite/metabase/metabasetest/common.go b/satellite/metabase/metabasetest/common.go index 1c18f756b..064eb410f 100644 --- a/satellite/metabase/metabasetest/common.go +++ b/satellite/metabase/metabasetest/common.go @@ -37,6 +37,8 @@ func (step Verify) Check(ctx *testcontext.Context, t testing.TB, db *metabase.DB sortRawObjects(state.Objects) sortRawObjects(step.Objects) + sortRawPendingObjects(state.PendingObjects) + sortRawPendingObjects(step.PendingObjects) sortRawSegments(state.Segments) sortRawSegments(step.Segments) sortRawCopies(state.Copies) @@ -69,6 +71,12 @@ func sortRawObjects(objects []metabase.RawObject) { }) } +func sortRawPendingObjects(objects []metabase.RawPendingObject) { + sort.Slice(objects, func(i, j int) bool { + return objects[i].StreamID.Less(objects[j].StreamID) + }) +} + func sortRawSegments(segments []metabase.RawSegment) { sort.Slice(segments, func(i, j int) bool { if segments[i].StreamID == segments[j].StreamID { diff --git a/satellite/metainfo/config.go b/satellite/metainfo/config.go index 6ac341dd2..4e5aef5f3 100644 --- a/satellite/metainfo/config.go +++ b/satellite/metainfo/config.go @@ -147,6 +147,8 @@ type Config struct { ServerSideCopyDisabled bool `help:"disable already enabled server-side copy. this is because once server side copy is enabled, delete code should stay changed, even if you want to disable server side copy" default:"false"` ServerSideCopyDuplicateMetadata bool `help:"perform server-side copy by duplicating metadata, instead of using segment_copies" default:"false"` + UsePendingObjectsTable bool `help:"enable new flow for upload which is using pending_objects table" default:"false"` + // TODO remove when we benchmarking are done and decision is made. TestListingQuery bool `default:"false" help:"test the new query for non-recursive listing"` } diff --git a/satellite/metainfo/endpoint_object.go b/satellite/metainfo/endpoint_object.go index 186f3bc44..b624cfc97 100644 --- a/satellite/metainfo/endpoint_object.go +++ b/satellite/metainfo/endpoint_object.go @@ -139,6 +139,8 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe EncryptedMetadata: req.EncryptedMetadata, EncryptedMetadataEncryptedKey: req.EncryptedMetadataEncryptedKey, EncryptedMetadataNonce: nonce, + + UsePendingObjectsTable: endpoint.config.UsePendingObjectsTable, }) if err != nil { return nil, endpoint.convertMetabaseErr(err) @@ -154,6 +156,8 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe MultipartObject: object.FixedSegmentSize <= 0, EncryptionParameters: req.EncryptionParameters, Placement: int32(placement), + + UsePendingObjectsTable: endpoint.config.UsePendingObjectsTable, }) if err != nil { endpoint.log.Error("internal", zap.Error(err)) diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index 06c750447..9b3152295 100755 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -706,6 +706,9 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key # how often we can upload to the single object (the same location) per API instance # metainfo.upload-limiter.single-object-limit: 1s +# enable new flow for upload which is using pending_objects table +# metainfo.use-pending-objects-table: false + # address(es) to send telemetry to (comma-separated) # metrics.addr: collectora.storj.io:9000