satellite/metabase: add intLimitRange
We have quite a bit code duplication in ensuring valid int ranges. Change-Id: Ib31db8c4b2be6a677625cab5bcdddefb60401fe3
This commit is contained in:
parent
bc2f81c2fa
commit
5a56021bca
@ -25,10 +25,10 @@ const (
|
||||
LastSegmentIndex = uint32(math.MaxUint32)
|
||||
)
|
||||
|
||||
// MaxListLimit is the maximum number of items the client can request for listing.
|
||||
const MaxListLimit = 1000
|
||||
// ListLimit is the maximum number of items the client can request for listing.
|
||||
const ListLimit = intLimitRange(1000)
|
||||
|
||||
const batchsizeLimit = 1000
|
||||
const batchsizeLimit = intLimitRange(1000)
|
||||
|
||||
// BucketPrefix consists of <project id>/<bucket name>.
|
||||
type BucketPrefix string
|
||||
|
@ -13,9 +13,10 @@ import (
|
||||
"storj.io/private/tagsql"
|
||||
)
|
||||
|
||||
const deleteBatchSizeLimit = 100
|
||||
|
||||
const deletePieceBatchLimit = 1000
|
||||
const (
|
||||
deleteBatchSizeLimit = intLimitRange(100)
|
||||
deletePieceBatchLimit = intLimitRange(1000)
|
||||
)
|
||||
|
||||
// DeleteBucketObjects contains arguments for deleting a whole bucket.
|
||||
type DeleteBucketObjects struct {
|
||||
@ -38,15 +39,8 @@ func (db *DB) DeleteBucketObjects(ctx context.Context, opts DeleteBucketObjects)
|
||||
return 0, err
|
||||
}
|
||||
|
||||
batchSize := opts.BatchSize
|
||||
if batchSize <= 0 || batchSize > deleteBatchSizeLimit {
|
||||
batchSize = deleteBatchSizeLimit
|
||||
}
|
||||
|
||||
deletePiecesBatchSize := opts.DeletePiecesBatchSize
|
||||
if deletePiecesBatchSize <= 0 || deletePiecesBatchSize > deletePieceBatchLimit {
|
||||
deletePiecesBatchSize = deletePieceBatchLimit
|
||||
}
|
||||
deleteBatchSizeLimit.Ensure(&opts.BatchSize)
|
||||
deletePieceBatchLimit.Ensure(&opts.DeletePiecesBatchSize)
|
||||
|
||||
var query string
|
||||
switch db.impl {
|
||||
@ -81,13 +75,13 @@ func (db *DB) DeleteBucketObjects(ctx context.Context, opts DeleteBucketObjects)
|
||||
}
|
||||
|
||||
// TODO: fix the count for objects without segments
|
||||
deletedSegmentsBatch := make([]DeletedSegmentInfo, 0, deletePiecesBatchSize)
|
||||
deletedSegmentsBatch := make([]DeletedSegmentInfo, 0, opts.DeletePiecesBatchSize)
|
||||
for {
|
||||
deletedSegmentsBatch = deletedSegmentsBatch[:0]
|
||||
batchDeletedObjects := 0
|
||||
deletedSegments := 0
|
||||
err = withRows(db.db.Query(ctx, query,
|
||||
opts.Bucket.ProjectID, []byte(opts.Bucket.BucketName), batchSize))(func(rows tagsql.Rows) error {
|
||||
opts.Bucket.ProjectID, []byte(opts.Bucket.BucketName), opts.BatchSize))(func(rows tagsql.Rows) error {
|
||||
ids := map[uuid.UUID]struct{}{} // TODO: avoid map here
|
||||
for rows.Next() {
|
||||
var streamID uuid.UUID
|
||||
@ -105,7 +99,7 @@ func (db *DB) DeleteBucketObjects(ctx context.Context, opts DeleteBucketObjects)
|
||||
ids[streamID] = struct{}{}
|
||||
deletedSegmentsBatch = append(deletedSegmentsBatch, segment)
|
||||
|
||||
if len(deletedSegmentsBatch) == deletePiecesBatchSize {
|
||||
if len(deletedSegmentsBatch) >= opts.DeletePiecesBatchSize {
|
||||
if opts.DeletePieces != nil {
|
||||
err = opts.DeletePieces(ctx, deletedSegmentsBatch)
|
||||
if err != nil {
|
||||
|
@ -17,7 +17,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
deleteBatchsizeLimit = 1000
|
||||
deleteBatchsizeLimit = intLimitRange(1000)
|
||||
)
|
||||
|
||||
// DeleteExpiredObjects contains all the information necessary to delete expired objects and segments.
|
||||
@ -151,13 +151,11 @@ func (db *DB) DeleteZombieObjects(ctx context.Context, opts DeleteZombieObjects)
|
||||
func (db *DB) deleteObjectsAndSegmentsBatch(ctx context.Context, batchsize int, deleteBatch func(startAfter ObjectStream, batchsize int) (last ObjectStream, err error)) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
bs := batchsize
|
||||
if batchsize == 0 || batchsize > deleteBatchsizeLimit {
|
||||
bs = deleteBatchsizeLimit
|
||||
}
|
||||
deleteBatchsizeLimit.Ensure(&batchsize)
|
||||
|
||||
var startAfter ObjectStream
|
||||
for {
|
||||
lastDeleted, err := deleteBatch(startAfter, bs)
|
||||
lastDeleted, err := deleteBatch(startAfter, batchsize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -12,10 +12,6 @@ import (
|
||||
"storj.io/common/storj"
|
||||
)
|
||||
|
||||
// !!!! NB !!!!
|
||||
//
|
||||
// Should we use protobuf here?
|
||||
|
||||
type encryptionParameters struct {
|
||||
*storj.EncryptionParameters
|
||||
}
|
||||
|
@ -135,10 +135,7 @@ func iteratePendingObjectsByKey(ctx context.Context, db *DB, opts IteratePending
|
||||
}
|
||||
|
||||
func iterate(ctx context.Context, it *objectsIterator, fn func(context.Context, ObjectsIterator) error) (err error) {
|
||||
// ensure batch size is reasonable
|
||||
if it.batchSize <= 0 || it.batchSize > batchsizeLimit {
|
||||
it.batchSize = batchsizeLimit
|
||||
}
|
||||
batchsizeLimit.Ensure(&it.batchSize)
|
||||
|
||||
it.curRows, err = it.doNextQuery(ctx, it)
|
||||
if err != nil {
|
||||
|
@ -38,9 +38,7 @@ func (db *DB) ListSegments(ctx context.Context, opts ListSegments) (result ListS
|
||||
return ListSegmentsResult{}, ErrInvalidRequest.New("Invalid limit: %d", opts.Limit)
|
||||
}
|
||||
|
||||
if opts.Limit == 0 || opts.Limit > MaxListLimit {
|
||||
opts.Limit = MaxListLimit
|
||||
}
|
||||
ListLimit.Ensure(&opts.Limit)
|
||||
|
||||
err = withRows(db.db.Query(ctx, `
|
||||
SELECT
|
||||
@ -144,9 +142,8 @@ func (db *DB) ListStreamPositions(ctx context.Context, opts ListStreamPositions)
|
||||
if opts.Limit < 0 {
|
||||
return ListStreamPositionsResult{}, ErrInvalidRequest.New("Invalid limit: %d", opts.Limit)
|
||||
}
|
||||
if opts.Limit == 0 || opts.Limit > MaxListLimit {
|
||||
opts.Limit = MaxListLimit
|
||||
}
|
||||
|
||||
ListLimit.Ensure(&opts.Limit)
|
||||
|
||||
if opts.Range != nil {
|
||||
if opts.Range.PlainStart > opts.Range.PlainLimit {
|
||||
|
@ -18,3 +18,16 @@ func withRows(rows tagsql.Rows, err error) func(func(tagsql.Rows) error) error {
|
||||
return errs.Combine(rows.Err(), rows.Close(), err)
|
||||
}
|
||||
}
|
||||
|
||||
// intLimitRange defines a valid range (1,limit].
|
||||
type intLimitRange int
|
||||
|
||||
// Ensure clamps v to a value between [1,limit].
|
||||
func (limit intLimitRange) Ensure(v *int) {
|
||||
if *v <= 0 || *v > int(limit) {
|
||||
*v = int(limit)
|
||||
}
|
||||
}
|
||||
|
||||
// Max returns maximum value for the given range.
|
||||
func (limit intLimitRange) Max() int { return int(limit) }
|
||||
|
@ -1217,9 +1217,7 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq
|
||||
if limit < 0 {
|
||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "limit is negative")
|
||||
}
|
||||
if limit == 0 {
|
||||
limit = metabase.MaxListLimit
|
||||
}
|
||||
metabase.ListLimit.Ensure(&limit)
|
||||
|
||||
var prefix metabase.ObjectKey
|
||||
if len(req.EncryptedPrefix) != 0 {
|
||||
@ -1330,10 +1328,7 @@ func (endpoint *Endpoint) ListPendingObjectStreams(ctx context.Context, req *pb.
|
||||
if limit < 0 {
|
||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "limit is negative")
|
||||
}
|
||||
|
||||
if limit == 0 {
|
||||
limit = metabase.MaxListLimit
|
||||
}
|
||||
metabase.ListLimit.Ensure(&limit)
|
||||
|
||||
resp = &pb.ObjectListPendingStreamsResponse{}
|
||||
resp.Items = []*pb.ObjectListItem{}
|
||||
|
Loading…
Reference in New Issue
Block a user