satellite/meta{base,info}: reduce db round trips for download

This combines the ListStreamPositions and GetSegmentByPosition
calls with a ListSegments call that now knows how to return
only the segments within a Range, just like ListStreamPositions.

It would theoretically be possible to also include the
GetObjectLastCommitted call by having it do one of three
queries based on the incoming request range, but that would
mean duplicating the data for the object in every single
row that is returned for each segment in the range.

One gross thing that ListSegments has to do now is update the
first segment returned with the information from any ancestor
segment because GetSegmentByPosition used to do that. It only
updates the first segment so that it doesn't do O(N) database
queries. It seems difficult to have it do a single query to
update all of the segments at once. I'm not certain this change
should be merged on this basis alone.

This change has made me think a couple of things should happen:

1. Server side copy with ancestor segments strikes again
   making the code less clear and potentially more buggy
   or inefficient for a rare case (empirically <0.1%)

2. The download code requests individual segments from
   the satellite lazily as part of its download which
   requires the satellite telling it the locations of
   all of the segments which requires the satellite
   querying the locations of all of the segments. Instead
   the download RPC could return the orders for all of
   the segments for a range and the download code could
   issue N download calls rather than 1 download call and
   N get segment calls. I believe both sides of the code
   paths would be simpler and more efficient this way.

3. In looking at the timing information for downloads when
   testing this, we really need to focus on getting the
   auth key and bandwidth limit verification times down.
   Here's the timing I saw:

       - 42ms: validate auth
       - 52ms: bandwidth usage checking
       - 14ms: get object info
       - 26ms: get segment position info
       - 26ms: getting the first segment full info
       - 20ms: unaccounted for by spans
       - 6ms:  creating the orders

   This change will remove 26ms, but there's a good 90ms
   in just validation. With improved semantics hitting the
   database only once and improved validation, a download
   rpc taking ~30ms seems doable compared to our current
   ~200ms.

Change-Id: I4109dba082eaedb79e634c61dbf86efa93ab1222
This commit is contained in:
Jeff Wendling 2023-03-23 16:04:58 -04:00 committed by Storj Robot
parent 54107264ca
commit 7b06575f6f
3 changed files with 125 additions and 28 deletions

View File

@ -19,6 +19,12 @@ type ListSegments struct {
StreamID uuid.UUID
Cursor SegmentPosition
Limit int
Range *StreamRange
// This causes ListSegments to update the first Segment in the response
// with the ancestor info if it exists and server side copy is enabled.
UpdateFirstWithAncestor bool
}
// ListSegmentsResult result of listing segments.
@ -41,23 +47,46 @@ func (db *DB) ListSegments(ctx context.Context, opts ListSegments) (result ListS
ListLimit.Ensure(&opts.Limit)
err = withRows(db.db.QueryContext(ctx, `
SELECT
position,
created_at,
expires_at,
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size, plain_offset, plain_size,
encrypted_etag,
redundancy,
inline_data, remote_alias_pieces
FROM segments
WHERE
stream_id = $1 AND
($2 = 0::INT8 OR position > $2)
ORDER BY stream_id, position ASC
LIMIT $3
`, opts.StreamID, opts.Cursor, opts.Limit+1))(func(rows tagsql.Rows) error {
if opts.Range != nil {
if opts.Range.PlainStart > opts.Range.PlainLimit {
return ListSegmentsResult{}, ErrInvalidRequest.New("invalid range: %d:%d", opts.Range.PlainStart, opts.Range.PlainLimit)
}
}
var rows tagsql.Rows
var rowsErr error
if opts.Range == nil {
rows, rowsErr = db.db.QueryContext(ctx, `
SELECT
position, created_at, expires_at, root_piece_id,
encrypted_key_nonce, encrypted_key, encrypted_size,
plain_offset, plain_size, encrypted_etag, redundancy,
inline_data, remote_alias_pieces
FROM segments
WHERE
stream_id = $1 AND
($2 = 0::INT8 OR position > $2)
ORDER BY stream_id, position ASC
LIMIT $3
`, opts.StreamID, opts.Cursor, opts.Limit+1)
} else {
rows, rowsErr = db.db.QueryContext(ctx, `
SELECT
position, created_at, expires_at, root_piece_id,
encrypted_key_nonce, encrypted_key, encrypted_size,
plain_offset, plain_size, encrypted_etag, redundancy,
inline_data, remote_alias_pieces
FROM segments
WHERE
stream_id = $1 AND
($2 = 0::INT8 OR position > $2) AND
$4 < plain_offset + plain_size AND plain_offset < $5
ORDER BY stream_id, position ASC
LIMIT $3
`, opts.StreamID, opts.Cursor, opts.Limit+1, opts.Range.PlainStart, opts.Range.PlainLimit)
}
err = withRows(rows, rowsErr)(func(rows tagsql.Rows) error {
for rows.Next() {
var segment Segment
var aliasPieces AliasPieces
@ -142,6 +171,16 @@ func (db *DB) ListSegments(ctx context.Context, opts ListSegments) (result ListS
len(copies), index)
}
}
// we have to update the first segment because DownloadObject uses this call
// and we only need to do the first segment because it only uses the extra
// information for the first segment.
if len(result.Segments) > 0 && opts.UpdateFirstWithAncestor {
err = db.updateWithAncestorSegment(ctx, &result.Segments[0])
if err != nil {
return ListSegmentsResult{}, err
}
}
}
if len(result.Segments) > opts.Limit {

View File

@ -275,7 +275,7 @@ func TestListSegments(t *testing.T) {
for _, numberOfSegments := range []byte{0, 1, 2, 10} {
originalObjectStream := metabasetest.RandObjectStream()
originalObject, _ := metabasetest.CreateTestObject{}.
originalObject, originalSegments := metabasetest.CreateTestObject{}.
Run(ctx, t, db, originalObjectStream, numberOfSegments)
copyStream := metabasetest.RandObjectStream()
@ -297,6 +297,43 @@ func TestListSegments(t *testing.T) {
Segments: expectedSegments,
},
}.Check(ctx, t, db)
if numberOfSegments > 0 {
expectedSegments[0].Pieces = originalSegments[0].Pieces
}
metabasetest.ListSegments{
Opts: metabase.ListSegments{
StreamID: copyStream.StreamID,
UpdateFirstWithAncestor: true,
},
Result: metabase.ListSegmentsResult{
Segments: expectedSegments,
},
}.Check(ctx, t, db)
}
})
t.Run("range", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
stream := metabasetest.RandObjectStream()
obj, segments := metabasetest.CreateTestObject{}.
Run(ctx, t, db, stream, 10)
for i := 0; i < 9; i++ {
metabasetest.ListSegments{
Opts: metabase.ListSegments{
StreamID: obj.StreamID,
Range: &metabase.StreamRange{
PlainStart: segments[i].PlainOffset + 1,
PlainLimit: segments[i+1].PlainOffset + 1,
},
},
Result: metabase.ListSegmentsResult{
Segments: segments[i : i+2],
},
}.Check(ctx, t, db)
}
})
})

