storagenode: decline uploads when there are too many live requests (#2397)

This commit is contained in:
Egon Elbre 2019-07-03 16:47:55 +03:00 committed by GitHub
parent f1e670c9be
commit 38f3d860a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 147 additions and 2 deletions

View File

@ -109,6 +109,8 @@ func (planet *Planet) newStorageNodes(count int, whitelistedSatelliteIDs []strin
StaticDir: filepath.Join(developmentRoot, "web/operator/"),
},
Storage2: piecestore.Config{
ExpirationGracePeriod: 0,
MaxConcurrentRequests: 100,
OrderLimitGracePeriod: time.Hour * 24,
Sender: orders.SenderConfig{
Interval: time.Hour,

View File

@ -7,6 +7,7 @@ import (
"context"
"io"
"os"
"sync/atomic"
"time"
"github.com/golang/protobuf/ptypes"
@ -55,6 +56,7 @@ type OldConfig struct {
// Config defines parameters for piecestore endpoint.
type Config struct {
ExpirationGracePeriod time.Duration `help:"how soon before expiration date should things be considered expired" default:"48h0m0s"`
MaxConcurrentRequests int `help:"how many concurrent requests are allowed, before uploads are rejected." default:"6"`
OrderLimitGracePeriod time.Duration `help:"how long after OrderLimit creation date are OrderLimits no longer accepted" default:"1h0m0s"`
Monitor monitor.Config
@ -75,6 +77,8 @@ type Endpoint struct {
orders orders.DB
usage bandwidth.DB
usedSerials UsedSerials
liveRequests int32
}
// NewEndpoint creates a new piecestore endpoint.
@ -92,6 +96,8 @@ func NewEndpoint(log *zap.Logger, signer signing.Signer, trust *trust.Pool, moni
orders: orders,
usage: usage,
usedSerials: usedSerials,
liveRequests: 0,
}, nil
}
@ -99,6 +105,9 @@ func NewEndpoint(log *zap.Logger, signer signing.Signer, trust *trust.Pool, moni
func (endpoint *Endpoint) Delete(ctx context.Context, delete *pb.PieceDeleteRequest) (_ *pb.PieceDeleteResponse, err error) {
defer mon.Task()(&ctx)(&err)
atomic.AddInt32(&endpoint.liveRequests, 1)
defer atomic.AddInt32(&endpoint.liveRequests, -1)
if delete.Limit.Action != pb.PieceAction_DELETE {
return nil, Error.New("expected delete action got %v", delete.Limit.Action) // TODO: report grpc status unauthorized or bad request
}
@ -129,6 +138,15 @@ func (endpoint *Endpoint) Delete(ctx context.Context, delete *pb.PieceDeleteRequ
func (endpoint *Endpoint) Upload(stream pb.Piecestore_UploadServer) (err error) {
ctx := stream.Context()
defer mon.Task()(&ctx)(&err)
liveRequests := atomic.AddInt32(&endpoint.liveRequests, 1)
defer atomic.AddInt32(&endpoint.liveRequests, -1)
if int(liveRequests) > endpoint.config.MaxConcurrentRequests {
endpoint.log.Error("upload rejected, too many requests", zap.Int32("live requests", liveRequests))
return status.Error(codes.Unavailable, "storage node overloaded")
}
startTime := time.Now().UTC()
// TODO: set connection timeouts
@ -322,6 +340,10 @@ func (endpoint *Endpoint) Upload(stream pb.Piecestore_UploadServer) (err error)
func (endpoint *Endpoint) Download(stream pb.Piecestore_DownloadServer) (err error) {
ctx := stream.Context()
defer mon.Task()(&ctx)(&err)
atomic.AddInt32(&endpoint.liveRequests, 1)
defer atomic.AddInt32(&endpoint.liveRequests, -1)
startTime := time.Now().UTC()
// TODO: set connection timeouts

View File

@ -6,13 +6,20 @@ package piecestore_test
import (
"io"
"strings"
"sync/atomic"
"testing"
"time"
"github.com/golang/protobuf/ptypes"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/zeebo/errs"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"storj.io/storj/internal/errs2"
"storj.io/storj/internal/memory"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testplanet"
@ -21,6 +28,7 @@ import (
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/pkcrypto"
"storj.io/storj/pkg/storj"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/bandwidth"
"storj.io/storj/uplink/piecestore"
)
@ -361,15 +369,129 @@ func TestDelete(t *testing.T) {
}
}
func TestTooManyRequests(t *testing.T) {
t.Skip("flaky, because of EOF issues")
ctx := testcontext.New(t)
defer ctx.Cleanup()
const uplinkCount = 6
const maxConcurrent = 3
const expectedFailures = uplinkCount - maxConcurrent
log := zaptest.NewLogger(t)
planet, err := testplanet.NewCustom(log, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: uplinkCount,
Reconfigure: testplanet.Reconfigure{
StorageNode: func(index int, config *storagenode.Config) {
config.Storage2.MaxConcurrentRequests = maxConcurrent
},
},
})
require.NoError(t, err)
defer ctx.Check(planet.Shutdown)
planet.Start(ctx)
doneWaiting := make(chan struct{})
failedCount := int64(expectedFailures)
uploads, _ := errgroup.WithContext(ctx)
defer ctx.Check(uploads.Wait)
for i, uplink := range planet.Uplinks {
i, uplink := i, uplink
uploads.Go(func() (err error) {
storageNode := planet.StorageNodes[0].Local()
signer := signing.SignerFromFullIdentity(uplink.Transport.Identity())
config := piecestore.DefaultConfig
config.UploadBufferSize = 0 // disable buffering so we can detect write error early
client, err := piecestore.Dial(ctx, uplink.Transport, &storageNode.Node, uplink.Log, signer, config)
if err != nil {
return err
}
defer func() {
if cerr := client.Close(); cerr != nil {
uplink.Log.Error("close failed", zap.Error(cerr))
err = errs.Combine(err, cerr)
}
}()
pieceID := storj.PieceID{byte(i + 1)}
serialNumber := testrand.SerialNumber()
orderLimit := GenerateOrderLimit(
t,
planet.Satellites[0].ID(),
uplink.ID(),
planet.StorageNodes[0].ID(),
pieceID,
pb.PieceAction_PUT,
serialNumber,
24*time.Hour,
24*time.Hour,
int64(10000),
)
satelliteSigner := signing.SignerFromFullIdentity(planet.Satellites[0].Identity)
orderLimit, err = signing.SignOrderLimit(ctx, satelliteSigner, orderLimit)
if err != nil {
return err
}
upload, err := client.Upload(ctx, orderLimit)
if err != nil {
if errs2.IsRPC(err, codes.Unavailable) {
if atomic.AddInt64(&failedCount, -1) == 0 {
close(doneWaiting)
}
return nil
}
uplink.Log.Error("upload failed", zap.Stringer("Piece ID", pieceID), zap.Error(err))
return err
}
_, err = upload.Write(make([]byte, orderLimit.Limit))
if err != nil {
if errs2.IsRPC(err, codes.Unavailable) {
if atomic.AddInt64(&failedCount, -1) == 0 {
close(doneWaiting)
}
return nil
}
uplink.Log.Error("write failed", zap.Stringer("Piece ID", pieceID), zap.Error(err))
return err
}
_, err = upload.Commit(ctx)
if err != nil {
if errs2.IsRPC(err, codes.Unavailable) {
if atomic.AddInt64(&failedCount, -1) == 0 {
close(doneWaiting)
}
return nil
}
uplink.Log.Error("commit failed", zap.Stringer("Piece ID", pieceID), zap.Error(err))
return err
}
return nil
})
}
}
func GenerateOrderLimit(t *testing.T, satellite storj.NodeID, uplink storj.NodeID, storageNode storj.NodeID, pieceID storj.PieceID,
action pb.PieceAction, serialNumber storj.SerialNumber, pieceExpiration, orderExpiration time.Duration, limit int64) *pb.OrderLimit {
pe, err := ptypes.TimestampProto(time.Now().Add(pieceExpiration))
require.NoError(t, err)
oe, err := ptypes.TimestampProto(time.Now().Add(orderExpiration))
require.NoError(t, err)
orderLimit := &pb.OrderLimit{
return &pb.OrderLimit{
SatelliteId: satellite,
UplinkId: uplink,
StorageNodeId: storageNode,
@ -381,5 +503,4 @@ func GenerateOrderLimit(t *testing.T, satellite storj.NodeID, uplink storj.NodeI
PieceExpiration: pe,
Limit: limit,
}
return orderLimit
}