storj/satellite/metainfo/metabase/list_segments.go
Kaloyan Raev a7685f50c9 satellite/metainfo/metabase: set maxParts to MaxListLimit if greater
We should set the client requested maxParts to MaxListLimit if it is
greater than that value instead of returning an error.

MinIO default value for maxParts is 10,000 while the satellite's
MaxListLimit is 1,000. If we return an error, the ListParts with default
maxParts will throw an error.

Change-Id: I06739e1d8d8f96803eba491585395da0443aec04
2020-12-03 11:32:07 +00:00

90 lines
2.2 KiB
Go

// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package metabase
import (
"context"
"database/sql"
"errors"
"storj.io/common/uuid"
"storj.io/storj/private/tagsql"
)
// ListSegments contains arguments necessary for listing stream segments.
type ListSegments struct {
StreamID uuid.UUID
Cursor SegmentPosition
Limit int
}
// ListSegmentsResult result of listing segments.
type ListSegmentsResult struct {
Segments []Segment
More bool
}
// ListSegments lists specified stream segments.
func (db *DB) ListSegments(ctx context.Context, opts ListSegments) (result ListSegmentsResult, err error) {
defer mon.Task()(&ctx)(&err)
if opts.StreamID.IsZero() {
return ListSegmentsResult{}, ErrInvalidRequest.New("StreamID missing")
}
if opts.Limit < 0 {
return ListSegmentsResult{}, ErrInvalidRequest.New("Invalid limit: %d", opts.Limit)
}
if opts.Limit == 0 || opts.Limit > MaxListLimit {
opts.Limit = MaxListLimit
}
err = withRows(db.db.Query(ctx, `
SELECT
position,
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size, plain_offset, plain_size,
redundancy,
inline_data, remote_pieces
FROM segments
WHERE
stream_id = $1 AND
($2 = 0::INT8 OR position > $2)
ORDER BY position ASC
LIMIT $3
`, opts.StreamID, opts.Cursor, opts.Limit+1))(func(rows tagsql.Rows) error {
for rows.Next() {
var segment Segment
err = rows.Scan(
&segment.Position,
&segment.RootPieceID, &segment.EncryptedKeyNonce, &segment.EncryptedKey,
&segment.EncryptedSize, &segment.PlainOffset, &segment.PlainSize,
redundancyScheme{&segment.Redundancy},
&segment.InlineData, &segment.Pieces,
)
if err != nil {
return Error.New("failed to scan segments: %w", err)
}
segment.StreamID = opts.StreamID
result.Segments = append(result.Segments, segment)
}
return nil
})
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return ListSegmentsResult{}, nil
}
return ListSegmentsResult{}, Error.New("unable to fetch object segments: %w", err)
}
if len(result.Segments) > opts.Limit {
result.More = true
result.Segments = result.Segments[:len(result.Segments)-1]
}
return result, nil
}