From 591971b4dc4212aec078974b0148f2b28dbe6220 Mon Sep 17 00:00:00 2001 From: JT Olio Date: Fri, 1 Dec 2023 13:44:17 -0500 Subject: [PATCH] satellite/metainfo: reduce database hits for segment creation Change-Id: I48a748b48daefa95f1dfbf6a0d75e65a568ee36a --- satellite/metabase/commit.go | 27 +++++++++++++++----------- satellite/metainfo/batch.go | 4 +++- satellite/metainfo/endpoint_segment.go | 10 ++++++++-- 3 files changed, 27 insertions(+), 14 deletions(-) diff --git a/satellite/metabase/commit.go b/satellite/metabase/commit.go index 64a89dc51..72a465b60 100644 --- a/satellite/metabase/commit.go +++ b/satellite/metabase/commit.go @@ -224,6 +224,8 @@ type BeginSegment struct { RootPieceID storj.PieceID Pieces Pieces + + ObjectExistsChecked bool } // BeginSegment verifies, whether a new segment upload can be started. @@ -242,24 +244,27 @@ func (db *DB) BeginSegment(ctx context.Context, opts BeginSegment) (err error) { return ErrInvalidRequest.New("RootPieceID missing") } - // NOTE: this isn't strictly necessary, since we can also fail this in CommitSegment. - // however, we should prevent creating segements for non-partial objects. + if !opts.ObjectExistsChecked { + // NOTE: Find a way to safely remove this. This isn't strictly necessary, + // since we can also fail this in CommitSegment. + // We should prevent creating segements for non-partial objects. - // Verify that object exists and is partial. - var exists bool - err = db.db.QueryRowContext(ctx, ` + // Verify that object exists and is partial. + var exists bool + err = db.db.QueryRowContext(ctx, ` SELECT EXISTS ( SELECT 1 FROM objects WHERE (project_id, bucket_name, object_key, version, stream_id) = ($1, $2, $3, $4, $5) AND status = `+statusPending+` )`, - opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, opts.StreamID).Scan(&exists) - if err != nil { - return Error.New("unable to query object status: %w", err) - } - if !exists { - return ErrPendingObjectMissing.New("") + opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, opts.StreamID).Scan(&exists) + if err != nil { + return Error.New("unable to query object status: %w", err) + } + if !exists { + return ErrPendingObjectMissing.New("") + } } mon.Meter("segment_begin").Mark(1) diff --git a/satellite/metainfo/batch.go b/satellite/metainfo/batch.go index 588c44d8f..7001eb4e4 100644 --- a/satellite/metainfo/batch.go +++ b/satellite/metainfo/batch.go @@ -226,11 +226,13 @@ func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp case *pb.BatchRequestItem_SegmentBegin: singleRequest.SegmentBegin.Header = req.Header + justCreatedObject := false if singleRequest.SegmentBegin.StreamId.IsZero() && !lastStreamID.IsZero() { singleRequest.SegmentBegin.StreamId = lastStreamID + justCreatedObject = true } - response, err := endpoint.BeginSegment(ctx, singleRequest.SegmentBegin) + response, err := endpoint.beginSegment(ctx, singleRequest.SegmentBegin, justCreatedObject) if err != nil { return resp, err } diff --git a/satellite/metainfo/endpoint_segment.go b/satellite/metainfo/endpoint_segment.go index 026c1a7e3..2109758be 100644 --- a/satellite/metainfo/endpoint_segment.go +++ b/satellite/metainfo/endpoint_segment.go @@ -31,6 +31,11 @@ func calculateSpaceUsed(segmentSize int64, numberOfPieces int, rs storj.Redundan // BeginSegment begins segment uploading. func (endpoint *Endpoint) BeginSegment(ctx context.Context, req *pb.SegmentBeginRequest) (resp *pb.SegmentBeginResponse, err error) { defer mon.Task()(&ctx)(&err) + return endpoint.beginSegment(ctx, req, false) +} + +func (endpoint *Endpoint) beginSegment(ctx context.Context, req *pb.SegmentBeginRequest, objectJustCreated bool) (resp *pb.SegmentBeginResponse, err error) { + defer mon.Task()(&ctx)(&err) endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName()) @@ -121,8 +126,9 @@ func (endpoint *Endpoint) BeginSegment(ctx context.Context, req *pb.SegmentBegin Part: uint32(req.Position.PartNumber), Index: uint32(req.Position.Index), }, - RootPieceID: rootPieceID, - Pieces: pieces, + RootPieceID: rootPieceID, + Pieces: pieces, + ObjectExistsChecked: objectJustCreated, }) if err != nil { return nil, endpoint.convertMetabaseErr(err)