storagenode/piecestore: fix ingress graph skewed by larger signed orders

Storagenodes are currently getting larger signed orders due to
a performance optimization in uplink, which now messes with the
ingress graph because the storagenode plots the graph using
the order amount instead of actually uploaded bytes, which this
change fixes.

The egress graph might have a similar issue if the order amount
is larger than the actually downloaded bytes but since we pay
for orders, whether fulfilled or unfulfilled, we continue using
the order amount for the egress graph.

Resolves https://github.com/storj/storj/issues/5853

Change-Id: I2af7ee3ff249801ce07714bba055370ebd597c6e
This commit is contained in:
Clement Sam 2023-05-19 14:19:54 +00:00 committed by Clement Sam
parent bbdeb1eeb8
commit b6026b9ff3
2 changed files with 28 additions and 5 deletions

View File

@ -399,7 +399,9 @@ func (endpoint *Endpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err err
return rpcstatus.Wrap(rpcstatus.InvalidArgument, err) return rpcstatus.Wrap(rpcstatus.InvalidArgument, err)
} }
largestOrder := pb.Order{} largestOrder := pb.Order{}
defer commitOrderToStore(ctx, &largestOrder) defer commitOrderToStore(ctx, &largestOrder, func() int64 {
return pieceWriter.Size()
})
// monitor speed of upload client to flag out slow uploads. // monitor speed of upload client to flag out slow uploads.
speedEstimate := speedEstimation{ speedEstimate := speedEstimation{
@ -730,7 +732,14 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err
if err != nil { if err != nil {
return err return err
} }
defer commitOrderToStore(ctx, &largestOrder) defer func() {
order := &largestOrder
commitOrderToStore(ctx, order, func() int64 {
// for downloads, we store the order amount for the egress graph instead
// of the bytes actually downloaded
return order.Amount
})
}()
// ensure that we always terminate sending goroutine // ensure that we always terminate sending goroutine
defer throttle.Fail(io.EOF) defer throttle.Fail(io.EOF)
@ -823,7 +832,7 @@ func (endpoint *Endpoint) sendData(ctx context.Context, stream pb.DRPCPiecestore
} }
// beginSaveOrder saves the order with all necessary information. It assumes it has been already verified. // beginSaveOrder saves the order with all necessary information. It assumes it has been already verified.
func (endpoint *Endpoint) beginSaveOrder(limit *pb.OrderLimit) (_commit func(ctx context.Context, order *pb.Order), err error) { func (endpoint *Endpoint) beginSaveOrder(limit *pb.OrderLimit) (_commit func(ctx context.Context, order *pb.Order, amountFunc func() int64), err error) {
defer mon.Task()(nil)(&err) defer mon.Task()(nil)(&err)
commit, err := endpoint.ordersStore.BeginEnqueue(limit.SatelliteId, limit.OrderCreation) commit, err := endpoint.ordersStore.BeginEnqueue(limit.SatelliteId, limit.OrderCreation)
@ -832,7 +841,7 @@ func (endpoint *Endpoint) beginSaveOrder(limit *pb.OrderLimit) (_commit func(ctx
} }
done := false done := false
return func(ctx context.Context, order *pb.Order) { return func(ctx context.Context, order *pb.Order, amountFunc func() int64) {
if done { if done {
return return
} }
@ -851,8 +860,12 @@ func (endpoint *Endpoint) beginSaveOrder(limit *pb.OrderLimit) (_commit func(ctx
if err != nil { if err != nil {
endpoint.log.Error("failed to add order", zap.Error(err)) endpoint.log.Error("failed to add order", zap.Error(err))
} else { } else {
amount := order.Amount
if amountFunc != nil {
amount = amountFunc()
}
// We always want to save order to the database to be able to settle. // We always want to save order to the database to be able to settle.
err = endpoint.usage.Add(context2.WithoutCancellation(ctx), limit.SatelliteId, limit.Action, order.Amount, time.Now()) err = endpoint.usage.Add(context2.WithoutCancellation(ctx), limit.SatelliteId, limit.Action, amount, time.Now())
if err != nil { if err != nil {
endpoint.log.Error("failed to add bandwidth usage", zap.Error(err)) endpoint.log.Error("failed to add bandwidth usage", zap.Error(err))
} }

View File

@ -26,6 +26,7 @@ import (
"storj.io/common/storj" "storj.io/common/storj"
"storj.io/common/testcontext" "storj.io/common/testcontext"
"storj.io/common/testrand" "storj.io/common/testrand"
"storj.io/storj/private/date"
"storj.io/storj/private/testplanet" "storj.io/storj/private/testplanet"
"storj.io/storj/storagenode" "storj.io/storj/storagenode"
"storj.io/storj/storagenode/bandwidth" "storj.io/storj/storagenode/bandwidth"
@ -116,6 +117,8 @@ func TestUpload(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer ctx.Check(client.Close) defer ctx.Check(client.Close)
var expectedIngressAmount int64
for _, tt := range []struct { for _, tt := range []struct {
pieceID storj.PieceID pieceID storj.PieceID
contentLength memory.Size contentLength memory.Size
@ -183,8 +186,15 @@ func TestUpload(t *testing.T) {
signee := signing.SignerFromFullIdentity(planet.StorageNodes[0].Identity) signee := signing.SignerFromFullIdentity(planet.StorageNodes[0].Identity)
require.NoError(t, signing.VerifyPieceHashSignature(ctx, signee, pieceHash)) require.NoError(t, signing.VerifyPieceHashSignature(ctx, signee, pieceHash))
expectedIngressAmount += int64(len(data)) // assuming all data is uploaded
} }
} }
from, to := date.MonthBoundary(time.Now().UTC())
summary, err := planet.StorageNodes[0].DB.Bandwidth().SatelliteIngressSummary(ctx, planet.Satellites[0].ID(), from, to)
require.NoError(t, err)
require.Equal(t, expectedIngressAmount, summary.Put)
}) })
} }