// Copyright (C) 2019 Storj Labs, Inc. // See LICENSE for copying information. package metainfo_test import ( "errors" "fmt" "net" "sort" "strconv" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/zeebo/errs" "go.uber.org/zap" "storj.io/common/errs2" "storj.io/common/identity" "storj.io/common/macaroon" "storj.io/common/memory" "storj.io/common/pb" "storj.io/common/rpc/rpcstatus" "storj.io/common/signing" "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/common/testrand" "storj.io/storj/private/testplanet" "storj.io/storj/satellite" "storj.io/storj/satellite/internalpb" "storj.io/storj/satellite/metabase" "storj.io/storj/satellite/metainfo" "storj.io/uplink" "storj.io/uplink/private/metaclient" "storj.io/uplink/private/object" "storj.io/uplink/private/testuplink" ) func TestMaxOutBuckets(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { limit := planet.Satellites[0].Config.Metainfo.ProjectLimits.MaxBuckets for i := 1; i <= limit; i++ { name := "test" + strconv.Itoa(i) err := planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], name) require.NoError(t, err) } err := planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], fmt.Sprintf("test%d", limit+1)) require.Error(t, err) require.Contains(t, err.Error(), fmt.Sprintf("number of allocated buckets (%d) exceeded", limit)) }) } func TestRevokeAccess(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { accessIssuer := planet.Uplinks[0].Access[planet.Satellites[0].ID()] accessUser1, err := accessIssuer.Share(uplink.Permission{ AllowDownload: true, AllowUpload: true, AllowList: true, AllowDelete: true, }) require.NoError(t, err) accessUser2, err := accessUser1.Share(uplink.Permission{ AllowDownload: true, AllowUpload: true, AllowList: true, AllowDelete: true, }) require.NoError(t, err) projectUser2, err := uplink.OpenProject(ctx, accessUser2) require.NoError(t, err) defer ctx.Check(projectUser2.Close) // confirm that we can create a bucket _, err = projectUser2.CreateBucket(ctx, "bob") require.NoError(t, err) // we shouldn't be allowed to revoke ourselves or our parent err = projectUser2.RevokeAccess(ctx, accessUser2) assert.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied)) err = projectUser2.RevokeAccess(ctx, accessUser1) assert.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied)) projectIssuer, err := uplink.OpenProject(ctx, accessIssuer) require.NoError(t, err) defer ctx.Check(projectIssuer.Close) projectUser1, err := uplink.OpenProject(ctx, accessUser1) require.NoError(t, err) defer ctx.Check(projectUser1.Close) // I should be able to revoke with accessIssuer err = projectIssuer.RevokeAccess(ctx, accessUser1) require.NoError(t, err) // should no longer be able to create bucket with access 2 or 3 _, err = projectUser2.CreateBucket(ctx, "bob1") assert.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied)) _, err = projectUser1.CreateBucket(ctx, "bob1") assert.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied)) }) } func TestRevokeMacaroon(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { // I want the api key for the single satellite in this test apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()] client, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey) require.NoError(t, err) defer ctx.Check(client.Close) // Sanity check: it should work before revoke _, err = client.ListBuckets(ctx, metaclient.ListBucketsParams{ ListOpts: storj.BucketListOptions{ Cursor: "", Direction: storj.Forward, Limit: 10, }, }) require.NoError(t, err) err = planet.Satellites[0].API.DB.Revocation().Revoke(ctx, apiKey.Tail(), []byte("apikey")) require.NoError(t, err) _, err = client.ListBuckets(ctx, metaclient.ListBucketsParams{}) assert.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied)) _, err = client.BeginObject(ctx, metaclient.BeginObjectParams{}) assert.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied)) _, err = client.BeginDeleteObject(ctx, metaclient.BeginDeleteObjectParams{}) assert.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied)) _, err = client.ListBuckets(ctx, metaclient.ListBucketsParams{}) assert.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied)) _, _, err = client.ListObjects(ctx, metaclient.ListObjectsParams{}) assert.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied)) _, err = client.CreateBucket(ctx, metaclient.CreateBucketParams{}) assert.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied)) _, err = client.DeleteBucket(ctx, metaclient.DeleteBucketParams{}) assert.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied)) _, err = client.BeginDeleteObject(ctx, metaclient.BeginDeleteObjectParams{}) assert.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied)) _, err = client.GetBucket(ctx, metaclient.GetBucketParams{}) assert.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied)) _, err = client.GetObject(ctx, metaclient.GetObjectParams{}) assert.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied)) _, err = client.GetProjectInfo(ctx) assert.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied)) signer := signing.SignerFromFullIdentity(planet.Satellites[0].Identity) satStreamID := &internalpb.StreamID{ CreationDate: time.Now(), } signedStreamID, err := metainfo.SignStreamID(ctx, signer, satStreamID) require.NoError(t, err) encodedStreamID, err := pb.Marshal(signedStreamID) require.NoError(t, err) err = client.CommitObject(ctx, metaclient.CommitObjectParams{StreamID: encodedStreamID}) assert.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied)) _, err = client.BeginSegment(ctx, metaclient.BeginSegmentParams{StreamID: encodedStreamID}) assert.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied)) err = client.MakeInlineSegment(ctx, metaclient.MakeInlineSegmentParams{StreamID: encodedStreamID}) assert.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied)) _, _, err = client.DownloadSegment(ctx, metaclient.DownloadSegmentParams{StreamID: encodedStreamID}) assert.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied)) _, err = client.ListSegments(ctx, metaclient.ListSegmentsParams{StreamID: encodedStreamID}) assert.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied)) // these methods needs SegmentID signedSegmentID, err := metainfo.SignSegmentID(ctx, signer, &internalpb.SegmentID{ StreamId: satStreamID, CreationDate: time.Now(), }) require.NoError(t, err) encodedSegmentID, err := pb.Marshal(signedSegmentID) require.NoError(t, err) segmentID, err := storj.SegmentIDFromBytes(encodedSegmentID) require.NoError(t, err) err = client.CommitSegment(ctx, metaclient.CommitSegmentParams{SegmentID: segmentID}) assert.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied)) }) } func TestInvalidAPIKey(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { throwawayKey, err := macaroon.NewAPIKey([]byte("secret")) require.NoError(t, err) for _, invalidAPIKey := range []string{"", "invalid", "testKey"} { func() { client, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], throwawayKey) require.NoError(t, err) defer ctx.Check(client.Close) client.SetRawAPIKey([]byte(invalidAPIKey)) _, err = client.BeginObject(ctx, metaclient.BeginObjectParams{}) assertInvalidArgument(t, err, false) _, err = client.BeginDeleteObject(ctx, metaclient.BeginDeleteObjectParams{}) assertInvalidArgument(t, err, false) _, err = client.ListBuckets(ctx, metaclient.ListBucketsParams{}) assertInvalidArgument(t, err, false) _, _, err = client.ListObjects(ctx, metaclient.ListObjectsParams{}) assertInvalidArgument(t, err, false) _, err = client.CreateBucket(ctx, metaclient.CreateBucketParams{}) assertInvalidArgument(t, err, false) _, err = client.DeleteBucket(ctx, metaclient.DeleteBucketParams{}) assertInvalidArgument(t, err, false) _, err = client.BeginDeleteObject(ctx, metaclient.BeginDeleteObjectParams{}) assertInvalidArgument(t, err, false) _, err = client.GetBucket(ctx, metaclient.GetBucketParams{}) assertInvalidArgument(t, err, false) _, err = client.GetObject(ctx, metaclient.GetObjectParams{}) assertInvalidArgument(t, err, false) _, err = client.GetProjectInfo(ctx) assertInvalidArgument(t, err, false) // these methods needs StreamID to do authentication signer := signing.SignerFromFullIdentity(planet.Satellites[0].Identity) satStreamID := &internalpb.StreamID{ CreationDate: time.Now(), } signedStreamID, err := metainfo.SignStreamID(ctx, signer, satStreamID) require.NoError(t, err) encodedStreamID, err := pb.Marshal(signedStreamID) require.NoError(t, err) streamID, err := storj.StreamIDFromBytes(encodedStreamID) require.NoError(t, err) err = client.CommitObject(ctx, metaclient.CommitObjectParams{StreamID: streamID}) assertInvalidArgument(t, err, false) _, err = client.BeginSegment(ctx, metaclient.BeginSegmentParams{StreamID: streamID}) assertInvalidArgument(t, err, false) err = client.MakeInlineSegment(ctx, metaclient.MakeInlineSegmentParams{StreamID: streamID}) assertInvalidArgument(t, err, false) _, _, err = client.DownloadSegment(ctx, metaclient.DownloadSegmentParams{StreamID: streamID}) assertInvalidArgument(t, err, false) _, err = client.ListSegments(ctx, metaclient.ListSegmentsParams{StreamID: streamID}) assertInvalidArgument(t, err, false) // these methods needs SegmentID signedSegmentID, err := metainfo.SignSegmentID(ctx, signer, &internalpb.SegmentID{ StreamId: satStreamID, CreationDate: time.Now(), }) require.NoError(t, err) encodedSegmentID, err := pb.Marshal(signedSegmentID) require.NoError(t, err) segmentID, err := storj.SegmentIDFromBytes(encodedSegmentID) require.NoError(t, err) err = client.CommitSegment(ctx, metaclient.CommitSegmentParams{SegmentID: segmentID}) assertInvalidArgument(t, err, false) }() } }) } func assertInvalidArgument(t *testing.T, err error, allowed bool) { t.Helper() // If it's allowed, we allow any non-unauthenticated error because // some calls error after authentication checks. if !allowed { assert.True(t, errs2.IsRPC(err, rpcstatus.InvalidArgument)) } } func TestServiceList(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { items := []struct { Key string Value []byte }{ {Key: "sample.😶", Value: []byte{1}}, {Key: "müsic", Value: []byte{2}}, {Key: "müsic/söng1.mp3", Value: []byte{3}}, {Key: "müsic/söng2.mp3", Value: []byte{4}}, {Key: "müsic/album/söng3.mp3", Value: []byte{5}}, {Key: "müsic/söng4.mp3", Value: []byte{6}}, {Key: "ビデオ/movie.mkv", Value: []byte{7}}, } for _, item := range items { err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", item.Key, item.Value) assert.NoError(t, err) } project, err := planet.Uplinks[0].GetProject(ctx, planet.Satellites[0]) require.NoError(t, err) defer ctx.Check(project.Close) objects := project.ListObjects(ctx, "testbucket", &uplink.ListObjectsOptions{ Recursive: true, }) listItems := make([]*uplink.Object, 0) for objects.Next() { listItems = append(listItems, objects.Item()) } require.NoError(t, objects.Err()) expected := []storj.Object{ {Path: "müsic"}, {Path: "müsic/album/söng3.mp3"}, {Path: "müsic/söng1.mp3"}, {Path: "müsic/söng2.mp3"}, {Path: "müsic/söng4.mp3"}, {Path: "sample.😶"}, {Path: "ビデオ/movie.mkv"}, } require.Equal(t, len(expected), len(listItems)) sort.Slice(listItems, func(i, k int) bool { return listItems[i].Key < listItems[k].Key }) for i, item := range expected { require.Equal(t, item.Path, listItems[i].Key) require.Equal(t, item.IsPrefix, listItems[i].IsPrefix) } objects = project.ListObjects(ctx, "testbucket", &uplink.ListObjectsOptions{ Recursive: false, }) listItems = make([]*uplink.Object, 0) for objects.Next() { listItems = append(listItems, objects.Item()) } require.NoError(t, objects.Err()) expected = []storj.Object{ {Path: "müsic"}, {Path: "müsic/", IsPrefix: true}, {Path: "sample.😶"}, {Path: "ビデオ/", IsPrefix: true}, } require.Equal(t, len(expected), len(listItems)) sort.Slice(listItems, func(i, k int) bool { return listItems[i].Key < listItems[k].Key }) for i, item := range expected { t.Log(item.Path, listItems[i].Key) require.Equal(t, item.Path, listItems[i].Key) require.Equal(t, item.IsPrefix, listItems[i].IsPrefix) } }) } func TestExpirationTimeSegment(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()] err := planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], "my-bucket-name") require.NoError(t, err) metainfoClient, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey) require.NoError(t, err) defer ctx.Check(metainfoClient.Close) for i, r := range []struct { expirationDate time.Time errFlag bool }{ { // expiration time not set time.Time{}, false, }, { // 10 days into future time.Now().AddDate(0, 0, 10), false, }, { // current time time.Now(), true, }, { // 10 days into past time.Now().AddDate(0, 0, -10), true, }, } { _, err := metainfoClient.BeginObject(ctx, metaclient.BeginObjectParams{ Bucket: []byte("my-bucket-name"), EncryptedPath: []byte("path" + strconv.Itoa(i)), ExpiresAt: r.expirationDate, EncryptionParameters: storj.EncryptionParameters{ CipherSuite: storj.EncAESGCM, BlockSize: 256, }, }) if r.errFlag { assert.Error(t, err) } else { assert.NoError(t, err) } } }) } func TestGetProjectInfo(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 2, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { apiKey0 := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()] apiKey1 := planet.Uplinks[1].APIKey[planet.Satellites[0].ID()] metainfo0, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey0) require.NoError(t, err) metainfo1, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey1) require.NoError(t, err) info0, err := metainfo0.GetProjectInfo(ctx) require.NoError(t, err) require.NotNil(t, info0.ProjectSalt) info1, err := metainfo1.GetProjectInfo(ctx) require.NoError(t, err) require.NotNil(t, info1.ProjectSalt) // Different projects should have different salts require.NotEqual(t, info0.ProjectSalt, info1.ProjectSalt) }) } func TestBucketNameValidation(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()] metainfoClient, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey) require.NoError(t, err) defer ctx.Check(metainfoClient.Close) validNames := []string{ "tes", "testbucket", "test-bucket", "testbucket9", "9testbucket", "a.b", "test.bucket", "test-one.bucket-one", "test.bucket.one", "testbucket-63-0123456789012345678901234567890123456789012345abc", } for _, name := range validNames { _, err = metainfoClient.CreateBucket(ctx, metaclient.CreateBucketParams{ Name: []byte(name), }) require.NoError(t, err, "bucket name: %v", name) _, err = metainfoClient.BeginObject(ctx, metaclient.BeginObjectParams{ Bucket: []byte(name), EncryptedPath: []byte("123"), Version: 0, ExpiresAt: time.Now().Add(16 * 24 * time.Hour), EncryptionParameters: storj.EncryptionParameters{ CipherSuite: storj.EncAESGCM, BlockSize: 256, }, }) require.NoError(t, err, "bucket name: %v", name) } invalidNames := []string{ "", "t", "te", "-testbucket", "testbucket-", "-testbucket-", "a.b.", "test.bucket-.one", "test.-bucket.one", "1.2.3.4", "192.168.1.234", "testBUCKET", "test/bucket", "testbucket-64-0123456789012345678901234567890123456789012345abcd", } for _, name := range invalidNames { _, err = metainfoClient.BeginObject(ctx, metaclient.BeginObjectParams{ Bucket: []byte(name), EncryptedPath: []byte("123"), }) require.Error(t, err, "bucket name: %v", name) _, err = metainfoClient.CreateBucket(ctx, metaclient.CreateBucketParams{ Name: []byte(name), }) require.Error(t, err, "bucket name: %v", name) } }) } func TestListGetObjects(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()] uplink := planet.Uplinks[0] files := make([]string, 10) data := testrand.Bytes(1 * memory.KiB) for i := 0; i < len(files); i++ { files[i] = "path" + strconv.Itoa(i) err := uplink.Upload(ctx, planet.Satellites[0], "testbucket", files[i], data) require.NoError(t, err) } metainfoClient, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey) require.NoError(t, err) defer ctx.Check(metainfoClient.Close) expectedBucketName := "testbucket" items, _, err := metainfoClient.ListObjects(ctx, metaclient.ListObjectsParams{ Bucket: []byte(expectedBucketName), IncludeSystemMetadata: true, }) require.NoError(t, err) require.Equal(t, len(files), len(items)) for _, item := range items { require.NotEmpty(t, item.EncryptedPath) require.True(t, item.CreatedAt.Before(time.Now())) object, err := metainfoClient.GetObject(ctx, metaclient.GetObjectParams{ Bucket: []byte(expectedBucketName), EncryptedPath: item.EncryptedPath, }) require.NoError(t, err) require.Equal(t, item.EncryptedPath, object.EncryptedPath) require.NotEmpty(t, object.StreamID) } items, _, err = metainfoClient.ListObjects(ctx, metaclient.ListObjectsParams{ Bucket: []byte(expectedBucketName), Limit: 3, }) require.NoError(t, err) require.Equal(t, 3, len(items)) }) } func TestBucketExistenceCheck(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()] metainfoClient, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey) require.NoError(t, err) defer ctx.Check(metainfoClient.Close) // test object methods for bucket existence check _, err = metainfoClient.BeginObject(ctx, metaclient.BeginObjectParams{ Bucket: []byte("non-existing-bucket"), EncryptedPath: []byte("encrypted-path"), }) require.Error(t, err) require.True(t, errs2.IsRPC(err, rpcstatus.NotFound)) require.Equal(t, storj.ErrBucketNotFound.New("%s", "non-existing-bucket").Error(), errs.Unwrap(err).Error()) _, _, err = metainfoClient.ListObjects(ctx, metaclient.ListObjectsParams{ Bucket: []byte("non-existing-bucket"), }) require.Error(t, err) require.True(t, errs2.IsRPC(err, rpcstatus.NotFound)) require.Equal(t, storj.ErrBucketNotFound.New("%s", "non-existing-bucket").Error(), errs.Unwrap(err).Error()) }) } func TestBeginCommit(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ Satellite: func(logger *zap.Logger, index int, config *satellite.Config) { config.Overlay.GeoIP.MockCountries = []string{"DE"} }, }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()] buckets := planet.Satellites[0].API.Buckets.Service bucket := storj.Bucket{ Name: "initial-bucket", ProjectID: planet.Uplinks[0].Projects[0].ID, Placement: storj.EU, } _, err := buckets.CreateBucket(ctx, bucket) require.NoError(t, err) metainfoClient, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey) require.NoError(t, err) defer ctx.Check(metainfoClient.Close) params := metaclient.BeginObjectParams{ Bucket: []byte(bucket.Name), EncryptedPath: []byte("encrypted-path"), Redundancy: storj.RedundancyScheme{ Algorithm: storj.ReedSolomon, ShareSize: 256, RequiredShares: 1, RepairShares: 1, OptimalShares: 3, TotalShares: 4, }, EncryptionParameters: storj.EncryptionParameters{ CipherSuite: storj.EncAESGCM, BlockSize: 256, }, ExpiresAt: time.Now().Add(24 * time.Hour), } beginObjectResponse, err := metainfoClient.BeginObject(ctx, params) require.NoError(t, err) streamID := internalpb.StreamID{} err = pb.Unmarshal(beginObjectResponse.StreamID.Bytes(), &streamID) require.NoError(t, err) require.Equal(t, int32(storj.EU), streamID.Placement) response, err := metainfoClient.BeginSegment(ctx, metaclient.BeginSegmentParams{ StreamID: beginObjectResponse.StreamID, Position: storj.SegmentPosition{ Index: 0, }, MaxOrderLimit: memory.MiB.Int64(), }) require.NoError(t, err) fullIDMap := make(map[storj.NodeID]*identity.FullIdentity) for _, node := range planet.StorageNodes { fullIDMap[node.ID()] = node.Identity } makeResult := func(num int32) *pb.SegmentPieceUploadResult { nodeID := response.Limits[num].Limit.StorageNodeId hash := &pb.PieceHash{ PieceId: response.Limits[num].Limit.PieceId, PieceSize: 1048832, Timestamp: time.Now(), } fullID := fullIDMap[nodeID] require.NotNil(t, fullID) signer := signing.SignerFromFullIdentity(fullID) signedHash, err := signing.SignPieceHash(ctx, signer, hash) require.NoError(t, err) return &pb.SegmentPieceUploadResult{ PieceNum: num, NodeId: nodeID, Hash: signedHash, } } err = metainfoClient.CommitSegment(ctx, metaclient.CommitSegmentParams{ SegmentID: response.SegmentID, Encryption: storj.SegmentEncryption{ EncryptedKey: testrand.Bytes(256), }, PlainSize: 5000, SizeEncryptedData: memory.MiB.Int64(), UploadResult: []*pb.SegmentPieceUploadResult{ makeResult(0), makeResult(1), makeResult(2), }, }) require.NoError(t, err) metadata, err := pb.Marshal(&pb.StreamMeta{ NumberOfSegments: 1, }) require.NoError(t, err) err = metainfoClient.CommitObject(ctx, metaclient.CommitObjectParams{ StreamID: beginObjectResponse.StreamID, EncryptedMetadata: metadata, EncryptedMetadataNonce: testrand.Nonce(), EncryptedMetadataEncryptedKey: testrand.Bytes(32), }) require.NoError(t, err) objects, _, err := metainfoClient.ListObjects(ctx, metaclient.ListObjectsParams{ Bucket: []byte(bucket.Name), IncludeSystemMetadata: true, }) require.NoError(t, err) require.Len(t, objects, 1) require.Equal(t, params.EncryptedPath, objects[0].EncryptedPath) // TODO find better way to compare (one ExpiresAt contains time zone informations) require.Equal(t, params.ExpiresAt.Unix(), objects[0].ExpiresAt.Unix()) object, err := metainfoClient.GetObject(ctx, metaclient.GetObjectParams{ Bucket: []byte(bucket.Name), EncryptedPath: objects[0].EncryptedPath, }) require.NoError(t, err) project := planet.Uplinks[0].Projects[0] allObjects, err := planet.Satellites[0].Metabase.DB.TestingAllCommittedObjects(ctx, project.ID, object.Bucket) require.NoError(t, err) require.Len(t, allObjects, 1) }) } func TestInlineSegment(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()] buckets := planet.Satellites[0].API.Buckets.Service // TODO maybe split into separate cases // Test: // * create bucket // * begin object // * send several inline segments // * commit object // * list created object // * list object segments // * download segments // * delete segments and object bucket := storj.Bucket{ Name: "inline-segments-bucket", ProjectID: planet.Uplinks[0].Projects[0].ID, } _, err := buckets.CreateBucket(ctx, bucket) require.NoError(t, err) metainfoClient, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey) require.NoError(t, err) defer ctx.Check(metainfoClient.Close) params := metaclient.BeginObjectParams{ Bucket: []byte(bucket.Name), EncryptedPath: []byte("encrypted-path"), Redundancy: storj.RedundancyScheme{ Algorithm: storj.ReedSolomon, ShareSize: 256, RequiredShares: 1, RepairShares: 1, OptimalShares: 3, TotalShares: 4, }, EncryptionParameters: storj.EncryptionParameters{ CipherSuite: storj.EncAESGCM, BlockSize: 256, }, ExpiresAt: time.Now().Add(24 * time.Hour), } beginObjectResp, err := metainfoClient.BeginObject(ctx, params) require.NoError(t, err) segments := []int32{0, 1, 2, 3, 4, 5, 6} segmentsData := make([][]byte, len(segments)) for i, segment := range segments { segmentsData[i] = testrand.Bytes(memory.KiB) err = metainfoClient.MakeInlineSegment(ctx, metaclient.MakeInlineSegmentParams{ StreamID: beginObjectResp.StreamID, Position: storj.SegmentPosition{ Index: segment, }, PlainSize: 1024, EncryptedInlineData: segmentsData[i], Encryption: storj.SegmentEncryption{ EncryptedKey: testrand.Bytes(256), }, }) require.NoError(t, err) } metadata, err := pb.Marshal(&pb.StreamMeta{ NumberOfSegments: int64(len(segments)), }) require.NoError(t, err) err = metainfoClient.CommitObject(ctx, metaclient.CommitObjectParams{ StreamID: beginObjectResp.StreamID, EncryptedMetadata: metadata, EncryptedMetadataNonce: testrand.Nonce(), EncryptedMetadataEncryptedKey: testrand.Bytes(32), }) require.NoError(t, err) objects, _, err := metainfoClient.ListObjects(ctx, metaclient.ListObjectsParams{ Bucket: []byte(bucket.Name), IncludeSystemMetadata: true, }) require.NoError(t, err) require.Len(t, objects, 1) require.Equal(t, params.EncryptedPath, objects[0].EncryptedPath) // TODO find better way to compare (one ExpiresAt contains time zone informations) require.Equal(t, params.ExpiresAt.Unix(), objects[0].ExpiresAt.Unix()) object, err := metainfoClient.GetObject(ctx, metaclient.GetObjectParams{ Bucket: params.Bucket, EncryptedPath: params.EncryptedPath, }) require.NoError(t, err) { // Confirm data larger than our configured max inline segment size of 4 KiB cannot be inlined beginObjectResp, err := metainfoClient.BeginObject(ctx, metaclient.BeginObjectParams{ Bucket: []byte(bucket.Name), EncryptedPath: []byte("too-large-inline-segment"), EncryptionParameters: storj.EncryptionParameters{ CipherSuite: storj.EncAESGCM, BlockSize: 256, }, }) require.NoError(t, err) data := testrand.Bytes(10 * memory.KiB) err = metainfoClient.MakeInlineSegment(ctx, metaclient.MakeInlineSegmentParams{ StreamID: beginObjectResp.StreamID, Position: storj.SegmentPosition{ Index: 0, }, EncryptedInlineData: data, Encryption: storj.SegmentEncryption{ EncryptedKey: testrand.Bytes(256), }, }) require.Error(t, err) } { // test download inline segments existingSegments := []int32{0, 1, 2, 3, 4, 5, -1} for i, index := range existingSegments { info, limits, err := metainfoClient.DownloadSegment(ctx, metaclient.DownloadSegmentParams{ StreamID: object.StreamID, Position: storj.SegmentPosition{ Index: index, }, }) require.NoError(t, err) require.Nil(t, limits) require.Equal(t, segmentsData[i], info.EncryptedInlineData) } } { // test deleting segments _, err := metainfoClient.BeginDeleteObject(ctx, metaclient.BeginDeleteObjectParams{ Bucket: params.Bucket, EncryptedPath: params.EncryptedPath, }) require.NoError(t, err) _, err = metainfoClient.GetObject(ctx, metaclient.GetObjectParams{ Bucket: params.Bucket, EncryptedPath: params.EncryptedPath, }) require.Error(t, err) } }) } func TestUploadWithPlacement(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ Satellite: func(logger *zap.Logger, index int, config *satellite.Config) { config.Overlay.GeoIP.MockCountries = []string{"DE"} }, }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { bucketName := "initial-bucket" objectName := "file1" apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()] fmt.Println(apiKey) buckets := planet.Satellites[0].API.Buckets.Service bucket := storj.Bucket{ Name: bucketName, ProjectID: planet.Uplinks[0].Projects[0].ID, Placement: storj.EU, } _, err := buckets.CreateBucket(ctx, bucket) require.NoError(t, err) // this should be bigger than the max inline segment content := make([]byte, 5000) err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucket.Name, objectName, content) require.NoError(t, err) segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) require.NoError(t, err) require.Equal(t, 1, len(segments)) require.Equal(t, storj.EU, segments[0].Placement) }) } func TestRemoteSegment(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()] uplink := planet.Uplinks[0] expectedBucketName := "remote-segments-bucket" err := uplink.Upload(ctx, planet.Satellites[0], expectedBucketName, "file-object", testrand.Bytes(50*memory.KiB)) require.NoError(t, err) metainfoClient, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey) require.NoError(t, err) defer ctx.Check(metainfoClient.Close) items, _, err := metainfoClient.ListObjects(ctx, metaclient.ListObjectsParams{ Bucket: []byte(expectedBucketName), }) require.NoError(t, err) require.Len(t, items, 1) { // Get object // Download segment object, err := metainfoClient.GetObject(ctx, metaclient.GetObjectParams{ Bucket: []byte(expectedBucketName), EncryptedPath: items[0].EncryptedPath, }) require.NoError(t, err) _, limits, err := metainfoClient.DownloadSegment(ctx, metaclient.DownloadSegmentParams{ StreamID: object.StreamID, Position: storj.SegmentPosition{ Index: -1, }, }) require.NoError(t, err) require.NotEmpty(t, limits) } { // Download Object download, err := metainfoClient.DownloadObject(ctx, metaclient.DownloadObjectParams{ Bucket: []byte(expectedBucketName), EncryptedObjectKey: items[0].EncryptedPath, Range: metaclient.StreamRange{ Mode: metaclient.StreamRangeStartLimit, Start: 1, Limit: 2, }, }) require.NoError(t, err) require.Len(t, download.DownloadedSegments, 1) require.NotEmpty(t, download.DownloadedSegments[0].Limits) for _, limit := range download.DownloadedSegments[0].Limits { if limit == nil { continue } // requested download size is // [1:2} // calculating encryption input block size (7408) indices gives us: // 0 and 1 // converting these into output block size (7424), gives us: // [0:7424} // this aligned to stripe size (256), gives us: // [0:7424} require.Equal(t, int64(7424), limit.Limit.Limit) } } { // Begin deleting object // List objects _, err := metainfoClient.BeginDeleteObject(ctx, metaclient.BeginDeleteObjectParams{ Bucket: []byte(expectedBucketName), EncryptedPath: items[0].EncryptedPath, }) require.NoError(t, err) items, _, err = metainfoClient.ListObjects(ctx, metaclient.ListObjectsParams{ Bucket: []byte(expectedBucketName), }) require.NoError(t, err) require.Len(t, items, 0) } }) } func TestIDs(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()] metainfoClient, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey) require.NoError(t, err) defer ctx.Check(metainfoClient.Close) { streamID := testrand.StreamID(256) err = metainfoClient.CommitObject(ctx, metaclient.CommitObjectParams{ StreamID: streamID, }) require.Error(t, err) // invalid streamID segmentID := testrand.SegmentID(512) err = metainfoClient.CommitSegment(ctx, metaclient.CommitSegmentParams{ SegmentID: segmentID, }) require.Error(t, err) // invalid segmentID } satellitePeer := signing.SignerFromFullIdentity(planet.Satellites[0].Identity) { // streamID expired signedStreamID, err := metainfo.SignStreamID(ctx, satellitePeer, &internalpb.StreamID{ CreationDate: time.Now().Add(-36 * time.Hour), }) require.NoError(t, err) encodedStreamID, err := pb.Marshal(signedStreamID) require.NoError(t, err) streamID, err := storj.StreamIDFromBytes(encodedStreamID) require.NoError(t, err) err = metainfoClient.CommitObject(ctx, metaclient.CommitObjectParams{ StreamID: streamID, }) require.Error(t, err) } { // segment id missing stream id signedSegmentID, err := metainfo.SignSegmentID(ctx, satellitePeer, &internalpb.SegmentID{ CreationDate: time.Now().Add(-1 * time.Hour), }) require.NoError(t, err) encodedSegmentID, err := pb.Marshal(signedSegmentID) require.NoError(t, err) segmentID, err := storj.SegmentIDFromBytes(encodedSegmentID) require.NoError(t, err) err = metainfoClient.CommitSegment(ctx, metaclient.CommitSegmentParams{ SegmentID: segmentID, }) require.Error(t, err) } { // segmentID expired signedSegmentID, err := metainfo.SignSegmentID(ctx, satellitePeer, &internalpb.SegmentID{ CreationDate: time.Now().Add(-36 * time.Hour), StreamId: &internalpb.StreamID{ CreationDate: time.Now(), }, }) require.NoError(t, err) encodedSegmentID, err := pb.Marshal(signedSegmentID) require.NoError(t, err) segmentID, err := storj.SegmentIDFromBytes(encodedSegmentID) require.NoError(t, err) err = metainfoClient.CommitSegment(ctx, metaclient.CommitSegmentParams{ SegmentID: segmentID, }) require.Error(t, err) } }) } func TestBatch(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()] metainfoClient, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey) require.NoError(t, err) defer ctx.Check(metainfoClient.Close) { // create few buckets and list them in one batch requests := make([]metaclient.BatchItem, 0) numOfBuckets := 5 for i := 0; i < numOfBuckets; i++ { requests = append(requests, &metaclient.CreateBucketParams{ Name: []byte("test-bucket-" + strconv.Itoa(i)), }) } requests = append(requests, &metaclient.ListBucketsParams{ ListOpts: storj.BucketListOptions{ Cursor: "", Direction: storj.After, }, }) responses, err := metainfoClient.Batch(ctx, requests...) require.NoError(t, err) require.Equal(t, numOfBuckets+1, len(responses)) for i := 0; i < numOfBuckets; i++ { response, err := responses[i].CreateBucket() require.NoError(t, err) require.Equal(t, "test-bucket-"+strconv.Itoa(i), response.Bucket.Name) _, err = responses[i].GetBucket() require.Error(t, err) } bucketsListResp, err := responses[numOfBuckets].ListBuckets() require.NoError(t, err) require.Equal(t, numOfBuckets, len(bucketsListResp.BucketList.Items)) } { // create bucket, object, upload inline segments in batch, download inline segments in batch err := planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], "second-test-bucket") require.NoError(t, err) requests := make([]metaclient.BatchItem, 0) requests = append(requests, &metaclient.BeginObjectParams{ Bucket: []byte("second-test-bucket"), EncryptedPath: []byte("encrypted-path"), EncryptionParameters: storj.EncryptionParameters{ CipherSuite: storj.EncAESGCM, BlockSize: 256, }, }) numOfSegments := 10 expectedData := make([][]byte, numOfSegments) for i := 0; i < numOfSegments; i++ { expectedData[i] = testrand.Bytes(memory.KiB) requests = append(requests, &metaclient.MakeInlineSegmentParams{ Position: storj.SegmentPosition{ Index: int32(i), }, PlainSize: int64(len(expectedData[i])), EncryptedInlineData: expectedData[i], Encryption: storj.SegmentEncryption{ EncryptedKey: testrand.Bytes(256), }, }) } metadata, err := pb.Marshal(&pb.StreamMeta{ NumberOfSegments: int64(numOfSegments), }) require.NoError(t, err) requests = append(requests, &metaclient.CommitObjectParams{ EncryptedMetadata: metadata, EncryptedMetadataNonce: testrand.Nonce(), EncryptedMetadataEncryptedKey: testrand.Bytes(32), }) responses, err := metainfoClient.Batch(ctx, requests...) require.NoError(t, err) require.Equal(t, numOfSegments+2, len(responses)) requests = make([]metaclient.BatchItem, 0) requests = append(requests, &metaclient.GetObjectParams{ Bucket: []byte("second-test-bucket"), EncryptedPath: []byte("encrypted-path"), }) for i := 0; i < numOfSegments-1; i++ { requests = append(requests, &metaclient.DownloadSegmentParams{ Position: storj.SegmentPosition{ Index: int32(i), }, }) } requests = append(requests, &metaclient.DownloadSegmentParams{ Position: storj.SegmentPosition{ Index: -1, }, }) responses, err = metainfoClient.Batch(ctx, requests...) require.NoError(t, err) require.Equal(t, numOfSegments+1, len(responses)) for i, response := range responses[1:] { downloadResponse, err := response.DownloadSegment() require.NoError(t, err) require.Equal(t, expectedData[i], downloadResponse.Info.EncryptedInlineData) } } { // test case when StreamID is not set automatically err := planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], "third-test-bucket") require.NoError(t, err) beginObjectResp, err := metainfoClient.BeginObject(ctx, metaclient.BeginObjectParams{ Bucket: []byte("third-test-bucket"), EncryptedPath: []byte("encrypted-path"), EncryptionParameters: storj.EncryptionParameters{ CipherSuite: storj.EncAESGCM, BlockSize: 256, }, }) require.NoError(t, err) requests := make([]metaclient.BatchItem, 0) numOfSegments := 10 expectedData := make([][]byte, numOfSegments) for i := 0; i < numOfSegments; i++ { expectedData[i] = testrand.Bytes(memory.KiB) requests = append(requests, &metaclient.MakeInlineSegmentParams{ StreamID: beginObjectResp.StreamID, Position: storj.SegmentPosition{ Index: int32(i), }, PlainSize: int64(len(expectedData[i])), EncryptedInlineData: expectedData[i], Encryption: storj.SegmentEncryption{ EncryptedKey: testrand.Bytes(256), }, }) } metadata, err := pb.Marshal(&pb.StreamMeta{ NumberOfSegments: int64(numOfSegments), }) require.NoError(t, err) requests = append(requests, &metaclient.CommitObjectParams{ StreamID: beginObjectResp.StreamID, EncryptedMetadata: metadata, EncryptedMetadataNonce: testrand.Nonce(), EncryptedMetadataEncryptedKey: testrand.Bytes(32), }) responses, err := metainfoClient.Batch(ctx, requests...) require.NoError(t, err) require.Equal(t, numOfSegments+1, len(responses)) } }) } func TestRateLimit(t *testing.T) { rateLimit := 2 testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ Satellite: func(log *zap.Logger, index int, config *satellite.Config) { config.Metainfo.RateLimiter.Rate = float64(rateLimit) config.Metainfo.RateLimiter.CacheExpiration = 500 * time.Millisecond }, }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { ul := planet.Uplinks[0] satellite := planet.Satellites[0] // TODO find a way to reset limiter before test is executed, currently // testplanet is doing one additional request to get access time.Sleep(1 * time.Second) var group errs2.Group for i := 0; i <= rateLimit; i++ { group.Go(func() error { return ul.CreateBucket(ctx, satellite, testrand.BucketName()) }) } groupErrs := group.Wait() require.Len(t, groupErrs, 1) }) } func TestRateLimit_Disabled(t *testing.T) { rateLimit := 2 testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ Satellite: func(log *zap.Logger, index int, config *satellite.Config) { config.Metainfo.RateLimiter.Enabled = false config.Metainfo.RateLimiter.Rate = float64(rateLimit) }, }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { ul := planet.Uplinks[0] satellite := planet.Satellites[0] var group errs2.Group for i := 0; i <= rateLimit; i++ { group.Go(func() error { return ul.CreateBucket(ctx, satellite, testrand.BucketName()) }) } groupErrs := group.Wait() require.Len(t, groupErrs, 0) }) } func TestRateLimit_ProjectRateLimitOverride(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ Satellite: func(log *zap.Logger, index int, config *satellite.Config) { config.Metainfo.RateLimiter.Rate = 2 config.Metainfo.RateLimiter.CacheExpiration = 500 * time.Millisecond }, }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { ul := planet.Uplinks[0] satellite := planet.Satellites[0] // TODO find a way to reset limiter before test is executed, currently // testplanet is doing one additional request to get access time.Sleep(1 * time.Second) projects, err := satellite.DB.Console().Projects().GetAll(ctx) require.NoError(t, err) require.Len(t, projects, 1) rateLimit := 3 projects[0].RateLimit = &rateLimit err = satellite.DB.Console().Projects().Update(ctx, &projects[0]) require.NoError(t, err) var group errs2.Group for i := 0; i <= rateLimit; i++ { group.Go(func() error { return ul.CreateBucket(ctx, satellite, testrand.BucketName()) }) } groupErrs := group.Wait() require.Len(t, groupErrs, 1) }) } func TestRateLimit_ProjectRateLimitOverrideCachedExpired(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ Satellite: func(log *zap.Logger, index int, config *satellite.Config) { config.Metainfo.RateLimiter.Rate = 2 config.Metainfo.RateLimiter.CacheExpiration = time.Second }, }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { ul := planet.Uplinks[0] satellite := planet.Satellites[0] // TODO find a way to reset limiter before test is executed, currently // testplanet is doing one additional request to get access time.Sleep(2 * time.Second) projects, err := satellite.DB.Console().Projects().GetAll(ctx) require.NoError(t, err) require.Len(t, projects, 1) rateLimit := 3 projects[0].RateLimit = &rateLimit err = satellite.DB.Console().Projects().Update(ctx, &projects[0]) require.NoError(t, err) var group1 errs2.Group for i := 0; i <= rateLimit; i++ { group1.Go(func() error { return ul.CreateBucket(ctx, satellite, testrand.BucketName()) }) } group1Errs := group1.Wait() require.Len(t, group1Errs, 1) rateLimit = 1 projects[0].RateLimit = &rateLimit err = satellite.DB.Console().Projects().Update(ctx, &projects[0]) require.NoError(t, err) time.Sleep(2 * time.Second) var group2 errs2.Group for i := 0; i <= rateLimit; i++ { group2.Go(func() error { return ul.CreateBucket(ctx, satellite, testrand.BucketName()) }) } group2Errs := group2.Wait() require.Len(t, group2Errs, 1) }) } func TestRateLimit_ExceededBurstLimit(t *testing.T) { burstLimit := 2 testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ Satellite: func(log *zap.Logger, index int, config *satellite.Config) { config.Metainfo.RateLimiter.Rate = float64(burstLimit) config.Metainfo.RateLimiter.CacheExpiration = 500 * time.Millisecond }, }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { ul := planet.Uplinks[0] satellite := planet.Satellites[0] // TODO find a way to reset limiter before test is executed, currently // testplanet is doing one additional request to get access time.Sleep(1 * time.Second) var group errs2.Group for i := 0; i <= burstLimit; i++ { group.Go(func() error { return ul.CreateBucket(ctx, satellite, testrand.BucketName()) }) } groupErrs := group.Wait() require.Len(t, groupErrs, 1) projects, err := satellite.DB.Console().Projects().GetAll(ctx) require.NoError(t, err) require.Len(t, projects, 1) zeroRateLimit := 0 err = satellite.DB.Console().Projects().UpdateBurstLimit(ctx, projects[0].ID, zeroRateLimit) require.NoError(t, err) time.Sleep(1 * time.Second) var group2 errs2.Group for i := 0; i <= burstLimit; i++ { group2.Go(func() error { return ul.CreateBucket(ctx, satellite, testrand.BucketName()) }) } group2Errs := group2.Wait() require.Len(t, group2Errs, burstLimit+1) }) } func TestBucketEmptinessBeforeDelete(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { for i := 0; i < 10; i++ { err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "test-bucket", "object-key"+strconv.Itoa(i), testrand.Bytes(memory.KiB)) require.NoError(t, err) } for i := 0; i < 10; i++ { err := planet.Uplinks[0].DeleteBucket(ctx, planet.Satellites[0], "test-bucket") require.Error(t, err) require.True(t, errors.Is(err, uplink.ErrBucketNotEmpty)) err = planet.Uplinks[0].DeleteObject(ctx, planet.Satellites[0], "test-bucket", "object-key"+strconv.Itoa(i)) require.NoError(t, err) } err := planet.Uplinks[0].DeleteBucket(ctx, planet.Satellites[0], "test-bucket") require.NoError(t, err) }) } func TestDeleteBatchWithoutPermission(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()] err := planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], "test-bucket") require.NoError(t, err) apiKey, err = apiKey.Restrict(macaroon.WithNonce(macaroon.Caveat{ DisallowLists: true, DisallowReads: true, })) require.NoError(t, err) metainfoClient, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey) require.NoError(t, err) defer ctx.Check(metainfoClient.Close) responses, err := metainfoClient.Batch(ctx, // this request was causing panic becase for deleting object // its possible to return no error and empty response for // specific set of permissions, see `apiKey.Restrict` from above &metaclient.BeginDeleteObjectParams{ Bucket: []byte("test-bucket"), EncryptedPath: []byte("not-existing-object"), }, // TODO this code should be enabled then issue with read permissions in // DeleteBucket method currently user have always permission to read bucket // https://storjlabs.atlassian.net/browse/USR-603 // when it will be fixed commented code from bellow should replace existing DeleteBucketParams // the same situation like above // &metaclient.DeleteBucketParams{ // Name: []byte("not-existing-bucket"), // }, &metaclient.DeleteBucketParams{ Name: []byte("test-bucket"), }, ) require.NoError(t, err) require.Equal(t, 2, len(responses)) }) } func TestInlineSegmentThreshold(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { { // limit is max inline segment size + encryption overhead err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "test-bucket-inline", "inline-object", testrand.Bytes(4*memory.KiB)) require.NoError(t, err) // we don't know encrypted path segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) require.NoError(t, err) require.Len(t, segments, 1) require.Zero(t, segments[0].Redundancy) require.NotEmpty(t, segments[0].InlineData) // clean up - delete the uploaded object objects, err := planet.Satellites[0].Metabase.DB.TestingAllObjects(ctx) require.NoError(t, err) require.Len(t, objects, 1) _, err = planet.Satellites[0].Metabase.DB.DeleteObjectLatestVersion(ctx, metabase.DeleteObjectLatestVersion{ ObjectLocation: objects[0].Location(), }) require.NoError(t, err) } { // one more byte over limit should enough to create remote segment err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "test-bucket-remote", "remote-object", testrand.Bytes(4*memory.KiB+1)) require.NoError(t, err) // we don't know encrypted path segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) require.NoError(t, err) require.Len(t, segments, 1) require.NotZero(t, segments[0].Redundancy) require.Empty(t, segments[0].InlineData) // clean up - delete the uploaded object objects, err := planet.Satellites[0].Metabase.DB.TestingAllObjects(ctx) require.NoError(t, err) require.Len(t, objects, 1) _, err = planet.Satellites[0].Metabase.DB.DeleteObjectLatestVersion(ctx, metabase.DeleteObjectLatestVersion{ ObjectLocation: objects[0].Location(), }) require.NoError(t, err) } }) } // TestCommitObjectMetadataSize ensures that CommitObject returns an error when the metadata provided by the user is too large. func TestCommitObjectMetadataSize(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ Satellite: testplanet.MaxMetadataSize(2 * memory.KiB), }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()] buckets := planet.Satellites[0].API.Buckets.Service bucket := storj.Bucket{ Name: "initial-bucket", ProjectID: planet.Uplinks[0].Projects[0].ID, } _, err := buckets.CreateBucket(ctx, bucket) require.NoError(t, err) metainfoClient, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey) require.NoError(t, err) defer ctx.Check(metainfoClient.Close) params := metaclient.BeginObjectParams{ Bucket: []byte(bucket.Name), EncryptedPath: []byte("encrypted-path"), Redundancy: storj.RedundancyScheme{ Algorithm: storj.ReedSolomon, ShareSize: 256, RequiredShares: 1, RepairShares: 1, OptimalShares: 3, TotalShares: 4, }, EncryptionParameters: storj.EncryptionParameters{ BlockSize: 256, CipherSuite: storj.EncNull, }, ExpiresAt: time.Now().Add(24 * time.Hour), } beginObjectResponse, err := metainfoClient.BeginObject(ctx, params) require.NoError(t, err) response, err := metainfoClient.BeginSegment(ctx, metaclient.BeginSegmentParams{ StreamID: beginObjectResponse.StreamID, Position: storj.SegmentPosition{ Index: 0, }, MaxOrderLimit: memory.MiB.Int64(), }) require.NoError(t, err) fullIDMap := make(map[storj.NodeID]*identity.FullIdentity) for _, node := range planet.StorageNodes { fullIDMap[node.ID()] = node.Identity } makeResult := func(num int32) *pb.SegmentPieceUploadResult { nodeID := response.Limits[num].Limit.StorageNodeId hash := &pb.PieceHash{ PieceId: response.Limits[num].Limit.PieceId, PieceSize: 1048832, Timestamp: time.Now(), } fullID := fullIDMap[nodeID] require.NotNil(t, fullID) signer := signing.SignerFromFullIdentity(fullID) signedHash, err := signing.SignPieceHash(ctx, signer, hash) require.NoError(t, err) return &pb.SegmentPieceUploadResult{ PieceNum: num, NodeId: nodeID, Hash: signedHash, } } err = metainfoClient.CommitSegment(ctx, metaclient.CommitSegmentParams{ SegmentID: response.SegmentID, Encryption: storj.SegmentEncryption{ EncryptedKey: []byte{1}, }, PlainSize: memory.MiB.Int64(), SizeEncryptedData: memory.MiB.Int64(), UploadResult: []*pb.SegmentPieceUploadResult{ makeResult(0), makeResult(1), makeResult(2), }, }) require.NoError(t, err) // 5KiB metadata should fail because it is too large. metadata, err := pb.Marshal(&pb.StreamMeta{ EncryptedStreamInfo: testrand.Bytes(5 * memory.KiB), NumberOfSegments: 1, }) require.NoError(t, err) err = metainfoClient.CommitObject(ctx, metaclient.CommitObjectParams{ StreamID: beginObjectResponse.StreamID, EncryptedMetadata: metadata, EncryptedMetadataNonce: testrand.Nonce(), EncryptedMetadataEncryptedKey: testrand.Bytes(32), }) require.Error(t, err) assertInvalidArgument(t, err, true) // 1KiB metadata should not fail. metadata, err = pb.Marshal(&pb.StreamMeta{ EncryptedStreamInfo: testrand.Bytes(1 * memory.KiB), NumberOfSegments: 1, }) require.NoError(t, err) err = metainfoClient.CommitObject(ctx, metaclient.CommitObjectParams{ StreamID: beginObjectResponse.StreamID, EncryptedMetadata: metadata, EncryptedMetadataNonce: testrand.Nonce(), EncryptedMetadataEncryptedKey: testrand.Bytes(32), }) require.NoError(t, err) }) } // TestBeginObjectEncryptedObjectKeyLength ensures that BeginObject returns an error when the encrypted key provided by the user is too large. func TestBeginObjectEncryptedObjectKeyLength(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ Satellite: testplanet.MaxObjectKeyLength(1024), }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()] err := planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], "initial-bucket") require.NoError(t, err) metainfoClient, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey) require.NoError(t, err) defer ctx.Check(metainfoClient.Close) params := metaclient.BeginObjectParams{ Bucket: []byte("initial-bucket"), EncryptionParameters: storj.EncryptionParameters{ BlockSize: 256, CipherSuite: storj.EncNull, }, } params.EncryptedPath = testrand.Bytes(500) _, err = metainfoClient.BeginObject(ctx, params) require.NoError(t, err) params.EncryptedPath = testrand.Bytes(1024) _, err = metainfoClient.BeginObject(ctx, params) require.NoError(t, err) params.EncryptedPath = testrand.Bytes(2048) _, err = metainfoClient.BeginObject(ctx, params) require.Error(t, err) require.True(t, rpcstatus.Code(err) == rpcstatus.InvalidArgument) }) } func TestDeleteRightsOnUpload(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { up := planet.Uplinks[0] err := up.CreateBucket(ctx, planet.Satellites[0], "test-bucket") require.NoError(t, err) data := testrand.Bytes(1 * memory.KiB) err = up.Upload(ctx, planet.Satellites[0], "test-bucket", "test-key", data) require.NoError(t, err) access := up.Access[planet.Satellites[0].ID()] overwrite := func(allowDelete bool) error { permission := uplink.FullPermission() permission.AllowDelete = allowDelete sharedAccess, err := access.Share(permission) require.NoError(t, err) project, err := uplink.OpenProject(ctx, sharedAccess) require.NoError(t, err) defer ctx.Check(project.Close) upload, err := project.UploadObject(ctx, "test-bucket", "test-key", nil) require.NoError(t, err) _, err = upload.Write([]byte("new data")) require.NoError(t, err) return upload.Commit() } require.Error(t, overwrite(false)) require.NoError(t, overwrite(true)) }) } func TestImmutableUpload(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { access := planet.Uplinks[0].Access[planet.Satellites[0].ID()] permission := uplink.Permission{AllowUpload: true} // AllowDelete: false sharedAccess, err := access.Share(permission) require.NoError(t, err) project, err := uplink.OpenProject(ctx, sharedAccess) require.NoError(t, err) defer ctx.Check(project.Close) _, err = project.CreateBucket(ctx, "test-bucket") require.NoError(t, err) // Uploading the object for first time should be successful. upload, err := project.UploadObject(ctx, "test-bucket", "test-key", nil) require.NoError(t, err) _, err = upload.Write(testrand.Bytes(1 * memory.KiB)) require.NoError(t, err) err = upload.Commit() require.NoError(t, err) // Overwriting the object should fail on Commit. upload, err = project.UploadObject(ctx, "test-bucket", "test-key", nil) require.NoError(t, err) _, err = upload.Write(testrand.Bytes(1 * memory.KiB)) require.NoError(t, err) err = upload.Commit() require.Error(t, err) }) } func TestGetObjectIPs(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 5, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { access := planet.Uplinks[0].Access[planet.Satellites[0].ID()] uplnk := planet.Uplinks[0] uplinkCtx := testuplink.WithMaxSegmentSize(ctx, 5*memory.KB) sat := planet.Satellites[0] require.NoError(t, uplnk.CreateBucket(uplinkCtx, sat, "bob")) require.NoError(t, uplnk.Upload(uplinkCtx, sat, "bob", "jones", testrand.Bytes(20*memory.KB))) ips, err := object.GetObjectIPs(ctx, uplink.Config{}, access, "bob", "jones") require.NoError(t, err) require.True(t, len(ips) > 0) // verify it's a real IP with valid host and port for _, ip := range ips { host, port, err := net.SplitHostPort(string(ip)) require.NoError(t, err) netIP := net.ParseIP(host) require.NotNil(t, netIP) _, err = strconv.Atoi(port) require.NoError(t, err) } }) } func TestMultipartObjectDownloadRejection(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { data := testrand.Bytes(20 * memory.KB) err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "pip-first", "non-multipart-object", data) require.NoError(t, err) project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0]) require.NoError(t, err) _, err = project.EnsureBucket(ctx, "pip-second") require.NoError(t, err) info, err := project.BeginUpload(ctx, "pip-second", "multipart-object", nil) require.NoError(t, err) upload, err := project.UploadPart(ctx, "pip-second", "multipart-object", info.UploadID, 1) require.NoError(t, err) _, err = upload.Write(data) require.NoError(t, err) require.NoError(t, upload.Commit()) _, err = project.CommitUpload(ctx, "pip-second", "multipart-object", info.UploadID, nil) require.NoError(t, err) _, err = project.EnsureBucket(ctx, "pip-third") require.NoError(t, err) info, err = project.BeginUpload(ctx, "pip-third", "multipart-object-third", nil) require.NoError(t, err) for i := 0; i < 4; i++ { upload, err := project.UploadPart(ctx, "pip-third", "multipart-object-third", info.UploadID, uint32(i+1)) require.NoError(t, err) _, err = upload.Write(data) require.NoError(t, err) require.NoError(t, upload.Commit()) } _, err = project.CommitUpload(ctx, "pip-third", "multipart-object-third", info.UploadID, nil) require.NoError(t, err) apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()] metainfoClient, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey) require.NoError(t, err) defer ctx.Check(metainfoClient.Close) objects, err := planet.Satellites[0].Metabase.DB.TestingAllCommittedObjects(ctx, planet.Uplinks[0].Projects[0].ID, "pip-first") require.NoError(t, err) require.Len(t, objects, 1) // verify that standard objects can be downloaded in an old way (index = -1 as last segment) object, err := metainfoClient.GetObject(ctx, metaclient.GetObjectParams{ Bucket: []byte("pip-first"), EncryptedPath: []byte(objects[0].ObjectKey), }) require.NoError(t, err) _, _, err = metainfoClient.DownloadSegment(ctx, metaclient.DownloadSegmentParams{ StreamID: object.StreamID, Position: storj.SegmentPosition{ Index: -1, }, }) require.NoError(t, err) objects, err = planet.Satellites[0].Metabase.DB.TestingAllCommittedObjects(ctx, planet.Uplinks[0].Projects[0].ID, "pip-second") require.NoError(t, err) require.Len(t, objects, 1) // verify that multipart objects (single segment) CANNOT be downloaded in an old way (index = -1 as last segment) object, err = metainfoClient.GetObject(ctx, metaclient.GetObjectParams{ Bucket: []byte("pip-second"), EncryptedPath: []byte(objects[0].ObjectKey), }) require.NoError(t, err) _, _, err = metainfoClient.DownloadSegment(ctx, metaclient.DownloadSegmentParams{ StreamID: object.StreamID, Position: storj.SegmentPosition{ Index: -1, }, }) require.Error(t, err) require.Contains(t, err.Error(), "Used uplink version cannot download multipart objects.") objects, err = planet.Satellites[0].Metabase.DB.TestingAllCommittedObjects(ctx, planet.Uplinks[0].Projects[0].ID, "pip-third") require.NoError(t, err) require.Len(t, objects, 1) // verify that multipart objects (multiple segments) CANNOT be downloaded in an old way (index = -1 as last segment) object, err = metainfoClient.GetObject(ctx, metaclient.GetObjectParams{ Bucket: []byte("pip-third"), EncryptedPath: []byte(objects[0].ObjectKey), }) require.NoError(t, err) _, _, err = metainfoClient.DownloadSegment(ctx, metaclient.DownloadSegmentParams{ StreamID: object.StreamID, Position: storj.SegmentPosition{ Index: -1, }, }) require.Error(t, err) require.Contains(t, err.Error(), "Used uplink version cannot download multipart objects.") }) } func TestObjectOverrideOnUpload(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { initialData := testrand.Bytes(20 * memory.KB) overrideData := testrand.Bytes(25 * memory.KB) { // committed object // upload committed object err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "pip-first", "committed-object", initialData) require.NoError(t, err) // upload once again to override err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "pip-first", "committed-object", overrideData) require.NoError(t, err) data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], "pip-first", "committed-object") require.NoError(t, err) require.Equal(t, overrideData, data) } { // pending object project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0]) require.NoError(t, err) defer ctx.Check(project.Close) // upload pending object info, err := project.BeginUpload(ctx, "pip-first", "pending-object", nil) require.NoError(t, err) upload, err := project.UploadPart(ctx, "pip-first", "pending-object", info.UploadID, 1) require.NoError(t, err) _, err = upload.Write(initialData) require.NoError(t, err) require.NoError(t, upload.Commit()) // upload once again to override err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "pip-first", "pending-object", overrideData) require.NoError(t, err) data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], "pip-first", "pending-object") require.NoError(t, err) require.Equal(t, overrideData, data) } }) } func TestStableUploadID(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()] client, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey) require.NoError(t, err) defer ctx.Check(client.Close) err = planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], "testbucket") require.NoError(t, err) beginResp, err := client.BeginObject(ctx, metaclient.BeginObjectParams{ Bucket: []byte("testbucket"), EncryptedPath: []byte("a/b/testobject"), EncryptionParameters: storj.EncryptionParameters{ CipherSuite: storj.EncAESGCM, BlockSize: 256, }, }) require.NoError(t, err) // List the root of the bucket recursively listResp, _, err := client.ListObjects(ctx, metaclient.ListObjectsParams{ Bucket: []byte("testbucket"), Status: int32(metabase.Pending), Recursive: true, IncludeSystemMetadata: true, }) require.NoError(t, err) require.Len(t, listResp, 1) // check that BeginObject and ListObjects return the same StreamID. assert.Equal(t, beginResp.StreamID, listResp[0].StreamID) // List with prefix non-recursively listResp2, _, err := client.ListObjects(ctx, metaclient.ListObjectsParams{ Bucket: []byte("testbucket"), Status: int32(metabase.Pending), EncryptedPrefix: []byte("a/b/"), IncludeSystemMetadata: true, }) require.NoError(t, err) require.Len(t, listResp2, 1) // check that the StreamID is still the same. assert.Equal(t, listResp[0].StreamID, listResp2[0].StreamID) // List with prefix recursively listResp3, _, err := client.ListObjects(ctx, metaclient.ListObjectsParams{ Bucket: []byte("testbucket"), Status: int32(metabase.Pending), EncryptedPrefix: []byte("a/b/"), Recursive: true, IncludeSystemMetadata: true, }) require.NoError(t, err) require.Len(t, listResp3, 1) // check that the StreamID is still the same. assert.Equal(t, listResp[0].StreamID, listResp3[0].StreamID) // List the pending object directly listResp4, err := client.ListPendingObjectStreams(ctx, metaclient.ListPendingObjectStreamsParams{ Bucket: []byte("testbucket"), EncryptedPath: []byte("a/b/testobject"), }) require.NoError(t, err) require.Len(t, listResp4.Items, 1) // check that the StreamID is still the same. assert.Equal(t, listResp[0].StreamID, listResp4.Items[0].StreamID) }) } func TestObjectSegmentExpiresAt(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { inlineData := testrand.Bytes(1 * memory.KiB) inlineExpiration := time.Now().Add(2 * time.Hour) err := planet.Uplinks[0].UploadWithExpiration(ctx, planet.Satellites[0], "hohoho", "inline_object", inlineData, inlineExpiration) require.NoError(t, err) remoteData := testrand.Bytes(10 * memory.KiB) remoteExpiration := time.Now().Add(4 * time.Hour) err = planet.Uplinks[0].UploadWithExpiration(ctx, planet.Satellites[0], "hohoho", "remote_object", remoteData, remoteExpiration) require.NoError(t, err) segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) require.NoError(t, err) require.Len(t, segments, 2) for _, segment := range segments { if int(segment.PlainSize) == len(inlineData) { require.Equal(t, inlineExpiration.Unix(), segment.ExpiresAt.Unix()) } else if int(segment.PlainSize) == len(remoteData) { require.Equal(t, remoteExpiration.Unix(), segment.ExpiresAt.Unix()) } else { t.Fail() } } }) } func TestDeleteNonExistentObject(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()] expectedBucketName := "delete-objects-bucket" metainfoClient, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey) require.NoError(t, err) defer ctx.Check(metainfoClient.Close) // non-pending non-existent objects return no error _, err = metainfoClient.BeginDeleteObject(ctx, metaclient.BeginDeleteObjectParams{ Bucket: []byte(expectedBucketName), EncryptedPath: []byte("bad path"), }) require.NoError(t, err) // pending non-existent objects return an RPC error signer := signing.SignerFromFullIdentity(planet.Satellites[0].Identity) streamUUID := testrand.UUID() satStreamID := &internalpb.StreamID{CreationDate: time.Now(), StreamId: streamUUID[:]} signedStreamID, err := metainfo.SignStreamID(ctx, signer, satStreamID) require.NoError(t, err) encodedStreamID, err := pb.Marshal(signedStreamID) require.NoError(t, err) streamID, err := storj.StreamIDFromBytes(encodedStreamID) require.NoError(t, err) _, err = metainfoClient.BeginDeleteObject(ctx, metaclient.BeginDeleteObjectParams{ Bucket: []byte(expectedBucketName), EncryptedPath: []byte("bad path"), Status: int32(metabase.Pending), StreamID: streamID, }) require.True(t, errs2.IsRPC(err, rpcstatus.NotFound)) }) }