satellite/overlay: improve contended update checkin

Improve UpdateCheckIn on a contended row:

  name                             old time/op  new time/op delta
  UpdateCheckInContended-100x-32   2.29s ±55%   0.17s ±61%  -92.45%  (p=0.008 n=5+5)

Change-Id: I053ab9f1cff136c306e5fb57f5e355cdc0269a8c
This commit is contained in:
Egon Elbre 2021-05-16 20:09:15 +03:00
parent b426248936
commit 8f15f975a2
2 changed files with 71 additions and 3 deletions

View File

@ -10,8 +10,10 @@ import (
"time"
"github.com/stretchr/testify/require"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/errs2"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/common/testrand"
@ -89,6 +91,30 @@ func BenchmarkOverlay(b *testing.B) {
}
})
b.Run("UpdateCheckInContended-100x", func(b *testing.B) {
for k := 0; k < b.N; k++ {
var g errs2.Group
for i := 0; i < 100; i++ {
g.Go(func() error {
d := overlay.NodeCheckInInfo{
NodeID: all[0],
Address: &pb.NodeAddress{Address: "127.0.0.0:8080"},
LastIPPort: "127.0.0.0:8080",
LastNet: "127.0.0",
Operator: &pb.NodeOperator{
Email: "hello@example.com",
Wallet: "123123123123",
},
Version: &pb.NodeVersion{Version: "v1.0.0"},
IsUp: true,
}
return overlaydb.UpdateCheckIn(ctx, d, time.Now().UTC(), overlay.NodeSelectionConfig{})
})
}
require.NoError(b, errs.Combine(g.Wait()...))
}
})
b.Run("UpdateStatsSuccess", func(b *testing.B) {
for i := 0; i < b.N; i++ {
id := all[i%len(all)]

View File

@ -1668,7 +1668,50 @@ func (cache *overlaycache) UpdateCheckIn(ctx context.Context, node overlay.NodeC
return Error.Wrap(err)
}
query := `
// First try the fast path.
var res sql.Result
res, err = cache.db.ExecContext(ctx, `
UPDATE nodes
SET
address=$2,
last_net=$3,
protocol=$4,
email=$5,
wallet=$6,
free_disk=$7,
major=$9, minor=$10, patch=$11, hash=$12, timestamp=$13, release=$14,
last_contact_success = CASE WHEN $8::bool IS TRUE
THEN $15::timestamptz
ELSE nodes.last_contact_success
END,
last_contact_failure = CASE WHEN $8::bool IS FALSE
THEN $15::timestamptz
ELSE nodes.last_contact_failure
END,
last_ip_port=$16,
wallet_features=$17
WHERE id = $1
`, // args $1 - $4
node.NodeID.Bytes(), node.Address.GetAddress(), node.LastNet, node.Address.GetTransport(),
// args $5 - $7
node.Operator.GetEmail(), node.Operator.GetWallet(), node.Capacity.GetFreeDisk(),
// args $8
node.IsUp,
// args $9 - $14
semVer.Major, semVer.Minor, semVer.Patch, node.Version.GetCommitHash(), node.Version.Timestamp, node.Version.GetRelease(),
// args $15
timestamp,
// args $16
node.LastIPPort,
// args $17,
walletFeatures,
)
affected, affectedErr := res.RowsAffected()
if affected > 0 && err == nil && affectedErr == nil {
return nil
}
_, err = cache.db.ExecContext(ctx, `
INSERT INTO nodes
(
id, address, last_net, protocol, type,
@ -1716,8 +1759,7 @@ func (cache *overlaycache) UpdateCheckIn(ctx context.Context, node overlay.NodeC
END,
last_ip_port=$19,
wallet_features=$20;
`
_, err = cache.db.ExecContext(ctx, query,
`,
// args $1 - $5
node.NodeID.Bytes(), node.Address.GetAddress(), node.LastNet, node.Address.GetTransport(), int(pb.NodeType_STORAGE),
// args $6 - $8