storj: use multipart API

Change-Id: I10b401434e3e77468d12ecd225b41689568fd197
This commit is contained in:
Michał Niewrzał 2021-04-20 10:06:56 +02:00 committed by Michal Niewrzal
parent aa27a8f1dd
commit 7944df20d6
13 changed files with 88 additions and 85 deletions

View File

@ -14,7 +14,6 @@ import (
"storj.io/common/fpath"
"storj.io/uplink"
"storj.io/uplink/private/multipart"
)
var (
@ -145,17 +144,18 @@ func listObjects(ctx context.Context, project *uplink.Project, bucket, prefix st
}
func listPendingObject(ctx context.Context, project *uplink.Project, bucket, path string) error {
objects := multipart.ListPendingObjectStreams(ctx, project, bucket, path, &multipart.ListMultipartUploadsOptions{
uploads := project.ListUploads(ctx, bucket, &uplink.ListUploadsOptions{
Prefix: path,
System: true,
Custom: true,
})
for objects.Next() {
object := objects.Item()
for uploads.Next() {
object := uploads.Item()
path := object.Key
fmt.Printf("%v %v %12v %v\n", "OBJ", formatTime(object.System.Created), object.System.ContentLength, path)
}
return objects.Err()
return uploads.Err()
}
func listPendingObjects(ctx context.Context, project *uplink.Project, bucket, prefix string, prependBucket bool) error {
@ -165,7 +165,7 @@ func listPendingObjects(ctx context.Context, project *uplink.Project, bucket, pr
prefix += "/"
}
objects := multipart.ListMultipartUploads(ctx, project, bucket, &multipart.ListMultipartUploadsOptions{
objects := project.ListUploads(ctx, bucket, &uplink.ListUploadsOptions{
Prefix: prefix,
Cursor: "",
Recursive: *lsRecursiveFlag,

View File

@ -14,7 +14,6 @@ import (
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/uplink/private/multipart"
)
func TestLsPending(t *testing.T) {
@ -55,10 +54,10 @@ func TestLsPending(t *testing.T) {
require.NoError(t, err)
defer ctx.Check(project.Close)
_, err = multipart.NewMultipartUpload(ctx, project, bucketName, "pending-object", nil)
_, err = project.BeginUpload(ctx, bucketName, "pending-object", nil)
require.NoError(t, err)
_, err = multipart.NewMultipartUpload(ctx, project, bucketName, "prefixed/pending-object", nil)
_, err = project.BeginUpload(ctx, bucketName, "prefixed/pending-object", nil)
require.NoError(t, err)
err = uplinkPeer.Upload(ctx, satellite, "testbucket", "committed-object", testrand.Bytes(5*memory.KiB))

View File

@ -9,7 +9,7 @@ import (
"github.com/spf13/cobra"
"storj.io/common/fpath"
"storj.io/uplink/private/multipart"
"storj.io/uplink"
)
var (
@ -55,10 +55,12 @@ func deleteObject(cmd *cobra.Command, args []string) error {
if *rmPendingFlag {
// TODO we may need a dedicated endpoint for deleting pending object streams
list := multipart.ListPendingObjectStreams(ctx, project, dst.Bucket(), dst.Path(), nil)
list := project.ListUploads(ctx, dst.Bucket(), &uplink.ListUploadsOptions{
Prefix: dst.Path(),
})
// TODO modify when we can have several pending objects for the same object key
if list.Next() {
err = multipart.AbortMultipartUpload(ctx, project, dst.Bucket(), dst.Path(), list.Item().StreamID)
err = project.AbortUpload(ctx, dst.Bucket(), dst.Path(), list.Item().UploadID)
if err != nil {
return convertError(err, dst)
}

View File

@ -16,7 +16,6 @@ import (
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/uplink"
"storj.io/uplink/private/multipart"
)
func TestRmPending(t *testing.T) {
@ -58,10 +57,10 @@ func TestRmPending(t *testing.T) {
// Create pending objects and one committed object.
{
_, err = multipart.NewMultipartUpload(ctx, project, bucketName, "pending-object", nil)
_, err = project.BeginUpload(ctx, bucketName, "pending-object", nil)
require.NoError(t, err)
_, err = multipart.NewMultipartUpload(ctx, project, bucketName, "prefixed/pending-object", nil)
_, err = project.BeginUpload(ctx, bucketName, "prefixed/pending-object", nil)
require.NoError(t, err)
err = uplinkPeer.Upload(ctx, satellite, "testbucket", "committed-object", testrand.Bytes(5*memory.KiB))
@ -153,7 +152,9 @@ func TestRmPending(t *testing.T) {
}
func pendingObjectExists(ctx context.Context, satellite *testplanet.Satellite, project *uplink.Project, bucketName string, objectKey string) bool {
iterator := multipart.ListPendingObjectStreams(ctx, project, bucketName, objectKey, nil)
iterator := project.ListUploads(ctx, bucketName, &uplink.ListUploadsOptions{
Prefix: objectKey,
})
return iterator.Next()
}

2
go.mod
View File

@ -52,5 +52,5 @@ require (
storj.io/drpc v0.0.20
storj.io/monkit-jaeger v0.0.0-20210225162224-66fb37637bf6
storj.io/private v0.0.0-20210423085237-5af81f2a2b21
storj.io/uplink v1.4.7-0.20210421171443-53fab7d9387c
storj.io/uplink v1.4.7-0.20210422134834-21140a50fee2
)

4
go.sum
View File

@ -852,5 +852,5 @@ storj.io/monkit-jaeger v0.0.0-20210225162224-66fb37637bf6 h1:LTDmeZDrFWD9byqNOf/
storj.io/monkit-jaeger v0.0.0-20210225162224-66fb37637bf6/go.mod h1:gj4vuCeyCRjRmH8LIrgoyU9Dc9uR6H+/GcDUXmTbf80=
storj.io/private v0.0.0-20210423085237-5af81f2a2b21 h1:0ZX6agMxxGMj9jcBl9SYcDPPZqpG+cbi56DV/1Btg6s=
storj.io/private v0.0.0-20210423085237-5af81f2a2b21/go.mod h1:iAc+LGwXYCe+YRRTlkfkg95ZBEL8pWHLVZ508/KQjOs=
storj.io/uplink v1.4.7-0.20210421171443-53fab7d9387c h1:LhZcZ2+UXd6rwhDO05RplEVdkAkqig8tA3xm2DfZclY=
storj.io/uplink v1.4.7-0.20210421171443-53fab7d9387c/go.mod h1:CroFLtFtcKj9B0AigacRHuxjNd+jOm9DG45257fTJo0=
storj.io/uplink v1.4.7-0.20210422134834-21140a50fee2 h1:X5us0nK+oG6gMpmbIH+a/voVEVBgD5NiNGLk6hyx3K0=
storj.io/uplink v1.4.7-0.20210422134834-21140a50fee2/go.mod h1:CroFLtFtcKj9B0AigacRHuxjNd+jOm9DG45257fTJo0=

View File

@ -4,9 +4,7 @@
package gc_test
import (
"bytes"
"context"
"crypto/sha256"
"errors"
"testing"
"time"
@ -28,8 +26,6 @@ import (
"storj.io/storj/satellite/metabase"
"storj.io/storj/storage"
"storj.io/storj/storagenode"
"storj.io/uplink/private/etag"
"storj.io/uplink/private/multipart"
"storj.io/uplink/private/testuplink"
)
@ -252,14 +248,16 @@ func startMultipartUpload(ctx context.Context, t *testing.T, uplink *testplanet.
_, err = project.EnsureBucket(ctx, bucketName)
require.NoError(t, err)
info, err := multipart.NewMultipartUpload(ctx, project, bucketName, path, nil)
info, err := project.BeginUpload(ctx, bucketName, path, nil)
require.NoError(t, err)
_, err = multipart.PutObjectPart(ctx, project, bucketName, path, info.StreamID, 1,
etag.NewHashReader(bytes.NewReader(data), sha256.New()))
upload, err := project.UploadPart(ctx, bucketName, path, info.UploadID, 1)
require.NoError(t, err)
_, err = upload.Write(data)
require.NoError(t, err)
require.NoError(t, upload.Commit())
return info.StreamID
return info.UploadID
}
func completeMultipartUpload(ctx context.Context, t *testing.T, uplink *testplanet.Uplink, satellite *testplanet.Satellite, bucketName string, path storj.Path, streamID string) {
@ -272,6 +270,6 @@ func completeMultipartUpload(ctx context.Context, t *testing.T, uplink *testplan
require.NoError(t, err)
defer func() { require.NoError(t, project.Close()) }()
_, err = multipart.CompleteMultipartUpload(ctx, project, bucketName, path, streamID, nil)
_, err = project.CommitUpload(ctx, bucketName, path, streamID, nil)
require.NoError(t, err)
}

View File

@ -4,9 +4,7 @@
package gracefulexit_test
import (
"bytes"
"context"
"crypto/sha256"
"testing"
"time"
@ -23,8 +21,6 @@ import (
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
"storj.io/uplink/private/etag"
"storj.io/uplink/private/multipart"
)
func TestChore(t *testing.T) {
@ -58,13 +54,16 @@ func TestChore(t *testing.T) {
err = uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path2", testrand.Bytes(5*memory.KiB))
require.NoError(t, err)
info, err := multipart.NewMultipartUpload(ctx, project, "testbucket", "test/path3", nil)
info, err := project.BeginUpload(ctx, "testbucket", "test/path3", nil)
require.NoError(t, err)
_, err = multipart.PutObjectPart(ctx, project, "testbucket", "test/path3", info.StreamID, 1,
etag.NewHashReader(bytes.NewReader(testrand.Bytes(5*memory.KiB)), sha256.New()))
upload, err := project.UploadPart(ctx, "testbucket", "test/path3", info.UploadID, 1)
require.NoError(t, err)
_, err = upload.Write(testrand.Bytes(5 * memory.KiB))
require.NoError(t, err)
require.NoError(t, upload.Commit())
exitStatusRequest := overlay.ExitStatusRequest{
NodeID: exitingNode.ID(),
ExitInitiatedAt: time.Now(),
@ -168,13 +167,16 @@ func TestDurabilityRatio(t *testing.T) {
err = uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB))
require.NoError(t, err)
info, err := multipart.NewMultipartUpload(ctx, project, "testbucket", "test/path2", nil)
info, err := project.BeginUpload(ctx, "testbucket", "test/path2", nil)
require.NoError(t, err)
_, err = multipart.PutObjectPart(ctx, project, "testbucket", "test/path2", info.StreamID, 1,
etag.NewHashReader(bytes.NewReader(testrand.Bytes(5*memory.KiB)), sha256.New()))
upload, err := project.UploadPart(ctx, "testbucket", "test/path2", info.UploadID, 1)
require.NoError(t, err)
_, err = upload.Write(testrand.Bytes(5 * memory.KiB))
require.NoError(t, err)
require.NoError(t, upload.Commit())
exitStatusRequest := overlay.ExitStatusRequest{
NodeID: exitingNode.ID(),
ExitInitiatedAt: time.Now(),

View File

@ -4,9 +4,7 @@
package gracefulexit_test
import (
"bytes"
"context"
"crypto/sha256"
"io"
"strconv"
"testing"
@ -36,8 +34,6 @@ import (
"storj.io/storj/satellite/overlay"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/gracefulexit"
"storj.io/uplink/private/etag"
"storj.io/uplink/private/multipart"
)
const numObjects = 6
@ -1110,11 +1106,14 @@ func TestSegmentChangedOrDeletedMultipart(t *testing.T) {
// etag.NewHashReader(bytes.NewReader(testrand.Bytes(5*memory.KiB)), sha256.New()))
// require.NoError(t, err)
info1, err := multipart.NewMultipartUpload(ctx, project, "testbucket", "test/path1", nil)
info1, err := project.BeginUpload(ctx, "testbucket", "test/path1", nil)
require.NoError(t, err)
_, err = multipart.PutObjectPart(ctx, project, "testbucket", "test/path1", info1.StreamID, 1,
etag.NewHashReader(bytes.NewReader(testrand.Bytes(5*memory.KiB)), sha256.New()))
upload, err := project.UploadPart(ctx, "testbucket", "test/path1", info1.UploadID, 1)
require.NoError(t, err)
_, err = upload.Write(testrand.Bytes(5 * memory.KiB))
require.NoError(t, err)
require.NoError(t, upload.Commit())
// check that there are no exiting nodes.
exitingNodes, err := satellite.DB.OverlayCache().GetExitingNodes(ctx)
@ -1154,7 +1153,7 @@ func TestSegmentChangedOrDeletedMultipart(t *testing.T) {
// TODO: activate when an object part can be overwritten
// _, err = multipart.PutObjectPart(ctx, project, "testbucket", "test/path0", info0.StreamID, 1, bytes.NewReader(testrand.Bytes(5*memory.KiB)))
// require.NoError(t, err)
err = multipart.AbortMultipartUpload(ctx, project, "testbucket", "test/path1", info1.StreamID)
err = project.AbortUpload(ctx, "testbucket", "test/path1", info1.UploadID)
require.NoError(t, err)
// reconnect to the satellite.
@ -1476,12 +1475,14 @@ func testTransfers(t *testing.T, objects int, multipartObjects int, verifier fun
for i := 0; i < multipartObjects; i++ {
objectName := "test/multipart" + strconv.Itoa(i)
info, err := multipart.NewMultipartUpload(ctx, project, "testbucket", objectName, nil)
info, err := project.BeginUpload(ctx, "testbucket", objectName, nil)
require.NoError(t, err)
_, err = multipart.PutObjectPart(ctx, project, "testbucket", objectName, info.StreamID, 1,
etag.NewHashReader(bytes.NewReader(testrand.Bytes(5*memory.KiB)), sha256.New()))
upload, err := project.UploadPart(ctx, "testbucket", objectName, info.UploadID, 1)
require.NoError(t, err)
_, err = upload.Write(testrand.Bytes(5 * memory.KiB))
require.NoError(t, err)
require.NoError(t, upload.Commit())
}
// check that there are no exiting nodes.

View File

@ -25,7 +25,6 @@ import (
"storj.io/storj/satellite"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/metaloop"
"storj.io/uplink/private/multipart"
)
// TestLoop does the following
@ -167,10 +166,10 @@ func TestLoop_ObjectNoSegments(t *testing.T) {
expectedNumberOfObjects := 5
for i := 0; i < expectedNumberOfObjects; i++ {
info, err := multipart.NewMultipartUpload(ctx, project, "abcd", "t"+strconv.Itoa(i), nil)
info, err := project.BeginUpload(ctx, "abcd", "t"+strconv.Itoa(i), nil)
require.NoError(t, err)
_, err = multipart.CompleteMultipartUpload(ctx, project, "abcd", "t"+strconv.Itoa(i), info.StreamID, nil)
_, err = project.CommitUpload(ctx, "abcd", "t"+strconv.Itoa(i), info.UploadID, nil)
require.NoError(t, err)
}

View File

@ -4,9 +4,7 @@
package metainfo_test
import (
"bytes"
"context"
"crypto/sha256"
"testing"
"time"
@ -22,9 +20,8 @@ import (
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite/metabase"
"storj.io/uplink/private/etag"
"storj.io/uplink"
"storj.io/uplink/private/metainfo"
"storj.io/uplink/private/multipart"
)
func TestEndpoint_DeleteCommittedObject(t *testing.T) {
@ -59,12 +56,14 @@ func TestEndpoint_DeletePendingObject(t *testing.T) {
_, err = project.CreateBucket(ctx, bucketName)
require.NoError(t, err, "failed to create bucket")
info, err := multipart.NewMultipartUpload(ctx, project, bucketName, "object-filename", &multipart.UploadOptions{})
info, err := project.BeginUpload(ctx, bucketName, "object-filename", &uplink.UploadOptions{})
require.NoError(t, err, "failed to start multipart upload")
_, err = multipart.PutObjectPart(ctx, project, bucketName, bucketName, info.StreamID, 1,
etag.NewHashReader(bytes.NewReader(data), sha256.New()))
upload, err := project.UploadPart(ctx, bucketName, bucketName, info.UploadID, 1)
require.NoError(t, err, "failed to put object part")
_, err = upload.Write(data)
require.NoError(t, err, "failed to put object part")
require.NoError(t, upload.Commit(), "failed to put object part")
}
deleteObject := func(ctx context.Context, t *testing.T, planet *testplanet.Planet) {
projectID := planet.Uplinks[0].Projects[0].ID
@ -117,12 +116,14 @@ func TestEndpoint_DeleteObjectAnyStatus(t *testing.T) {
_, err = project.CreateBucket(ctx, bucketName)
require.NoError(t, err, "failed to create bucket")
info, err := multipart.NewMultipartUpload(ctx, project, bucketName, "object-filename", &multipart.UploadOptions{})
info, err := project.BeginUpload(ctx, bucketName, "object-filename", &uplink.UploadOptions{})
require.NoError(t, err, "failed to start multipart upload")
_, err = multipart.PutObjectPart(ctx, project, bucketName, bucketName, info.StreamID, 1,
etag.NewHashReader(bytes.NewReader(data), sha256.New()))
upload, err := project.UploadPart(ctx, bucketName, bucketName, info.UploadID, 1)
require.NoError(t, err, "failed to put object part")
_, err = upload.Write(data)
require.NoError(t, err, "failed to start multipart upload")
require.NoError(t, upload.Commit(), "failed to start multipart upload")
}
deletePendingObject := func(ctx context.Context, t *testing.T, planet *testplanet.Planet) {

View File

@ -4,8 +4,6 @@
package metainfo_test
import (
"bytes"
"crypto/sha256"
"errors"
"fmt"
"net"
@ -35,9 +33,7 @@ import (
"storj.io/storj/satellite/metabase"
satMetainfo "storj.io/storj/satellite/metainfo"
"storj.io/uplink"
"storj.io/uplink/private/etag"
"storj.io/uplink/private/metainfo"
"storj.io/uplink/private/multipart"
"storj.io/uplink/private/object"
"storj.io/uplink/private/testuplink"
)
@ -1705,24 +1701,28 @@ func TestMultipartObjectDownloadRejection(t *testing.T) {
_, err = project.EnsureBucket(ctx, "pip-second")
require.NoError(t, err)
info, err := multipart.NewMultipartUpload(ctx, project, "pip-second", "multipart-object", nil)
info, err := project.BeginUpload(ctx, "pip-second", "multipart-object", nil)
require.NoError(t, err)
_, err = multipart.PutObjectPart(ctx, project, "pip-second", "multipart-object", info.StreamID, 1,
etag.NewHashReader(bytes.NewReader(data), sha256.New()))
upload, err := project.UploadPart(ctx, "pip-second", "multipart-object", info.UploadID, 1)
require.NoError(t, err)
_, err = multipart.CompleteMultipartUpload(ctx, project, "pip-second", "multipart-object", info.StreamID, nil)
_, err = upload.Write(data)
require.NoError(t, err)
require.NoError(t, upload.Commit())
_, err = project.CommitUpload(ctx, "pip-second", "multipart-object", info.UploadID, nil)
require.NoError(t, err)
_, err = project.EnsureBucket(ctx, "pip-third")
require.NoError(t, err)
info, err = multipart.NewMultipartUpload(ctx, project, "pip-third", "multipart-object-third", nil)
info, err = project.BeginUpload(ctx, "pip-third", "multipart-object-third", nil)
require.NoError(t, err)
for i := 0; i < 4; i++ {
_, err = multipart.PutObjectPart(ctx, project, "pip-third", "multipart-object-third", info.StreamID, i+1,
etag.NewHashReader(bytes.NewReader(data), sha256.New()))
upload, err := project.UploadPart(ctx, "pip-third", "multipart-object-third", info.UploadID, uint32(i+1))
require.NoError(t, err)
_, err = upload.Write(data)
require.NoError(t, err)
require.NoError(t, upload.Commit())
}
_, err = multipart.CompleteMultipartUpload(ctx, project, "pip-third", "multipart-object-third", info.StreamID, nil)
_, err = project.CommitUpload(ctx, "pip-third", "multipart-object-third", info.UploadID, nil)
require.NoError(t, err)
apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()]
@ -1815,11 +1815,13 @@ func TestObjectOverrideOnUpload(t *testing.T) {
defer ctx.Check(project.Close)
// upload pending object
info, err := multipart.NewMultipartUpload(ctx, project, "pip-first", "pending-object", nil)
info, err := project.BeginUpload(ctx, "pip-first", "pending-object", nil)
require.NoError(t, err)
_, err = multipart.PutObjectPart(ctx, project, "pip-first", "pending-object", info.StreamID, 1,
etag.NewHashReader(bytes.NewReader(initialData), sha256.New()))
upload, err := project.UploadPart(ctx, "pip-first", "pending-object", info.UploadID, 1)
require.NoError(t, err)
_, err = upload.Write(initialData)
require.NoError(t, err)
require.NoError(t, upload.Commit())
// upload once again to override
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "pip-first", "pending-object", overrideData)

View File

@ -4,9 +4,7 @@
package repair_test
import (
"bytes"
"context"
"crypto/sha256"
"io"
"math"
"testing"
@ -27,8 +25,6 @@ import (
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/repair/checker"
"storj.io/storj/storage"
"storj.io/uplink/private/etag"
"storj.io/uplink/private/multipart"
)
// TestDataRepair does the following:
@ -222,11 +218,13 @@ func testDataRepairPendingObject(t *testing.T, inMemoryRepair bool) {
require.NoError(t, err)
// upload pending object
info, err := multipart.NewMultipartUpload(ctx, project, "testbucket", "test/path", nil)
info, err := project.BeginUpload(ctx, "testbucket", "test/path", nil)
require.NoError(t, err)
_, err = multipart.PutObjectPart(ctx, project, "testbucket", "test/path", info.StreamID, 7,
etag.NewHashReader(bytes.NewReader(testData), sha256.New()))
upload, err := project.UploadPart(ctx, "testbucket", "test/path", info.UploadID, 7)
require.NoError(t, err)
_, err = upload.Write(testData)
require.NoError(t, err)
require.NoError(t, upload.Commit())
segment, _ := getRemoteSegment(t, ctx, satellite, planet.Uplinks[0].Projects[0].ID, "testbucket")
@ -307,7 +305,7 @@ func testDataRepairPendingObject(t *testing.T, inMemoryRepair bool) {
}
// complete the pending multipart upload
_, err = multipart.CompleteMultipartUpload(ctx, project, "testbucket", "test/path", info.StreamID, nil)
_, err = project.CommitUpload(ctx, "testbucket", "test/path", info.UploadID, nil)
require.NoError(t, err)
// we should be able to download data without any of the original nodes