2019-03-18 10:55:06 +00:00
|
|
|
// Copyright (C) 2019 Storj Labs, Inc.
|
|
|
|
// See LICENSE for copying information
|
|
|
|
|
|
|
|
package piecestore_test
|
|
|
|
|
|
|
|
import (
|
2020-04-18 06:41:20 +01:00
|
|
|
"bytes"
|
2019-03-18 10:55:06 +00:00
|
|
|
"io"
|
2023-08-15 09:01:22 +01:00
|
|
|
"os"
|
2022-12-14 20:04:41 +00:00
|
|
|
"strconv"
|
2019-04-04 17:56:42 +01:00
|
|
|
"strings"
|
2019-07-03 14:47:55 +01:00
|
|
|
"sync/atomic"
|
2019-03-18 10:55:06 +00:00
|
|
|
"testing"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
|
|
"github.com/stretchr/testify/require"
|
2019-07-03 14:47:55 +01:00
|
|
|
"github.com/zeebo/errs"
|
|
|
|
"go.uber.org/zap"
|
2019-03-18 10:55:06 +00:00
|
|
|
|
2019-12-27 11:48:47 +00:00
|
|
|
"storj.io/common/errs2"
|
|
|
|
"storj.io/common/memory"
|
|
|
|
"storj.io/common/pb"
|
|
|
|
"storj.io/common/pkcrypto"
|
|
|
|
"storj.io/common/rpc/rpcstatus"
|
|
|
|
"storj.io/common/signing"
|
|
|
|
"storj.io/common/storj"
|
|
|
|
"storj.io/common/testcontext"
|
|
|
|
"storj.io/common/testrand"
|
2023-05-19 15:19:54 +01:00
|
|
|
"storj.io/storj/private/date"
|
2019-11-14 19:46:15 +00:00
|
|
|
"storj.io/storj/private/testplanet"
|
2019-07-03 14:47:55 +01:00
|
|
|
"storj.io/storj/storagenode"
|
2019-03-18 10:55:06 +00:00
|
|
|
"storj.io/storj/storagenode/bandwidth"
|
2023-04-05 18:03:06 +01:00
|
|
|
"storj.io/storj/storagenode/blobstore/testblobs"
|
2020-02-21 14:07:29 +00:00
|
|
|
"storj.io/uplink/private/piecestore"
|
2019-03-18 10:55:06 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
func TestUploadAndPartialDownload(t *testing.T) {
|
2019-04-22 10:07:50 +01:00
|
|
|
testplanet.Run(t, testplanet.Config{
|
|
|
|
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
|
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
2019-06-26 11:38:51 +01:00
|
|
|
expectedData := testrand.Bytes(100 * memory.KiB)
|
2019-03-18 10:55:06 +00:00
|
|
|
|
2019-06-26 11:38:51 +01:00
|
|
|
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path", expectedData)
|
2019-04-22 10:07:50 +01:00
|
|
|
assert.NoError(t, err)
|
|
|
|
|
|
|
|
var totalDownload int64
|
|
|
|
for _, tt := range []struct {
|
|
|
|
offset, size int64
|
|
|
|
}{
|
|
|
|
{0, 1510},
|
|
|
|
{1513, 1584},
|
|
|
|
{13581, 4783},
|
|
|
|
} {
|
2020-11-02 12:21:55 +00:00
|
|
|
func() {
|
|
|
|
if piecestore.DefaultConfig.InitialStep < tt.size {
|
|
|
|
t.Fatal("test expects initial step to be larger than size to download")
|
|
|
|
}
|
|
|
|
totalDownload += piecestore.DefaultConfig.InitialStep
|
2019-04-22 10:07:50 +01:00
|
|
|
|
2020-11-02 12:21:55 +00:00
|
|
|
download, cleanup, err := planet.Uplinks[0].DownloadStreamRange(ctx, planet.Satellites[0], "testbucket", "test/path", tt.offset, -1)
|
|
|
|
require.NoError(t, err)
|
|
|
|
defer ctx.Check(cleanup)
|
2019-04-22 10:07:50 +01:00
|
|
|
|
2020-11-02 12:21:55 +00:00
|
|
|
data := make([]byte, tt.size)
|
|
|
|
n, err := io.ReadFull(download, data)
|
|
|
|
require.NoError(t, err)
|
|
|
|
assert.Equal(t, int(tt.size), n)
|
2019-04-22 10:07:50 +01:00
|
|
|
|
2020-11-02 12:21:55 +00:00
|
|
|
assert.Equal(t, expectedData[tt.offset:tt.offset+tt.size], data)
|
2019-04-22 10:07:50 +01:00
|
|
|
|
2020-11-02 12:21:55 +00:00
|
|
|
require.NoError(t, download.Close())
|
|
|
|
}()
|
2019-04-22 10:07:50 +01:00
|
|
|
}
|
2019-03-18 10:55:06 +00:00
|
|
|
|
2023-06-29 18:52:04 +01:00
|
|
|
require.NoError(t, planet.WaitForStorageNodeEndpoints(ctx))
|
|
|
|
|
2019-04-22 10:07:50 +01:00
|
|
|
var totalBandwidthUsage bandwidth.Usage
|
|
|
|
for _, storagenode := range planet.StorageNodes {
|
|
|
|
usage, err := storagenode.DB.Bandwidth().Summary(ctx, time.Now().Add(-10*time.Hour), time.Now().Add(10*time.Hour))
|
|
|
|
require.NoError(t, err)
|
|
|
|
totalBandwidthUsage.Add(usage)
|
|
|
|
}
|
2019-03-18 10:55:06 +00:00
|
|
|
|
2020-02-10 12:18:18 +00:00
|
|
|
err = planet.Uplinks[0].DeleteObject(ctx, planet.Satellites[0], "testbucket", "test/path")
|
2019-03-18 10:55:06 +00:00
|
|
|
require.NoError(t, err)
|
2019-04-22 10:07:50 +01:00
|
|
|
_, err = planet.Uplinks[0].Download(ctx, planet.Satellites[0], "testbucket", "test/path")
|
|
|
|
require.Error(t, err)
|
|
|
|
|
|
|
|
// check rough limits for the upload and download
|
|
|
|
totalUpload := int64(len(expectedData))
|
2023-03-06 16:20:44 +00:00
|
|
|
totalUploadOrderMax := calcUploadOrderMax(totalUpload)
|
|
|
|
t.Log(totalUpload, totalBandwidthUsage.Put, totalUploadOrderMax, int64(len(planet.StorageNodes))*totalUploadOrderMax)
|
|
|
|
assert.True(t, totalUpload < totalBandwidthUsage.Put && totalBandwidthUsage.Put < int64(len(planet.StorageNodes))*totalUploadOrderMax)
|
2019-04-22 10:07:50 +01:00
|
|
|
t.Log(totalDownload, totalBandwidthUsage.Get, int64(len(planet.StorageNodes))*totalDownload)
|
|
|
|
assert.True(t, totalBandwidthUsage.Get < int64(len(planet.StorageNodes))*totalDownload)
|
|
|
|
})
|
2019-03-18 10:55:06 +00:00
|
|
|
}
|
2019-03-20 21:12:00 +00:00
|
|
|
|
2023-03-06 16:20:44 +00:00
|
|
|
func calcUploadOrderMax(size int64) (ordered int64) {
|
2023-04-05 14:24:39 +01:00
|
|
|
initialStep := piecestore.DefaultConfig.InitialStep * memory.KiB.Int64()
|
|
|
|
maxStep := piecestore.DefaultConfig.MaximumStep * memory.KiB.Int64()
|
2023-03-06 16:20:44 +00:00
|
|
|
currentStep := initialStep
|
|
|
|
ordered = 0
|
|
|
|
for ordered < size {
|
|
|
|
ordered += currentStep
|
|
|
|
currentStep = currentStep * 3 / 2
|
|
|
|
if currentStep > maxStep {
|
|
|
|
currentStep = maxStep
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return ordered
|
|
|
|
}
|
|
|
|
|
2019-03-20 21:12:00 +00:00
|
|
|
func TestUpload(t *testing.T) {
|
2019-12-06 18:03:22 +00:00
|
|
|
testplanet.Run(t, testplanet.Config{
|
|
|
|
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
|
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
|
|
client, err := planet.Uplinks[0].DialPiecestore(ctx, planet.StorageNodes[0])
|
|
|
|
require.NoError(t, err)
|
|
|
|
defer ctx.Check(client.Close)
|
2019-03-20 21:12:00 +00:00
|
|
|
|
2023-05-19 15:19:54 +01:00
|
|
|
var expectedIngressAmount int64
|
|
|
|
|
2019-12-06 18:03:22 +00:00
|
|
|
for _, tt := range []struct {
|
|
|
|
pieceID storj.PieceID
|
|
|
|
contentLength memory.Size
|
|
|
|
action pb.PieceAction
|
|
|
|
err string
|
2022-08-01 10:30:33 +01:00
|
|
|
hashAlgo pb.PieceHashAlgorithm
|
2019-12-06 18:03:22 +00:00
|
|
|
}{
|
|
|
|
{ // should successfully store data
|
|
|
|
pieceID: storj.PieceID{1},
|
|
|
|
contentLength: 50 * memory.KiB,
|
|
|
|
action: pb.PieceAction_PUT,
|
|
|
|
err: "",
|
|
|
|
},
|
|
|
|
{ // should err with piece ID not specified
|
|
|
|
pieceID: storj.PieceID{},
|
|
|
|
contentLength: 1 * memory.KiB,
|
|
|
|
action: pb.PieceAction_PUT,
|
|
|
|
err: "missing piece id",
|
|
|
|
},
|
|
|
|
{ // should err because invalid action
|
|
|
|
pieceID: storj.PieceID{1},
|
|
|
|
contentLength: 1 * memory.KiB,
|
|
|
|
action: pb.PieceAction_GET,
|
|
|
|
err: "expected put or put repair action got GET",
|
|
|
|
},
|
2022-08-01 10:30:33 +01:00
|
|
|
{ // different piece hash
|
|
|
|
pieceID: storj.PieceID{2},
|
|
|
|
contentLength: 1 * memory.KiB,
|
|
|
|
action: pb.PieceAction_PUT,
|
|
|
|
hashAlgo: pb.PieceHashAlgorithm_BLAKE3,
|
|
|
|
},
|
2019-12-06 18:03:22 +00:00
|
|
|
} {
|
2022-08-01 10:30:33 +01:00
|
|
|
client.UploadHashAlgo = tt.hashAlgo
|
2019-12-06 18:03:22 +00:00
|
|
|
data := testrand.Bytes(tt.contentLength)
|
|
|
|
serialNumber := testrand.SerialNumber()
|
|
|
|
|
|
|
|
orderLimit, piecePrivateKey := GenerateOrderLimit(
|
|
|
|
t,
|
|
|
|
planet.Satellites[0].ID(),
|
|
|
|
planet.StorageNodes[0].ID(),
|
|
|
|
tt.pieceID,
|
|
|
|
tt.action,
|
|
|
|
serialNumber,
|
|
|
|
24*time.Hour,
|
|
|
|
24*time.Hour,
|
|
|
|
int64(len(data)),
|
|
|
|
)
|
|
|
|
signer := signing.SignerFromFullIdentity(planet.Satellites[0].Identity)
|
|
|
|
orderLimit, err = signing.SignOrderLimit(ctx, signer, orderLimit)
|
|
|
|
require.NoError(t, err)
|
2019-03-20 21:12:00 +00:00
|
|
|
|
2020-04-18 06:41:20 +01:00
|
|
|
pieceHash, err := client.UploadReader(ctx, orderLimit, piecePrivateKey, bytes.NewReader(data))
|
2019-12-06 18:03:22 +00:00
|
|
|
if tt.err != "" {
|
|
|
|
require.Error(t, err)
|
|
|
|
require.Contains(t, err.Error(), tt.err)
|
|
|
|
} else {
|
|
|
|
require.NoError(t, err)
|
2019-03-20 21:12:00 +00:00
|
|
|
|
2022-08-01 10:30:33 +01:00
|
|
|
hasher := pb.NewHashFromAlgorithm(tt.hashAlgo)
|
|
|
|
_, err = hasher.Write(data)
|
|
|
|
require.NoError(t, err)
|
|
|
|
expectedHash := hasher.Sum([]byte{})
|
|
|
|
|
2019-12-06 18:03:22 +00:00
|
|
|
assert.Equal(t, expectedHash, pieceHash.Hash)
|
|
|
|
|
|
|
|
signee := signing.SignerFromFullIdentity(planet.StorageNodes[0].Identity)
|
|
|
|
require.NoError(t, signing.VerifyPieceHashSignature(ctx, signee, pieceHash))
|
2023-05-19 15:19:54 +01:00
|
|
|
|
|
|
|
expectedIngressAmount += int64(len(data)) // assuming all data is uploaded
|
2019-12-06 18:03:22 +00:00
|
|
|
}
|
|
|
|
}
|
2023-05-19 15:19:54 +01:00
|
|
|
|
2023-06-29 18:52:04 +01:00
|
|
|
require.NoError(t, planet.WaitForStorageNodeEndpoints(ctx))
|
|
|
|
|
2023-05-19 15:19:54 +01:00
|
|
|
from, to := date.MonthBoundary(time.Now().UTC())
|
|
|
|
summary, err := planet.StorageNodes[0].DB.Bandwidth().SatelliteIngressSummary(ctx, planet.Satellites[0].ID(), from, to)
|
|
|
|
require.NoError(t, err)
|
|
|
|
require.Equal(t, expectedIngressAmount, summary.Put)
|
2019-12-06 18:03:22 +00:00
|
|
|
})
|
|
|
|
}
|
2019-03-20 21:12:00 +00:00
|
|
|
|
2021-07-01 07:59:04 +01:00
|
|
|
// TestSlowUpload tries to mock a SlowLoris attack.
|
|
|
|
func TestSlowUpload(t *testing.T) {
|
2023-02-07 23:15:33 +00:00
|
|
|
t.Skip()
|
2021-07-01 07:59:04 +01:00
|
|
|
testplanet.Run(t, testplanet.Config{
|
|
|
|
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
|
|
|
|
Reconfigure: testplanet.Reconfigure{
|
|
|
|
|
|
|
|
StorageNode: func(index int, config *storagenode.Config) {
|
|
|
|
// Set MinUploadSpeed to extremely high to indicates that
|
|
|
|
// client upload rate is slow (relative to node's standards)
|
|
|
|
config.Storage2.MinUploadSpeed = 10000000 * memory.MB
|
|
|
|
|
|
|
|
// Storage Node waits only few microsecond before starting the measurement
|
|
|
|
// of upload rate to flag unsually slow connection
|
|
|
|
config.Storage2.MinUploadSpeedGraceDuration = 500 * time.Microsecond
|
|
|
|
|
|
|
|
config.Storage2.MinUploadSpeedCongestionThreshold = 0.8
|
|
|
|
},
|
|
|
|
},
|
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
|
|
client, err := planet.Uplinks[0].DialPiecestore(ctx, planet.StorageNodes[0])
|
|
|
|
require.NoError(t, err)
|
|
|
|
defer ctx.Check(client.Close)
|
|
|
|
|
|
|
|
for _, tt := range []struct {
|
|
|
|
pieceID storj.PieceID
|
|
|
|
contentLength memory.Size
|
|
|
|
action pb.PieceAction
|
|
|
|
err string
|
|
|
|
}{
|
|
|
|
{ // connection should be aborted
|
|
|
|
pieceID: storj.PieceID{1},
|
|
|
|
// As the server node only starts flagging unusually slow connection
|
|
|
|
// after 500 micro seconds, the file should be big enough to ensure the connection is still open.
|
|
|
|
contentLength: 50 * memory.MB,
|
|
|
|
action: pb.PieceAction_PUT,
|
|
|
|
err: "speed too low",
|
|
|
|
},
|
|
|
|
} {
|
|
|
|
data := testrand.Bytes(tt.contentLength)
|
|
|
|
|
|
|
|
serialNumber := testrand.SerialNumber()
|
|
|
|
|
|
|
|
orderLimit, piecePrivateKey := GenerateOrderLimit(
|
|
|
|
t,
|
|
|
|
planet.Satellites[0].ID(),
|
|
|
|
planet.StorageNodes[0].ID(),
|
|
|
|
tt.pieceID,
|
|
|
|
tt.action,
|
|
|
|
serialNumber,
|
|
|
|
24*time.Hour,
|
|
|
|
24*time.Hour,
|
|
|
|
int64(len(data)),
|
|
|
|
)
|
|
|
|
signer := signing.SignerFromFullIdentity(planet.Satellites[0].Identity)
|
|
|
|
orderLimit, err = signing.SignOrderLimit(ctx, signer, orderLimit)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
pieceHash, err := client.UploadReader(ctx, orderLimit, piecePrivateKey, bytes.NewReader(data))
|
|
|
|
|
|
|
|
if tt.err != "" {
|
|
|
|
require.Error(t, err)
|
|
|
|
require.Contains(t, err.Error(), tt.err)
|
|
|
|
} else {
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
expectedHash := pkcrypto.SHA256Hash(data)
|
|
|
|
assert.Equal(t, expectedHash, pieceHash.Hash)
|
|
|
|
|
|
|
|
signee := signing.SignerFromFullIdentity(planet.StorageNodes[0].Identity)
|
|
|
|
require.NoError(t, signing.VerifyPieceHashSignature(ctx, signee, pieceHash))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
2023-02-07 23:15:33 +00:00
|
|
|
|
2020-05-22 19:12:26 +01:00
|
|
|
func TestUploadOverAvailable(t *testing.T) {
|
|
|
|
testplanet.Run(t, testplanet.Config{
|
|
|
|
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
|
|
|
|
Reconfigure: testplanet.Reconfigure{
|
|
|
|
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
|
|
|
|
return testblobs.NewLimitedSpaceDB(log.Named("overload"), db, 3000000), nil
|
|
|
|
},
|
|
|
|
StorageNode: func(index int, config *storagenode.Config) {
|
|
|
|
config.Storage2.Monitor.MinimumDiskSpace = 3 * memory.MB
|
|
|
|
},
|
|
|
|
},
|
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
|
|
client, err := planet.Uplinks[0].DialPiecestore(ctx, planet.StorageNodes[0])
|
|
|
|
require.NoError(t, err)
|
|
|
|
defer ctx.Check(client.Close)
|
|
|
|
|
|
|
|
var tt struct {
|
|
|
|
pieceID storj.PieceID
|
|
|
|
contentLength memory.Size
|
|
|
|
action pb.PieceAction
|
|
|
|
err string
|
|
|
|
}
|
|
|
|
|
|
|
|
tt.pieceID = storj.PieceID{1}
|
|
|
|
tt.contentLength = 5 * memory.MB
|
|
|
|
tt.action = pb.PieceAction_PUT
|
|
|
|
tt.err = "not enough available disk space, have: 3000000, need: 5000000"
|
|
|
|
|
|
|
|
data := testrand.Bytes(tt.contentLength)
|
|
|
|
serialNumber := testrand.SerialNumber()
|
|
|
|
|
|
|
|
orderLimit, piecePrivateKey := GenerateOrderLimit(
|
|
|
|
t,
|
|
|
|
planet.Satellites[0].ID(),
|
|
|
|
planet.StorageNodes[0].ID(),
|
|
|
|
tt.pieceID,
|
|
|
|
tt.action,
|
|
|
|
serialNumber,
|
|
|
|
24*time.Hour,
|
|
|
|
24*time.Hour,
|
|
|
|
int64(len(data)),
|
|
|
|
)
|
|
|
|
signer := signing.SignerFromFullIdentity(planet.Satellites[0].Identity)
|
|
|
|
orderLimit, err = signing.SignOrderLimit(ctx, signer, orderLimit)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-09-01 19:40:54 +01:00
|
|
|
pieceHash, err := client.UploadReader(ctx, orderLimit, piecePrivateKey, bytes.NewReader(data))
|
2020-05-22 19:12:26 +01:00
|
|
|
if tt.err != "" {
|
|
|
|
require.Error(t, err)
|
|
|
|
require.Contains(t, err.Error(), tt.err)
|
|
|
|
} else {
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
expectedHash := pkcrypto.SHA256Hash(data)
|
|
|
|
assert.Equal(t, expectedHash, pieceHash.Hash)
|
|
|
|
|
|
|
|
signee := signing.SignerFromFullIdentity(planet.StorageNodes[0].Identity)
|
|
|
|
require.NoError(t, signing.VerifyPieceHashSignature(ctx, signee, pieceHash))
|
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2019-12-06 18:03:22 +00:00
|
|
|
func TestDownload(t *testing.T) {
|
|
|
|
testplanet.Run(t, testplanet.Config{
|
|
|
|
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
|
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
|
|
pieceID := storj.PieceID{1}
|
2023-08-15 09:01:22 +01:00
|
|
|
data, _, _ := uploadPiece(t, ctx, pieceID, planet.StorageNodes[0], planet.Uplinks[0], planet.Satellites[0])
|
|
|
|
|
|
|
|
// upload another piece that we will trash
|
|
|
|
trashPieceID := storj.PieceID{3}
|
|
|
|
trashPieceData, _, _ := uploadPiece(t, ctx, trashPieceID, planet.StorageNodes[0], planet.Uplinks[0], planet.Satellites[0])
|
|
|
|
err := planet.StorageNodes[0].Storage2.Store.Trash(ctx, planet.Satellites[0].ID(), trashPieceID)
|
|
|
|
require.NoError(t, err)
|
|
|
|
_, err = planet.StorageNodes[0].Storage2.Store.Stat(ctx, planet.Satellites[0].ID(), trashPieceID)
|
|
|
|
require.Equal(t, true, errs.Is(err, os.ErrNotExist))
|
|
|
|
|
2019-12-06 18:03:22 +00:00
|
|
|
client, err := planet.Uplinks[0].DialPiecestore(ctx, planet.StorageNodes[0])
|
2019-03-20 21:12:00 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2019-12-06 18:03:22 +00:00
|
|
|
for _, tt := range []struct {
|
2023-08-15 09:01:22 +01:00
|
|
|
name string
|
2019-12-06 18:03:22 +00:00
|
|
|
pieceID storj.PieceID
|
|
|
|
action pb.PieceAction
|
2023-08-15 09:01:22 +01:00
|
|
|
// downloadData is data we are trying to download
|
|
|
|
downloadData []byte
|
|
|
|
errs []string
|
|
|
|
finalChecks func()
|
2019-12-06 18:03:22 +00:00
|
|
|
}{
|
|
|
|
{ // should successfully download data
|
2023-08-15 09:01:22 +01:00
|
|
|
name: "download successful",
|
|
|
|
pieceID: pieceID,
|
|
|
|
action: pb.PieceAction_GET,
|
|
|
|
downloadData: data,
|
|
|
|
},
|
|
|
|
{ // should restore from trash and successfully download data
|
|
|
|
name: "restore trash and successfully download",
|
|
|
|
pieceID: trashPieceID,
|
|
|
|
action: pb.PieceAction_GET,
|
|
|
|
downloadData: trashPieceData,
|
|
|
|
finalChecks: func() {
|
|
|
|
blobInfo, err := planet.StorageNodes[0].Storage2.Store.Stat(ctx, planet.Satellites[0].ID(), trashPieceID)
|
|
|
|
require.NoError(t, err)
|
|
|
|
require.Equal(t, trashPieceID.Bytes(), blobInfo.BlobRef().Key)
|
|
|
|
},
|
2019-12-06 18:03:22 +00:00
|
|
|
},
|
|
|
|
{ // should err with piece ID not specified
|
2023-08-15 09:01:22 +01:00
|
|
|
name: "piece id not specified",
|
|
|
|
pieceID: storj.PieceID{},
|
|
|
|
action: pb.PieceAction_GET,
|
|
|
|
downloadData: data,
|
|
|
|
errs: []string{"missing piece id"},
|
2019-12-06 18:03:22 +00:00
|
|
|
},
|
|
|
|
{ // should err with piece ID not specified
|
2023-08-15 09:01:22 +01:00
|
|
|
name: "file does not exist",
|
|
|
|
pieceID: storj.PieceID{2},
|
|
|
|
action: pb.PieceAction_GET,
|
|
|
|
downloadData: data,
|
|
|
|
errs: []string{"file does not exist", "The system cannot find the path specified"},
|
2019-12-06 18:03:22 +00:00
|
|
|
},
|
|
|
|
{ // should err with invalid action
|
2023-08-15 09:01:22 +01:00
|
|
|
name: "invalid action",
|
|
|
|
pieceID: pieceID,
|
|
|
|
downloadData: data,
|
|
|
|
action: pb.PieceAction_PUT,
|
|
|
|
errs: []string{"expected get or get repair or audit action got PUT"},
|
2019-12-06 18:03:22 +00:00
|
|
|
},
|
|
|
|
} {
|
2021-04-22 17:02:48 +01:00
|
|
|
tt := tt
|
2023-08-15 09:01:22 +01:00
|
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
|
|
serialNumber := testrand.SerialNumber()
|
|
|
|
|
|
|
|
orderLimit, piecePrivateKey := GenerateOrderLimit(
|
|
|
|
t,
|
|
|
|
planet.Satellites[0].ID(),
|
|
|
|
planet.StorageNodes[0].ID(),
|
|
|
|
tt.pieceID,
|
|
|
|
tt.action,
|
|
|
|
serialNumber,
|
|
|
|
24*time.Hour,
|
|
|
|
24*time.Hour,
|
|
|
|
int64(len(tt.downloadData)),
|
|
|
|
)
|
|
|
|
signer := signing.SignerFromFullIdentity(planet.Satellites[0].Identity)
|
|
|
|
orderLimit, err = signing.SignOrderLimit(ctx, signer, orderLimit)
|
|
|
|
require.NoError(t, err)
|
2019-03-20 21:12:00 +00:00
|
|
|
|
2023-08-15 09:01:22 +01:00
|
|
|
downloader, err := client.Download(ctx, orderLimit, piecePrivateKey, 0, int64(len(tt.downloadData)))
|
|
|
|
require.NoError(t, err)
|
2019-03-20 21:12:00 +00:00
|
|
|
|
2023-08-15 09:01:22 +01:00
|
|
|
buffer := make([]byte, len(data))
|
|
|
|
n, readErr := downloader.Read(buffer)
|
2019-03-20 21:12:00 +00:00
|
|
|
|
2023-08-15 09:01:22 +01:00
|
|
|
if len(tt.errs) > 0 {
|
|
|
|
} else {
|
|
|
|
require.NoError(t, readErr)
|
|
|
|
require.Equal(t, tt.downloadData, buffer[:n])
|
|
|
|
}
|
2019-03-20 21:12:00 +00:00
|
|
|
|
2023-08-15 09:01:22 +01:00
|
|
|
closeErr := downloader.Close()
|
|
|
|
err = errs.Combine(readErr, closeErr)
|
|
|
|
|
|
|
|
switch len(tt.errs) {
|
|
|
|
case 0:
|
|
|
|
require.NoError(t, err)
|
|
|
|
case 1:
|
|
|
|
require.Error(t, err)
|
|
|
|
require.Contains(t, err.Error(), tt.errs[0])
|
|
|
|
case 2:
|
|
|
|
require.Error(t, err)
|
|
|
|
require.Conditionf(t, func() bool {
|
|
|
|
return strings.Contains(err.Error(), tt.errs[0]) ||
|
|
|
|
strings.Contains(err.Error(), tt.errs[1])
|
|
|
|
}, "expected error to contain %q or %q, but it does not: %v", tt.errs[0], tt.errs[1], err)
|
|
|
|
default:
|
|
|
|
require.FailNow(t, "unexpected number of error cases")
|
|
|
|
}
|
2021-04-22 17:02:48 +01:00
|
|
|
|
2023-08-15 09:01:22 +01:00
|
|
|
// these should only be not-nil if action = pb.PieceAction_GET_REPAIR
|
|
|
|
hash, originalLimit := downloader.GetHashAndLimit()
|
|
|
|
require.Nil(t, hash)
|
|
|
|
require.Nil(t, originalLimit)
|
2019-03-20 21:12:00 +00:00
|
|
|
|
2023-08-15 09:01:22 +01:00
|
|
|
if tt.finalChecks != nil {
|
|
|
|
tt.finalChecks()
|
|
|
|
}
|
|
|
|
})
|
2019-12-06 18:03:22 +00:00
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
2019-03-20 21:12:00 +00:00
|
|
|
|
2019-12-06 18:03:22 +00:00
|
|
|
func TestDownloadGetRepair(t *testing.T) {
|
|
|
|
testplanet.Run(t, testplanet.Config{
|
|
|
|
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
|
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
|
|
|
|
|
|
pieceID := storj.PieceID{1}
|
|
|
|
expectedData, ulOrderLimit, originHash := uploadPiece(
|
|
|
|
t, ctx, pieceID, planet.StorageNodes[0], planet.Uplinks[0], planet.Satellites[0],
|
|
|
|
)
|
|
|
|
client, err := planet.Uplinks[0].DialPiecestore(ctx, planet.StorageNodes[0])
|
|
|
|
require.NoError(t, err)
|
2019-03-20 21:12:00 +00:00
|
|
|
|
2019-06-26 11:38:51 +01:00
|
|
|
serialNumber := testrand.SerialNumber()
|
2019-03-20 21:12:00 +00:00
|
|
|
|
2019-12-06 18:03:22 +00:00
|
|
|
dlOrderLimit, piecePrivateKey := GenerateOrderLimit(
|
2019-03-20 21:12:00 +00:00
|
|
|
t,
|
|
|
|
planet.Satellites[0].ID(),
|
|
|
|
planet.StorageNodes[0].ID(),
|
2019-12-06 18:03:22 +00:00
|
|
|
storj.PieceID{1},
|
|
|
|
pb.PieceAction_GET_REPAIR,
|
2019-03-20 21:12:00 +00:00
|
|
|
serialNumber,
|
|
|
|
24*time.Hour,
|
|
|
|
24*time.Hour,
|
|
|
|
int64(len(expectedData)),
|
|
|
|
)
|
|
|
|
signer := signing.SignerFromFullIdentity(planet.Satellites[0].Identity)
|
2019-12-06 18:03:22 +00:00
|
|
|
dlOrderLimit, err = signing.SignOrderLimit(ctx, signer, dlOrderLimit)
|
2019-03-20 21:12:00 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2019-12-06 18:03:22 +00:00
|
|
|
downloader, err := client.Download(ctx, dlOrderLimit, piecePrivateKey, 0, int64(len(expectedData)))
|
2019-03-20 21:12:00 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
buffer := make([]byte, len(expectedData))
|
|
|
|
n, err := downloader.Read(buffer)
|
|
|
|
|
2019-12-06 18:03:22 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
require.Equal(t, expectedData, buffer[:n])
|
2019-03-20 21:12:00 +00:00
|
|
|
|
|
|
|
err = downloader.Close()
|
2019-12-06 18:03:22 +00:00
|
|
|
require.NoError(t, err)
|
2019-08-26 19:57:41 +01:00
|
|
|
|
2019-12-06 18:03:22 +00:00
|
|
|
hash, originLimit := downloader.GetHashAndLimit()
|
|
|
|
require.NotNil(t, hash)
|
|
|
|
require.Equal(t, originHash.Hash, hash.Hash)
|
|
|
|
require.Equal(t, originHash.PieceId, hash.PieceId)
|
|
|
|
|
|
|
|
require.NotNil(t, originLimit)
|
|
|
|
require.Equal(t, originLimit.Action, ulOrderLimit.Action)
|
|
|
|
require.Equal(t, originLimit.Limit, ulOrderLimit.Limit)
|
|
|
|
require.Equal(t, originLimit.PieceId, ulOrderLimit.PieceId)
|
|
|
|
require.Equal(t, originLimit.SatelliteId, ulOrderLimit.SatelliteId)
|
|
|
|
require.Equal(t, originLimit.SerialNumber, ulOrderLimit.SerialNumber)
|
|
|
|
require.Equal(t, originLimit.SatelliteSignature, ulOrderLimit.SatelliteSignature)
|
|
|
|
require.Equal(t, originLimit.UplinkPublicKey, ulOrderLimit.UplinkPublicKey)
|
|
|
|
})
|
2019-08-26 19:57:41 +01:00
|
|
|
}
|
|
|
|
|
2019-03-20 21:12:00 +00:00
|
|
|
func TestDelete(t *testing.T) {
|
2019-12-06 18:03:22 +00:00
|
|
|
testplanet.Run(t, testplanet.Config{
|
|
|
|
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
|
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
|
|
pieceID := storj.PieceID{1}
|
|
|
|
uploadPiece(t, ctx, pieceID, planet.StorageNodes[0], planet.Uplinks[0], planet.Satellites[0])
|
2020-05-04 08:14:55 +01:00
|
|
|
|
|
|
|
nodeurl := planet.StorageNodes[0].NodeURL()
|
2020-05-19 16:49:13 +01:00
|
|
|
conn, err := planet.Uplinks[0].Dialer.DialNodeURL(ctx, nodeurl)
|
2019-03-20 21:12:00 +00:00
|
|
|
require.NoError(t, err)
|
2020-05-04 08:14:55 +01:00
|
|
|
defer ctx.Check(conn.Close)
|
|
|
|
|
|
|
|
client := pb.NewDRPCPiecestoreClient(conn)
|
2019-03-20 21:12:00 +00:00
|
|
|
|
2019-12-06 18:03:22 +00:00
|
|
|
for _, tt := range []struct {
|
|
|
|
pieceID storj.PieceID
|
|
|
|
action pb.PieceAction
|
|
|
|
err string
|
|
|
|
}{
|
|
|
|
{ // should successfully delete data
|
|
|
|
pieceID: pieceID,
|
|
|
|
action: pb.PieceAction_DELETE,
|
|
|
|
err: "",
|
|
|
|
},
|
|
|
|
{ // should err with piece ID not found
|
|
|
|
pieceID: storj.PieceID{99},
|
|
|
|
action: pb.PieceAction_DELETE,
|
|
|
|
err: "", // TODO should this return error
|
|
|
|
},
|
|
|
|
{ // should err with piece ID not specified
|
|
|
|
pieceID: storj.PieceID{},
|
|
|
|
action: pb.PieceAction_DELETE,
|
|
|
|
err: "missing piece id",
|
|
|
|
},
|
|
|
|
{ // should err due to incorrect action
|
|
|
|
pieceID: pieceID,
|
|
|
|
action: pb.PieceAction_GET,
|
|
|
|
err: "expected delete action got GET",
|
|
|
|
},
|
|
|
|
} {
|
|
|
|
serialNumber := testrand.SerialNumber()
|
|
|
|
|
2020-05-04 08:14:55 +01:00
|
|
|
orderLimit, _ := GenerateOrderLimit(
|
2019-12-06 18:03:22 +00:00
|
|
|
t,
|
|
|
|
planet.Satellites[0].ID(),
|
|
|
|
planet.StorageNodes[0].ID(),
|
|
|
|
tt.pieceID,
|
|
|
|
tt.action,
|
|
|
|
serialNumber,
|
|
|
|
24*time.Hour,
|
|
|
|
24*time.Hour,
|
|
|
|
100,
|
|
|
|
)
|
|
|
|
signer := signing.SignerFromFullIdentity(planet.Satellites[0].Identity)
|
|
|
|
orderLimit, err = signing.SignOrderLimit(ctx, signer, orderLimit)
|
2019-03-20 21:12:00 +00:00
|
|
|
require.NoError(t, err)
|
2019-12-06 18:03:22 +00:00
|
|
|
|
2020-05-04 08:14:55 +01:00
|
|
|
_, err := client.Delete(ctx, &pb.PieceDeleteRequest{
|
|
|
|
Limit: orderLimit,
|
|
|
|
})
|
2019-12-06 18:03:22 +00:00
|
|
|
if tt.err != "" {
|
|
|
|
require.Error(t, err)
|
|
|
|
require.Contains(t, err.Error(), tt.err)
|
|
|
|
} else {
|
|
|
|
require.NoError(t, err)
|
|
|
|
}
|
2019-03-20 21:12:00 +00:00
|
|
|
}
|
2019-12-06 18:03:22 +00:00
|
|
|
})
|
2019-03-20 21:12:00 +00:00
|
|
|
}
|
|
|
|
|
2019-12-18 15:33:12 +00:00
|
|
|
func TestDeletePieces(t *testing.T) {
|
|
|
|
testplanet.Run(t, testplanet.Config{
|
|
|
|
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
|
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
2020-04-29 16:22:43 +01:00
|
|
|
satellite := planet.Satellites[0]
|
|
|
|
storagenode := planet.StorageNodes[0]
|
2019-12-18 15:33:12 +00:00
|
|
|
|
2020-04-29 16:22:43 +01:00
|
|
|
nodeurl := storagenode.NodeURL()
|
2020-05-19 16:49:13 +01:00
|
|
|
conn, err := planet.Satellites[0].Dialer.DialNodeURL(ctx, nodeurl)
|
2020-04-29 16:22:43 +01:00
|
|
|
require.NoError(t, err)
|
|
|
|
defer ctx.Check(conn.Close)
|
2019-12-18 15:33:12 +00:00
|
|
|
|
2020-04-29 16:22:43 +01:00
|
|
|
client := pb.NewDRPCPiecestoreClient(conn)
|
2019-12-18 15:33:12 +00:00
|
|
|
|
2020-04-29 16:22:43 +01:00
|
|
|
t.Run("ok", func(t *testing.T) {
|
2020-04-20 21:29:18 +01:00
|
|
|
pieceIDs := []storj.PieceID{testrand.PieceID(), testrand.PieceID(), testrand.PieceID(), testrand.PieceID()}
|
2019-12-18 15:33:12 +00:00
|
|
|
dataArray := make([][]byte, len(pieceIDs))
|
|
|
|
for i, pieceID := range pieceIDs {
|
2020-04-29 16:22:43 +01:00
|
|
|
dataArray[i], _, _ = uploadPiece(t, ctx, pieceID, storagenode, planet.Uplinks[0], satellite)
|
2019-12-18 15:33:12 +00:00
|
|
|
}
|
|
|
|
|
2020-04-29 16:22:43 +01:00
|
|
|
_, err := client.DeletePieces(ctx.Context, &pb.DeletePiecesRequest{
|
|
|
|
PieceIds: pieceIDs,
|
|
|
|
})
|
2019-12-18 15:33:12 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-04-29 19:38:05 +01:00
|
|
|
planet.WaitForStorageNodeDeleters(ctx)
|
|
|
|
|
2019-12-18 15:33:12 +00:00
|
|
|
for i, pieceID := range pieceIDs {
|
2020-04-29 16:22:43 +01:00
|
|
|
_, err = downloadPiece(t, ctx, pieceID, int64(len(dataArray[i])), storagenode, planet.Uplinks[0], satellite)
|
2019-12-18 15:33:12 +00:00
|
|
|
require.Error(t, err)
|
|
|
|
}
|
|
|
|
require.Condition(t, func() bool {
|
|
|
|
return strings.Contains(err.Error(), "file does not exist") ||
|
|
|
|
strings.Contains(err.Error(), "The system cannot find the path specified")
|
|
|
|
}, "unexpected error message")
|
|
|
|
})
|
|
|
|
|
2020-04-29 16:22:43 +01:00
|
|
|
t.Run("ok: one piece to delete is missing", func(t *testing.T) {
|
2020-04-20 21:29:18 +01:00
|
|
|
missingPieceID := testrand.PieceID()
|
|
|
|
pieceIDs := []storj.PieceID{testrand.PieceID(), testrand.PieceID(), testrand.PieceID(), testrand.PieceID()}
|
2019-12-18 15:33:12 +00:00
|
|
|
dataArray := make([][]byte, len(pieceIDs))
|
|
|
|
for i, pieceID := range pieceIDs {
|
2020-04-29 16:22:43 +01:00
|
|
|
dataArray[i], _, _ = uploadPiece(t, ctx, pieceID, storagenode, planet.Uplinks[0], satellite)
|
2019-12-18 15:33:12 +00:00
|
|
|
}
|
|
|
|
|
2020-04-29 16:22:43 +01:00
|
|
|
_, err := client.DeletePieces(ctx.Context, &pb.DeletePiecesRequest{
|
|
|
|
PieceIds: append(pieceIDs, missingPieceID),
|
|
|
|
})
|
2019-12-18 15:33:12 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-04-29 19:38:05 +01:00
|
|
|
planet.WaitForStorageNodeDeleters(ctx)
|
|
|
|
|
2019-12-18 15:33:12 +00:00
|
|
|
for i, pieceID := range pieceIDs {
|
2020-04-29 16:22:43 +01:00
|
|
|
_, err = downloadPiece(t, ctx, pieceID, int64(len(dataArray[i])), storagenode, planet.Uplinks[0], satellite)
|
2019-12-18 15:33:12 +00:00
|
|
|
require.Error(t, err)
|
|
|
|
}
|
|
|
|
require.Condition(t, func() bool {
|
|
|
|
return strings.Contains(err.Error(), "file does not exist") ||
|
|
|
|
strings.Contains(err.Error(), "The system cannot find the path specified")
|
|
|
|
}, "unexpected error message")
|
|
|
|
})
|
|
|
|
|
2020-04-29 16:22:43 +01:00
|
|
|
t.Run("ok: no piece deleted", func(t *testing.T) {
|
2020-04-20 21:29:18 +01:00
|
|
|
pieceID := testrand.PieceID()
|
2020-04-29 16:22:43 +01:00
|
|
|
data, _, _ := uploadPiece(t, ctx, pieceID, storagenode, planet.Uplinks[0], satellite)
|
2019-12-18 15:33:12 +00:00
|
|
|
|
2020-04-29 16:22:43 +01:00
|
|
|
_, err := client.DeletePieces(ctx.Context, &pb.DeletePiecesRequest{})
|
2019-12-18 15:33:12 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-04-29 19:38:05 +01:00
|
|
|
planet.WaitForStorageNodeDeleters(ctx)
|
|
|
|
|
2020-04-29 16:22:43 +01:00
|
|
|
downloaded, err := downloadPiece(t, ctx, pieceID, int64(len(data)), storagenode, planet.Uplinks[0], satellite)
|
2019-12-18 15:33:12 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
require.Equal(t, data, downloaded)
|
|
|
|
})
|
|
|
|
|
|
|
|
t.Run("error: permission denied", func(t *testing.T) {
|
2020-05-19 16:49:13 +01:00
|
|
|
conn, err := planet.Uplinks[0].Dialer.DialNodeURL(ctx, nodeurl)
|
2019-12-18 15:33:12 +00:00
|
|
|
require.NoError(t, err)
|
2020-04-29 16:22:43 +01:00
|
|
|
defer ctx.Check(conn.Close)
|
|
|
|
client := pb.NewDRPCPiecestoreClient(conn)
|
|
|
|
|
|
|
|
pieceID := testrand.PieceID()
|
|
|
|
data, _, _ := uploadPiece(t, ctx, pieceID, storagenode, planet.Uplinks[0], satellite)
|
2019-12-18 15:33:12 +00:00
|
|
|
|
2020-04-29 16:22:43 +01:00
|
|
|
_, err = client.DeletePieces(ctx.Context, &pb.DeletePiecesRequest{
|
|
|
|
PieceIds: []storj.PieceID{pieceID},
|
|
|
|
})
|
2019-12-18 15:33:12 +00:00
|
|
|
require.Error(t, err)
|
|
|
|
require.Equal(t, rpcstatus.PermissionDenied, rpcstatus.Code(err))
|
|
|
|
|
2020-04-29 19:38:05 +01:00
|
|
|
planet.WaitForStorageNodeDeleters(ctx)
|
|
|
|
|
2020-04-29 16:22:43 +01:00
|
|
|
downloaded, err := downloadPiece(t, ctx, pieceID, int64(len(data)), storagenode, planet.Uplinks[0], satellite)
|
2019-12-18 15:33:12 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
require.Equal(t, data, downloaded)
|
|
|
|
})
|
|
|
|
})
|
|
|
|
}
|
2019-11-26 17:47:19 +00:00
|
|
|
|
2019-07-03 14:47:55 +01:00
|
|
|
func TestTooManyRequests(t *testing.T) {
|
|
|
|
const uplinkCount = 6
|
|
|
|
const maxConcurrent = 3
|
|
|
|
const expectedFailures = uplinkCount - maxConcurrent
|
|
|
|
|
2020-01-21 19:06:14 +00:00
|
|
|
testplanet.Run(t, testplanet.Config{
|
2019-07-03 14:47:55 +01:00
|
|
|
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: uplinkCount,
|
|
|
|
Reconfigure: testplanet.Reconfigure{
|
|
|
|
StorageNode: func(index int, config *storagenode.Config) {
|
|
|
|
config.Storage2.MaxConcurrentRequests = maxConcurrent
|
|
|
|
},
|
|
|
|
},
|
2020-01-21 19:06:14 +00:00
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
|
|
failedCount := int64(expectedFailures)
|
|
|
|
|
2021-09-21 13:35:10 +01:00
|
|
|
defer ctx.Wait()
|
2020-01-21 19:06:14 +00:00
|
|
|
|
|
|
|
for i, uplink := range planet.Uplinks {
|
|
|
|
i, uplink := i, uplink
|
2021-09-21 13:35:10 +01:00
|
|
|
ctx.Go(func() (err error) {
|
2020-05-19 16:49:13 +01:00
|
|
|
storageNode := planet.StorageNodes[0]
|
2020-01-21 19:06:14 +00:00
|
|
|
config := piecestore.DefaultConfig
|
|
|
|
|
2021-05-06 14:53:55 +01:00
|
|
|
client, err := piecestore.Dial(ctx, uplink.Dialer, storageNode.NodeURL(), config)
|
2020-01-21 19:06:14 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
defer func() {
|
|
|
|
if cerr := client.Close(); cerr != nil {
|
|
|
|
uplink.Log.Error("close failed", zap.Error(cerr))
|
|
|
|
err = errs.Combine(err, cerr)
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
pieceID := storj.PieceID{byte(i + 1)}
|
|
|
|
serialNumber := testrand.SerialNumber()
|
|
|
|
|
|
|
|
orderLimit, piecePrivateKey := GenerateOrderLimit(
|
|
|
|
t,
|
|
|
|
planet.Satellites[0].ID(),
|
|
|
|
planet.StorageNodes[0].ID(),
|
|
|
|
pieceID,
|
|
|
|
pb.PieceAction_PUT,
|
|
|
|
serialNumber,
|
|
|
|
24*time.Hour,
|
|
|
|
24*time.Hour,
|
|
|
|
int64(10000),
|
|
|
|
)
|
|
|
|
|
|
|
|
satelliteSigner := signing.SignerFromFullIdentity(planet.Satellites[0].Identity)
|
|
|
|
orderLimit, err = signing.SignOrderLimit(ctx, satelliteSigner, orderLimit)
|
|
|
|
if err != nil {
|
2021-09-21 13:35:10 +01:00
|
|
|
return errs.New("signing failed: %w", err)
|
2019-07-03 14:47:55 +01:00
|
|
|
}
|
|
|
|
|
2020-04-18 06:41:20 +01:00
|
|
|
_, err = client.UploadReader(ctx, orderLimit, piecePrivateKey, bytes.NewReader(make([]byte, orderLimit.Limit)))
|
2020-01-21 19:06:14 +00:00
|
|
|
if err != nil {
|
|
|
|
if errs2.IsRPC(err, rpcstatus.Unavailable) {
|
2021-09-21 13:35:10 +01:00
|
|
|
if atomic.AddInt64(&failedCount, -1) < 0 {
|
|
|
|
return errs.New("too many uploads failed: %w", err)
|
2020-01-21 19:06:14 +00:00
|
|
|
}
|
|
|
|
return nil
|
2019-07-03 14:47:55 +01:00
|
|
|
}
|
2020-01-21 19:06:14 +00:00
|
|
|
uplink.Log.Error("upload failed", zap.Stringer("Piece ID", pieceID), zap.Error(err))
|
|
|
|
return err
|
2019-07-03 14:47:55 +01:00
|
|
|
}
|
|
|
|
|
2020-01-21 19:06:14 +00:00
|
|
|
return nil
|
|
|
|
})
|
|
|
|
}
|
|
|
|
})
|
2019-07-03 14:47:55 +01:00
|
|
|
}
|
|
|
|
|
2019-07-11 21:51:40 +01:00
|
|
|
func GenerateOrderLimit(t *testing.T, satellite storj.NodeID, storageNode storj.NodeID, pieceID storj.PieceID, action pb.PieceAction, serialNumber storj.SerialNumber, pieceExpiration, orderExpiration time.Duration, limit int64) (*pb.OrderLimit, storj.PiecePrivateKey) {
|
|
|
|
piecePublicKey, piecePrivateKey, err := storj.NewPieceKey()
|
|
|
|
require.NoError(t, err)
|
2019-03-20 21:12:00 +00:00
|
|
|
|
2019-07-09 22:54:00 +01:00
|
|
|
now := time.Now()
|
2019-07-03 14:47:55 +01:00
|
|
|
return &pb.OrderLimit{
|
2019-03-20 21:12:00 +00:00
|
|
|
SatelliteId: satellite,
|
2019-07-11 21:51:40 +01:00
|
|
|
UplinkPublicKey: piecePublicKey,
|
2019-03-20 21:12:00 +00:00
|
|
|
StorageNodeId: storageNode,
|
|
|
|
PieceId: pieceID,
|
|
|
|
Action: action,
|
|
|
|
SerialNumber: serialNumber,
|
2019-07-02 17:06:12 +01:00
|
|
|
OrderCreation: time.Now(),
|
2019-07-09 22:54:00 +01:00
|
|
|
OrderExpiration: now.Add(orderExpiration),
|
|
|
|
PieceExpiration: now.Add(pieceExpiration),
|
2019-03-20 21:12:00 +00:00
|
|
|
Limit: limit,
|
2019-07-11 21:51:40 +01:00
|
|
|
}, piecePrivateKey
|
2019-03-20 21:12:00 +00:00
|
|
|
}
|
2019-11-26 17:47:19 +00:00
|
|
|
|
|
|
|
// uploadPiece uploads piece to storageNode.
|
|
|
|
func uploadPiece(
|
2020-03-27 14:46:40 +00:00
|
|
|
t *testing.T, ctx *testcontext.Context, piece storj.PieceID, storageNode *testplanet.StorageNode,
|
|
|
|
uplink *testplanet.Uplink, satellite *testplanet.Satellite,
|
2019-11-26 17:47:19 +00:00
|
|
|
) (uploadedData []byte, _ *pb.OrderLimit, _ *pb.PieceHash) {
|
|
|
|
t.Helper()
|
|
|
|
|
|
|
|
client, err := uplink.DialPiecestore(ctx, storageNode)
|
|
|
|
require.NoError(t, err)
|
|
|
|
defer ctx.Check(client.Close)
|
|
|
|
|
|
|
|
serialNumber := testrand.SerialNumber()
|
|
|
|
uploadedData = testrand.Bytes(10 * memory.KiB)
|
|
|
|
|
|
|
|
orderLimit, piecePrivateKey := GenerateOrderLimit(
|
|
|
|
t,
|
|
|
|
satellite.ID(),
|
|
|
|
storageNode.ID(),
|
|
|
|
piece,
|
|
|
|
pb.PieceAction_PUT,
|
|
|
|
serialNumber,
|
|
|
|
24*time.Hour,
|
|
|
|
24*time.Hour,
|
|
|
|
int64(len(uploadedData)),
|
|
|
|
)
|
|
|
|
signer := signing.SignerFromFullIdentity(satellite.Identity)
|
|
|
|
orderLimit, err = signing.SignOrderLimit(ctx, signer, orderLimit)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
2020-04-18 06:41:20 +01:00
|
|
|
hash, err := client.UploadReader(ctx, orderLimit, piecePrivateKey, bytes.NewReader(uploadedData))
|
2019-11-26 17:47:19 +00:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
return uploadedData, orderLimit, hash
|
|
|
|
}
|
|
|
|
|
|
|
|
// downloadPiece downlodads piece from storageNode.
|
|
|
|
func downloadPiece(
|
|
|
|
t *testing.T, ctx *testcontext.Context, piece storj.PieceID, limit int64,
|
2020-03-27 14:46:40 +00:00
|
|
|
storageNode *testplanet.StorageNode, uplink *testplanet.Uplink, satellite *testplanet.Satellite,
|
2019-11-26 17:47:19 +00:00
|
|
|
) (pieceData []byte, err error) {
|
|
|
|
t.Helper()
|
|
|
|
|
|
|
|
serialNumber := testrand.SerialNumber()
|
|
|
|
orderLimit, piecePrivateKey := GenerateOrderLimit(
|
|
|
|
t,
|
|
|
|
satellite.ID(),
|
|
|
|
storageNode.ID(),
|
|
|
|
piece,
|
|
|
|
pb.PieceAction_GET,
|
|
|
|
serialNumber,
|
|
|
|
24*time.Hour,
|
|
|
|
24*time.Hour,
|
|
|
|
limit,
|
|
|
|
)
|
|
|
|
signer := signing.SignerFromFullIdentity(satellite.Identity)
|
|
|
|
orderLimit, err = signing.SignOrderLimit(ctx.Context, signer, orderLimit)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
client, err := uplink.DialPiecestore(ctx, storageNode)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
downloader, err := client.Download(ctx.Context, orderLimit, piecePrivateKey, 0, limit)
|
|
|
|
require.NoError(t, err)
|
|
|
|
defer func() {
|
|
|
|
if err != nil {
|
|
|
|
// Ignore err in Close if an error happened in Download because it's also
|
|
|
|
// returned by Close.
|
|
|
|
_ = downloader.Close()
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
err = downloader.Close()
|
|
|
|
}()
|
|
|
|
|
|
|
|
buffer := make([]byte, limit)
|
|
|
|
n, err := downloader.Read(buffer)
|
|
|
|
return buffer[:n], err
|
|
|
|
}
|
2022-12-14 20:04:41 +00:00
|
|
|
|
|
|
|
func TestExists(t *testing.T) {
|
|
|
|
testplanet.Run(t, testplanet.Config{
|
|
|
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
|
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
|
|
// use non satellite dialer to check if request will be rejected
|
|
|
|
uplinkDialer := planet.Uplinks[0].Dialer
|
|
|
|
conn, err := uplinkDialer.DialNodeURL(ctx, planet.StorageNodes[0].NodeURL())
|
|
|
|
require.NoError(t, err)
|
|
|
|
piecestore := pb.NewDRPCPiecestoreClient(conn)
|
|
|
|
_, err = piecestore.Exists(ctx, &pb.ExistsRequest{})
|
|
|
|
require.Error(t, err)
|
|
|
|
require.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied))
|
|
|
|
|
|
|
|
for i := 0; i < 15; i++ {
|
|
|
|
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "bucket", "object"+strconv.Itoa(i), testrand.Bytes(5*memory.KiB))
|
|
|
|
require.NoError(t, err)
|
|
|
|
}
|
|
|
|
|
|
|
|
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
existingSNPieces := map[storj.NodeID][]storj.PieceID{}
|
|
|
|
|
|
|
|
for _, segment := range segments {
|
|
|
|
for _, piece := range segment.Pieces {
|
|
|
|
pieceID := segment.RootPieceID.Derive(piece.StorageNode, int32(piece.Number))
|
|
|
|
existingSNPieces[piece.StorageNode] = append(existingSNPieces[piece.StorageNode], pieceID)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
dialer := planet.Satellites[0].Dialer
|
|
|
|
for _, node := range planet.StorageNodes {
|
|
|
|
conn, err := dialer.DialNodeURL(ctx, node.NodeURL())
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
piecestore := pb.NewDRPCPiecestoreClient(conn)
|
|
|
|
|
|
|
|
response, err := piecestore.Exists(ctx, &pb.ExistsRequest{})
|
|
|
|
require.NoError(t, err)
|
|
|
|
require.Empty(t, response.Missing)
|
|
|
|
|
|
|
|
piecesToVerify := existingSNPieces[node.ID()]
|
|
|
|
response, err = piecestore.Exists(ctx, &pb.ExistsRequest{
|
|
|
|
PieceIds: piecesToVerify,
|
|
|
|
})
|
|
|
|
require.NoError(t, err)
|
|
|
|
require.Empty(t, response.Missing)
|
|
|
|
|
|
|
|
notExistingPieceID := testrand.PieceID()
|
|
|
|
// add not existing piece to the list
|
|
|
|
piecesToVerify = append(piecesToVerify, notExistingPieceID)
|
|
|
|
response, err = piecestore.Exists(ctx, &pb.ExistsRequest{
|
|
|
|
PieceIds: piecesToVerify,
|
|
|
|
})
|
|
|
|
require.NoError(t, err)
|
|
|
|
require.Equal(t, []uint32{uint32(len(piecesToVerify) - 1)}, response.Missing)
|
|
|
|
|
|
|
|
// verify single missing piece
|
|
|
|
response, err = piecestore.Exists(ctx, &pb.ExistsRequest{
|
|
|
|
PieceIds: []pb.PieceID{notExistingPieceID},
|
|
|
|
})
|
|
|
|
require.NoError(t, err)
|
|
|
|
require.Equal(t, []uint32{0}, response.Missing)
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO verify that pieces from different satellite doesn't leak into results
|
|
|
|
// TODO verify larger number of pieces
|
|
|
|
})
|
|
|
|
}
|