satellite/metabase: adjust BeginSegment to use pending_objects table

Change is adjusting BeginSegment to check pending object existence in
`pending_objects` or `objects` table.

Satellite stream id is used to determine if we need to use
`pending_objects` or `objects` table.

General goal is to support both tables until `objects` table will be
free from pending objects. Whenever it will be needed code will be
supporting both tables at once.

Part of https://github.com/storj/storj/issues/6046

Change-Id: I08aaa605c23d82695fde352fdbd0a7fd11f46bb5
This commit is contained in:
Michal Niewrzal 2023-07-21 12:20:49 +02:00 committed by Storj Robot
parent cebf255d64
commit 7b2006a883
4 changed files with 177 additions and 2 deletions

View File

@ -257,6 +257,8 @@ type BeginSegment struct {
RootPieceID storj.PieceID
Pieces Pieces
UsePendingObjectsTable bool
}
// BeginSegment verifies, whether a new segment upload can be started.
@ -280,7 +282,17 @@ func (db *DB) BeginSegment(ctx context.Context, opts BeginSegment) (err error) {
// Verify that object exists and is partial.
var value int
err = db.db.QueryRowContext(ctx, `
if opts.UsePendingObjectsTable {
err = db.db.QueryRowContext(ctx, `
SELECT 1
FROM pending_objects WHERE
project_id = $1 AND
bucket_name = $2 AND
object_key = $3 AND
stream_id = $4
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.StreamID).Scan(&value)
} else {
err = db.db.QueryRowContext(ctx, `
SELECT 1
FROM objects WHERE
project_id = $1 AND
@ -289,7 +301,8 @@ func (db *DB) BeginSegment(ctx context.Context, opts BeginSegment) (err error) {
version = $4 AND
stream_id = $5 AND
status = `+pendingStatus,
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, opts.StreamID).Scan(&value)
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, opts.StreamID).Scan(&value)
}
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return ErrPendingObjectMissing.New("")

View File

@ -1136,6 +1136,156 @@ func TestBeginSegment(t *testing.T) {
},
}.Check(ctx, t, db)
})
t.Run("use pending objects table", func(t *testing.T) {
obj.Version = metabase.NextVersion
t.Run("pending object missing", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
metabasetest.BeginSegment{
Opts: metabase.BeginSegment{
ObjectStream: obj,
RootPieceID: storj.PieceID{1},
Pieces: []metabase.Piece{{
Number: 1,
StorageNode: testrand.NodeID(),
}},
UsePendingObjectsTable: true,
},
ErrClass: &metabase.ErrPendingObjectMissing,
}.Check(ctx, t, db)
metabasetest.Verify{}.Check(ctx, t, db)
})
t.Run("pending object missing when object committed", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
now := time.Now()
obj := obj
metabasetest.BeginObjectNextVersion{
Opts: metabase.BeginObjectNextVersion{
ObjectStream: obj,
Encryption: metabasetest.DefaultEncryption,
},
Version: 1,
}.Check(ctx, t, db)
obj.Version++
metabasetest.CommitObject{
Opts: metabase.CommitObject{
ObjectStream: obj,
},
}.Check(ctx, t, db)
metabasetest.BeginSegment{
Opts: metabase.BeginSegment{
ObjectStream: obj,
RootPieceID: storj.PieceID{1},
Pieces: []metabase.Piece{{
Number: 1,
StorageNode: testrand.NodeID(),
}},
UsePendingObjectsTable: true,
},
ErrClass: &metabase.ErrPendingObjectMissing,
}.Check(ctx, t, db)
metabasetest.Verify{
Objects: []metabase.RawObject{
{
ObjectStream: obj,
CreatedAt: now,
Status: metabase.Committed,
Encryption: metabasetest.DefaultEncryption,
},
},
}.Check(ctx, t, db)
})
t.Run("begin segment successfully", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
now := time.Now()
zombieDeadline := now.Add(24 * time.Hour)
metabasetest.BeginObjectNextVersion{
Opts: metabase.BeginObjectNextVersion{
ObjectStream: obj,
Encryption: metabasetest.DefaultEncryption,
UsePendingObjectsTable: true,
},
Version: 1,
}.Check(ctx, t, db)
metabasetest.BeginSegment{
Opts: metabase.BeginSegment{
ObjectStream: obj,
RootPieceID: storj.PieceID{1},
Pieces: []metabase.Piece{{
Number: 1,
StorageNode: testrand.NodeID(),
}},
UsePendingObjectsTable: true,
},
}.Check(ctx, t, db)
metabasetest.Verify{
PendingObjects: []metabase.RawPendingObject{
{
PendingObjectStream: metabasetest.ObjectStreamToPending(obj),
CreatedAt: now,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &zombieDeadline,
},
},
}.Check(ctx, t, db)
})
t.Run("multiple begin segment successfully", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
now := time.Now()
zombieDeadline := now.Add(24 * time.Hour)
obj := metabasetest.RandObjectStream()
obj.Version = metabase.NextVersion
metabasetest.BeginObjectNextVersion{
Opts: metabase.BeginObjectNextVersion{
ObjectStream: obj,
Encryption: metabasetest.DefaultEncryption,
UsePendingObjectsTable: true,
},
Version: 1,
}.Check(ctx, t, db)
for i := 0; i < 5; i++ {
metabasetest.BeginSegment{
Opts: metabase.BeginSegment{
ObjectStream: obj,
RootPieceID: testrand.PieceID(),
Pieces: []metabase.Piece{{
Number: 1,
StorageNode: testrand.NodeID(),
}},
UsePendingObjectsTable: true,
},
}.Check(ctx, t, db)
}
metabasetest.Verify{
PendingObjects: []metabase.RawPendingObject{
{
PendingObjectStream: metabasetest.ObjectStreamToPending(obj),
CreatedAt: now,
Encryption: metabasetest.DefaultEncryption,
ZombieDeletionDeadline: &zombieDeadline,
},
},
}.Check(ctx, t, db)
})
})
})
}

View File

@ -400,3 +400,13 @@ func SegmentsToRaw(segments []metabase.Segment) []metabase.RawSegment {
return rawSegments
}
// ObjectStreamToPending converts ObjectStream to PendingObjectStream.
func ObjectStreamToPending(objectStream metabase.ObjectStream) metabase.PendingObjectStream {
return metabase.PendingObjectStream{
ProjectID: objectStream.ProjectID,
BucketName: objectStream.BucketName,
ObjectKey: objectStream.ObjectKey,
StreamID: objectStream.StreamID,
}
}

View File

@ -120,6 +120,8 @@ func (endpoint *Endpoint) BeginSegment(ctx context.Context, req *pb.SegmentBegin
},
RootPieceID: rootPieceID,
Pieces: pieces,
UsePendingObjectsTable: streamID.UsePendingObjectsTable,
})
if err != nil {
return nil, endpoint.convertMetabaseErr(err)