From e1379bea0f72f457b67fd4333b6c10c539e1198c Mon Sep 17 00:00:00 2001 From: TungHoang Date: Thu, 1 Jul 2021 08:59:04 +0200 Subject: [PATCH] storagenode/piecestore: allow rejecting slow clients Estimate speed of the uploads by calculating the time a client has been in a non-congested state. Storage node is uncongested when it has used up over 80% of its maximum concurrent requests capacity. Currently it's disabled by default. --- storagenode/piecestore/endpoint.go | 68 +++++++++++++++++++++++ storagenode/piecestore/endpoint_test.go | 74 +++++++++++++++++++++++++ 2 files changed, 142 insertions(+) diff --git a/storagenode/piecestore/endpoint.go b/storagenode/piecestore/endpoint.go index e7179508d..f374132af 100644 --- a/storagenode/piecestore/endpoint.go +++ b/storagenode/piecestore/endpoint.go @@ -64,6 +64,10 @@ type Config struct { ReportCapacityThreshold memory.Size `help:"threshold below which to immediately notify satellite of capacity" default:"500MB" hidden:"true"` MaxUsedSerialsSize memory.Size `help:"amount of memory allowed for used serials store - once surpassed, serials will be dropped at random" default:"1MB"` + MinUploadSpeed memory.Size `help:"a client upload speed should not be lower than MinUploadSpeed in bytes-per-second (E.g: 1Mb), otherwise, it will be flagged as slow-connection and potentially be closed" default:"0Mb"` + MinUploadSpeedGraceDuration time.Duration `help:"if MinUploadSpeed is configured, after a period of time after the client initiated the upload, the server will flag unusually slow upload client" default:"0h0m10s"` + MinUploadSpeedCongestionThreshold float64 `help:"if the portion defined by the total number of alive connection per MaxConcurrentRequest reaches this threshold, a slow upload client will no longer be monitored and flagged" default:"0.8"` + Trust trust.Config Monitor monitor.Config @@ -314,7 +318,18 @@ func (endpoint *Endpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err err largestOrder := pb.Order{} defer commitOrderToStore(ctx, &largestOrder) + // monitor speed of upload client to flag out slow uploads. + speedEstimate := speedEstimation{ + grace: endpoint.config.MinUploadSpeedGraceDuration, + limit: endpoint.config.MinUploadSpeed, + } + for { + + if err := speedEstimate.EnsureLimit(memory.Size(pieceWriter.Size()), endpoint.isCongested(), time.Now()); err != nil { + return rpcstatus.Wrap(rpcstatus.Aborted, err) + } + // TODO: reuse messages to avoid allocations // N.B.: we are only allowed to use message if the returned error is nil. it would be @@ -419,6 +434,17 @@ func (endpoint *Endpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err err } } +// isCongested identifies state of congestion. If the total number of +// connections is above 80% of the MaxConcurrentRequests, then it is defined +// as congestion. +func (endpoint *Endpoint) isCongested() bool { + + requestCongestionThreshold := int32(float64(endpoint.config.MaxConcurrentRequests) * endpoint.config.MinUploadSpeedCongestionThreshold) + + connectionCount := atomic.LoadInt32(&endpoint.liveRequests) + return connectionCount > requestCongestionThreshold +} + // Download handles Downloading a piece on piecestore. func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err error) { ctx := stream.Context() @@ -765,3 +791,45 @@ func min(a, b int64) int64 { } return b } + +// speedEstimation monitors state of incoming traffic. It would signal slow-speed +// client in non-congested traffic condition. +type speedEstimation struct { + // grace indicates a certain period of time before the observator kicks in + grace time.Duration + // limit for flagging slow connections. Speed below this limit is considered to be slow. + limit memory.Size + // uncongestedTime indicates the duration of connection, measured in non-congested state + uncongestedTime time.Duration + lastChecked time.Time +} + +// EnsureLimit makes sure that in non-congested condition, a slow-upload client will be flagged out. +func (estimate *speedEstimation) EnsureLimit(transferred memory.Size, congested bool, now time.Time) error { + if estimate.lastChecked.IsZero() { + estimate.lastChecked = now + return nil + } + + delta := now.Sub(estimate.lastChecked) + estimate.lastChecked = now + + // In congested condition, the speed check would produce false-positive results, + // thus it shall be skipped. + if congested { + return nil + } + + estimate.uncongestedTime += delta + if estimate.uncongestedTime <= 0 || estimate.uncongestedTime <= estimate.grace { + // not enough data + return nil + } + bytesPerSec := float64(transferred) / estimate.uncongestedTime.Seconds() + + if bytesPerSec < float64(estimate.limit) { + return errs.New("speed too low, current:%v < limit:%v", bytesPerSec, estimate.limit) + } + + return nil +} diff --git a/storagenode/piecestore/endpoint_test.go b/storagenode/piecestore/endpoint_test.go index 163ead7c1..e30afc52c 100644 --- a/storagenode/piecestore/endpoint_test.go +++ b/storagenode/piecestore/endpoint_test.go @@ -160,6 +160,80 @@ func TestUpload(t *testing.T) { }) } +// TestSlowUpload tries to mock a SlowLoris attack. +func TestSlowUpload(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + + StorageNode: func(index int, config *storagenode.Config) { + // Set MinUploadSpeed to extremely high to indicates that + // client upload rate is slow (relative to node's standards) + config.Storage2.MinUploadSpeed = 10000000 * memory.MB + + // Storage Node waits only few microsecond before starting the measurement + // of upload rate to flag unsually slow connection + config.Storage2.MinUploadSpeedGraceDuration = 500 * time.Microsecond + + config.Storage2.MinUploadSpeedCongestionThreshold = 0.8 + }, + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + client, err := planet.Uplinks[0].DialPiecestore(ctx, planet.StorageNodes[0]) + require.NoError(t, err) + defer ctx.Check(client.Close) + + for _, tt := range []struct { + pieceID storj.PieceID + contentLength memory.Size + action pb.PieceAction + err string + }{ + { // connection should be aborted + pieceID: storj.PieceID{1}, + // As the server node only starts flagging unusually slow connection + // after 500 micro seconds, the file should be big enough to ensure the connection is still open. + contentLength: 50 * memory.MB, + action: pb.PieceAction_PUT, + err: "speed too low", + }, + } { + data := testrand.Bytes(tt.contentLength) + + serialNumber := testrand.SerialNumber() + + orderLimit, piecePrivateKey := GenerateOrderLimit( + t, + planet.Satellites[0].ID(), + planet.StorageNodes[0].ID(), + tt.pieceID, + tt.action, + serialNumber, + 24*time.Hour, + 24*time.Hour, + int64(len(data)), + ) + signer := signing.SignerFromFullIdentity(planet.Satellites[0].Identity) + orderLimit, err = signing.SignOrderLimit(ctx, signer, orderLimit) + require.NoError(t, err) + + pieceHash, err := client.UploadReader(ctx, orderLimit, piecePrivateKey, bytes.NewReader(data)) + + if tt.err != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tt.err) + } else { + require.NoError(t, err) + + expectedHash := pkcrypto.SHA256Hash(data) + assert.Equal(t, expectedHash, pieceHash.Hash) + + signee := signing.SignerFromFullIdentity(planet.StorageNodes[0].Identity) + require.NoError(t, signing.VerifyPieceHashSignature(ctx, signee, pieceHash)) + } + } + }) +} func TestUploadOverAvailable(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,