b1d4a159a6
ListUploads returns incorrect UploadID if Expires was set in BeginUpload. DB is truncating expiration date to microseconds precision so we need to do this also in code. Change-Id: Iee0cf45cb705342f6bb9a2f745acca91cce6ff52
2343 lines
83 KiB
Go
2343 lines
83 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package metainfo_test
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"sort"
|
|
"strconv"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
"github.com/zeebo/errs"
|
|
"go.uber.org/zap"
|
|
|
|
"storj.io/common/errs2"
|
|
"storj.io/common/identity"
|
|
"storj.io/common/memory"
|
|
"storj.io/common/pb"
|
|
"storj.io/common/rpc/rpcstatus"
|
|
"storj.io/common/signing"
|
|
"storj.io/common/storj"
|
|
"storj.io/common/testcontext"
|
|
"storj.io/common/testrand"
|
|
"storj.io/common/uuid"
|
|
"storj.io/storj/private/testplanet"
|
|
"storj.io/storj/satellite"
|
|
"storj.io/storj/satellite/buckets"
|
|
"storj.io/storj/satellite/internalpb"
|
|
"storj.io/storj/satellite/metabase"
|
|
"storj.io/storj/satellite/metainfo"
|
|
"storj.io/storj/storage"
|
|
"storj.io/uplink"
|
|
"storj.io/uplink/private/metaclient"
|
|
"storj.io/uplink/private/object"
|
|
"storj.io/uplink/private/testuplink"
|
|
)
|
|
|
|
func assertRPCStatusCode(t *testing.T, actualError error, expectedStatusCode rpcstatus.StatusCode) {
|
|
statusCode := rpcstatus.Code(actualError)
|
|
require.NotEqual(t, rpcstatus.Unknown, statusCode, "expected rpcstatus error, got \"%v\"", actualError)
|
|
require.Equal(t, expectedStatusCode, statusCode, "wrong %T, got %v", statusCode, actualError)
|
|
}
|
|
|
|
func TestEndpoint_Object_No_StorageNodes(t *testing.T) {
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1, UplinkCount: 1,
|
|
Reconfigure: testplanet.Reconfigure{
|
|
Satellite: testplanet.MaxObjectKeyLength(1024),
|
|
},
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()]
|
|
satellite := planet.Satellites[0]
|
|
|
|
metainfoClient, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey)
|
|
require.NoError(t, err)
|
|
defer ctx.Check(metainfoClient.Close)
|
|
|
|
bucketName := "testbucket"
|
|
deleteBucket := func() error {
|
|
_, err := metainfoClient.DeleteBucket(ctx, metaclient.DeleteBucketParams{
|
|
Name: []byte(bucketName),
|
|
DeleteAll: true,
|
|
})
|
|
return err
|
|
}
|
|
|
|
t.Run("get objects", func(t *testing.T) {
|
|
defer ctx.Check(deleteBucket)
|
|
|
|
files := make([]string, 10)
|
|
data := testrand.Bytes(1 * memory.KiB)
|
|
for i := 0; i < len(files); i++ {
|
|
files[i] = "path" + strconv.Itoa(i)
|
|
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, files[i], data)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
expectedBucketName := bucketName
|
|
items, _, err := metainfoClient.ListObjects(ctx, metaclient.ListObjectsParams{
|
|
Bucket: []byte(expectedBucketName),
|
|
IncludeSystemMetadata: true,
|
|
})
|
|
require.NoError(t, err)
|
|
require.Equal(t, len(files), len(items))
|
|
for _, item := range items {
|
|
require.NotEmpty(t, item.EncryptedObjectKey)
|
|
require.True(t, item.CreatedAt.Before(time.Now()))
|
|
|
|
object, err := metainfoClient.GetObject(ctx, metaclient.GetObjectParams{
|
|
Bucket: []byte(expectedBucketName),
|
|
EncryptedObjectKey: item.EncryptedObjectKey,
|
|
})
|
|
require.NoError(t, err)
|
|
require.Equal(t, item.EncryptedObjectKey, object.EncryptedObjectKey)
|
|
|
|
require.NotEmpty(t, object.StreamID)
|
|
}
|
|
|
|
items, _, err = metainfoClient.ListObjects(ctx, metaclient.ListObjectsParams{
|
|
Bucket: []byte(expectedBucketName),
|
|
Limit: 3,
|
|
})
|
|
require.NoError(t, err)
|
|
require.Equal(t, 3, len(items))
|
|
|
|
})
|
|
|
|
t.Run("list service", func(t *testing.T) {
|
|
defer ctx.Check(deleteBucket)
|
|
|
|
items := []struct {
|
|
Key string
|
|
Value []byte
|
|
}{
|
|
{Key: "sample.😶", Value: []byte{1}},
|
|
{Key: "müsic", Value: []byte{2}},
|
|
{Key: "müsic/söng1.mp3", Value: []byte{3}},
|
|
{Key: "müsic/söng2.mp3", Value: []byte{4}},
|
|
{Key: "müsic/album/söng3.mp3", Value: []byte{5}},
|
|
{Key: "müsic/söng4.mp3", Value: []byte{6}},
|
|
{Key: "ビデオ/movie.mkv", Value: []byte{7}},
|
|
}
|
|
|
|
for _, item := range items {
|
|
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, item.Key, item.Value)
|
|
assert.NoError(t, err)
|
|
}
|
|
|
|
project, err := planet.Uplinks[0].GetProject(ctx, planet.Satellites[0])
|
|
require.NoError(t, err)
|
|
defer ctx.Check(project.Close)
|
|
|
|
objects := project.ListObjects(ctx, "testbucket", &uplink.ListObjectsOptions{
|
|
Recursive: true,
|
|
})
|
|
|
|
listItems := make([]*uplink.Object, 0)
|
|
for objects.Next() {
|
|
listItems = append(listItems, objects.Item())
|
|
}
|
|
require.NoError(t, objects.Err())
|
|
|
|
expected := []metaclient.Object{
|
|
{Path: "müsic"},
|
|
{Path: "müsic/album/söng3.mp3"},
|
|
{Path: "müsic/söng1.mp3"},
|
|
{Path: "müsic/söng2.mp3"},
|
|
{Path: "müsic/söng4.mp3"},
|
|
{Path: "sample.😶"},
|
|
{Path: "ビデオ/movie.mkv"},
|
|
}
|
|
|
|
require.Equal(t, len(expected), len(listItems))
|
|
sort.Slice(listItems, func(i, k int) bool {
|
|
return listItems[i].Key < listItems[k].Key
|
|
})
|
|
for i, item := range expected {
|
|
require.Equal(t, item.Path, listItems[i].Key)
|
|
require.Equal(t, item.IsPrefix, listItems[i].IsPrefix)
|
|
}
|
|
|
|
objects = project.ListObjects(ctx, bucketName, &uplink.ListObjectsOptions{
|
|
Recursive: false,
|
|
})
|
|
|
|
listItems = make([]*uplink.Object, 0)
|
|
for objects.Next() {
|
|
listItems = append(listItems, objects.Item())
|
|
}
|
|
require.NoError(t, objects.Err())
|
|
|
|
expected = []metaclient.Object{
|
|
{Path: "müsic"},
|
|
{Path: "müsic/", IsPrefix: true},
|
|
{Path: "sample.😶"},
|
|
{Path: "ビデオ/", IsPrefix: true},
|
|
}
|
|
|
|
require.Equal(t, len(expected), len(listItems))
|
|
sort.Slice(listItems, func(i, k int) bool {
|
|
return listItems[i].Key < listItems[k].Key
|
|
})
|
|
for i, item := range expected {
|
|
t.Log(item.Path, listItems[i].Key)
|
|
require.Equal(t, item.Path, listItems[i].Key)
|
|
require.Equal(t, item.IsPrefix, listItems[i].IsPrefix)
|
|
}
|
|
})
|
|
|
|
// ensures that CommitObject returns an error when the metadata provided by the user is too large.
|
|
t.Run("validate metadata size", func(t *testing.T) {
|
|
defer ctx.Check(deleteBucket)
|
|
|
|
err = planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], bucketName)
|
|
require.NoError(t, err)
|
|
|
|
params := metaclient.BeginObjectParams{
|
|
Bucket: []byte(bucketName),
|
|
EncryptedObjectKey: []byte("encrypted-path"),
|
|
Redundancy: storj.RedundancyScheme{
|
|
Algorithm: storj.ReedSolomon,
|
|
ShareSize: 256,
|
|
RequiredShares: 1,
|
|
RepairShares: 1,
|
|
OptimalShares: 3,
|
|
TotalShares: 4,
|
|
},
|
|
EncryptionParameters: storj.EncryptionParameters{
|
|
BlockSize: 256,
|
|
CipherSuite: storj.EncNull,
|
|
},
|
|
ExpiresAt: time.Now().Add(24 * time.Hour),
|
|
}
|
|
beginObjectResponse, err := metainfoClient.BeginObject(ctx, params)
|
|
require.NoError(t, err)
|
|
|
|
// 5KiB metadata should fail because it is too large.
|
|
metadata, err := pb.Marshal(&pb.StreamMeta{
|
|
EncryptedStreamInfo: testrand.Bytes(5 * memory.KiB),
|
|
NumberOfSegments: 1,
|
|
})
|
|
require.NoError(t, err)
|
|
err = metainfoClient.CommitObject(ctx, metaclient.CommitObjectParams{
|
|
StreamID: beginObjectResponse.StreamID,
|
|
EncryptedMetadata: metadata,
|
|
EncryptedMetadataNonce: testrand.Nonce(),
|
|
EncryptedMetadataEncryptedKey: randomEncryptedKey,
|
|
})
|
|
require.Error(t, err)
|
|
assertInvalidArgument(t, err, true)
|
|
|
|
// 1KiB metadata should not fail.
|
|
metadata, err = pb.Marshal(&pb.StreamMeta{
|
|
EncryptedStreamInfo: testrand.Bytes(1 * memory.KiB),
|
|
NumberOfSegments: 1,
|
|
})
|
|
require.NoError(t, err)
|
|
err = metainfoClient.CommitObject(ctx, metaclient.CommitObjectParams{
|
|
StreamID: beginObjectResponse.StreamID,
|
|
EncryptedMetadata: metadata,
|
|
EncryptedMetadataNonce: testrand.Nonce(),
|
|
EncryptedMetadataEncryptedKey: randomEncryptedKey,
|
|
})
|
|
require.NoError(t, err)
|
|
})
|
|
|
|
t.Run("update metadata", func(t *testing.T) {
|
|
defer ctx.Check(deleteBucket)
|
|
|
|
satelliteSys := planet.Satellites[0]
|
|
|
|
// upload a small inline object
|
|
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, "testobject", testrand.Bytes(1*memory.KiB))
|
|
require.NoError(t, err)
|
|
|
|
objects, err := satelliteSys.API.Metainfo.Metabase.TestingAllObjects(ctx)
|
|
require.NoError(t, err)
|
|
require.Len(t, objects, 1)
|
|
|
|
getResp, err := satelliteSys.API.Metainfo.Endpoint.GetObject(ctx, &pb.ObjectGetRequest{
|
|
Header: &pb.RequestHeader{
|
|
ApiKey: apiKey.SerializeRaw(),
|
|
},
|
|
Bucket: []byte("testbucket"),
|
|
EncryptedObjectKey: []byte(objects[0].ObjectKey),
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
testEncryptedMetadata := testrand.BytesInt(64)
|
|
testEncryptedMetadataEncryptedKey := randomEncryptedKey
|
|
testEncryptedMetadataNonce := testrand.Nonce()
|
|
|
|
// update the object metadata
|
|
_, err = satelliteSys.API.Metainfo.Endpoint.UpdateObjectMetadata(ctx, &pb.ObjectUpdateMetadataRequest{
|
|
Header: &pb.RequestHeader{
|
|
ApiKey: apiKey.SerializeRaw(),
|
|
},
|
|
Bucket: getResp.Object.Bucket,
|
|
EncryptedObjectKey: getResp.Object.EncryptedObjectKey,
|
|
Version: getResp.Object.Version,
|
|
StreamId: getResp.Object.StreamId,
|
|
EncryptedMetadataNonce: testEncryptedMetadataNonce,
|
|
EncryptedMetadata: testEncryptedMetadata,
|
|
EncryptedMetadataEncryptedKey: testEncryptedMetadataEncryptedKey,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
// assert the metadata has been updated
|
|
objects, err = satelliteSys.API.Metainfo.Metabase.TestingAllObjects(ctx)
|
|
require.NoError(t, err)
|
|
require.Len(t, objects, 1)
|
|
assert.Equal(t, testEncryptedMetadata, objects[0].EncryptedMetadata)
|
|
assert.Equal(t, testEncryptedMetadataEncryptedKey, objects[0].EncryptedMetadataEncryptedKey)
|
|
assert.Equal(t, testEncryptedMetadataNonce[:], objects[0].EncryptedMetadataNonce)
|
|
})
|
|
|
|
t.Run("check delete rights on upload", func(t *testing.T) {
|
|
defer ctx.Check(deleteBucket)
|
|
|
|
up := planet.Uplinks[0]
|
|
|
|
err := up.CreateBucket(ctx, planet.Satellites[0], bucketName)
|
|
require.NoError(t, err)
|
|
|
|
data := testrand.Bytes(1 * memory.KiB)
|
|
err = up.Upload(ctx, planet.Satellites[0], bucketName, "test-key", data)
|
|
require.NoError(t, err)
|
|
|
|
access := up.Access[planet.Satellites[0].ID()]
|
|
|
|
overwrite := func(allowDelete bool) error {
|
|
permission := uplink.FullPermission()
|
|
permission.AllowDelete = allowDelete
|
|
|
|
sharedAccess, err := access.Share(permission)
|
|
require.NoError(t, err)
|
|
|
|
project, err := uplink.OpenProject(ctx, sharedAccess)
|
|
require.NoError(t, err)
|
|
defer ctx.Check(project.Close)
|
|
|
|
upload, err := project.UploadObject(ctx, bucketName, "test-key", nil)
|
|
require.NoError(t, err)
|
|
|
|
_, err = upload.Write([]byte("new data"))
|
|
require.NoError(t, err)
|
|
|
|
return upload.Commit()
|
|
}
|
|
|
|
require.Error(t, overwrite(false))
|
|
require.NoError(t, overwrite(true))
|
|
})
|
|
|
|
t.Run("immutable upload", func(t *testing.T) {
|
|
defer ctx.Check(deleteBucket)
|
|
|
|
access := planet.Uplinks[0].Access[planet.Satellites[0].ID()]
|
|
|
|
permission := uplink.Permission{AllowUpload: true} // AllowDelete: false
|
|
sharedAccess, err := access.Share(permission)
|
|
require.NoError(t, err)
|
|
|
|
project, err := uplink.OpenProject(ctx, sharedAccess)
|
|
require.NoError(t, err)
|
|
defer ctx.Check(project.Close)
|
|
|
|
_, err = project.CreateBucket(ctx, bucketName)
|
|
require.NoError(t, err)
|
|
|
|
// Uploading the object for first time should be successful.
|
|
upload, err := project.UploadObject(ctx, bucketName, "test-key", nil)
|
|
require.NoError(t, err)
|
|
|
|
_, err = upload.Write(testrand.Bytes(1 * memory.KiB))
|
|
require.NoError(t, err)
|
|
|
|
err = upload.Commit()
|
|
require.NoError(t, err)
|
|
|
|
// Overwriting the object should fail on Commit.
|
|
upload, err = project.UploadObject(ctx, bucketName, "test-key", nil)
|
|
require.NoError(t, err)
|
|
|
|
_, err = upload.Write(testrand.Bytes(1 * memory.KiB))
|
|
require.NoError(t, err)
|
|
|
|
err = upload.Commit()
|
|
require.Error(t, err)
|
|
})
|
|
|
|
t.Run("stable upload id", func(t *testing.T) {
|
|
defer ctx.Check(deleteBucket)
|
|
|
|
err = planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], bucketName)
|
|
require.NoError(t, err)
|
|
|
|
beginResp, err := metainfoClient.BeginObject(ctx, metaclient.BeginObjectParams{
|
|
Bucket: []byte(bucketName),
|
|
EncryptedObjectKey: []byte("a/b/testobject"),
|
|
EncryptionParameters: storj.EncryptionParameters{
|
|
CipherSuite: storj.EncAESGCM,
|
|
BlockSize: 256,
|
|
},
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
// List the root of the bucket recursively
|
|
listResp, _, err := metainfoClient.ListObjects(ctx, metaclient.ListObjectsParams{
|
|
Bucket: []byte(bucketName),
|
|
Status: int32(metabase.Pending),
|
|
Recursive: true,
|
|
IncludeSystemMetadata: true,
|
|
})
|
|
require.NoError(t, err)
|
|
require.Len(t, listResp, 1)
|
|
// check that BeginObject and ListObjects return the same StreamID.
|
|
assert.Equal(t, beginResp.StreamID, listResp[0].StreamID)
|
|
|
|
// List with prefix non-recursively
|
|
listResp2, _, err := metainfoClient.ListObjects(ctx, metaclient.ListObjectsParams{
|
|
Bucket: []byte(bucketName),
|
|
Status: int32(metabase.Pending),
|
|
EncryptedPrefix: []byte("a/b/"),
|
|
IncludeSystemMetadata: true,
|
|
})
|
|
require.NoError(t, err)
|
|
require.Len(t, listResp2, 1)
|
|
// check that the StreamID is still the same.
|
|
assert.Equal(t, listResp[0].StreamID, listResp2[0].StreamID)
|
|
|
|
// List with prefix recursively
|
|
listResp3, _, err := metainfoClient.ListObjects(ctx, metaclient.ListObjectsParams{
|
|
Bucket: []byte(bucketName),
|
|
Status: int32(metabase.Pending),
|
|
EncryptedPrefix: []byte("a/b/"),
|
|
Recursive: true,
|
|
IncludeSystemMetadata: true,
|
|
})
|
|
require.NoError(t, err)
|
|
require.Len(t, listResp3, 1)
|
|
// check that the StreamID is still the same.
|
|
assert.Equal(t, listResp[0].StreamID, listResp3[0].StreamID)
|
|
|
|
// List the pending object directly
|
|
listResp4, err := metainfoClient.ListPendingObjectStreams(ctx, metaclient.ListPendingObjectStreamsParams{
|
|
Bucket: []byte(bucketName),
|
|
EncryptedObjectKey: []byte("a/b/testobject"),
|
|
})
|
|
require.NoError(t, err)
|
|
require.Len(t, listResp4.Items, 1)
|
|
// check that the StreamID is still the same.
|
|
assert.Equal(t, listResp[0].StreamID, listResp4.Items[0].StreamID)
|
|
})
|
|
|
|
// ensures that BeginObject returns an error when the encrypted key provided by the user is too large.
|
|
t.Run("validate encrypted object key length", func(t *testing.T) {
|
|
defer ctx.Check(deleteBucket)
|
|
|
|
err := planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], bucketName)
|
|
require.NoError(t, err)
|
|
|
|
params := metaclient.BeginObjectParams{
|
|
Bucket: []byte(bucketName),
|
|
EncryptionParameters: storj.EncryptionParameters{
|
|
BlockSize: 256,
|
|
CipherSuite: storj.EncNull,
|
|
},
|
|
}
|
|
|
|
params.EncryptedObjectKey = testrand.Bytes(500)
|
|
_, err = metainfoClient.BeginObject(ctx, params)
|
|
require.NoError(t, err)
|
|
|
|
params.EncryptedObjectKey = testrand.Bytes(1024)
|
|
_, err = metainfoClient.BeginObject(ctx, params)
|
|
require.NoError(t, err)
|
|
|
|
params.EncryptedObjectKey = testrand.Bytes(2048)
|
|
_, err = metainfoClient.BeginObject(ctx, params)
|
|
require.Error(t, err)
|
|
require.True(t, rpcstatus.Code(err) == rpcstatus.InvalidArgument)
|
|
})
|
|
|
|
t.Run("delete not existing object", func(t *testing.T) {
|
|
expectedBucketName := bucketName
|
|
|
|
// non-pending non-existent objects return no error
|
|
_, err = metainfoClient.BeginDeleteObject(ctx, metaclient.BeginDeleteObjectParams{
|
|
Bucket: []byte(expectedBucketName),
|
|
EncryptedObjectKey: []byte("bad path"),
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
// pending non-existent objects return an RPC error
|
|
signer := signing.SignerFromFullIdentity(planet.Satellites[0].Identity)
|
|
streamUUID := testrand.UUID()
|
|
satStreamID := &internalpb.StreamID{
|
|
Bucket: []byte(expectedBucketName),
|
|
EncryptedObjectKey: []byte("bad path"),
|
|
StreamId: streamUUID[:],
|
|
CreationDate: time.Now(),
|
|
}
|
|
signedStreamID, err := metainfo.SignStreamID(ctx, signer, satStreamID)
|
|
require.NoError(t, err)
|
|
encodedStreamID, err := pb.Marshal(signedStreamID)
|
|
require.NoError(t, err)
|
|
streamID, err := storj.StreamIDFromBytes(encodedStreamID)
|
|
require.NoError(t, err)
|
|
_, err = metainfoClient.BeginDeleteObject(ctx, metaclient.BeginDeleteObjectParams{
|
|
Bucket: []byte(expectedBucketName),
|
|
EncryptedObjectKey: []byte("bad path"),
|
|
Status: int32(metabase.Pending),
|
|
StreamID: streamID,
|
|
})
|
|
require.True(t, errs2.IsRPC(err, rpcstatus.NotFound))
|
|
})
|
|
|
|
t.Run("get object", func(t *testing.T) {
|
|
defer ctx.Check(deleteBucket)
|
|
|
|
err := planet.Uplinks[0].Upload(ctx, satellite, "testbucket", "object", testrand.Bytes(256))
|
|
require.NoError(t, err)
|
|
|
|
objects, err := satellite.API.Metainfo.Metabase.TestingAllObjects(ctx)
|
|
require.NoError(t, err)
|
|
require.Len(t, objects, 1)
|
|
|
|
committedObject := objects[0]
|
|
|
|
pendingObject, err := satellite.API.Metainfo.Metabase.BeginObjectNextVersion(ctx, metabase.BeginObjectNextVersion{
|
|
ObjectStream: metabase.ObjectStream{
|
|
ProjectID: committedObject.ProjectID,
|
|
BucketName: committedObject.BucketName,
|
|
ObjectKey: committedObject.ObjectKey,
|
|
StreamID: committedObject.StreamID,
|
|
},
|
|
})
|
|
require.NoError(t, err)
|
|
require.Equal(t, committedObject.Version+1, pendingObject.Version)
|
|
|
|
getObjectResponse, err := satellite.API.Metainfo.Endpoint.GetObject(ctx, &pb.ObjectGetRequest{
|
|
Header: &pb.RequestHeader{ApiKey: apiKey.SerializeRaw()},
|
|
Bucket: []byte("testbucket"),
|
|
EncryptedObjectKey: []byte(committedObject.ObjectKey),
|
|
Version: int32(committedObject.Version),
|
|
})
|
|
require.NoError(t, err)
|
|
require.EqualValues(t, committedObject.BucketName, getObjectResponse.Object.Bucket)
|
|
require.EqualValues(t, committedObject.ObjectKey, getObjectResponse.Object.EncryptedObjectKey)
|
|
require.EqualValues(t, committedObject.Version, getObjectResponse.Object.Version)
|
|
})
|
|
|
|
t.Run("download object", func(t *testing.T) {
|
|
defer ctx.Check(deleteBucket)
|
|
|
|
err := planet.Uplinks[0].Upload(ctx, satellite, "testbucket", "object", testrand.Bytes(256))
|
|
require.NoError(t, err)
|
|
|
|
objects, err := satellite.API.Metainfo.Metabase.TestingAllObjects(ctx)
|
|
require.NoError(t, err)
|
|
require.Len(t, objects, 1)
|
|
|
|
committedObject := objects[0]
|
|
|
|
pendingObject, err := satellite.API.Metainfo.Metabase.BeginObjectNextVersion(ctx, metabase.BeginObjectNextVersion{
|
|
ObjectStream: metabase.ObjectStream{
|
|
ProjectID: committedObject.ProjectID,
|
|
BucketName: committedObject.BucketName,
|
|
ObjectKey: committedObject.ObjectKey,
|
|
StreamID: committedObject.StreamID,
|
|
},
|
|
})
|
|
require.NoError(t, err)
|
|
require.Equal(t, committedObject.Version+1, pendingObject.Version)
|
|
|
|
downloadObjectResponse, err := satellite.API.Metainfo.Endpoint.DownloadObject(ctx, &pb.ObjectDownloadRequest{
|
|
Header: &pb.RequestHeader{ApiKey: apiKey.SerializeRaw()},
|
|
Bucket: []byte("testbucket"),
|
|
EncryptedObjectKey: []byte(committedObject.ObjectKey),
|
|
})
|
|
require.NoError(t, err)
|
|
require.EqualValues(t, committedObject.BucketName, downloadObjectResponse.Object.Bucket)
|
|
require.EqualValues(t, committedObject.ObjectKey, downloadObjectResponse.Object.EncryptedObjectKey)
|
|
require.EqualValues(t, committedObject.Version, downloadObjectResponse.Object.Version)
|
|
})
|
|
|
|
t.Run("begin expired object", func(t *testing.T) {
|
|
defer ctx.Check(deleteBucket)
|
|
|
|
err := planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], bucketName)
|
|
require.NoError(t, err)
|
|
|
|
params := metaclient.BeginObjectParams{
|
|
Bucket: []byte(bucketName),
|
|
EncryptedObjectKey: []byte("encrypted-path"),
|
|
Redundancy: storj.RedundancyScheme{
|
|
Algorithm: storj.ReedSolomon,
|
|
ShareSize: 256,
|
|
RequiredShares: 1,
|
|
RepairShares: 1,
|
|
OptimalShares: 3,
|
|
TotalShares: 4,
|
|
},
|
|
EncryptionParameters: storj.EncryptionParameters{
|
|
BlockSize: 256,
|
|
CipherSuite: storj.EncNull,
|
|
},
|
|
ExpiresAt: time.Now().Add(-24 * time.Hour),
|
|
}
|
|
|
|
_, err = metainfoClient.BeginObject(ctx, params)
|
|
require.Error(t, err)
|
|
require.Contains(t, err.Error(), "Invalid expiration time")
|
|
require.True(t, errs2.IsRPC(err, rpcstatus.InvalidArgument))
|
|
})
|
|
|
|
t.Run("UploadID check", func(t *testing.T) {
|
|
defer ctx.Check(deleteBucket)
|
|
|
|
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
|
|
require.NoError(t, err)
|
|
defer ctx.Check(project.Close)
|
|
|
|
_, err = project.CreateBucket(ctx, bucketName)
|
|
require.NoError(t, err)
|
|
|
|
for _, tt := range []struct {
|
|
expires time.Time
|
|
options uplink.ListUploadsOptions
|
|
}{
|
|
{
|
|
options: uplink.ListUploadsOptions{System: false, Custom: false},
|
|
},
|
|
{
|
|
options: uplink.ListUploadsOptions{System: true, Custom: false},
|
|
},
|
|
{
|
|
options: uplink.ListUploadsOptions{System: true, Custom: true},
|
|
},
|
|
{
|
|
options: uplink.ListUploadsOptions{System: false, Custom: true},
|
|
},
|
|
{
|
|
expires: time.Now().Add(24 * time.Hour),
|
|
options: uplink.ListUploadsOptions{System: false, Custom: false},
|
|
},
|
|
{
|
|
expires: time.Now().Add(24 * time.Hour),
|
|
options: uplink.ListUploadsOptions{System: true, Custom: false},
|
|
},
|
|
{
|
|
expires: time.Now().Add(24 * time.Hour),
|
|
options: uplink.ListUploadsOptions{System: true, Custom: true},
|
|
},
|
|
{
|
|
expires: time.Now().Add(24 * time.Hour),
|
|
options: uplink.ListUploadsOptions{System: false, Custom: true},
|
|
},
|
|
} {
|
|
t.Run(fmt.Sprintf("expires:%v;system:%v;custom:%v", !tt.expires.IsZero(), tt.options.System, tt.options.Custom), func(t *testing.T) {
|
|
uploadInfo, err := project.BeginUpload(ctx, bucketName, "multipart-object", &uplink.UploadOptions{
|
|
Expires: tt.expires,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
iterator := project.ListUploads(ctx, bucketName, &tt.options)
|
|
require.True(t, iterator.Next())
|
|
require.Equal(t, uploadInfo.UploadID, iterator.Item().UploadID)
|
|
|
|
err = project.AbortUpload(ctx, bucketName, "multipart-object", iterator.Item().UploadID)
|
|
require.NoError(t, err)
|
|
})
|
|
}
|
|
})
|
|
})
|
|
}
|
|
|
|
// TODO remove when listing query tests feature flag is removed.
|
|
func TestEndpoint_Object_No_StorageNodes_TestListingQuery(t *testing.T) {
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1, UplinkCount: 1,
|
|
Reconfigure: testplanet.Reconfigure{
|
|
Satellite: testplanet.Combine(testplanet.MaxObjectKeyLength(1024), func(log *zap.Logger, index int, config *satellite.Config) {
|
|
config.Metainfo.TestListingQuery = true
|
|
}),
|
|
},
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()]
|
|
|
|
metainfoClient, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey)
|
|
require.NoError(t, err)
|
|
defer ctx.Check(metainfoClient.Close)
|
|
|
|
bucketName := "testbucket"
|
|
deleteBucket := func() error {
|
|
_, err := metainfoClient.DeleteBucket(ctx, metaclient.DeleteBucketParams{
|
|
Name: []byte(bucketName),
|
|
DeleteAll: true,
|
|
})
|
|
return err
|
|
}
|
|
|
|
t.Run("list service with listing query test", func(t *testing.T) {
|
|
defer ctx.Check(deleteBucket)
|
|
|
|
items := []struct {
|
|
Key string
|
|
Value []byte
|
|
}{
|
|
{Key: "sample.😶", Value: []byte{1}},
|
|
{Key: "müsic", Value: []byte{2}},
|
|
{Key: "müsic/söng1.mp3", Value: []byte{3}},
|
|
{Key: "müsic/söng2.mp3", Value: []byte{4}},
|
|
{Key: "müsic/album/söng3.mp3", Value: []byte{5}},
|
|
{Key: "müsic/söng4.mp3", Value: []byte{6}},
|
|
{Key: "ビデオ/movie.mkv", Value: []byte{7}},
|
|
}
|
|
|
|
for _, item := range items {
|
|
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, item.Key, item.Value)
|
|
assert.NoError(t, err)
|
|
}
|
|
|
|
project, err := planet.Uplinks[0].GetProject(ctx, planet.Satellites[0])
|
|
require.NoError(t, err)
|
|
defer ctx.Check(project.Close)
|
|
|
|
objects := project.ListObjects(ctx, "testbucket", &uplink.ListObjectsOptions{
|
|
Recursive: true,
|
|
})
|
|
|
|
listItems := make([]*uplink.Object, 0)
|
|
for objects.Next() {
|
|
listItems = append(listItems, objects.Item())
|
|
}
|
|
require.NoError(t, objects.Err())
|
|
|
|
expected := []metaclient.Object{
|
|
{Path: "müsic"},
|
|
{Path: "müsic/album/söng3.mp3"},
|
|
{Path: "müsic/söng1.mp3"},
|
|
{Path: "müsic/söng2.mp3"},
|
|
{Path: "müsic/söng4.mp3"},
|
|
{Path: "sample.😶"},
|
|
{Path: "ビデオ/movie.mkv"},
|
|
}
|
|
|
|
require.Equal(t, len(expected), len(listItems))
|
|
sort.Slice(listItems, func(i, k int) bool {
|
|
return listItems[i].Key < listItems[k].Key
|
|
})
|
|
for i, item := range expected {
|
|
require.Equal(t, item.Path, listItems[i].Key)
|
|
require.Equal(t, item.IsPrefix, listItems[i].IsPrefix)
|
|
}
|
|
|
|
objects = project.ListObjects(ctx, bucketName, &uplink.ListObjectsOptions{
|
|
Recursive: false,
|
|
})
|
|
|
|
listItems = make([]*uplink.Object, 0)
|
|
for objects.Next() {
|
|
listItems = append(listItems, objects.Item())
|
|
}
|
|
require.NoError(t, objects.Err())
|
|
|
|
expected = []metaclient.Object{
|
|
{Path: "müsic"},
|
|
{Path: "müsic/", IsPrefix: true},
|
|
{Path: "sample.😶"},
|
|
{Path: "ビデオ/", IsPrefix: true},
|
|
}
|
|
|
|
require.Equal(t, len(expected), len(listItems))
|
|
sort.Slice(listItems, func(i, k int) bool {
|
|
return listItems[i].Key < listItems[k].Key
|
|
})
|
|
for i, item := range expected {
|
|
t.Log(item.Path, listItems[i].Key)
|
|
require.Equal(t, item.Path, listItems[i].Key)
|
|
require.Equal(t, item.IsPrefix, listItems[i].IsPrefix)
|
|
}
|
|
})
|
|
|
|
})
|
|
}
|
|
func TestEndpoint_Object_With_StorageNodes(t *testing.T) {
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
|
Reconfigure: testplanet.Reconfigure{
|
|
Satellite: func(logger *zap.Logger, index int, config *satellite.Config) {
|
|
config.Overlay.GeoIP.MockCountries = []string{"DE"}
|
|
},
|
|
},
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()]
|
|
metainfoClient, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey)
|
|
require.NoError(t, err)
|
|
defer ctx.Check(metainfoClient.Close)
|
|
|
|
bucketName := "testbucket"
|
|
deleteBucket := func(bucketName string) func() error {
|
|
return func() error {
|
|
_, err := metainfoClient.DeleteBucket(ctx, metaclient.DeleteBucketParams{
|
|
Name: []byte(bucketName),
|
|
DeleteAll: true,
|
|
})
|
|
return err
|
|
}
|
|
}
|
|
|
|
t.Run("begin commit", func(t *testing.T) {
|
|
defer ctx.Check(deleteBucket(bucketName))
|
|
|
|
buckets := planet.Satellites[0].API.Buckets.Service
|
|
|
|
bucket := storj.Bucket{
|
|
Name: bucketName,
|
|
ProjectID: planet.Uplinks[0].Projects[0].ID,
|
|
Placement: storj.EU,
|
|
}
|
|
|
|
_, err := buckets.CreateBucket(ctx, bucket)
|
|
require.NoError(t, err)
|
|
|
|
params := metaclient.BeginObjectParams{
|
|
Bucket: []byte(bucket.Name),
|
|
EncryptedObjectKey: []byte("encrypted-path"),
|
|
Redundancy: storj.RedundancyScheme{
|
|
Algorithm: storj.ReedSolomon,
|
|
ShareSize: 256,
|
|
RequiredShares: 1,
|
|
RepairShares: 1,
|
|
OptimalShares: 3,
|
|
TotalShares: 4,
|
|
},
|
|
EncryptionParameters: storj.EncryptionParameters{
|
|
CipherSuite: storj.EncAESGCM,
|
|
BlockSize: 256,
|
|
},
|
|
ExpiresAt: time.Now().Add(24 * time.Hour),
|
|
}
|
|
beginObjectResponse, err := metainfoClient.BeginObject(ctx, params)
|
|
require.NoError(t, err)
|
|
|
|
streamID := internalpb.StreamID{}
|
|
err = pb.Unmarshal(beginObjectResponse.StreamID.Bytes(), &streamID)
|
|
require.NoError(t, err)
|
|
require.Equal(t, int32(storj.EU), streamID.Placement)
|
|
|
|
response, err := metainfoClient.BeginSegment(ctx, metaclient.BeginSegmentParams{
|
|
StreamID: beginObjectResponse.StreamID,
|
|
Position: metaclient.SegmentPosition{
|
|
Index: 0,
|
|
},
|
|
MaxOrderLimit: memory.MiB.Int64(),
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
fullIDMap := make(map[storj.NodeID]*identity.FullIdentity)
|
|
for _, node := range planet.StorageNodes {
|
|
fullIDMap[node.ID()] = node.Identity
|
|
}
|
|
|
|
makeResult := func(num int32) *pb.SegmentPieceUploadResult {
|
|
nodeID := response.Limits[num].Limit.StorageNodeId
|
|
hash := &pb.PieceHash{
|
|
PieceId: response.Limits[num].Limit.PieceId,
|
|
PieceSize: 1048832,
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
fullID := fullIDMap[nodeID]
|
|
require.NotNil(t, fullID)
|
|
signer := signing.SignerFromFullIdentity(fullID)
|
|
signedHash, err := signing.SignPieceHash(ctx, signer, hash)
|
|
require.NoError(t, err)
|
|
|
|
return &pb.SegmentPieceUploadResult{
|
|
PieceNum: num,
|
|
NodeId: nodeID,
|
|
Hash: signedHash,
|
|
}
|
|
}
|
|
err = metainfoClient.CommitSegment(ctx, metaclient.CommitSegmentParams{
|
|
SegmentID: response.SegmentID,
|
|
Encryption: metaclient.SegmentEncryption{
|
|
EncryptedKey: testrand.Bytes(256),
|
|
},
|
|
PlainSize: 5000,
|
|
SizeEncryptedData: memory.MiB.Int64(),
|
|
UploadResult: []*pb.SegmentPieceUploadResult{
|
|
makeResult(0),
|
|
makeResult(1),
|
|
makeResult(2),
|
|
},
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
metadata, err := pb.Marshal(&pb.StreamMeta{
|
|
NumberOfSegments: 1,
|
|
})
|
|
require.NoError(t, err)
|
|
err = metainfoClient.CommitObject(ctx, metaclient.CommitObjectParams{
|
|
StreamID: beginObjectResponse.StreamID,
|
|
EncryptedMetadata: metadata,
|
|
EncryptedMetadataNonce: testrand.Nonce(),
|
|
EncryptedMetadataEncryptedKey: randomEncryptedKey,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
objects, _, err := metainfoClient.ListObjects(ctx, metaclient.ListObjectsParams{
|
|
Bucket: []byte(bucket.Name),
|
|
IncludeSystemMetadata: true,
|
|
})
|
|
require.NoError(t, err)
|
|
require.Len(t, objects, 1)
|
|
require.Equal(t, params.EncryptedObjectKey, objects[0].EncryptedObjectKey)
|
|
// TODO find better way to compare (one ExpiresAt contains time zone informations)
|
|
require.Equal(t, params.ExpiresAt.Unix(), objects[0].ExpiresAt.Unix())
|
|
|
|
object, err := metainfoClient.GetObject(ctx, metaclient.GetObjectParams{
|
|
Bucket: []byte(bucket.Name),
|
|
EncryptedObjectKey: objects[0].EncryptedObjectKey,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
project := planet.Uplinks[0].Projects[0]
|
|
allObjects, err := planet.Satellites[0].Metabase.DB.TestingAllCommittedObjects(ctx, project.ID, object.Bucket)
|
|
require.NoError(t, err)
|
|
require.Len(t, allObjects, 1)
|
|
})
|
|
|
|
t.Run("get object IP", func(t *testing.T) {
|
|
defer ctx.Check(deleteBucket(bucketName))
|
|
|
|
access := planet.Uplinks[0].Access[planet.Satellites[0].ID()]
|
|
uplnk := planet.Uplinks[0]
|
|
uplinkCtx := testuplink.WithMaxSegmentSize(ctx, 5*memory.KB)
|
|
sat := planet.Satellites[0]
|
|
|
|
require.NoError(t, uplnk.CreateBucket(uplinkCtx, sat, bucketName))
|
|
require.NoError(t, uplnk.Upload(uplinkCtx, sat, bucketName, "jones", testrand.Bytes(20*memory.KB)))
|
|
|
|
project, err := uplnk.OpenProject(ctx, planet.Satellites[0])
|
|
require.NoError(t, err)
|
|
defer ctx.Check(project.Close)
|
|
|
|
// make a copy
|
|
_, err = project.CopyObject(ctx, bucketName, "jones", bucketName, "jones_copy", nil)
|
|
require.NoError(t, err)
|
|
|
|
ips, err := object.GetObjectIPs(ctx, uplink.Config{}, access, bucketName, "jones")
|
|
require.NoError(t, err)
|
|
require.True(t, len(ips) > 0)
|
|
|
|
copyIPs, err := object.GetObjectIPs(ctx, uplink.Config{}, access, bucketName, "jones_copy")
|
|
require.NoError(t, err)
|
|
|
|
sort.Slice(ips, func(i, j int) bool {
|
|
return bytes.Compare(ips[i], ips[j]) < 0
|
|
})
|
|
sort.Slice(copyIPs, func(i, j int) bool {
|
|
return bytes.Compare(copyIPs[i], copyIPs[j]) < 0
|
|
})
|
|
|
|
// verify that orignal and copy has the same results
|
|
require.Equal(t, ips, copyIPs)
|
|
|
|
// verify it's a real IP with valid host and port
|
|
for _, ip := range ips {
|
|
host, port, err := net.SplitHostPort(string(ip))
|
|
require.NoError(t, err)
|
|
netIP := net.ParseIP(host)
|
|
require.NotNil(t, netIP)
|
|
_, err = strconv.Atoi(port)
|
|
require.NoError(t, err)
|
|
}
|
|
})
|
|
|
|
t.Run("get object IP with same location committed and pending status", func(t *testing.T) {
|
|
defer ctx.Check(deleteBucket(bucketName))
|
|
|
|
access := planet.Uplinks[0].Access[planet.Satellites[0].ID()]
|
|
uplnk := planet.Uplinks[0]
|
|
sat := planet.Satellites[0]
|
|
|
|
require.NoError(t, uplnk.Upload(ctx, sat, bucketName, "jones", testrand.Bytes(20*memory.KB)))
|
|
|
|
ips, err := object.GetObjectIPs(ctx, uplink.Config{}, access, bucketName, "jones")
|
|
require.NoError(t, err)
|
|
require.True(t, len(ips) > 0)
|
|
|
|
// verify it's a real IP with valid host and port
|
|
for _, ip := range ips {
|
|
host, port, err := net.SplitHostPort(string(ip))
|
|
require.NoError(t, err)
|
|
netIP := net.ParseIP(host)
|
|
require.NotNil(t, netIP)
|
|
_, err = strconv.Atoi(port)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
objects, err := sat.API.Metainfo.Metabase.TestingAllObjects(ctx)
|
|
require.NoError(t, err)
|
|
require.Len(t, objects, 1)
|
|
|
|
committedObject := objects[0]
|
|
|
|
pendingObject, err := sat.Metabase.DB.BeginObjectNextVersion(ctx, metabase.BeginObjectNextVersion{
|
|
ObjectStream: metabase.ObjectStream{
|
|
ProjectID: committedObject.ProjectID,
|
|
BucketName: committedObject.BucketName,
|
|
ObjectKey: committedObject.ObjectKey,
|
|
StreamID: committedObject.StreamID,
|
|
},
|
|
})
|
|
require.NoError(t, err)
|
|
require.Equal(t, committedObject.Version+1, pendingObject.Version)
|
|
|
|
newIps, err := object.GetObjectIPs(ctx, uplink.Config{}, access, bucketName, "jones")
|
|
require.NoError(t, err)
|
|
|
|
sort.Slice(ips, func(i, j int) bool {
|
|
return bytes.Compare(ips[i], ips[j]) < 0
|
|
})
|
|
sort.Slice(newIps, func(i, j int) bool {
|
|
return bytes.Compare(newIps[i], newIps[j]) < 0
|
|
})
|
|
require.Equal(t, ips, newIps)
|
|
})
|
|
|
|
t.Run("get object IP with version != 1", func(t *testing.T) {
|
|
defer ctx.Check(deleteBucket(bucketName))
|
|
|
|
access := planet.Uplinks[0].Access[planet.Satellites[0].ID()]
|
|
uplnk := planet.Uplinks[0]
|
|
sat := planet.Satellites[0]
|
|
|
|
require.NoError(t, uplnk.Upload(ctx, sat, bucketName, "jones", testrand.Bytes(20*memory.KB)))
|
|
|
|
objects, err := sat.API.Metainfo.Metabase.TestingAllObjects(ctx)
|
|
require.NoError(t, err)
|
|
|
|
committedObject := objects[0]
|
|
randomVersion := metabase.Version(2 + testrand.Intn(9))
|
|
|
|
// atm there's no better way to change object's version
|
|
res, err := planet.Satellites[0].Metabase.DB.UnderlyingTagSQL().Exec(ctx,
|
|
"UPDATE objects SET version = $1 WHERE project_id = $2 AND bucket_name = $3 AND object_key = $4 AND stream_id = $5",
|
|
randomVersion, committedObject.ProjectID, committedObject.BucketName, committedObject.ObjectKey, committedObject.StreamID,
|
|
)
|
|
require.NoError(t, err)
|
|
|
|
affected, err := res.RowsAffected()
|
|
require.NoError(t, err)
|
|
require.EqualValues(t, 1, affected)
|
|
|
|
ips, err := object.GetObjectIPs(ctx, uplink.Config{}, access, bucketName, "jones")
|
|
require.NoError(t, err)
|
|
require.True(t, len(ips) > 0)
|
|
|
|
// verify it's a real IP with valid host and port
|
|
for _, ip := range ips {
|
|
host, port, err := net.SplitHostPort(string(ip))
|
|
require.NoError(t, err)
|
|
netIP := net.ParseIP(host)
|
|
require.NotNil(t, netIP)
|
|
_, err = strconv.Atoi(port)
|
|
require.NoError(t, err)
|
|
}
|
|
})
|
|
|
|
t.Run("multipart object download rejection", func(t *testing.T) {
|
|
defer ctx.Check(deleteBucket("pip-first"))
|
|
defer ctx.Check(deleteBucket("pip-second"))
|
|
defer ctx.Check(deleteBucket("pip-third"))
|
|
|
|
data := testrand.Bytes(20 * memory.KB)
|
|
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "pip-first", "non-multipart-object", data)
|
|
require.NoError(t, err)
|
|
|
|
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
|
|
require.NoError(t, err)
|
|
defer ctx.Check(project.Close)
|
|
|
|
_, err = project.EnsureBucket(ctx, "pip-second")
|
|
require.NoError(t, err)
|
|
info, err := project.BeginUpload(ctx, "pip-second", "multipart-object", nil)
|
|
require.NoError(t, err)
|
|
upload, err := project.UploadPart(ctx, "pip-second", "multipart-object", info.UploadID, 1)
|
|
require.NoError(t, err)
|
|
_, err = upload.Write(data)
|
|
require.NoError(t, err)
|
|
require.NoError(t, upload.Commit())
|
|
_, err = project.CommitUpload(ctx, "pip-second", "multipart-object", info.UploadID, nil)
|
|
require.NoError(t, err)
|
|
|
|
_, err = project.EnsureBucket(ctx, "pip-third")
|
|
require.NoError(t, err)
|
|
info, err = project.BeginUpload(ctx, "pip-third", "multipart-object-third", nil)
|
|
require.NoError(t, err)
|
|
for i := 0; i < 4; i++ {
|
|
upload, err := project.UploadPart(ctx, "pip-third", "multipart-object-third", info.UploadID, uint32(i+1))
|
|
require.NoError(t, err)
|
|
_, err = upload.Write(data)
|
|
require.NoError(t, err)
|
|
require.NoError(t, upload.Commit())
|
|
}
|
|
_, err = project.CommitUpload(ctx, "pip-third", "multipart-object-third", info.UploadID, nil)
|
|
require.NoError(t, err)
|
|
|
|
objects, err := planet.Satellites[0].Metabase.DB.TestingAllCommittedObjects(ctx, planet.Uplinks[0].Projects[0].ID, "pip-first")
|
|
require.NoError(t, err)
|
|
require.Len(t, objects, 1)
|
|
|
|
// verify that standard objects can be downloaded in an old way (index = -1 as last segment)
|
|
object, err := metainfoClient.GetObject(ctx, metaclient.GetObjectParams{
|
|
Bucket: []byte("pip-first"),
|
|
EncryptedObjectKey: []byte(objects[0].ObjectKey),
|
|
})
|
|
require.NoError(t, err)
|
|
_, err = metainfoClient.DownloadSegmentWithRS(ctx, metaclient.DownloadSegmentParams{
|
|
StreamID: object.StreamID,
|
|
Position: metaclient.SegmentPosition{
|
|
Index: -1,
|
|
},
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
objects, err = planet.Satellites[0].Metabase.DB.TestingAllCommittedObjects(ctx, planet.Uplinks[0].Projects[0].ID, "pip-second")
|
|
require.NoError(t, err)
|
|
require.Len(t, objects, 1)
|
|
|
|
// verify that multipart objects (single segment) CANNOT be downloaded in an old way (index = -1 as last segment)
|
|
object, err = metainfoClient.GetObject(ctx, metaclient.GetObjectParams{
|
|
Bucket: []byte("pip-second"),
|
|
EncryptedObjectKey: []byte(objects[0].ObjectKey),
|
|
})
|
|
require.NoError(t, err)
|
|
_, err = metainfoClient.DownloadSegmentWithRS(ctx, metaclient.DownloadSegmentParams{
|
|
StreamID: object.StreamID,
|
|
Position: metaclient.SegmentPosition{
|
|
Index: -1,
|
|
},
|
|
})
|
|
require.Error(t, err)
|
|
require.Contains(t, err.Error(), "Used uplink version cannot download multipart objects.")
|
|
|
|
objects, err = planet.Satellites[0].Metabase.DB.TestingAllCommittedObjects(ctx, planet.Uplinks[0].Projects[0].ID, "pip-third")
|
|
require.NoError(t, err)
|
|
require.Len(t, objects, 1)
|
|
|
|
// verify that multipart objects (multiple segments) CANNOT be downloaded in an old way (index = -1 as last segment)
|
|
object, err = metainfoClient.GetObject(ctx, metaclient.GetObjectParams{
|
|
Bucket: []byte("pip-third"),
|
|
EncryptedObjectKey: []byte(objects[0].ObjectKey),
|
|
})
|
|
require.NoError(t, err)
|
|
_, err = metainfoClient.DownloadSegmentWithRS(ctx, metaclient.DownloadSegmentParams{
|
|
StreamID: object.StreamID,
|
|
Position: metaclient.SegmentPosition{
|
|
Index: -1,
|
|
},
|
|
})
|
|
require.Error(t, err)
|
|
require.Contains(t, err.Error(), "Used uplink version cannot download multipart objects.")
|
|
})
|
|
|
|
t.Run("object override on upload", func(t *testing.T) {
|
|
defer ctx.Check(deleteBucket("pip-first"))
|
|
|
|
initialData := testrand.Bytes(20 * memory.KB)
|
|
overrideData := testrand.Bytes(25 * memory.KB)
|
|
|
|
{ // committed object
|
|
|
|
// upload committed object
|
|
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "pip-first", "committed-object", initialData)
|
|
require.NoError(t, err)
|
|
|
|
// upload once again to override
|
|
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "pip-first", "committed-object", overrideData)
|
|
require.NoError(t, err)
|
|
|
|
data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], "pip-first", "committed-object")
|
|
require.NoError(t, err)
|
|
require.Equal(t, overrideData, data)
|
|
}
|
|
|
|
{ // pending object
|
|
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
|
|
require.NoError(t, err)
|
|
defer ctx.Check(project.Close)
|
|
|
|
// upload pending object
|
|
info, err := project.BeginUpload(ctx, "pip-first", "pending-object", nil)
|
|
require.NoError(t, err)
|
|
upload, err := project.UploadPart(ctx, "pip-first", "pending-object", info.UploadID, 1)
|
|
require.NoError(t, err)
|
|
_, err = upload.Write(initialData)
|
|
require.NoError(t, err)
|
|
require.NoError(t, upload.Commit())
|
|
|
|
// upload once again to override
|
|
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "pip-first", "pending-object", overrideData)
|
|
require.NoError(t, err)
|
|
|
|
data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], "pip-first", "pending-object")
|
|
require.NoError(t, err)
|
|
require.Equal(t, overrideData, data)
|
|
}
|
|
})
|
|
|
|
t.Run("upload with placement", func(t *testing.T) {
|
|
defer ctx.Check(deleteBucket("initial-bucket"))
|
|
|
|
bucketName := "initial-bucket"
|
|
objectName := "file1"
|
|
|
|
apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()]
|
|
fmt.Println(apiKey)
|
|
buckets := planet.Satellites[0].API.Buckets.Service
|
|
|
|
bucket := storj.Bucket{
|
|
Name: bucketName,
|
|
ProjectID: planet.Uplinks[0].Projects[0].ID,
|
|
Placement: storj.EU,
|
|
}
|
|
_, err := buckets.CreateBucket(ctx, bucket)
|
|
require.NoError(t, err)
|
|
|
|
// this should be bigger than the max inline segment
|
|
content := make([]byte, 5000)
|
|
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucket.Name, objectName, content)
|
|
require.NoError(t, err)
|
|
|
|
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
|
|
require.NoError(t, err)
|
|
require.Equal(t, 1, len(segments))
|
|
require.Equal(t, storj.EU, segments[0].Placement)
|
|
})
|
|
})
|
|
}
|
|
|
|
func TestMoveObject_Geofencing(t *testing.T) {
|
|
testplanet.Run(t,
|
|
testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1,
|
|
},
|
|
func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
satellite := planet.Satellites[0]
|
|
buckets := satellite.API.Buckets.Service
|
|
uplink := planet.Uplinks[0]
|
|
projectID := uplink.Projects[0].ID
|
|
|
|
// create buckets with different placement
|
|
createGeofencedBucket(t, ctx, buckets, projectID, "global1", storj.EveryCountry)
|
|
createGeofencedBucket(t, ctx, buckets, projectID, "global2", storj.EveryCountry)
|
|
createGeofencedBucket(t, ctx, buckets, projectID, "us1", storj.US)
|
|
createGeofencedBucket(t, ctx, buckets, projectID, "us2", storj.US)
|
|
createGeofencedBucket(t, ctx, buckets, projectID, "eu1", storj.EU)
|
|
|
|
// upload an object to one of the global buckets
|
|
err := uplink.Upload(ctx, satellite, "global1", "testobject", []byte{})
|
|
require.NoError(t, err)
|
|
|
|
project, err := uplink.GetProject(ctx, satellite)
|
|
require.NoError(t, err)
|
|
|
|
// move the object to a new key within the same bucket
|
|
err = project.MoveObject(ctx, "global1", "testobject", "global1", "movedobject", nil)
|
|
require.NoError(t, err)
|
|
|
|
// move the object to the other global bucket
|
|
err = project.MoveObject(ctx, "global1", "movedobject", "global2", "movedobject", nil)
|
|
require.NoError(t, err)
|
|
|
|
// move the object to a geofenced bucket - should fail
|
|
err = project.MoveObject(ctx, "global2", "movedobject", "us1", "movedobject", nil)
|
|
require.Error(t, err)
|
|
|
|
// upload an object to one of the US-geofenced buckets
|
|
err = uplink.Upload(ctx, satellite, "us1", "testobject", []byte{})
|
|
require.NoError(t, err)
|
|
|
|
// move the object to a new key within the same bucket
|
|
err = project.MoveObject(ctx, "us1", "testobject", "us1", "movedobject", nil)
|
|
require.NoError(t, err)
|
|
|
|
// move the object to the other US-geofenced bucket
|
|
err = project.MoveObject(ctx, "us1", "movedobject", "us2", "movedobject", nil)
|
|
require.NoError(t, err)
|
|
|
|
// move the object to the EU-geofenced bucket - should fail
|
|
err = project.MoveObject(ctx, "us2", "movedobject", "eu1", "movedobject", nil)
|
|
require.Error(t, err)
|
|
|
|
// move the object to a non-geofenced bucket - should fail
|
|
err = project.MoveObject(ctx, "us2", "movedobject", "global1", "movedobject", nil)
|
|
require.Error(t, err)
|
|
},
|
|
)
|
|
}
|
|
|
|
func createGeofencedBucket(t *testing.T, ctx *testcontext.Context, buckets *buckets.Service, projectID uuid.UUID, bucketName string, placement storj.PlacementConstraint) {
|
|
// generate the bucket id
|
|
bucketID, err := uuid.New()
|
|
require.NoError(t, err)
|
|
|
|
// create the bucket
|
|
_, err = buckets.CreateBucket(ctx, storj.Bucket{
|
|
ID: bucketID,
|
|
Name: bucketName,
|
|
ProjectID: projectID,
|
|
Placement: placement,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
// check that the bucket placement is correct
|
|
bucket, err := buckets.GetBucket(ctx, []byte(bucketName), projectID)
|
|
require.NoError(t, err)
|
|
require.Equal(t, placement, bucket.Placement)
|
|
}
|
|
|
|
func TestEndpoint_DeleteCommittedObject(t *testing.T) {
|
|
createObject := func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, key string, data []byte) {
|
|
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucket, key, data)
|
|
require.NoError(t, err)
|
|
}
|
|
deleteObject := func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, encryptedKey string, streamID uuid.UUID) {
|
|
projectID := planet.Uplinks[0].Projects[0].ID
|
|
|
|
_, err := planet.Satellites[0].Metainfo.Endpoint.DeleteCommittedObject(ctx, projectID, bucket, metabase.ObjectKey(encryptedKey))
|
|
require.NoError(t, err)
|
|
}
|
|
testDeleteObject(t, createObject, deleteObject)
|
|
}
|
|
|
|
func TestEndpoint_DeletePendingObject(t *testing.T) {
|
|
createPendingObject := func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, key string, data []byte) {
|
|
// TODO This should be replaced by a call to testplanet.Uplink.MultipartUpload when available.
|
|
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
|
|
require.NoError(t, err, "failed to retrieve project")
|
|
defer func() { require.NoError(t, project.Close()) }()
|
|
|
|
_, err = project.EnsureBucket(ctx, bucket)
|
|
require.NoError(t, err, "failed to create bucket")
|
|
|
|
info, err := project.BeginUpload(ctx, bucket, key, &uplink.UploadOptions{})
|
|
require.NoError(t, err, "failed to start multipart upload")
|
|
|
|
upload, err := project.UploadPart(ctx, bucket, key, info.UploadID, 1)
|
|
require.NoError(t, err, "failed to put object part")
|
|
_, err = upload.Write(data)
|
|
require.NoError(t, err, "failed to put object part")
|
|
require.NoError(t, upload.Commit(), "failed to put object part")
|
|
}
|
|
deletePendingObject := func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, encryptedKey string, streamID uuid.UUID) {
|
|
projectID := planet.Uplinks[0].Projects[0].ID
|
|
|
|
deletedObjects, err := planet.Satellites[0].Metainfo.Endpoint.DeletePendingObject(ctx,
|
|
metabase.ObjectStream{
|
|
ProjectID: projectID,
|
|
BucketName: bucket,
|
|
ObjectKey: metabase.ObjectKey(encryptedKey),
|
|
Version: metabase.DefaultVersion,
|
|
StreamID: streamID,
|
|
})
|
|
require.NoError(t, err)
|
|
require.Len(t, deletedObjects, 1)
|
|
}
|
|
testDeleteObject(t, createPendingObject, deletePendingObject)
|
|
}
|
|
|
|
func TestEndpoint_DeleteObjectAnyStatus(t *testing.T) {
|
|
createCommittedObject := func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, key string, data []byte) {
|
|
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucket, key, data)
|
|
require.NoError(t, err)
|
|
}
|
|
deleteCommittedObject := func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, encryptedKey string, streamID uuid.UUID) {
|
|
projectID := planet.Uplinks[0].Projects[0].ID
|
|
|
|
deletedObjects, err := planet.Satellites[0].Metainfo.Endpoint.DeleteObjectAnyStatus(ctx, metabase.ObjectLocation{
|
|
ProjectID: projectID,
|
|
BucketName: bucket,
|
|
ObjectKey: metabase.ObjectKey(encryptedKey),
|
|
})
|
|
require.NoError(t, err)
|
|
require.Len(t, deletedObjects, 1)
|
|
|
|
}
|
|
testDeleteObject(t, createCommittedObject, deleteCommittedObject)
|
|
|
|
createPendingObject := func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, key string, data []byte) {
|
|
// TODO This should be replaced by a call to testplanet.Uplink.MultipartUpload when available.
|
|
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
|
|
require.NoError(t, err, "failed to retrieve project")
|
|
defer func() { require.NoError(t, project.Close()) }()
|
|
|
|
_, err = project.EnsureBucket(ctx, bucket)
|
|
require.NoError(t, err, "failed to create bucket")
|
|
|
|
info, err := project.BeginUpload(ctx, bucket, key, &uplink.UploadOptions{})
|
|
require.NoError(t, err, "failed to start multipart upload")
|
|
|
|
upload, err := project.UploadPart(ctx, bucket, key, info.UploadID, 1)
|
|
require.NoError(t, err, "failed to put object part")
|
|
_, err = upload.Write(data)
|
|
require.NoError(t, err, "failed to start multipart upload")
|
|
require.NoError(t, upload.Commit(), "failed to start multipart upload")
|
|
}
|
|
|
|
deletePendingObject := func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, encryptedKey string, streamID uuid.UUID) {
|
|
projectID := planet.Uplinks[0].Projects[0].ID
|
|
|
|
deletedObjects, err := planet.Satellites[0].Metainfo.Endpoint.DeleteObjectAnyStatus(ctx, metabase.ObjectLocation{
|
|
ProjectID: projectID,
|
|
BucketName: bucket,
|
|
ObjectKey: metabase.ObjectKey(encryptedKey),
|
|
})
|
|
require.NoError(t, err)
|
|
require.Len(t, deletedObjects, 1)
|
|
}
|
|
|
|
testDeleteObject(t, createPendingObject, deletePendingObject)
|
|
}
|
|
|
|
func testDeleteObject(t *testing.T,
|
|
createObject func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, key string, data []byte),
|
|
deleteObject func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, encryptedKey string, streamID uuid.UUID),
|
|
) {
|
|
bucketName := "deleteobjects"
|
|
t.Run("all nodes up", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
var testCases = []struct {
|
|
caseDescription string
|
|
objData []byte
|
|
hasRemote bool
|
|
}{
|
|
{caseDescription: "one remote segment", objData: testrand.Bytes(10 * memory.KiB)},
|
|
{caseDescription: "one inline segment", objData: testrand.Bytes(3 * memory.KiB)},
|
|
{caseDescription: "several segments (all remote)", objData: testrand.Bytes(50 * memory.KiB)},
|
|
{caseDescription: "several segments (remote + inline)", objData: testrand.Bytes(33 * memory.KiB)},
|
|
}
|
|
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
|
Reconfigure: testplanet.Reconfigure{
|
|
// Reconfigure RS for ensuring that we don't have long-tail cancellations
|
|
// and the upload doesn't leave garbage in the SNs
|
|
Satellite: testplanet.Combine(
|
|
testplanet.ReconfigureRS(2, 2, 4, 4),
|
|
testplanet.MaxSegmentSize(13*memory.KiB),
|
|
),
|
|
},
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
var (
|
|
percentExp = 0.75
|
|
)
|
|
for _, tc := range testCases {
|
|
tc := tc
|
|
t.Run(tc.caseDescription, func(t *testing.T) {
|
|
|
|
createObject(ctx, t, planet, bucketName, tc.caseDescription, tc.objData)
|
|
|
|
// calculate the SNs total used space after data upload
|
|
var totalUsedSpace int64
|
|
for _, sn := range planet.StorageNodes {
|
|
piecesTotal, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
|
|
require.NoError(t, err)
|
|
totalUsedSpace += piecesTotal
|
|
}
|
|
|
|
objects, err := planet.Satellites[0].Metabase.DB.TestingAllObjects(ctx)
|
|
require.NoError(t, err)
|
|
for _, object := range objects {
|
|
deleteObject(ctx, t, planet, bucketName, string(object.ObjectKey), object.StreamID)
|
|
}
|
|
|
|
planet.WaitForStorageNodeDeleters(ctx)
|
|
|
|
// calculate the SNs used space after delete the pieces
|
|
var totalUsedSpaceAfterDelete int64
|
|
for _, sn := range planet.StorageNodes {
|
|
piecesTotal, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
|
|
require.NoError(t, err)
|
|
totalUsedSpaceAfterDelete += piecesTotal
|
|
}
|
|
|
|
// At this point we can only guarantee that the 75% of the SNs pieces
|
|
// are delete due to the success threshold
|
|
deletedUsedSpace := float64(totalUsedSpace-totalUsedSpaceAfterDelete) / float64(totalUsedSpace)
|
|
if deletedUsedSpace < percentExp {
|
|
t.Fatalf("deleted used space is less than %f%%. Got %f", percentExp, deletedUsedSpace)
|
|
}
|
|
})
|
|
}
|
|
})
|
|
|
|
})
|
|
|
|
t.Run("some nodes down", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
var testCases = []struct {
|
|
caseDescription string
|
|
objData []byte
|
|
}{
|
|
{caseDescription: "one remote segment", objData: testrand.Bytes(10 * memory.KiB)},
|
|
{caseDescription: "several segments (all remote)", objData: testrand.Bytes(50 * memory.KiB)},
|
|
{caseDescription: "several segments (remote + inline)", objData: testrand.Bytes(33 * memory.KiB)},
|
|
}
|
|
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
|
Reconfigure: testplanet.Reconfigure{
|
|
// Reconfigure RS for ensuring that we don't have long-tail cancellations
|
|
// and the upload doesn't leave garbage in the SNs
|
|
Satellite: testplanet.Combine(
|
|
testplanet.ReconfigureRS(2, 2, 4, 4),
|
|
testplanet.MaxSegmentSize(13*memory.KiB),
|
|
),
|
|
},
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
numToShutdown := 2
|
|
|
|
for _, tc := range testCases {
|
|
createObject(ctx, t, planet, bucketName, tc.caseDescription, tc.objData)
|
|
}
|
|
|
|
require.NoError(t, planet.WaitForStorageNodeEndpoints(ctx))
|
|
|
|
// Shutdown the first numToShutdown storage nodes before we delete the pieces
|
|
// and collect used space values for those nodes
|
|
snUsedSpace := make([]int64, len(planet.StorageNodes))
|
|
for i := 0; i < numToShutdown; i++ {
|
|
var err error
|
|
snUsedSpace[i], _, err = planet.StorageNodes[i].Storage2.Store.SpaceUsedForPieces(ctx)
|
|
require.NoError(t, err)
|
|
|
|
require.NoError(t, planet.StopPeer(planet.StorageNodes[i]))
|
|
}
|
|
|
|
objects, err := planet.Satellites[0].Metabase.DB.TestingAllObjects(ctx)
|
|
require.NoError(t, err)
|
|
for _, object := range objects {
|
|
deleteObject(ctx, t, planet, bucketName, string(object.ObjectKey), object.StreamID)
|
|
}
|
|
|
|
planet.WaitForStorageNodeDeleters(ctx)
|
|
|
|
// Check that storage nodes that were offline when deleting the pieces
|
|
// they are still holding data
|
|
// Check that storage nodes which are online when deleting pieces don't
|
|
// hold any piece
|
|
// We are comparing used space from before deletion for nodes that were
|
|
// offline, values for available nodes are 0
|
|
for i, sn := range planet.StorageNodes {
|
|
usedSpace, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
|
|
require.NoError(t, err)
|
|
|
|
require.Equal(t, snUsedSpace[i], usedSpace, "StorageNode #%d", i)
|
|
}
|
|
})
|
|
})
|
|
|
|
t.Run("all nodes down", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
var testCases = []struct {
|
|
caseDescription string
|
|
objData []byte
|
|
}{
|
|
{caseDescription: "one remote segment", objData: testrand.Bytes(10 * memory.KiB)},
|
|
{caseDescription: "several segments (all remote)", objData: testrand.Bytes(50 * memory.KiB)},
|
|
{caseDescription: "several segments (remote + inline)", objData: testrand.Bytes(33 * memory.KiB)},
|
|
}
|
|
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
|
Reconfigure: testplanet.Reconfigure{
|
|
// Reconfigure RS for ensuring that we don't have long-tail cancellations
|
|
// and the upload doesn't leave garbage in the SNs
|
|
Satellite: testplanet.Combine(
|
|
testplanet.ReconfigureRS(2, 2, 4, 4),
|
|
testplanet.MaxSegmentSize(13*memory.KiB),
|
|
),
|
|
},
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
for _, tc := range testCases {
|
|
createObject(ctx, t, planet, bucketName, tc.caseDescription, tc.objData)
|
|
}
|
|
|
|
// calculate the SNs total used space after data upload
|
|
var usedSpaceBeforeDelete int64
|
|
for _, sn := range planet.StorageNodes {
|
|
piecesTotal, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
|
|
require.NoError(t, err)
|
|
usedSpaceBeforeDelete += piecesTotal
|
|
}
|
|
|
|
// Shutdown all the storage nodes before we delete the pieces
|
|
for _, sn := range planet.StorageNodes {
|
|
require.NoError(t, planet.StopPeer(sn))
|
|
}
|
|
|
|
objects, err := planet.Satellites[0].Metabase.DB.TestingAllObjects(ctx)
|
|
require.NoError(t, err)
|
|
for _, object := range objects {
|
|
deleteObject(ctx, t, planet, bucketName, string(object.ObjectKey), object.StreamID)
|
|
}
|
|
|
|
// Check that storage nodes that were offline when deleting the pieces
|
|
// they are still holding data
|
|
var totalUsedSpace int64
|
|
for _, sn := range planet.StorageNodes {
|
|
piecesTotal, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
|
|
require.NoError(t, err)
|
|
totalUsedSpace += piecesTotal
|
|
}
|
|
|
|
require.Equal(t, usedSpaceBeforeDelete, totalUsedSpace, "totalUsedSpace")
|
|
})
|
|
})
|
|
}
|
|
|
|
func TestEndpoint_CopyObject(t *testing.T) {
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 4,
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()]
|
|
satelliteSys := planet.Satellites[0]
|
|
uplnk := planet.Uplinks[0]
|
|
|
|
// upload a small inline object
|
|
err := uplnk.Upload(ctx, planet.Satellites[0], "testbucket", "testobject", testrand.Bytes(1*memory.KiB))
|
|
require.NoError(t, err)
|
|
objects, err := satelliteSys.API.Metainfo.Metabase.TestingAllObjects(ctx)
|
|
require.NoError(t, err)
|
|
require.Len(t, objects, 1)
|
|
|
|
getResp, err := satelliteSys.API.Metainfo.Endpoint.GetObject(ctx, &pb.ObjectGetRequest{
|
|
Header: &pb.RequestHeader{
|
|
ApiKey: apiKey.SerializeRaw(),
|
|
},
|
|
Bucket: []byte("testbucket"),
|
|
EncryptedObjectKey: []byte(objects[0].ObjectKey),
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
testEncryptedMetadataNonce := testrand.Nonce()
|
|
// update the object metadata
|
|
beginResp, err := satelliteSys.API.Metainfo.Endpoint.BeginCopyObject(ctx, &pb.ObjectBeginCopyRequest{
|
|
Header: &pb.RequestHeader{
|
|
ApiKey: apiKey.SerializeRaw(),
|
|
},
|
|
Bucket: getResp.Object.Bucket,
|
|
EncryptedObjectKey: getResp.Object.EncryptedObjectKey,
|
|
NewBucket: []byte("testbucket"),
|
|
NewEncryptedObjectKey: []byte("newencryptedkey"),
|
|
})
|
|
require.NoError(t, err)
|
|
assert.Len(t, beginResp.SegmentKeys, 1)
|
|
assert.Equal(t, beginResp.EncryptedMetadataKey, objects[0].EncryptedMetadataEncryptedKey)
|
|
assert.Equal(t, beginResp.EncryptedMetadataKeyNonce.Bytes(), objects[0].EncryptedMetadataNonce)
|
|
|
|
segmentKeys := pb.EncryptedKeyAndNonce{
|
|
Position: beginResp.SegmentKeys[0].Position,
|
|
EncryptedKeyNonce: testrand.Nonce(),
|
|
EncryptedKey: []byte("newencryptedkey"),
|
|
}
|
|
|
|
{
|
|
// metadata too large
|
|
_, err = satelliteSys.API.Metainfo.Endpoint.FinishCopyObject(ctx, &pb.ObjectFinishCopyRequest{
|
|
Header: &pb.RequestHeader{
|
|
ApiKey: apiKey.SerializeRaw(),
|
|
},
|
|
StreamId: getResp.Object.StreamId,
|
|
NewBucket: []byte("testbucket"),
|
|
NewEncryptedObjectKey: []byte("newobjectkey"),
|
|
NewEncryptedMetadata: testrand.Bytes(satelliteSys.Config.Metainfo.MaxMetadataSize + 1),
|
|
NewEncryptedMetadataKeyNonce: testEncryptedMetadataNonce,
|
|
NewEncryptedMetadataKey: []byte("encryptedmetadatakey"),
|
|
NewSegmentKeys: []*pb.EncryptedKeyAndNonce{&segmentKeys},
|
|
})
|
|
require.True(t, errs2.IsRPC(err, rpcstatus.InvalidArgument))
|
|
|
|
// invalid encrypted metadata key
|
|
_, err = satelliteSys.API.Metainfo.Endpoint.FinishCopyObject(ctx, &pb.ObjectFinishCopyRequest{
|
|
Header: &pb.RequestHeader{
|
|
ApiKey: apiKey.SerializeRaw(),
|
|
},
|
|
StreamId: getResp.Object.StreamId,
|
|
NewBucket: []byte("testbucket"),
|
|
NewEncryptedObjectKey: []byte("newobjectkey"),
|
|
NewEncryptedMetadata: testrand.Bytes(satelliteSys.Config.Metainfo.MaxMetadataSize),
|
|
NewEncryptedMetadataKeyNonce: testEncryptedMetadataNonce,
|
|
NewEncryptedMetadataKey: []byte("encryptedmetadatakey"),
|
|
NewSegmentKeys: []*pb.EncryptedKeyAndNonce{&segmentKeys},
|
|
})
|
|
require.True(t, errs2.IsRPC(err, rpcstatus.InvalidArgument))
|
|
}
|
|
|
|
_, err = satelliteSys.API.Metainfo.Endpoint.FinishCopyObject(ctx, &pb.ObjectFinishCopyRequest{
|
|
Header: &pb.RequestHeader{
|
|
ApiKey: apiKey.SerializeRaw(),
|
|
},
|
|
StreamId: getResp.Object.StreamId,
|
|
NewBucket: []byte("testbucket"),
|
|
NewEncryptedObjectKey: []byte("newobjectkey"),
|
|
NewEncryptedMetadataKeyNonce: testEncryptedMetadataNonce,
|
|
NewEncryptedMetadataKey: []byte("encryptedmetadatakey"),
|
|
NewSegmentKeys: []*pb.EncryptedKeyAndNonce{&segmentKeys},
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
objectsAfterCopy, err := satelliteSys.API.Metainfo.Metabase.TestingAllObjects(ctx)
|
|
require.NoError(t, err)
|
|
require.Len(t, objectsAfterCopy, 2)
|
|
|
|
getCopyResp, err := satelliteSys.API.Metainfo.Endpoint.GetObject(ctx, &pb.ObjectGetRequest{
|
|
Header: &pb.RequestHeader{
|
|
ApiKey: apiKey.SerializeRaw(),
|
|
},
|
|
Bucket: []byte("testbucket"),
|
|
EncryptedObjectKey: []byte("newobjectkey"),
|
|
})
|
|
require.NoError(t, err, objectsAfterCopy[1])
|
|
require.NotEqual(t, getResp.Object.StreamId, getCopyResp.Object.StreamId)
|
|
require.NotZero(t, getCopyResp.Object.StreamId)
|
|
require.Equal(t, getResp.Object.InlineSize, getCopyResp.Object.InlineSize)
|
|
|
|
// compare segments
|
|
originalSegment, err := satelliteSys.API.Metainfo.Endpoint.DownloadSegment(ctx, &pb.SegmentDownloadRequest{
|
|
Header: &pb.RequestHeader{
|
|
ApiKey: apiKey.SerializeRaw(),
|
|
},
|
|
StreamId: getResp.Object.StreamId,
|
|
CursorPosition: segmentKeys.Position,
|
|
})
|
|
require.NoError(t, err)
|
|
copiedSegment, err := satelliteSys.API.Metainfo.Endpoint.DownloadSegment(ctx, &pb.SegmentDownloadRequest{
|
|
Header: &pb.RequestHeader{
|
|
ApiKey: apiKey.SerializeRaw(),
|
|
},
|
|
StreamId: getCopyResp.Object.StreamId,
|
|
CursorPosition: segmentKeys.Position,
|
|
})
|
|
require.NoError(t, err)
|
|
require.Equal(t, originalSegment.EncryptedInlineData, copiedSegment.EncryptedInlineData)
|
|
|
|
{ // test copy respects project storage size limit
|
|
// set storage limit
|
|
err = planet.Satellites[0].DB.ProjectAccounting().UpdateProjectUsageLimit(ctx, planet.Uplinks[1].Projects[0].ID, 1000)
|
|
require.NoError(t, err)
|
|
|
|
// test object below the limit when copied
|
|
err = planet.Uplinks[1].Upload(ctx, planet.Satellites[0], "testbucket", "testobject", testrand.Bytes(100))
|
|
require.NoError(t, err)
|
|
objects, err = satelliteSys.API.Metainfo.Metabase.TestingAllObjects(ctx)
|
|
require.NoError(t, err)
|
|
|
|
_, err = satelliteSys.API.Metainfo.Endpoint.BeginCopyObject(ctx, &pb.ObjectBeginCopyRequest{
|
|
Header: &pb.RequestHeader{
|
|
ApiKey: planet.Uplinks[1].APIKey[planet.Satellites[0].ID()].SerializeRaw(),
|
|
},
|
|
Bucket: []byte("testbucket"),
|
|
EncryptedObjectKey: []byte(objects[0].ObjectKey),
|
|
NewBucket: []byte("testbucket"),
|
|
NewEncryptedObjectKey: []byte("newencryptedobjectkey"),
|
|
})
|
|
require.NoError(t, err)
|
|
err = satelliteSys.API.Metainfo.Metabase.TestingDeleteAll(ctx)
|
|
require.NoError(t, err)
|
|
|
|
// set storage limit
|
|
err = planet.Satellites[0].DB.ProjectAccounting().UpdateProjectUsageLimit(ctx, planet.Uplinks[2].Projects[0].ID, 1000)
|
|
require.NoError(t, err)
|
|
|
|
// test object exceeding the limit when copied
|
|
err = planet.Uplinks[2].Upload(ctx, planet.Satellites[0], "testbucket", "testobject", testrand.Bytes(400))
|
|
require.NoError(t, err)
|
|
objects, err = satelliteSys.API.Metainfo.Metabase.TestingAllObjects(ctx)
|
|
require.NoError(t, err)
|
|
|
|
err = planet.Uplinks[2].CopyObject(ctx, planet.Satellites[0], "testbucket", "testobject", "testbucket", "testobject1")
|
|
require.NoError(t, err)
|
|
|
|
_, err = satelliteSys.API.Metainfo.Endpoint.BeginCopyObject(ctx, &pb.ObjectBeginCopyRequest{
|
|
Header: &pb.RequestHeader{
|
|
ApiKey: planet.Uplinks[2].APIKey[planet.Satellites[0].ID()].SerializeRaw(),
|
|
},
|
|
Bucket: []byte("testbucket"),
|
|
EncryptedObjectKey: []byte(objects[0].ObjectKey),
|
|
NewBucket: []byte("testbucket"),
|
|
NewEncryptedObjectKey: []byte("newencryptedobjectkey"),
|
|
})
|
|
assertRPCStatusCode(t, err, rpcstatus.ResourceExhausted)
|
|
assert.EqualError(t, err, "Exceeded Storage Limit")
|
|
|
|
// metabaseObjects, err := satelliteSys.API.Metainfo.Metabase.TestingAllObjects(ctx)
|
|
// require.NoError(t, err)
|
|
// metabaseObj := metabaseObjects[0]
|
|
|
|
// randomEncKey := testrand.Key()
|
|
|
|
// somehow triggers error "proto: can't skip unknown wire type 7" in endpoint.unmarshalSatStreamID
|
|
|
|
// _, err = satelliteSys.API.Metainfo.Endpoint.FinishCopyObject(ctx, &pb.ObjectFinishCopyRequest{
|
|
// Header: &pb.RequestHeader{
|
|
// ApiKey: planet.Uplinks[2].APIKey[planet.Satellites[0].ID()].SerializeRaw(),
|
|
// },
|
|
// StreamId: metabaseObj.ObjectStream.StreamID.Bytes(),
|
|
// NewBucket: []byte("testbucket"),
|
|
// NewEncryptedObjectKey: []byte("newencryptedobjectkey"),
|
|
// NewEncryptedMetadata: testrand.Bytes(10),
|
|
// NewEncryptedMetadataKey: randomEncKey.Raw()[:],
|
|
// NewEncryptedMetadataKeyNonce: testrand.Nonce(),
|
|
// NewSegmentKeys: []*pb.EncryptedKeyAndNonce{
|
|
// {
|
|
// Position: &pb.SegmentPosition{
|
|
// PartNumber: 0,
|
|
// Index: 0,
|
|
// },
|
|
// EncryptedKeyNonce: testrand.Nonce(),
|
|
// EncryptedKey: randomEncKey.Raw()[:],
|
|
// },
|
|
// },
|
|
// })
|
|
// assertRPCStatusCode(t, err, rpcstatus.ResourceExhausted)
|
|
// assert.EqualError(t, err, "Exceeded Storage Limit")
|
|
|
|
// test that a smaller object can still be uploaded and copied
|
|
err = planet.Uplinks[2].Upload(ctx, planet.Satellites[0], "testbucket", "testobject2", testrand.Bytes(10))
|
|
require.NoError(t, err)
|
|
|
|
err = planet.Uplinks[2].CopyObject(ctx, planet.Satellites[0], "testbucket", "testobject2", "testbucket", "testobject2copy")
|
|
require.NoError(t, err)
|
|
|
|
err = satelliteSys.API.Metainfo.Metabase.TestingDeleteAll(ctx)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
{ // test copy respects project segment limit
|
|
// set segment limit
|
|
err = planet.Satellites[0].DB.ProjectAccounting().UpdateProjectSegmentLimit(ctx, planet.Uplinks[3].Projects[0].ID, 2)
|
|
require.NoError(t, err)
|
|
|
|
err = planet.Uplinks[3].Upload(ctx, planet.Satellites[0], "testbucket", "testobject", testrand.Bytes(100))
|
|
require.NoError(t, err)
|
|
objects, err = satelliteSys.API.Metainfo.Metabase.TestingAllObjects(ctx)
|
|
require.NoError(t, err)
|
|
|
|
err = planet.Uplinks[3].CopyObject(ctx, planet.Satellites[0], "testbucket", "testobject", "testbucket", "testobject1")
|
|
require.NoError(t, err)
|
|
|
|
_, err = satelliteSys.API.Metainfo.Endpoint.BeginCopyObject(ctx, &pb.ObjectBeginCopyRequest{
|
|
Header: &pb.RequestHeader{
|
|
ApiKey: planet.Uplinks[3].APIKey[planet.Satellites[0].ID()].SerializeRaw(),
|
|
},
|
|
Bucket: []byte("testbucket"),
|
|
EncryptedObjectKey: []byte(objects[0].ObjectKey),
|
|
NewBucket: []byte("testbucket"),
|
|
NewEncryptedObjectKey: []byte("newencryptedobjectkey1"),
|
|
})
|
|
assertRPCStatusCode(t, err, rpcstatus.ResourceExhausted)
|
|
assert.EqualError(t, err, "Exceeded Segments Limit")
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestEndpoint_ParallelDeletes(t *testing.T) {
|
|
t.Skip("to be fixed - creating deadlocks")
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1,
|
|
StorageNodeCount: 4,
|
|
UplinkCount: 1,
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
|
|
require.NoError(t, err)
|
|
defer ctx.Check(project.Close)
|
|
testData := testrand.Bytes(5 * memory.KiB)
|
|
for i := 0; i < 50; i++ {
|
|
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "bucket", "object"+strconv.Itoa(i), testData)
|
|
require.NoError(t, err)
|
|
_, err = project.CopyObject(ctx, "bucket", "object"+strconv.Itoa(i), "bucket", "object"+strconv.Itoa(i)+"copy", nil)
|
|
require.NoError(t, err)
|
|
}
|
|
list := project.ListObjects(ctx, "bucket", nil)
|
|
keys := []string{}
|
|
for list.Next() {
|
|
item := list.Item()
|
|
keys = append(keys, item.Key)
|
|
}
|
|
require.NoError(t, list.Err())
|
|
var wg sync.WaitGroup
|
|
wg.Add(len(keys))
|
|
var errlist errs.Group
|
|
|
|
for i, name := range keys {
|
|
name := name
|
|
go func(toDelete string, index int) {
|
|
_, err := project.DeleteObject(ctx, "bucket", toDelete)
|
|
errlist.Add(err)
|
|
wg.Done()
|
|
}(name, i)
|
|
}
|
|
wg.Wait()
|
|
|
|
require.NoError(t, errlist.Err())
|
|
|
|
// check all objects have been deleted
|
|
listAfterDelete := project.ListObjects(ctx, "bucket", nil)
|
|
require.False(t, listAfterDelete.Next())
|
|
require.NoError(t, listAfterDelete.Err())
|
|
|
|
_, err = project.DeleteBucket(ctx, "bucket")
|
|
require.NoError(t, err)
|
|
})
|
|
}
|
|
|
|
func TestEndpoint_ParallelDeletesSameAncestor(t *testing.T) {
|
|
t.Skip("to be fixed - creating deadlocks")
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1,
|
|
StorageNodeCount: 4,
|
|
UplinkCount: 1,
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
|
|
require.NoError(t, err)
|
|
defer ctx.Check(project.Close)
|
|
testData := testrand.Bytes(5 * memory.KiB)
|
|
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "bucket", "original-object", testData)
|
|
require.NoError(t, err)
|
|
for i := 0; i < 50; i++ {
|
|
_, err = project.CopyObject(ctx, "bucket", "original-object", "bucket", "copy"+strconv.Itoa(i), nil)
|
|
require.NoError(t, err)
|
|
}
|
|
list := project.ListObjects(ctx, "bucket", nil)
|
|
keys := []string{}
|
|
for list.Next() {
|
|
item := list.Item()
|
|
keys = append(keys, item.Key)
|
|
}
|
|
require.NoError(t, list.Err())
|
|
var wg sync.WaitGroup
|
|
wg.Add(len(keys))
|
|
var errlist errs.Group
|
|
|
|
for i, name := range keys {
|
|
name := name
|
|
go func(toDelete string, index int) {
|
|
_, err := project.DeleteObject(ctx, "bucket", toDelete)
|
|
errlist.Add(err)
|
|
wg.Done()
|
|
}(name, i)
|
|
}
|
|
wg.Wait()
|
|
|
|
require.NoError(t, errlist.Err())
|
|
|
|
// check all objects have been deleted
|
|
listAfterDelete := project.ListObjects(ctx, "bucket", nil)
|
|
require.False(t, listAfterDelete.Next())
|
|
require.NoError(t, listAfterDelete.Err())
|
|
|
|
_, err = project.DeleteBucket(ctx, "bucket")
|
|
require.NoError(t, err)
|
|
})
|
|
}
|
|
|
|
func TestEndpoint_UpdateObjectMetadata(t *testing.T) {
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1,
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
satellite := planet.Satellites[0]
|
|
apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()].SerializeRaw()
|
|
err := planet.Uplinks[0].Upload(ctx, satellite, "testbucket", "object", testrand.Bytes(256))
|
|
require.NoError(t, err)
|
|
|
|
objects, err := satellite.API.Metainfo.Metabase.TestingAllObjects(ctx)
|
|
require.NoError(t, err)
|
|
|
|
validMetadata := testrand.Bytes(satellite.Config.Metainfo.MaxMetadataSize)
|
|
validKey := randomEncryptedKey
|
|
|
|
getObjectResponse, err := satellite.API.Metainfo.Endpoint.GetObject(ctx, &pb.ObjectGetRequest{
|
|
Header: &pb.RequestHeader{ApiKey: apiKey},
|
|
Bucket: []byte("testbucket"),
|
|
EncryptedObjectKey: []byte(objects[0].ObjectKey),
|
|
Version: int32(objects[0].Version),
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
_, err = satellite.API.Metainfo.Endpoint.UpdateObjectMetadata(ctx, &pb.ObjectUpdateMetadataRequest{
|
|
Header: &pb.RequestHeader{ApiKey: apiKey},
|
|
Bucket: []byte("testbucket"),
|
|
EncryptedObjectKey: []byte(objects[0].ObjectKey),
|
|
Version: int32(objects[0].Version),
|
|
StreamId: getObjectResponse.Object.StreamId,
|
|
EncryptedMetadata: validMetadata,
|
|
EncryptedMetadataEncryptedKey: validKey,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
// too large metadata
|
|
_, err = satellite.API.Metainfo.Endpoint.UpdateObjectMetadata(ctx, &pb.ObjectUpdateMetadataRequest{
|
|
Header: &pb.RequestHeader{ApiKey: apiKey},
|
|
Bucket: []byte("testbucket"),
|
|
EncryptedObjectKey: []byte(objects[0].ObjectKey),
|
|
Version: int32(objects[0].Version),
|
|
|
|
EncryptedMetadata: testrand.Bytes(satellite.Config.Metainfo.MaxMetadataSize + 1),
|
|
EncryptedMetadataEncryptedKey: validKey,
|
|
})
|
|
require.True(t, errs2.IsRPC(err, rpcstatus.InvalidArgument))
|
|
|
|
// invalid encrypted metadata key
|
|
_, err = satellite.API.Metainfo.Endpoint.UpdateObjectMetadata(ctx, &pb.ObjectUpdateMetadataRequest{
|
|
Header: &pb.RequestHeader{ApiKey: apiKey},
|
|
Bucket: []byte("testbucket"),
|
|
EncryptedObjectKey: []byte(objects[0].ObjectKey),
|
|
Version: int32(objects[0].Version),
|
|
|
|
EncryptedMetadata: validMetadata,
|
|
EncryptedMetadataEncryptedKey: testrand.Bytes(16),
|
|
})
|
|
require.True(t, errs2.IsRPC(err, rpcstatus.InvalidArgument))
|
|
|
|
// verify that metadata didn't change with rejected requests
|
|
objects, err = satellite.API.Metainfo.Metabase.TestingAllObjects(ctx)
|
|
require.NoError(t, err)
|
|
require.Equal(t, validMetadata, objects[0].EncryptedMetadata)
|
|
require.Equal(t, validKey, objects[0].EncryptedMetadataEncryptedKey)
|
|
})
|
|
}
|
|
|
|
func TestEndpoint_Object_MultipleVersions(t *testing.T) {
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
|
Reconfigure: testplanet.Reconfigure{
|
|
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
|
config.Metainfo.MultipleVersions = true
|
|
config.Metainfo.PieceDeletion.DeleteSuccessThreshold = 1
|
|
|
|
testplanet.ReconfigureRS(2, 3, 4, 4)(log, index, config)
|
|
},
|
|
},
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
|
|
require.NoError(t, err)
|
|
defer ctx.Check(project.Close)
|
|
|
|
deleteBucket := func(bucketName string) func() error {
|
|
return func() error {
|
|
_, err := project.DeleteBucketWithObjects(ctx, bucketName)
|
|
return err
|
|
}
|
|
}
|
|
|
|
t.Run("multiple versions", func(t *testing.T) {
|
|
defer ctx.Check(deleteBucket("multipleversions"))
|
|
|
|
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "multipleversions", "object", testrand.Bytes(10*memory.MiB))
|
|
require.NoError(t, err)
|
|
|
|
// override object to have it with version 2
|
|
expectedData := testrand.Bytes(11 * memory.KiB)
|
|
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "multipleversions", "object", expectedData)
|
|
require.NoError(t, err)
|
|
|
|
objects, err := planet.Satellites[0].Metabase.DB.TestingAllObjects(ctx)
|
|
require.NoError(t, err)
|
|
require.Len(t, objects, 1)
|
|
require.EqualValues(t, 2, objects[0].Version)
|
|
|
|
// add some pending uploads, each will have version higher then 2
|
|
uploadIDs := []string{}
|
|
for i := 0; i < 10; i++ {
|
|
info, err := project.BeginUpload(ctx, "multipleversions", "object", nil)
|
|
require.NoError(t, err)
|
|
uploadIDs = append(uploadIDs, info.UploadID)
|
|
}
|
|
|
|
checkDownload := func(objectKey string, expectedData []byte) {
|
|
data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], "multipleversions", objectKey)
|
|
require.NoError(t, err)
|
|
require.Equal(t, expectedData, data)
|
|
}
|
|
|
|
checkDownload("object", expectedData)
|
|
|
|
err = project.MoveObject(ctx, "multipleversions", "object", "multipleversions", "object_moved", nil)
|
|
require.NoError(t, err)
|
|
|
|
checkDownload("object_moved", expectedData)
|
|
|
|
err = project.MoveObject(ctx, "multipleversions", "object_moved", "multipleversions", "object", nil)
|
|
require.NoError(t, err)
|
|
|
|
checkDownload("object", expectedData)
|
|
|
|
iterator := project.ListObjects(ctx, "multipleversions", nil)
|
|
require.True(t, iterator.Next())
|
|
require.Equal(t, "object", iterator.Item().Key)
|
|
require.NoError(t, iterator.Err())
|
|
|
|
// upload multipleversions/object once again as we just moved it
|
|
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "multipleversions", "object", expectedData)
|
|
require.NoError(t, err)
|
|
|
|
checkDownload("object", expectedData)
|
|
|
|
{ // server side copy
|
|
_, err = project.CopyObject(ctx, "multipleversions", "object", "multipleversions", "object_copy", nil)
|
|
require.NoError(t, err)
|
|
|
|
checkDownload("object_copy", expectedData)
|
|
|
|
_, err = project.DeleteObject(ctx, "multipleversions", "object")
|
|
require.NoError(t, err)
|
|
|
|
_, err = project.CopyObject(ctx, "multipleversions", "object_copy", "multipleversions", "object", nil)
|
|
require.NoError(t, err)
|
|
|
|
checkDownload("object", expectedData)
|
|
|
|
_, err = project.DeleteObject(ctx, "multipleversions", "object_copy")
|
|
require.NoError(t, err)
|
|
|
|
checkDownload("object", expectedData)
|
|
}
|
|
|
|
err = project.AbortUpload(ctx, "multipleversions", "object", uploadIDs[0])
|
|
require.NoError(t, err)
|
|
checkDownload("object", expectedData)
|
|
|
|
expectedData = testrand.Bytes(12 * memory.KiB)
|
|
upload, err := project.UploadPart(ctx, "multipleversions", "object", uploadIDs[1], 1)
|
|
require.NoError(t, err)
|
|
_, err = upload.Write(expectedData)
|
|
require.NoError(t, err)
|
|
require.NoError(t, upload.Commit())
|
|
_, err = project.CommitUpload(ctx, "multipleversions", "object", uploadIDs[1], nil)
|
|
require.NoError(t, err)
|
|
|
|
checkDownload("object", expectedData)
|
|
|
|
_, err = project.DeleteObject(ctx, "multipleversions", "object")
|
|
require.NoError(t, err)
|
|
|
|
_, err = project.DeleteObject(ctx, "multipleversions", "object_moved")
|
|
require.NoError(t, err)
|
|
|
|
iterator = project.ListObjects(ctx, "multipleversions", nil)
|
|
require.False(t, iterator.Next())
|
|
require.NoError(t, iterator.Err())
|
|
|
|
// use next available pending upload
|
|
upload, err = project.UploadPart(ctx, "multipleversions", "object", uploadIDs[2], 1)
|
|
require.NoError(t, err)
|
|
_, err = upload.Write(expectedData)
|
|
require.NoError(t, err)
|
|
require.NoError(t, upload.Commit())
|
|
_, err = project.CommitUpload(ctx, "multipleversions", "object", uploadIDs[2], nil)
|
|
require.NoError(t, err)
|
|
|
|
checkDownload("object", expectedData)
|
|
|
|
uploads := project.ListUploads(ctx, "multipleversions", nil)
|
|
count := 0
|
|
for uploads.Next() {
|
|
require.Equal(t, "object", uploads.Item().Key)
|
|
count++
|
|
}
|
|
// we started with 10 pending object and during test we abort/commit 3 objects
|
|
pendingUploadsLeft := 7
|
|
require.Equal(t, pendingUploadsLeft, count)
|
|
})
|
|
|
|
t.Run("override object", func(t *testing.T) {
|
|
defer ctx.Check(deleteBucket("bucket"))
|
|
|
|
bucketName := "bucket"
|
|
objectName := "file1"
|
|
|
|
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, objectName, testrand.Bytes(5*memory.KiB))
|
|
require.NoError(t, err)
|
|
|
|
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
|
|
require.NoError(t, err)
|
|
require.Len(t, segments, 1)
|
|
|
|
pieceIDs := map[storj.NodeID]storj.PieceID{}
|
|
for _, piece := range segments[0].Pieces {
|
|
pieceIDs[piece.StorageNode] = segments[0].RootPieceID.Derive(piece.StorageNode, int32(piece.Number))
|
|
}
|
|
|
|
for _, node := range planet.StorageNodes {
|
|
pieceID, ok := pieceIDs[node.ID()]
|
|
require.True(t, ok)
|
|
piece, err := node.DB.Pieces().Stat(ctx, storage.BlobRef{
|
|
Namespace: planet.Satellites[0].ID().Bytes(),
|
|
Key: pieceID.Bytes(),
|
|
})
|
|
require.NoError(t, err)
|
|
require.NotNil(t, piece)
|
|
}
|
|
|
|
expectedData := testrand.Bytes(5 * memory.KiB)
|
|
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, objectName, expectedData)
|
|
require.NoError(t, err)
|
|
|
|
planet.WaitForStorageNodeDeleters(ctx)
|
|
|
|
// verify that old object pieces are not stored on storage nodes anymore
|
|
for _, node := range planet.StorageNodes {
|
|
pieceID, ok := pieceIDs[node.ID()]
|
|
require.True(t, ok)
|
|
|
|
piece, err := node.DB.Pieces().Stat(ctx, storage.BlobRef{
|
|
Namespace: planet.Satellites[0].ID().Bytes(),
|
|
Key: pieceID.Bytes(),
|
|
})
|
|
require.Error(t, err)
|
|
require.Nil(t, piece)
|
|
}
|
|
|
|
data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], bucketName, objectName)
|
|
require.NoError(t, err)
|
|
require.Equal(t, expectedData, data)
|
|
})
|
|
})
|
|
|
|
}
|
|
|
|
func TestEndpoint_Object_CopyObject_MultipleVersions(t *testing.T) {
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
|
Reconfigure: testplanet.Reconfigure{
|
|
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
|
config.Metainfo.MultipleVersions = true
|
|
},
|
|
},
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
checkDownload := func(objectKey string, expectedData []byte) {
|
|
data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], "multipleversions", objectKey)
|
|
require.NoError(t, err)
|
|
require.Equal(t, expectedData, data)
|
|
}
|
|
|
|
expectedDataA := testrand.Bytes(7 * memory.KiB)
|
|
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "multipleversions", "objectA", expectedDataA)
|
|
require.NoError(t, err)
|
|
|
|
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "multipleversions", "objectInline", testrand.Bytes(1*memory.KiB))
|
|
require.NoError(t, err)
|
|
|
|
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "multipleversions", "objectRemote", testrand.Bytes(10*memory.KiB))
|
|
require.NoError(t, err)
|
|
|
|
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
|
|
require.NoError(t, err)
|
|
defer ctx.Check(project.Close)
|
|
|
|
_, err = project.CopyObject(ctx, "multipleversions", "objectA", "multipleversions", "objectInline", nil)
|
|
require.NoError(t, err)
|
|
|
|
_, err = project.CopyObject(ctx, "multipleversions", "objectA", "multipleversions", "objectRemote", nil)
|
|
require.NoError(t, err)
|
|
|
|
checkDownload("objectInline", expectedDataA)
|
|
checkDownload("objectRemote", expectedDataA)
|
|
|
|
expectedDataB := testrand.Bytes(8 * memory.KiB)
|
|
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "multipleversions", "objectInline", expectedDataB)
|
|
require.NoError(t, err)
|
|
|
|
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "multipleversions", "objectRemote", expectedDataB)
|
|
require.NoError(t, err)
|
|
|
|
checkDownload("objectInline", expectedDataB)
|
|
checkDownload("objectRemote", expectedDataB)
|
|
checkDownload("objectA", expectedDataA)
|
|
|
|
expectedDataD := testrand.Bytes(6 * memory.KiB)
|
|
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "multipleversions", "objectA", expectedDataD)
|
|
require.NoError(t, err)
|
|
|
|
checkDownload("objectInline", expectedDataB)
|
|
checkDownload("objectRemote", expectedDataB)
|
|
checkDownload("objectA", expectedDataD)
|
|
|
|
objects, err := planet.Satellites[0].Metabase.DB.TestingAllObjects(ctx)
|
|
require.NoError(t, err)
|
|
require.Len(t, objects, 3)
|
|
|
|
for _, object := range objects {
|
|
require.Greater(t, int64(object.Version), int64(1))
|
|
}
|
|
|
|
_, err = project.CopyObject(ctx, "multipleversions", "objectInline", "multipleversions", "objectInlineCopy", nil)
|
|
require.NoError(t, err)
|
|
|
|
checkDownload("objectInlineCopy", expectedDataB)
|
|
|
|
iterator := project.ListObjects(ctx, "multipleversions", nil)
|
|
|
|
items := []string{}
|
|
for iterator.Next() {
|
|
items = append(items, iterator.Item().Key)
|
|
}
|
|
require.NoError(t, iterator.Err())
|
|
|
|
sort.Strings(items)
|
|
require.Equal(t, []string{
|
|
"objectA", "objectInline", "objectInlineCopy", "objectRemote",
|
|
}, items)
|
|
})
|
|
}
|
|
|
|
func TestEndpoint_Object_MoveObject_MultipleVersions(t *testing.T) {
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
|
Reconfigure: testplanet.Reconfigure{
|
|
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
|
config.Metainfo.MultipleVersions = true
|
|
},
|
|
},
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
expectedDataA := testrand.Bytes(7 * memory.KiB)
|
|
|
|
// upload objectA twice to have to have version different than 1
|
|
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "multipleversions", "objectA", expectedDataA)
|
|
require.NoError(t, err)
|
|
|
|
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "multipleversions", "objectA", expectedDataA)
|
|
require.NoError(t, err)
|
|
|
|
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "multipleversions", "objectB", testrand.Bytes(1*memory.KiB))
|
|
require.NoError(t, err)
|
|
|
|
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
|
|
require.NoError(t, err)
|
|
defer ctx.Check(project.Close)
|
|
|
|
// move is not possible because we have committed object under target location
|
|
err = project.MoveObject(ctx, "multipleversions", "objectA", "multipleversions", "objectB", nil)
|
|
require.Error(t, err)
|
|
})
|
|
}
|