satellite/orders: ignore context canceled when updating bucket bandwidth
Orders from storage nodes are received by SettlementWithWindowFinal method. There is a stream which receives all orders and after getting all orders we are inserting into DB storagenode and bucket bandwidth. Problem is with bucket bandwidth which is stored through cache which is often using context from SettlementWithWindowFinal stream to perform DB inserts and its doing this in separate goroutine. Because of that is possible that SettlementWithWindowFinal is finished before flushing was finished and context is canceled while doing insert into DB Change-Id: I3a72c86390e9aedc060f6b082bb059f1406231ee
This commit is contained in:
parent
51a0d43eef
commit
252c437b0e
@ -15,6 +15,7 @@ import (
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/context2"
|
||||
"storj.io/common/identity"
|
||||
"storj.io/common/pb"
|
||||
"storj.io/common/rpc/rpcstatus"
|
||||
@ -386,6 +387,10 @@ func (endpoint *Endpoint) SettlementWithWindowFinal(stream pb.DRPCOrders_Settlem
|
||||
)
|
||||
|
||||
if status == pb.SettlementWithWindowResponse_ACCEPTED && !alreadyProcessed {
|
||||
// we would like to update bandwidth even if context was canceled because
|
||||
// underlying implementation is flushing cache using this context in separate
|
||||
// goroutine so it can be executed after this stream will be closed
|
||||
ctx := context2.WithoutCancellation(ctx)
|
||||
for bucketIDAction, bwAmount := range bucketSettled {
|
||||
err = endpoint.DB.UpdateBucketBandwidthSettle(ctx,
|
||||
bucketIDAction.projectID, []byte(bucketIDAction.bucketname), bucketIDAction.action, bwAmount.Settled, bwAmount.Dead, time.Unix(0, window),
|
||||
|
@ -6,20 +6,26 @@ package orders_test
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
||||
"storj.io/common/memory"
|
||||
"storj.io/common/pb"
|
||||
"storj.io/common/signing"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/testrand"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/accounting"
|
||||
"storj.io/storj/satellite/internalpb"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
"storj.io/storj/satellite/orders"
|
||||
"storj.io/storj/satellite/satellitedb/satellitedbtest"
|
||||
)
|
||||
@ -212,3 +218,113 @@ func TestUpdateBucketBandwidth(t *testing.T) {
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
func TestEndpointAndCacheContextCanceled(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
config.Orders.FlushBatchSize = 3
|
||||
},
|
||||
},
|
||||
},
|
||||
func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
satellite := planet.Satellites[0]
|
||||
storagenode := planet.StorageNodes[0]
|
||||
ordersDB := planet.Satellites[0].Orders.DB
|
||||
|
||||
now := time.Now()
|
||||
|
||||
// create orders to trigger RollupsWriteCache flush
|
||||
projectID := testrand.UUID()
|
||||
requests := []*pb.SettlementRequest{}
|
||||
singleOrderAmount := int64(50)
|
||||
for i := 0; i < 3; i++ {
|
||||
piecePublicKey, piecePrivateKey, err := storj.NewPieceKey()
|
||||
require.NoError(t, err)
|
||||
|
||||
bucketname := "testbucket" + strconv.Itoa(i)
|
||||
|
||||
bucketLocation := metabase.BucketLocation{
|
||||
ProjectID: projectID,
|
||||
BucketName: bucketname,
|
||||
}
|
||||
|
||||
serialNumber := testrand.SerialNumber()
|
||||
key := satellite.Config.Orders.EncryptionKeys.Default
|
||||
encrypted, err := key.EncryptMetadata(
|
||||
serialNumber,
|
||||
&internalpb.OrderLimitMetadata{
|
||||
CompactProjectBucketPrefix: bucketLocation.CompactPrefix(),
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
limit := &pb.OrderLimit{
|
||||
SerialNumber: serialNumber,
|
||||
SatelliteId: satellite.ID(),
|
||||
UplinkPublicKey: piecePublicKey,
|
||||
StorageNodeId: storagenode.ID(),
|
||||
PieceId: storj.NewPieceID(),
|
||||
Action: pb.PieceAction_GET,
|
||||
Limit: 1000,
|
||||
PieceExpiration: time.Time{},
|
||||
OrderCreation: now,
|
||||
OrderExpiration: now.Add(24 * time.Hour),
|
||||
EncryptedMetadataKeyId: key.ID[:],
|
||||
EncryptedMetadata: encrypted,
|
||||
}
|
||||
|
||||
orderLimit, err := signing.SignOrderLimit(ctx, signing.SignerFromFullIdentity(satellite.Identity), limit)
|
||||
require.NoError(t, err)
|
||||
|
||||
order, err := signing.SignUplinkOrder(ctx, piecePrivateKey, &pb.Order{
|
||||
SerialNumber: serialNumber,
|
||||
Amount: singleOrderAmount,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
requests = append(requests, &pb.SettlementRequest{
|
||||
Limit: orderLimit,
|
||||
Order: order,
|
||||
})
|
||||
}
|
||||
|
||||
conn, err := storagenode.Dialer.DialNodeURL(ctx, storj.NodeURL{ID: satellite.ID(), Address: satellite.Addr()})
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(conn.Close)
|
||||
|
||||
stream, err := pb.NewDRPCOrdersClient(conn).SettlementWithWindow(ctx)
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(stream.Close)
|
||||
|
||||
for _, request := range requests {
|
||||
err := stream.Send(&pb.SettlementRequest{
|
||||
Limit: request.Limit,
|
||||
Order: request.Order,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
resp, err := stream.CloseAndRecv()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, pb.SettlementWithWindowResponse_ACCEPTED, resp.Status)
|
||||
|
||||
rwc := ordersDB.(*orders.RollupsWriteCache)
|
||||
whenDone := rwc.OnNextFlush()
|
||||
|
||||
// make sure flushing is done
|
||||
select {
|
||||
case <-whenDone:
|
||||
break
|
||||
case <-ctx.Done():
|
||||
t.Fatal(ctx.Err())
|
||||
}
|
||||
|
||||
// verify that orders were stored in DB
|
||||
bucketBandwidth, err := getSettledBandwidth(ctx, planet.Satellites[0].DB.ProjectAccounting(), projectID, now)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, singleOrderAmount*int64(len(requests)), bucketBandwidth)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user