satellite/satellitedb: serialize UpdateStats and BatchUpdateStats transactions
Since we increased the number of audit workers from 1 to 2, we need to make sure concurrent updates do not trample each other. We can do this by serializing the transactions. Change-Id: If1b2f71cabe3c779c12ffa33c0c3271778ac3ae0
This commit is contained in:
parent
95a7403802
commit
bad299b541
@ -15,6 +15,7 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zaptest"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"storj.io/common/memory"
|
||||
"storj.io/common/pb"
|
||||
@ -808,6 +809,54 @@ func TestSuspendedSelection(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestConcurrentAudit(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
planet.Satellites[0].Audit.Chore.Loop.Stop()
|
||||
data := testrand.Bytes(10 * memory.MB)
|
||||
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "bucket", "testpath", data)
|
||||
require.NoError(t, err)
|
||||
var group errgroup.Group
|
||||
n := 5
|
||||
for i := 0; i < n; i++ {
|
||||
group.Go(func() error {
|
||||
_, err := planet.Satellites[0].Overlay.Service.UpdateStats(ctx, &overlay.UpdateRequest{
|
||||
NodeID: planet.StorageNodes[0].ID(),
|
||||
AuditOutcome: overlay.AuditSuccess,
|
||||
IsUp: true,
|
||||
})
|
||||
return err
|
||||
})
|
||||
}
|
||||
err = group.Wait()
|
||||
require.NoError(t, err)
|
||||
|
||||
node, err := planet.Satellites[0].DB.OverlayCache().Get(ctx, planet.StorageNodes[0].ID())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(n), node.Reputation.AuditCount)
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
group.Go(func() error {
|
||||
_, err := planet.Satellites[0].Overlay.Service.BatchUpdateStats(ctx, []*overlay.UpdateRequest{
|
||||
{
|
||||
NodeID: planet.StorageNodes[0].ID(),
|
||||
AuditOutcome: overlay.AuditSuccess,
|
||||
IsUp: true,
|
||||
},
|
||||
})
|
||||
return err
|
||||
})
|
||||
}
|
||||
err = group.Wait()
|
||||
require.NoError(t, err)
|
||||
|
||||
node, err = planet.Satellites[0].DB.OverlayCache().Get(ctx, planet.StorageNodes[0].ID())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(n*2), node.Reputation.AuditCount)
|
||||
})
|
||||
}
|
||||
|
||||
func getNodeInfo(nodeID storj.NodeID) overlay.NodeCheckInInfo {
|
||||
return overlay.NodeCheckInInfo{
|
||||
NodeID: nodeID,
|
||||
|
@ -344,6 +344,10 @@ func (cache *overlaycache) BatchUpdateStats(ctx context.Context, updateRequests
|
||||
|
||||
doAppendAll := true
|
||||
err = cache.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) (err error) {
|
||||
_, err = tx.Tx.ExecContext(ctx, "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var allSQL string
|
||||
for _, updateReq := range updateSlice {
|
||||
dbNode, err := tx.Get_Node_By_Id(ctx, dbx.Node_Id(updateReq.NodeID.Bytes()))
|
||||
@ -416,6 +420,10 @@ func (cache *overlaycache) UpdateStats(ctx context.Context, updateReq *overlay.U
|
||||
|
||||
var dbNode *dbx.Node
|
||||
err = cache.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) (err error) {
|
||||
_, err = tx.Tx.ExecContext(ctx, "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dbNode, err = tx.Get_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()))
|
||||
if err != nil {
|
||||
return err
|
||||
|
Loading…
Reference in New Issue
Block a user