Fix some leaks and add notes about close handling (#2334)
This commit is contained in:
parent
605e3fb730
commit
c7679b9b30
@ -43,6 +43,7 @@ func (d *GraphiteDest) Metric(application, instance string, key []byte, val floa
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
// TODO(leak): free connection
|
||||||
d.conn = conn
|
d.conn = conn
|
||||||
d.buf = bufio.NewWriter(conn)
|
d.buf = bufio.NewWriter(conn)
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,7 @@ import (
|
|||||||
"github.com/golang/protobuf/ptypes"
|
"github.com/golang/protobuf/ptypes"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
"storj.io/storj/internal/memory"
|
"storj.io/storj/internal/memory"
|
||||||
"storj.io/storj/internal/version"
|
"storj.io/storj/internal/version"
|
||||||
@ -29,13 +30,10 @@ const contactWindow = time.Minute * 10
|
|||||||
|
|
||||||
type dashboardClient struct {
|
type dashboardClient struct {
|
||||||
client pb.PieceStoreInspectorClient
|
client pb.PieceStoreInspectorClient
|
||||||
|
conn *grpc.ClientConn
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dash *dashboardClient) dashboard(ctx context.Context) (*pb.DashboardResponse, error) {
|
func dialDashboardClient(ctx context.Context, address string) (*dashboardClient, error) {
|
||||||
return dash.client.Dashboard(ctx, &pb.DashboardRequest{})
|
|
||||||
}
|
|
||||||
|
|
||||||
func newDashboardClient(ctx context.Context, address string) (*dashboardClient, error) {
|
|
||||||
conn, err := transport.DialAddressInsecure(ctx, address)
|
conn, err := transport.DialAddressInsecure(ctx, address)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &dashboardClient{}, err
|
return &dashboardClient{}, err
|
||||||
@ -43,9 +41,18 @@ func newDashboardClient(ctx context.Context, address string) (*dashboardClient,
|
|||||||
|
|
||||||
return &dashboardClient{
|
return &dashboardClient{
|
||||||
client: pb.NewPieceStoreInspectorClient(conn),
|
client: pb.NewPieceStoreInspectorClient(conn),
|
||||||
|
conn: conn,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (dash *dashboardClient) dashboard(ctx context.Context) (*pb.DashboardResponse, error) {
|
||||||
|
return dash.client.Dashboard(ctx, &pb.DashboardRequest{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dash *dashboardClient) close() error {
|
||||||
|
return dash.conn.Close()
|
||||||
|
}
|
||||||
|
|
||||||
func cmdDashboard(cmd *cobra.Command, args []string) (err error) {
|
func cmdDashboard(cmd *cobra.Command, args []string) (err error) {
|
||||||
ctx := process.Ctx(cmd)
|
ctx := process.Ctx(cmd)
|
||||||
|
|
||||||
@ -56,10 +63,15 @@ func cmdDashboard(cmd *cobra.Command, args []string) (err error) {
|
|||||||
zap.S().Info("Node ID: ", ident.ID)
|
zap.S().Info("Node ID: ", ident.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
client, err := newDashboardClient(ctx, dashboardCfg.Address)
|
client, err := dialDashboardClient(ctx, dashboardCfg.Address)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
defer func() {
|
||||||
|
if err := client.close(); err != nil {
|
||||||
|
zap.S().Debug("closing dashboard client failed", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
data, err := client.dashboard(ctx)
|
data, err := client.dashboard(ctx)
|
||||||
|
@ -162,6 +162,7 @@ func (uplink *Uplink) DialPiecestore(ctx context.Context, destination Peer) (*pi
|
|||||||
|
|
||||||
signer := signing.SignerFromFullIdentity(uplink.Transport.Identity())
|
signer := signing.SignerFromFullIdentity(uplink.Transport.Identity())
|
||||||
|
|
||||||
|
// TODO(leak): unclear ownership semantics
|
||||||
return piecestore.NewClient(uplink.Log.Named("uplink>piecestore"), signer, conn, piecestore.DefaultConfig), nil
|
return piecestore.NewClient(uplink.Log.Named("uplink>piecestore"), signer, conn, piecestore.DefaultConfig), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -476,6 +476,7 @@ func (verifier *Verifier) GetShare(ctx context.Context, limit *pb.AddressedOrder
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return Share{}, err
|
return Share{}, err
|
||||||
}
|
}
|
||||||
|
// TODO(leak): unclear ownership semantics
|
||||||
ps := piecestore.NewClient(
|
ps := piecestore.NewClient(
|
||||||
verifier.log.Named(storageNodeID.String()),
|
verifier.log.Named(storageNodeID.String()),
|
||||||
signing.SignerFromFullIdentity(verifier.transport.Identity()),
|
signing.SignerFromFullIdentity(verifier.transport.Identity()),
|
||||||
|
@ -139,6 +139,8 @@ func TestOptions_ServerOption_Peer_CA_Whitelist(t *testing.T) {
|
|||||||
conn, err := transportClient.DialNode(ctx, &target.Node, dialOption)
|
conn, err := transportClient.DialNode(ctx, &target.Node, dialOption)
|
||||||
assert.NotNil(t, conn)
|
assert.NotNil(t, conn)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
assert.NoError(t, conn.Close())
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -62,6 +62,7 @@ func (ec *ecClient) newPSClient(ctx context.Context, n *pb.Node) (*piecestore.Cl
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
// TODO(leak): unclear ownership semantics
|
||||||
return piecestore.NewClient(
|
return piecestore.NewClient(
|
||||||
zap.L().Named(n.Id.String()),
|
zap.L().Named(n.Id.String()),
|
||||||
signing.SignerFromFullIdentity(ec.transport.Identity()),
|
signing.SignerFromFullIdentity(ec.transport.Identity()),
|
||||||
|
@ -55,6 +55,7 @@ func TestVouchers(t *testing.T) {
|
|||||||
|
|
||||||
conn, err := tt.node.Transport.DialNode(ctx, &satellite)
|
conn, err := tt.node.Transport.DialNode(ctx, &satellite)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
defer ctx.Check(conn.Close)
|
||||||
|
|
||||||
client := pb.NewVouchersClient(conn)
|
client := pb.NewVouchersClient(conn)
|
||||||
|
|
||||||
|
@ -99,6 +99,7 @@ func TestUpload(t *testing.T) {
|
|||||||
|
|
||||||
client, err := planet.Uplinks[0].DialPiecestore(ctx, planet.StorageNodes[0])
|
client, err := planet.Uplinks[0].DialPiecestore(ctx, planet.StorageNodes[0])
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
defer ctx.Check(client.Close)
|
||||||
|
|
||||||
for _, tt := range []struct {
|
for _, tt := range []struct {
|
||||||
pieceID storj.PieceID
|
pieceID storj.PieceID
|
||||||
@ -183,6 +184,7 @@ func TestDownload(t *testing.T) {
|
|||||||
// upload test piece
|
// upload test piece
|
||||||
client, err := planet.Uplinks[0].DialPiecestore(ctx, planet.StorageNodes[0])
|
client, err := planet.Uplinks[0].DialPiecestore(ctx, planet.StorageNodes[0])
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
defer ctx.Check(client.Close)
|
||||||
|
|
||||||
var serialNumber storj.SerialNumber
|
var serialNumber storj.SerialNumber
|
||||||
_, _ = rand.Read(serialNumber[:])
|
_, _ = rand.Read(serialNumber[:])
|
||||||
@ -289,6 +291,7 @@ func TestDelete(t *testing.T) {
|
|||||||
// upload test piece
|
// upload test piece
|
||||||
client, err := planet.Uplinks[0].DialPiecestore(ctx, planet.StorageNodes[0])
|
client, err := planet.Uplinks[0].DialPiecestore(ctx, planet.StorageNodes[0])
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
defer ctx.Check(client.Close)
|
||||||
|
|
||||||
var serialNumber storj.SerialNumber
|
var serialNumber storj.SerialNumber
|
||||||
_, _ = rand.Read(serialNumber[:])
|
_, _ = rand.Read(serialNumber[:])
|
||||||
|
@ -129,6 +129,7 @@ func TestOrderLimitPutValidation(t *testing.T) {
|
|||||||
|
|
||||||
client, err := planet.Uplinks[0].DialPiecestore(ctx, planet.StorageNodes[0])
|
client, err := planet.Uplinks[0].DialPiecestore(ctx, planet.StorageNodes[0])
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
defer ctx.Check(client.Close)
|
||||||
|
|
||||||
signer := signing.SignerFromFullIdentity(planet.Satellites[0].Identity)
|
signer := signing.SignerFromFullIdentity(planet.Satellites[0].Identity)
|
||||||
satellite := planet.Satellites[0].Identity
|
satellite := planet.Satellites[0].Identity
|
||||||
@ -199,6 +200,7 @@ func TestOrderLimitGetValidation(t *testing.T) {
|
|||||||
{ // upload test piece
|
{ // upload test piece
|
||||||
client, err := planet.Uplinks[0].DialPiecestore(ctx, planet.StorageNodes[0])
|
client, err := planet.Uplinks[0].DialPiecestore(ctx, planet.StorageNodes[0])
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
defer ctx.Check(client.Close)
|
||||||
|
|
||||||
signer := signing.SignerFromFullIdentity(planet.Satellites[0].Identity)
|
signer := signing.SignerFromFullIdentity(planet.Satellites[0].Identity)
|
||||||
satellite := planet.Satellites[0].Identity
|
satellite := planet.Satellites[0].Identity
|
||||||
@ -253,6 +255,7 @@ func TestOrderLimitGetValidation(t *testing.T) {
|
|||||||
} {
|
} {
|
||||||
client, err := planet.Uplinks[0].DialPiecestore(ctx, planet.StorageNodes[0])
|
client, err := planet.Uplinks[0].DialPiecestore(ctx, planet.StorageNodes[0])
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
defer ctx.Check(client.Close)
|
||||||
|
|
||||||
signer := signing.SignerFromFullIdentity(planet.Satellites[0].Identity)
|
signer := signing.SignerFromFullIdentity(planet.Satellites[0].Identity)
|
||||||
satellite := planet.Satellites[0].Identity
|
satellite := planet.Satellites[0].Identity
|
||||||
|
@ -51,6 +51,7 @@ func NewClient(log *zap.Logger, signer signing.Signer, conn *grpc.ClientConn, co
|
|||||||
return &Client{
|
return &Client{
|
||||||
log: log,
|
log: log,
|
||||||
signer: signer,
|
signer: signer,
|
||||||
|
// TODO(leak): unclear ownership semantics
|
||||||
conn: conn,
|
conn: conn,
|
||||||
client: pb.NewPiecestoreClient(conn),
|
client: pb.NewPiecestoreClient(conn),
|
||||||
config: config,
|
config: config,
|
||||||
|
Loading…
Reference in New Issue
Block a user