storagenode/piecestore: add Exists endpoint
Adds new method Exists which can be used to verify which requested piece ids exists on storage node. Will verify only pieces which belongs to the satellite that used that endpoint. Minum WASM size was increased a bit. https://github.com/storj/storj/issues/5415 Change-Id: Ia5f9cadeb526541b2776a8973eb7d50133ad8636
This commit is contained in:
parent
678bb12d4b
commit
5110803102
2
go.mod
2
go.mod
@ -52,7 +52,7 @@ require (
|
||||
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e
|
||||
gopkg.in/segmentio/analytics-go.v3 v3.1.0
|
||||
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776
|
||||
storj.io/common v0.0.0-20221123115229-fed3e6651b63
|
||||
storj.io/common v0.0.0-20221215155610-3715c7f7ce66
|
||||
storj.io/drpc v0.0.32
|
||||
storj.io/monkit-jaeger v0.0.0-20220915074555-d100d7589f41
|
||||
storj.io/private v0.0.0-20221108123115-3a27297f0b78
|
||||
|
4
go.sum
4
go.sum
@ -951,8 +951,8 @@ rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8
|
||||
sourcegraph.com/sourcegraph/go-diff v0.5.0/go.mod h1:kuch7UrkMzY0X+p9CRK03kfuPQ2zzQcaEFbx8wA8rck=
|
||||
sourcegraph.com/sqs/pbtypes v0.0.0-20180604144634-d3ebe8f20ae4/go.mod h1:ketZ/q3QxT9HOBeFhu6RdvsftgpsbFHBF5Cas6cDKZ0=
|
||||
storj.io/common v0.0.0-20220719163320-cd2ef8e1b9b0/go.mod h1:mCYV6Ud5+cdbuaxdPD5Zht/HYaIn0sffnnws9ErkrMQ=
|
||||
storj.io/common v0.0.0-20221123115229-fed3e6651b63 h1:OuleF/3FvZe3Nnu6NdwVr+FvCXjfD4iNNdgfI2kcs3k=
|
||||
storj.io/common v0.0.0-20221123115229-fed3e6651b63/go.mod h1:+gF7jbVvpjVIVHhK+EJFhfPbudX395lnPq/dKkj/Qys=
|
||||
storj.io/common v0.0.0-20221215155610-3715c7f7ce66 h1:T9IqZ+Hi8EnAuZ0Te9FEPRR10UffbDhGY4D/SvZdmqg=
|
||||
storj.io/common v0.0.0-20221215155610-3715c7f7ce66/go.mod h1:+gF7jbVvpjVIVHhK+EJFhfPbudX395lnPq/dKkj/Qys=
|
||||
storj.io/drpc v0.0.32 h1:5p5ZwsK/VOgapaCu+oxaPVwO6UwIs+iwdMiD50+R4PI=
|
||||
storj.io/drpc v0.0.32/go.mod h1:6rcOyR/QQkSTX/9L5ZGtlZaE2PtXTTZl8d+ulSeeYEg=
|
||||
storj.io/monkit-jaeger v0.0.0-20220915074555-d100d7589f41 h1:SVuEocEhZfFc13J1AmlVLitdGXTVrvmbzN4Z9C9Ms40=
|
||||
|
@ -169,6 +169,7 @@ func (planet *Planet) newStorageNode(ctx context.Context, prefix string, index,
|
||||
ReportCapacityThreshold: 100 * memory.MB,
|
||||
DeleteQueueSize: 10000,
|
||||
DeleteWorkers: 1,
|
||||
ExistsCheckWorkers: 5,
|
||||
Orders: orders.Config{
|
||||
SenderInterval: defaultInterval,
|
||||
SenderTimeout: 10 * time.Minute,
|
||||
|
@ -147,6 +147,9 @@ func (mock *piecestoreMock) Retain(ctx context.Context, retain *pb.RetainRequest
|
||||
func (mock *piecestoreMock) RestoreTrash(context.Context, *pb.RestoreTrashRequest) (*pb.RestoreTrashResponse, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (mock *piecestoreMock) Exists(context.Context, *pb.ExistsRequest) (*pb.ExistsResponse, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func TestDownloadFromUnresponsiveNode(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
|
@ -10,7 +10,7 @@ trap cleanup EXIT
|
||||
|
||||
cd satellite/console/wasm && pwd && GOOS=js GOARCH=wasm go build -o main.wasm .
|
||||
BUILD_SIZE=$(stat -c %s main.wasm)
|
||||
CURRENT_SIZE=10000000
|
||||
CURRENT_SIZE=10009000
|
||||
if [ $BUILD_SIZE -gt $CURRENT_SIZE ]; then
|
||||
echo "Wasm size is too big, was $CURRENT_SIZE but now it is $BUILD_SIZE"
|
||||
exit 1
|
||||
|
@ -728,6 +728,14 @@ func (store *Store) CheckWritability(ctx context.Context) error {
|
||||
return store.blobs.CheckWritability(ctx)
|
||||
}
|
||||
|
||||
// Stat looks up disk metadata on the blob file.
|
||||
func (store *Store) Stat(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (storage.BlobInfo, error) {
|
||||
return store.blobs.Stat(ctx, storage.BlobRef{
|
||||
Namespace: satellite.Bytes(),
|
||||
Key: pieceID.Bytes(),
|
||||
})
|
||||
}
|
||||
|
||||
type storedPieceAccess struct {
|
||||
storage.BlobInfo
|
||||
store *Store
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@ -58,6 +59,7 @@ type Config struct {
|
||||
MaxConcurrentRequests int `help:"how many concurrent requests are allowed, before uploads are rejected. 0 represents unlimited." default:"0"`
|
||||
DeleteWorkers int `help:"how many piece delete workers" default:"1"`
|
||||
DeleteQueueSize int `help:"size of the piece delete queue" default:"10000"`
|
||||
ExistsCheckWorkers int `help:"how many workers to use to check if satellite pieces exists" default:"5"`
|
||||
OrderLimitGracePeriod time.Duration `help:"how long after OrderLimit creation date are OrderLimits no longer accepted" default:"1h0m0s"`
|
||||
CacheSyncInterval time.Duration `help:"how often the space used cache is synced to persistent storage" releaseDefault:"1h0m0s" devDefault:"0h1m0s"`
|
||||
PieceScanOnStartup bool `help:"if set to true, all pieces disk usage is recalculated on startup" default:"true"`
|
||||
@ -189,6 +191,66 @@ func (endpoint *Endpoint) DeletePieces(
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Exists check if pieces from the list exists on storage node. Request will
|
||||
// accept only connections from trusted satellite and will check pieces only
|
||||
// for that satellite.
|
||||
func (endpoint *Endpoint) Exists(
|
||||
ctx context.Context, req *pb.ExistsRequest,
|
||||
) (_ *pb.ExistsResponse, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
peer, err := identity.PeerIdentityFromContext(ctx)
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Wrap(rpcstatus.Unauthenticated, err)
|
||||
}
|
||||
|
||||
err = endpoint.trust.VerifySatelliteID(ctx, peer.ID)
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.PermissionDenied, "piecestore.exists called with untrusted ID")
|
||||
}
|
||||
|
||||
if len(req.PieceIds) == 0 {
|
||||
return &pb.ExistsResponse{}, nil
|
||||
}
|
||||
|
||||
limiter := sync2.NewLimiter(endpoint.config.ExistsCheckWorkers)
|
||||
var mu sync.Mutex
|
||||
|
||||
missing := make([]uint32, 0, 100)
|
||||
|
||||
addMissing := func(index int) {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
missing = append(missing, uint32(index))
|
||||
}
|
||||
|
||||
for index, pieceID := range req.PieceIds {
|
||||
index := index
|
||||
pieceID := pieceID
|
||||
|
||||
ok := limiter.Go(ctx, func() {
|
||||
_, err := endpoint.store.Stat(ctx, peer.ID, pieceID)
|
||||
if err != nil {
|
||||
if errs.Is(err, os.ErrNotExist) {
|
||||
addMissing(index)
|
||||
}
|
||||
endpoint.log.Debug("failed to stat piece", zap.String("Piece ID", pieceID.String()), zap.String("Satellite ID", peer.ID.String()), zap.Error(err))
|
||||
return
|
||||
}
|
||||
})
|
||||
if !ok {
|
||||
return &pb.ExistsResponse{}, rpcstatus.Wrap(rpcstatus.Internal, ctx.Err())
|
||||
}
|
||||
}
|
||||
|
||||
limiter.Wait()
|
||||
|
||||
return &pb.ExistsResponse{
|
||||
Missing: missing,
|
||||
}, rpcstatus.Wrap(rpcstatus.Internal, err)
|
||||
}
|
||||
|
||||
// Upload handles uploading a piece on piece store.
|
||||
func (endpoint *Endpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err error) {
|
||||
ctx := stream.Context()
|
||||
|
@ -6,6 +6,7 @@ package piecestore_test
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
@ -796,3 +797,73 @@ func downloadPiece(
|
||||
n, err := downloader.Read(buffer)
|
||||
return buffer[:n], err
|
||||
}
|
||||
|
||||
func TestExists(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
// use non satellite dialer to check if request will be rejected
|
||||
uplinkDialer := planet.Uplinks[0].Dialer
|
||||
conn, err := uplinkDialer.DialNodeURL(ctx, planet.StorageNodes[0].NodeURL())
|
||||
require.NoError(t, err)
|
||||
piecestore := pb.NewDRPCPiecestoreClient(conn)
|
||||
_, err = piecestore.Exists(ctx, &pb.ExistsRequest{})
|
||||
require.Error(t, err)
|
||||
require.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied))
|
||||
|
||||
for i := 0; i < 15; i++ {
|
||||
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "bucket", "object"+strconv.Itoa(i), testrand.Bytes(5*memory.KiB))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
existingSNPieces := map[storj.NodeID][]storj.PieceID{}
|
||||
|
||||
for _, segment := range segments {
|
||||
for _, piece := range segment.Pieces {
|
||||
pieceID := segment.RootPieceID.Derive(piece.StorageNode, int32(piece.Number))
|
||||
existingSNPieces[piece.StorageNode] = append(existingSNPieces[piece.StorageNode], pieceID)
|
||||
}
|
||||
}
|
||||
|
||||
dialer := planet.Satellites[0].Dialer
|
||||
for _, node := range planet.StorageNodes {
|
||||
conn, err := dialer.DialNodeURL(ctx, node.NodeURL())
|
||||
require.NoError(t, err)
|
||||
|
||||
piecestore := pb.NewDRPCPiecestoreClient(conn)
|
||||
|
||||
response, err := piecestore.Exists(ctx, &pb.ExistsRequest{})
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, response.Missing)
|
||||
|
||||
piecesToVerify := existingSNPieces[node.ID()]
|
||||
response, err = piecestore.Exists(ctx, &pb.ExistsRequest{
|
||||
PieceIds: piecesToVerify,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, response.Missing)
|
||||
|
||||
notExistingPieceID := testrand.PieceID()
|
||||
// add not existing piece to the list
|
||||
piecesToVerify = append(piecesToVerify, notExistingPieceID)
|
||||
response, err = piecestore.Exists(ctx, &pb.ExistsRequest{
|
||||
PieceIds: piecesToVerify,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []uint32{uint32(len(piecesToVerify) - 1)}, response.Missing)
|
||||
|
||||
// verify single missing piece
|
||||
response, err = piecestore.Exists(ctx, &pb.ExistsRequest{
|
||||
PieceIds: []pb.PieceID{notExistingPieceID},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []uint32{0}, response.Missing)
|
||||
}
|
||||
|
||||
// TODO verify that pieces from different satellite doesn't leak into results
|
||||
// TODO verify larger number of pieces
|
||||
})
|
||||
}
|
||||
|
@ -10,7 +10,7 @@ require (
|
||||
github.com/zeebo/errs v1.3.0
|
||||
go.uber.org/zap v1.21.0
|
||||
golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde
|
||||
storj.io/common v0.0.0-20221123115229-fed3e6651b63
|
||||
storj.io/common v0.0.0-20221215155610-3715c7f7ce66
|
||||
storj.io/private v0.0.0-20221108123115-3a27297f0b78
|
||||
storj.io/storj v1.63.1
|
||||
storj.io/storjscan v0.0.0-20220926140643-1623c3b391b0
|
||||
|
@ -1262,8 +1262,9 @@ storj.io/common v0.0.0-20220719163320-cd2ef8e1b9b0/go.mod h1:mCYV6Ud5+cdbuaxdPD5
|
||||
storj.io/common v0.0.0-20220802175255-aae0c09ec9d4/go.mod h1:+gF7jbVvpjVIVHhK+EJFhfPbudX395lnPq/dKkj/Qys=
|
||||
storj.io/common v0.0.0-20220829171748-14b0a3c9565e/go.mod h1:+gF7jbVvpjVIVHhK+EJFhfPbudX395lnPq/dKkj/Qys=
|
||||
storj.io/common v0.0.0-20220915180246-7826900e2b06/go.mod h1:+gF7jbVvpjVIVHhK+EJFhfPbudX395lnPq/dKkj/Qys=
|
||||
storj.io/common v0.0.0-20221123115229-fed3e6651b63 h1:OuleF/3FvZe3Nnu6NdwVr+FvCXjfD4iNNdgfI2kcs3k=
|
||||
storj.io/common v0.0.0-20221123115229-fed3e6651b63/go.mod h1:+gF7jbVvpjVIVHhK+EJFhfPbudX395lnPq/dKkj/Qys=
|
||||
storj.io/common v0.0.0-20221215155610-3715c7f7ce66 h1:T9IqZ+Hi8EnAuZ0Te9FEPRR10UffbDhGY4D/SvZdmqg=
|
||||
storj.io/common v0.0.0-20221215155610-3715c7f7ce66/go.mod h1:+gF7jbVvpjVIVHhK+EJFhfPbudX395lnPq/dKkj/Qys=
|
||||
storj.io/drpc v0.0.32 h1:5p5ZwsK/VOgapaCu+oxaPVwO6UwIs+iwdMiD50+R4PI=
|
||||
storj.io/drpc v0.0.32/go.mod h1:6rcOyR/QQkSTX/9L5ZGtlZaE2PtXTTZl8d+ulSeeYEg=
|
||||
storj.io/monkit-jaeger v0.0.0-20220726162929-c3a9898b5bca/go.mod h1:iK+dmHZZXQlW7ahKdNSOo+raMk5BDL2wbD62FIeXLWs=
|
||||
|
@ -10,7 +10,7 @@ require (
|
||||
github.com/spf13/pflag v1.0.5
|
||||
github.com/stretchr/testify v1.7.0
|
||||
go.uber.org/zap v1.17.0
|
||||
storj.io/common v0.0.0-20221123115229-fed3e6651b63
|
||||
storj.io/common v0.0.0-20221215155610-3715c7f7ce66
|
||||
storj.io/gateway-mt v1.18.1-0.20211210081136-cada9a567d31
|
||||
storj.io/private v0.0.0-20221108123115-3a27297f0b78
|
||||
storj.io/storj v0.12.1-0.20221125175451-ef4b564b82f7
|
||||
|
@ -1501,8 +1501,9 @@ storj.io/common v0.0.0-20210916151047-6aaeb34bb916/go.mod h1:objobGrIWQwhmTSpSm6
|
||||
storj.io/common v0.0.0-20211102144601-401a79f0706a/go.mod h1:a2Kw7Uipu929OFANfWKLHRoD0JfhgssikEvimd6hbSQ=
|
||||
storj.io/common v0.0.0-20220719163320-cd2ef8e1b9b0/go.mod h1:mCYV6Ud5+cdbuaxdPD5Zht/HYaIn0sffnnws9ErkrMQ=
|
||||
storj.io/common v0.0.0-20220915180246-7826900e2b06/go.mod h1:+gF7jbVvpjVIVHhK+EJFhfPbudX395lnPq/dKkj/Qys=
|
||||
storj.io/common v0.0.0-20221123115229-fed3e6651b63 h1:OuleF/3FvZe3Nnu6NdwVr+FvCXjfD4iNNdgfI2kcs3k=
|
||||
storj.io/common v0.0.0-20221123115229-fed3e6651b63/go.mod h1:+gF7jbVvpjVIVHhK+EJFhfPbudX395lnPq/dKkj/Qys=
|
||||
storj.io/common v0.0.0-20221215155610-3715c7f7ce66 h1:T9IqZ+Hi8EnAuZ0Te9FEPRR10UffbDhGY4D/SvZdmqg=
|
||||
storj.io/common v0.0.0-20221215155610-3715c7f7ce66/go.mod h1:+gF7jbVvpjVIVHhK+EJFhfPbudX395lnPq/dKkj/Qys=
|
||||
storj.io/dotworld v0.0.0-20210324183515-0d11aeccd840/go.mod h1:KU9YvEgRrMMiWLvH8pzn1UkoCoxggKIPvQxmNdx7aXQ=
|
||||
storj.io/drpc v0.0.11/go.mod h1:TiFc2obNjL9/3isMW1Rpxjy8V9uE0B2HMeMFGiiI7Iw=
|
||||
storj.io/drpc v0.0.24/go.mod h1:ofQUDPQbbIymRDKE0tms48k8bLP5Y+dsI9CbXGv3gko=
|
||||
|
Loading…
Reference in New Issue
Block a user