storagenode/piecestore: allow rejecting slow clients

Estimate speed of the uploads by calculating the time a client has been in a non-congested state. Storage node is uncongested when it has used up over 80% of its maximum concurrent requests capacity.

Currently it's disabled by default.
This commit is contained in:
TungHoang 2021-07-01 08:59:04 +02:00 committed by GitHub
parent 4335b21332
commit e1379bea0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 142 additions and 0 deletions

View File

@ -64,6 +64,10 @@ type Config struct {
ReportCapacityThreshold memory.Size `help:"threshold below which to immediately notify satellite of capacity" default:"500MB" hidden:"true"`
MaxUsedSerialsSize memory.Size `help:"amount of memory allowed for used serials store - once surpassed, serials will be dropped at random" default:"1MB"`
MinUploadSpeed memory.Size `help:"a client upload speed should not be lower than MinUploadSpeed in bytes-per-second (E.g: 1Mb), otherwise, it will be flagged as slow-connection and potentially be closed" default:"0Mb"`
MinUploadSpeedGraceDuration time.Duration `help:"if MinUploadSpeed is configured, after a period of time after the client initiated the upload, the server will flag unusually slow upload client" default:"0h0m10s"`
MinUploadSpeedCongestionThreshold float64 `help:"if the portion defined by the total number of alive connection per MaxConcurrentRequest reaches this threshold, a slow upload client will no longer be monitored and flagged" default:"0.8"`
Trust trust.Config
Monitor monitor.Config
@ -314,7 +318,18 @@ func (endpoint *Endpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err err
largestOrder := pb.Order{}
defer commitOrderToStore(ctx, &largestOrder)
// monitor speed of upload client to flag out slow uploads.
speedEstimate := speedEstimation{
grace: endpoint.config.MinUploadSpeedGraceDuration,
limit: endpoint.config.MinUploadSpeed,
}
for {
if err := speedEstimate.EnsureLimit(memory.Size(pieceWriter.Size()), endpoint.isCongested(), time.Now()); err != nil {
return rpcstatus.Wrap(rpcstatus.Aborted, err)
}
// TODO: reuse messages to avoid allocations
// N.B.: we are only allowed to use message if the returned error is nil. it would be
@ -419,6 +434,17 @@ func (endpoint *Endpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err err
}
}
// isCongested identifies state of congestion. If the total number of
// connections is above 80% of the MaxConcurrentRequests, then it is defined
// as congestion.
func (endpoint *Endpoint) isCongested() bool {
requestCongestionThreshold := int32(float64(endpoint.config.MaxConcurrentRequests) * endpoint.config.MinUploadSpeedCongestionThreshold)
connectionCount := atomic.LoadInt32(&endpoint.liveRequests)
return connectionCount > requestCongestionThreshold
}
// Download handles Downloading a piece on piecestore.
func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err error) {
ctx := stream.Context()
@ -765,3 +791,45 @@ func min(a, b int64) int64 {
}
return b
}
// speedEstimation monitors state of incoming traffic. It would signal slow-speed
// client in non-congested traffic condition.
type speedEstimation struct {
// grace indicates a certain period of time before the observator kicks in
grace time.Duration
// limit for flagging slow connections. Speed below this limit is considered to be slow.
limit memory.Size
// uncongestedTime indicates the duration of connection, measured in non-congested state
uncongestedTime time.Duration
lastChecked time.Time
}
// EnsureLimit makes sure that in non-congested condition, a slow-upload client will be flagged out.
func (estimate *speedEstimation) EnsureLimit(transferred memory.Size, congested bool, now time.Time) error {
if estimate.lastChecked.IsZero() {
estimate.lastChecked = now
return nil
}
delta := now.Sub(estimate.lastChecked)
estimate.lastChecked = now
// In congested condition, the speed check would produce false-positive results,
// thus it shall be skipped.
if congested {
return nil
}
estimate.uncongestedTime += delta
if estimate.uncongestedTime <= 0 || estimate.uncongestedTime <= estimate.grace {
// not enough data
return nil
}
bytesPerSec := float64(transferred) / estimate.uncongestedTime.Seconds()
if bytesPerSec < float64(estimate.limit) {
return errs.New("speed too low, current:%v < limit:%v", bytesPerSec, estimate.limit)
}
return nil
}

View File

@ -160,6 +160,80 @@ func TestUpload(t *testing.T) {
})
}
// TestSlowUpload tries to mock a SlowLoris attack.
func TestSlowUpload(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
StorageNode: func(index int, config *storagenode.Config) {
// Set MinUploadSpeed to extremely high to indicates that
// client upload rate is slow (relative to node's standards)
config.Storage2.MinUploadSpeed = 10000000 * memory.MB
// Storage Node waits only few microsecond before starting the measurement
// of upload rate to flag unsually slow connection
config.Storage2.MinUploadSpeedGraceDuration = 500 * time.Microsecond
config.Storage2.MinUploadSpeedCongestionThreshold = 0.8
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
client, err := planet.Uplinks[0].DialPiecestore(ctx, planet.StorageNodes[0])
require.NoError(t, err)
defer ctx.Check(client.Close)
for _, tt := range []struct {
pieceID storj.PieceID
contentLength memory.Size
action pb.PieceAction
err string
}{
{ // connection should be aborted
pieceID: storj.PieceID{1},
// As the server node only starts flagging unusually slow connection
// after 500 micro seconds, the file should be big enough to ensure the connection is still open.
contentLength: 50 * memory.MB,
action: pb.PieceAction_PUT,
err: "speed too low",
},
} {
data := testrand.Bytes(tt.contentLength)
serialNumber := testrand.SerialNumber()
orderLimit, piecePrivateKey := GenerateOrderLimit(
t,
planet.Satellites[0].ID(),
planet.StorageNodes[0].ID(),
tt.pieceID,
tt.action,
serialNumber,
24*time.Hour,
24*time.Hour,
int64(len(data)),
)
signer := signing.SignerFromFullIdentity(planet.Satellites[0].Identity)
orderLimit, err = signing.SignOrderLimit(ctx, signer, orderLimit)
require.NoError(t, err)
pieceHash, err := client.UploadReader(ctx, orderLimit, piecePrivateKey, bytes.NewReader(data))
if tt.err != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tt.err)
} else {
require.NoError(t, err)
expectedHash := pkcrypto.SHA256Hash(data)
assert.Equal(t, expectedHash, pieceHash.Hash)
signee := signing.SignerFromFullIdentity(planet.StorageNodes[0].Identity)
require.NoError(t, signing.VerifyPieceHashSignature(ctx, signee, pieceHash))
}
}
})
}
func TestUploadOverAvailable(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,