View File

@ -418,10 +418,12 @@ func (endpoint *Endpoint) DownloadObject(ctx context.Context, req *pb.ObjectDown
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
segments, err := endpoint.metabase.ListStreamPositions(ctx, metabase.ListStreamPositions{
segments, err := endpoint.metabase.ListSegments(ctx, metabase.ListSegments{
StreamID: object.StreamID,
Range: streamRange,
Limit: int(req.Limit),
UpdateFirstWithAncestor: true,
})
if err != nil {
return nil, endpoint.convertMetabaseErr(err)
@ -436,14 +438,7 @@ func (endpoint *Endpoint) DownloadObject(ctx context.Context, req *pb.ObjectDown
return nil, nil
}
segment, err := endpoint.metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: object.StreamID,
Position: segments.Segments[0].Position,
})
if err != nil {
return nil, endpoint.convertMetabaseErr(err)
}
segment := segments.Segments[0]
downloadSizes := endpoint.calculateDownloadSizes(streamRange, segment, object.Encryption)
// Update the current bandwidth cache value incrementing the SegmentSize.
@ -563,7 +558,7 @@ func (endpoint *Endpoint) DownloadObject(ctx context.Context, req *pb.ObjectDown
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
segmentList, err := convertStreamListResults(segments)
segmentList, err := convertSegmentListResults(segments)
if err != nil {
endpoint.log.Error("unable to convert stream list", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
@ -587,6 +582,32 @@ func (endpoint *Endpoint) DownloadObject(ctx context.Context, req *pb.ObjectDown
}, nil
}
func convertSegmentListResults(segments metabase.ListSegmentsResult) (*pb.SegmentListResponse, error) {
items := make([]*pb.SegmentListItem, len(segments.Segments))
for i, item := range segments.Segments {
items[i] = &pb.SegmentListItem{
Position: &pb.SegmentPosition{
PartNumber: int32(item.Position.Part),
Index: int32(item.Position.Index),
},
PlainSize: int64(item.PlainSize),
PlainOffset: item.PlainOffset,
CreatedAt: item.CreatedAt,
EncryptedETag: item.EncryptedETag,
EncryptedKey: item.EncryptedKey,
}
var err error
items[i].EncryptedKeyNonce, err = storj.NonceFromBytes(item.EncryptedKeyNonce)
if err != nil {
return nil, err
}
}
return &pb.SegmentListResponse{
Items: items,
More: segments.More,
}, nil
}
type downloadSizes struct {
// amount of data that uplink eventually gets
plainSize int64