From bc79f01aaab54de31faec5761142d43a41eec7f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Niewrza=C5=82?= Date: Thu, 10 Jun 2021 12:08:21 +0200 Subject: [PATCH] satellite/metabase: set expires_at while committing segment We added expires_at column to segments table and now we need to populate this column while committing segment. We still need to migrate existing segments with separate tool. Change-Id: Ibac8c63d97201dd98cc2cb9db385f4cb73bc3f7e --- satellite/metabase/commit.go | 50 ++++---- satellite/metabase/commit_test.go | 135 ++++++++++++++++++++++ satellite/metabase/delete_objects_test.go | 1 + satellite/metabase/metabasetest/create.go | 1 + satellite/metainfo/metainfo.go | 12 ++ satellite/metainfo/metainfo_test.go | 30 +++++ 6 files changed, 206 insertions(+), 23 deletions(-) diff --git a/satellite/metabase/commit.go b/satellite/metabase/commit.go index a8a32ad72..c7e4923b2 100644 --- a/satellite/metabase/commit.go +++ b/satellite/metabase/commit.go @@ -237,6 +237,8 @@ type CommitSegment struct { Position SegmentPosition RootPieceID storj.PieceID + ExpiresAt *time.Time + EncryptedKeyNonce []byte EncryptedKey []byte @@ -292,7 +294,7 @@ func (db *DB) CommitSegment(ctx context.Context, opts CommitSegment) (err error) // Verify that object exists and is partial. _, err = db.db.ExecContext(ctx, ` INSERT INTO segments ( - stream_id, position, + stream_id, position, expires_at, root_piece_id, encrypted_key_nonce, encrypted_key, encrypted_size, plain_offset, plain_size, encrypted_etag, redundancy, @@ -300,18 +302,18 @@ func (db *DB) CommitSegment(ctx context.Context, opts CommitSegment) (err error) ) VALUES ( (SELECT stream_id FROM objects WHERE - project_id = $11 AND - bucket_name = $12 AND - object_key = $13 AND - version = $14 AND - stream_id = $15 AND + project_id = $12 AND + bucket_name = $13 AND + object_key = $14 AND + version = $15 AND + stream_id = $16 AND status = `+pendingStatus+ - ` ), $1, - $2, $3, $4, - $5, $6, $7, $8, - $9, - $10 - )`, opts.Position, + ` ), $1, $2, + $3, $4, $5, + $6, $7, $8, $9, + $10, + $11 + )`, opts.Position, opts.ExpiresAt, opts.RootPieceID, opts.EncryptedKeyNonce, opts.EncryptedKey, opts.EncryptedSize, opts.PlainOffset, opts.PlainSize, opts.EncryptedETag, redundancyScheme{&opts.Redundancy}, @@ -340,6 +342,8 @@ type CommitInlineSegment struct { Position SegmentPosition + ExpiresAt *time.Time + EncryptedKeyNonce []byte EncryptedKey []byte @@ -375,24 +379,24 @@ func (db *DB) CommitInlineSegment(ctx context.Context, opts CommitInlineSegment) // Verify that object exists and is partial. _, err = db.db.ExecContext(ctx, ` INSERT INTO segments ( - stream_id, position, + stream_id, position, expires_at, root_piece_id, encrypted_key_nonce, encrypted_key, encrypted_size, plain_offset, plain_size, encrypted_etag, inline_data ) VALUES ( (SELECT stream_id FROM objects WHERE - project_id = $10 AND - bucket_name = $11 AND - object_key = $12 AND - version = $13 AND - stream_id = $14 AND + project_id = $11 AND + bucket_name = $12 AND + object_key = $13 AND + version = $14 AND + stream_id = $15 AND status = `+pendingStatus+ - ` ), $1, - $2, $3, $4, - $5, $6, $7, $8, - $9 - )`, opts.Position, + ` ), $1, $2, + $3, $4, $5, + $6, $7, $8, $9, + $10 + )`, opts.Position, opts.ExpiresAt, storj.PieceID{}, opts.EncryptedKeyNonce, opts.EncryptedKey, len(opts.InlineData), opts.PlainOffset, opts.PlainSize, opts.EncryptedETag, opts.InlineData, diff --git a/satellite/metabase/commit_test.go b/satellite/metabase/commit_test.go index 28a3d328c..516826a7c 100644 --- a/satellite/metabase/commit_test.go +++ b/satellite/metabase/commit_test.go @@ -1290,6 +1290,77 @@ func TestCommitSegment(t *testing.T) { }.Check(ctx, t, db) }) + t.Run("commit segment of object with expires at", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + rootPieceID := testrand.PieceID() + pieces := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}} + encryptedKey := testrand.Bytes(32) + encryptedKeyNonce := testrand.Bytes(32) + + now := time.Now() + expectedExpiresAt := now.Add(33 * time.Hour) + zombieDeadline := now.Add(24 * time.Hour) + metabasetest.BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: obj, + Encryption: metabasetest.DefaultEncryption, + ExpiresAt: &expectedExpiresAt, + }, + Version: obj.Version, + }.Check(ctx, t, db) + + metabasetest.CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + ExpiresAt: &expectedExpiresAt, + RootPieceID: rootPieceID, + Pieces: pieces, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: 0, + Redundancy: metabasetest.DefaultRedundancy, + }, + }.Check(ctx, t, db) + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: obj, + CreatedAt: now, + ExpiresAt: &expectedExpiresAt, + Status: metabase.Pending, + + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieDeadline, + }, + }, + Segments: []metabase.RawSegment{ + { + StreamID: obj.StreamID, + CreatedAt: &now, + ExpiresAt: &expectedExpiresAt, + + RootPieceID: rootPieceID, + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + EncryptedSize: 1024, + PlainOffset: 0, + PlainSize: 512, + + Redundancy: metabasetest.DefaultRedundancy, + + Pieces: pieces, + }, + }, + }.Check(ctx, t, db) + }) + t.Run("commit segment of pending object", func(t *testing.T) { defer metabasetest.DeleteAll{}.Check(ctx, t, db) @@ -1705,6 +1776,70 @@ func TestCommitInlineSegment(t *testing.T) { }, }.Check(ctx, t, db) }) + + t.Run("commit segment of object with expires at", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + encryptedKey := testrand.Bytes(32) + encryptedKeyNonce := testrand.Bytes(32) + encryptedETag := testrand.Bytes(32) + + now := time.Now() + expectedExpiresAt := now.Add(33 * time.Hour) + metabasetest.BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: obj, + Encryption: metabasetest.DefaultEncryption, + ExpiresAt: &expectedExpiresAt, + }, + Version: obj.Version, + }.Check(ctx, t, db) + + metabasetest.CommitInlineSegment{ + Opts: metabase.CommitInlineSegment{ + ObjectStream: obj, + ExpiresAt: &expectedExpiresAt, + InlineData: []byte{1, 2, 3}, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + PlainSize: 512, + PlainOffset: 0, + EncryptedETag: encryptedETag, + }, + }.Check(ctx, t, db) + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: obj, + CreatedAt: now, + ExpiresAt: &expectedExpiresAt, + Status: metabase.Pending, + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieDeadline, + }, + }, + Segments: []metabase.RawSegment{ + { + StreamID: obj.StreamID, + CreatedAt: &now, + ExpiresAt: &expectedExpiresAt, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + PlainOffset: 0, + PlainSize: 512, + + InlineData: []byte{1, 2, 3}, + EncryptedSize: 3, + EncryptedETag: encryptedETag, + }, + }, + }.Check(ctx, t, db) + }) }) } diff --git a/satellite/metabase/delete_objects_test.go b/satellite/metabase/delete_objects_test.go index 118aa556c..9a61abb9c 100644 --- a/satellite/metabase/delete_objects_test.go +++ b/satellite/metabase/delete_objects_test.go @@ -146,6 +146,7 @@ func TestDeleteExpiredObjects(t *testing.T) { expectedObj3Segment := expectedObj1Segment expectedObj3Segment.StreamID = obj3.StreamID + expectedObj3Segment.ExpiresAt = &futureTime metabasetest.DeleteExpiredObjects{ Opts: metabase.DeleteExpiredObjects{ diff --git a/satellite/metabase/metabasetest/create.go b/satellite/metabase/metabasetest/create.go index f7496fef0..06453e3b1 100644 --- a/satellite/metabase/metabasetest/create.go +++ b/satellite/metabase/metabasetest/create.go @@ -226,6 +226,7 @@ func (co CreateTestObject) Run(ctx *testcontext.Context, t testing.TB, db *metab CommitSegment{ Opts: metabase.CommitSegment{ ObjectStream: obj, + ExpiresAt: boeOpts.ExpiresAt, Position: metabase.SegmentPosition{Part: 0, Index: uint32(i)}, RootPieceID: storj.PieceID{1}, Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}}, diff --git a/satellite/metainfo/metainfo.go b/satellite/metainfo/metainfo.go index 0c9891911..914e3e80a 100644 --- a/satellite/metainfo/metainfo.go +++ b/satellite/metainfo/metainfo.go @@ -1775,6 +1775,11 @@ func (endpoint *Endpoint) commitSegment(ctx context.Context, req *pb.SegmentComm return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) } + var expiresAt *time.Time + if !streamID.ExpirationDate.IsZero() { + expiresAt = &streamID.ExpirationDate + } + mbCommitSegment := metabase.CommitSegment{ ObjectStream: metabase.ObjectStream{ ProjectID: keyInfo.ProjectID, @@ -1783,6 +1788,7 @@ func (endpoint *Endpoint) commitSegment(ctx context.Context, req *pb.SegmentComm StreamID: id, Version: 1, }, + ExpiresAt: expiresAt, EncryptedKey: req.EncryptedKey, EncryptedKeyNonce: req.EncryptedKeyNonce[:], @@ -1910,6 +1916,11 @@ func (endpoint *Endpoint) makeInlineSegment(ctx context.Context, req *pb.Segment return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) } + var expiresAt *time.Time + if !streamID.ExpirationDate.IsZero() { + expiresAt = &streamID.ExpirationDate + } + err = endpoint.metainfo.metabaseDB.CommitInlineSegment(ctx, metabase.CommitInlineSegment{ ObjectStream: metabase.ObjectStream{ ProjectID: keyInfo.ProjectID, @@ -1918,6 +1929,7 @@ func (endpoint *Endpoint) makeInlineSegment(ctx context.Context, req *pb.Segment StreamID: id, Version: 1, }, + ExpiresAt: expiresAt, EncryptedKey: req.EncryptedKey, EncryptedKeyNonce: req.EncryptedKeyNonce.Bytes(), diff --git a/satellite/metainfo/metainfo_test.go b/satellite/metainfo/metainfo_test.go index 73f758c4c..607a9bab9 100644 --- a/satellite/metainfo/metainfo_test.go +++ b/satellite/metainfo/metainfo_test.go @@ -1934,3 +1934,33 @@ func TestStableUploadID(t *testing.T) { assert.Equal(t, listResp[0].StreamID, listResp4.Items[0].StreamID) }) } + +func TestObjectSegmentExpiresAt(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + inlineData := testrand.Bytes(1 * memory.KiB) + inlineExpiration := time.Now().Add(2 * time.Hour) + err := planet.Uplinks[0].UploadWithExpiration(ctx, planet.Satellites[0], "hohoho", "inline_object", inlineData, inlineExpiration) + require.NoError(t, err) + + remoteData := testrand.Bytes(10 * memory.KiB) + remoteExpiration := time.Now().Add(4 * time.Hour) + err = planet.Uplinks[0].UploadWithExpiration(ctx, planet.Satellites[0], "hohoho", "remote_object", remoteData, remoteExpiration) + require.NoError(t, err) + + segments, err := planet.Satellites[0].Metainfo.Metabase.TestingAllSegments(ctx) + require.NoError(t, err) + require.Len(t, segments, 2) + + for _, segment := range segments { + if int(segment.PlainSize) == len(inlineData) { + require.Equal(t, inlineExpiration.Unix(), segment.ExpiresAt.Unix()) + } else if int(segment.PlainSize) == len(remoteData) { + require.Equal(t, remoteExpiration.Unix(), segment.ExpiresAt.Unix()) + } else { + t.Fail() + } + } + }) +}