cmd/uplink: add --pending for listing pending objects

Change-Id: I19c1965117c386f63e7b1c71ba70402d312329f0
This commit is contained in:
Fadila Khadar 2021-02-17 20:27:38 +01:00 committed by Fadila
parent a0cc7e8c5e
commit e96ed289e5
3 changed files with 281 additions and 12 deletions

View File

@ -49,7 +49,7 @@ func init() {
inspectCmd := &cobra.Command{
Use: "inspect [ACCESS]",
Short: "Inspect allows you to explode a serialized access into it's constituent parts.",
Short: "Inspect allows you to explode a serialized access into its constituent parts.",
RunE: accessInspect,
Args: cobra.MaximumNArgs(1),
}

View File

@ -5,6 +5,7 @@ package cmd
import (
"context"
"errors"
"fmt"
"strings"
"time"
@ -13,11 +14,13 @@ import (
"storj.io/common/fpath"
"storj.io/uplink"
"storj.io/uplink/private/multipart"
)
var (
lsRecursiveFlag *bool
lsEncryptedFlag *bool
lsPendingFlag *bool
)
func init() {
@ -29,8 +32,9 @@ func init() {
}, RootCmd)
lsRecursiveFlag = lsCmd.Flags().Bool("recursive", false, "if true, list recursively")
lsEncryptedFlag = lsCmd.Flags().Bool("encrypted", false, "if true, show paths as base64-encoded encrypted paths")
lsPendingFlag = lsCmd.Flags().Bool("pending", false, "if true, list pending objects")
setBasicFlags(lsCmd.Flags(), "recursive", "encrypted")
setBasicFlags(lsCmd.Flags(), "recursive", "encrypted", "pending")
}
func list(cmd *cobra.Command, args []string) error {
@ -53,19 +57,26 @@ func list(cmd *cobra.Command, args []string) error {
return fmt.Errorf("no bucket specified, use format sj://bucket/")
}
err = listFiles(ctx, project, src.Bucket(), src.Path(), false)
if !strings.HasSuffix(args[0], "/") && src.Path() != "" {
err = listObject(ctx, project, src.Bucket(), src.Path())
if err != nil && !errors.Is(err, uplink.ErrObjectNotFound) {
return convertError(err, src)
}
}
err = listObjects(ctx, project, src.Bucket(), src.Path(), false)
return convertError(err, src)
}
noBuckets := true
buckets := project.ListBuckets(ctx, nil)
for buckets.Next() {
bucket := buckets.Item()
fmt.Println("BKT", formatTime(bucket.Created), bucket.Name)
if !*lsPendingFlag {
fmt.Println("BKT", formatTime(bucket.Created), bucket.Name)
}
if *lsRecursiveFlag {
if err := listFilesFromBucket(ctx, project, bucket.Name); err != nil {
if err := listObjectsFromBucket(ctx, project, bucket.Name); err != nil {
return err
}
}
@ -78,27 +89,42 @@ func list(cmd *cobra.Command, args []string) error {
if noBuckets {
fmt.Println("No buckets")
}
return nil
}
func listFilesFromBucket(ctx context.Context, project *uplink.Project, bucket string) error {
return listFiles(ctx, project, bucket, "", true)
func listObjectsFromBucket(ctx context.Context, project *uplink.Project, bucket string) error {
return listObjects(ctx, project, bucket, "", true)
}
func listFiles(ctx context.Context, project *uplink.Project, bucket, prefix string, prependBucket bool) error {
func listObject(ctx context.Context, project *uplink.Project, bucket, path string) error {
if *lsPendingFlag {
return listPendingObject(ctx, project, bucket, path)
}
object, err := project.StatObject(ctx, bucket, path)
if err != nil {
return err
}
fmt.Printf("%v %v %12v %v\n", "OBJ", formatTime(object.System.Created), object.System.ContentLength, path)
return nil
}
func listObjects(ctx context.Context, project *uplink.Project, bucket, prefix string, prependBucket bool) error {
// TODO force adding slash at the end because fpath is removing it,
// most probably should be fixed in storj/common
if prefix != "" && !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
objects := project.ListObjects(ctx, bucket, &uplink.ListObjectsOptions{
var objects *uplink.ObjectIterator
if *lsPendingFlag {
return listPendingObjects(ctx, project, bucket, prefix, prependBucket)
}
objects = project.ListObjects(ctx, bucket, &uplink.ListObjectsOptions{
Prefix: prefix,
Recursive: *lsRecursiveFlag,
System: true,
})
for objects.Next() {
object := objects.Item()
path := object.Key
@ -118,6 +144,52 @@ func listFiles(ctx context.Context, project *uplink.Project, bucket, prefix stri
return nil
}
func listPendingObject(ctx context.Context, project *uplink.Project, bucket, path string) error {
objects := multipart.ListPendingObjectStreams(ctx, project, bucket, path, &multipart.ListMultipartUploadsOptions{
System: true,
Custom: true,
})
for objects.Next() {
object := objects.Item()
path := object.Key
fmt.Printf("%v %v %12v %v\n", "OBJ", formatTime(object.System.Created), object.System.ContentLength, path)
}
return objects.Err()
}
func listPendingObjects(ctx context.Context, project *uplink.Project, bucket, prefix string, prependBucket bool) error {
// TODO force adding slash at the end because fpath is removing it,
// most probably should be fixed in storj/common
if prefix != "" && !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
objects := multipart.ListMultipartUploads(ctx, project, bucket, &multipart.ListMultipartUploadsOptions{
Prefix: prefix,
Cursor: "",
Recursive: *lsRecursiveFlag,
System: true,
Custom: true,
})
for objects.Next() {
object := objects.Item()
path := object.Key
if prependBucket {
path = fmt.Sprintf("%s/%s", bucket, path)
}
if object.IsPrefix {
fmt.Println("PRE", path)
} else {
fmt.Printf("%v %v %12v %v\n", "OBJ", formatTime(object.System.Created), object.System.ContentLength, path)
}
}
return objects.Err()
}
func formatTime(t time.Time) string {
return t.Local().Format("2006-01-02 15:04:05")
}

197
cmd/uplink/cmd/ls_test.go Normal file
View File

@ -0,0 +1,197 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package cmd_test
import (
"os/exec"
"strings"
"testing"
"github.com/stretchr/testify/require"
"storj.io/common/memory"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/uplink/private/multipart"
)
func TestLsPending(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: 4,
UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
uplinkExe := ctx.Compile("storj.io/storj/cmd/uplink")
// Configure uplink.
{
access := planet.Uplinks[0].Access[planet.Satellites[0].ID()]
accessString, err := access.Serialize()
require.NoError(t, err)
output, err := exec.Command(uplinkExe,
"--config-dir", ctx.Dir("uplink"),
"import",
accessString,
).CombinedOutput()
t.Log(string(output))
require.NoError(t, err)
}
// Create bucket.
bucketName := "testbucket"
err := planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], "testbucket")
require.NoError(t, err)
// Create pending objects and committed objects.
{
uplinkPeer := planet.Uplinks[0]
satellite := planet.Satellites[0]
project, err := uplinkPeer.GetProject(ctx, satellite)
require.NoError(t, err)
defer ctx.Check(project.Close)
_, err = multipart.NewMultipartUpload(ctx, project, bucketName, "pending-object", nil)
require.NoError(t, err)
_, err = multipart.NewMultipartUpload(ctx, project, bucketName, "prefixed/pending-object", nil)
require.NoError(t, err)
err = uplinkPeer.Upload(ctx, satellite, "testbucket", "committed-object", testrand.Bytes(5*memory.KiB))
require.NoError(t, err)
err = uplinkPeer.Upload(ctx, satellite, "testbucket", "prefixed/committed-object", testrand.Bytes(5*memory.KiB))
require.NoError(t, err)
}
// List pending objects non-recursively.
{
cmd := exec.Command(uplinkExe,
"--config-dir", ctx.Dir("uplink"),
"ls",
"--pending",
)
t.Log(cmd)
output, err := cmd.Output()
require.NoError(t, err)
checkOutput(t, output)
}
// List pending objects recursively.
{
cmd := exec.Command(uplinkExe,
"--config-dir", ctx.Dir("uplink"),
"ls",
"--pending",
"--recursive",
)
t.Log(cmd)
output, err := cmd.Output()
require.NoError(t, err)
checkOutput(t, output,
bucketName,
"prefixed/pending-object",
"pending-object",
)
}
// List pending objects from bucket non-recursively.
{
cmd := exec.Command(uplinkExe,
"--config-dir", ctx.Dir("uplink"),
"ls",
"--pending",
"sj://"+bucketName,
)
t.Log(cmd)
output, err := cmd.Output()
require.NoError(t, err)
checkOutput(t, output,
"prefixed",
"pending-object",
)
}
// List pending object from bucket recursively.
{
cmd := exec.Command(uplinkExe,
"--config-dir", ctx.Dir("uplink"),
"ls",
"--pending",
"--recursive",
"sj://"+bucketName,
)
t.Log(cmd)
output, err := cmd.Output()
require.NoError(t, err)
checkOutput(t, output,
"prefixed/pending-object",
"pending-object",
)
}
// List pending objects with prefix.
{
cmd := exec.Command(uplinkExe,
"--config-dir", ctx.Dir("uplink"),
"ls",
"--pending",
"sj://"+bucketName+"/prefixed",
)
t.Log(cmd)
output, err := cmd.Output()
require.NoError(t, err)
checkOutput(t, output,
"prefixed/pending-object",
)
}
// List pending object by specifying object key.
{
cmd := exec.Command(uplinkExe,
"--config-dir", ctx.Dir("uplink"),
"ls",
"--pending",
"sj://"+bucketName+"/prefixed/pending-object",
)
t.Log(cmd)
output, err := cmd.Output()
require.NoError(t, err)
checkOutput(t, output,
"prefixed/pending-object",
)
}
})
}
func checkOutput(t *testing.T, output []byte, objectKeys ...string) {
lines := strings.Split(string(output), "\n")
objectKeyFound := false
foundObjectKeys := make(map[string]bool, len(objectKeys))
for _, line := range lines {
if line != "" {
for _, objectKey := range objectKeys {
if strings.Contains(line, objectKey) {
objectKeyFound = true
foundObjectKeys[objectKey] = true
}
}
require.True(t, objectKeyFound, line, " Object should not be listed.")
}
}
require.Len(t, foundObjectKeys, len(objectKeys))
}