diff --git a/satellite/metainfo/config.go b/satellite/metainfo/config.go index 2558efa68..a4afc03f6 100644 --- a/satellite/metainfo/config.go +++ b/satellite/metainfo/config.go @@ -188,6 +188,8 @@ type MetabaseDB interface { GetLatestObjectLastSegment(ctx context.Context, opts metabase.GetLatestObjectLastSegment) (segment metabase.Segment, err error) // ListSegments lists specified stream segments. ListSegments(ctx context.Context, opts metabase.ListSegments) (result metabase.ListSegmentsResult, err error) + // IterateObjectsAllVersions iterates through all versions of all committed objects. + IterateObjectsAllVersions(ctx context.Context, opts metabase.IterateObjects, fn func(context.Context, metabase.ObjectsIterator) error) (err error) // InternalImplementation returns *metabase.DB. // TODO: remove. diff --git a/satellite/metainfo/metabase/common.go b/satellite/metainfo/metabase/common.go index c51119161..8c73562fd 100644 --- a/satellite/metainfo/metabase/common.go +++ b/satellite/metainfo/metabase/common.go @@ -23,7 +23,9 @@ const ( FirstSegmentIndex = 0 ) -const maxListLimit = 1000 +// MaxListLimit is the maximum number of items the client can request for listing. +const MaxListLimit = 1000 + const batchsizeLimit = 1000 // BucketPrefix consists of /. diff --git a/satellite/metainfo/metabase/list_segments.go b/satellite/metainfo/metabase/list_segments.go index df7534549..fd7c9f1a4 100644 --- a/satellite/metainfo/metabase/list_segments.go +++ b/satellite/metainfo/metabase/list_segments.go @@ -34,15 +34,15 @@ func (db *DB) ListSegments(ctx context.Context, opts ListSegments) (result ListS } // TODO verify this limit - if opts.Limit > maxListLimit { - return ListSegmentsResult{}, ErrInvalidRequest.New("Maximum listing limit is %d", maxListLimit) + if opts.Limit > MaxListLimit { + return ListSegmentsResult{}, ErrInvalidRequest.New("Maximum listing limit is %d", MaxListLimit) } if opts.Limit < 0 { return ListSegmentsResult{}, ErrInvalidRequest.New("Invalid limit: %d", opts.Limit) } if opts.Limit == 0 { - opts.Limit = maxListLimit + opts.Limit = MaxListLimit } err = withRows(db.db.Query(ctx, ` diff --git a/satellite/metainfo/metainfo.go b/satellite/metainfo/metainfo.go index 02b84b5e7..3f8053153 100644 --- a/satellite/metainfo/metainfo.go +++ b/satellite/metainfo/metainfo.go @@ -902,41 +902,62 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq if storj.ErrBucketNotFound.Has(err) { return nil, rpcstatus.Error(rpcstatus.NotFound, err.Error()) } - endpoint.log.Error("unable to check bucket", zap.Error(err)) return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) } - prefix, err := CreatePath(ctx, keyInfo.ProjectID, metabase.LastSegmentIndex, req.Bucket, req.EncryptedPrefix) - if err != nil { - return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) + limit := int(req.Limit) + if limit < 0 { + return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "limit is negative") + } + if limit == 0 { + limit = metabase.MaxListLimit } - metaflags := meta.All - // TODO use flags - segments, more, err := endpoint.metainfo.List(ctx, prefix.Encode(), string(req.EncryptedCursor), req.Recursive, req.Limit, metaflags) + resp = &pb.ObjectListResponse{} + // TODO: Replace with IterateObjectsLatestVersion when ready + err = endpoint.metainfo.metabaseDB.IterateObjectsAllVersions(ctx, + metabase.IterateObjects{ + ProjectID: keyInfo.ProjectID, + BucketName: string(req.Bucket), + Prefix: metabase.ObjectKey(req.EncryptedPrefix), + Cursor: metabase.IterateCursor{Key: metabase.ObjectKey(req.EncryptedCursor)}, + Recursive: req.Recursive, + BatchSize: limit + 1, + }, func(ctx context.Context, it metabase.ObjectsIterator) error { + entry := metabase.ObjectEntry{} + for len(resp.Items) < limit && it.Next(ctx, &entry) { + item := &pb.ObjectListItem{ + EncryptedPath: []byte(entry.ObjectKey), + Version: int32(entry.Version), + CreatedAt: entry.CreatedAt, + EncryptedMetadata: entry.EncryptedMetadata, + } + if entry.ExpiresAt != nil { + item.ExpiresAt = *entry.ExpiresAt + } + item.EncryptedMetadataNonce, err = storj.NonceFromBytes(entry.EncryptedMetadataNonce) + if err != nil { + return err + } + resp.Items = append(resp.Items, item) + } + resp.More = it.Next(ctx, &entry) + return nil + }, + ) if err != nil { + if metabase.ErrInvalidRequest.Has(err) { + return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) + } + endpoint.log.Error("unable to list objects", zap.Error(err)) return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) } - items := make([]*pb.ObjectListItem, len(segments)) - for i, segment := range segments { - items[i] = &pb.ObjectListItem{ - EncryptedPath: []byte(segment.Path), - } - if segment.Pointer != nil { - items[i].EncryptedMetadata = segment.Pointer.Metadata - items[i].CreatedAt = segment.Pointer.CreationDate - items[i].ExpiresAt = segment.Pointer.ExpirationDate - } - } endpoint.log.Info("Object List", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "list"), zap.String("type", "object")) mon.Meter("req_list_object").Mark(1) - return &pb.ObjectListResponse{ - Items: items, - More: more, - }, nil + return resp, nil } // BeginDeleteObject begins object deletion process.