From 414648d660dfe363716acf6dc60faf8bfd2ca8f5 Mon Sep 17 00:00:00 2001 From: Egon Elbre Date: Tue, 25 Jun 2019 18:36:23 +0300 Subject: [PATCH] Fix some metainfo.Client leaks (#2327) --- internal/testplanet/uplink.go | 5 +- lib/uplink/bucket.go | 2 + lib/uplink/project.go | 9 ++-- lib/uplink/uplink.go | 13 +++-- pkg/metainfo/kvmetainfo/buckets_test.go | 1 + pkg/metainfo/kvmetainfo/metainfo.go | 4 +- pkg/metainfo/kvmetainfo/temputils.go | 2 +- pkg/miniogw/gateway_test.go | 1 + pkg/storage/segments/store.go | 4 +- pkg/storage/segments/store_test.go | 1 + satellite/metainfo/metainfo_test.go | 10 +++- uplink/config.go | 3 +- uplink/metainfo/client.go | 72 ++++++++++++------------- 13 files changed, 71 insertions(+), 56 deletions(-) diff --git a/internal/testplanet/uplink.go b/internal/testplanet/uplink.go index c53201076..106c231ae 100644 --- a/internal/testplanet/uplink.go +++ b/internal/testplanet/uplink.go @@ -147,9 +147,8 @@ func (uplink *Uplink) Local() pb.Node { return uplink.Info } func (uplink *Uplink) Shutdown() error { return nil } // DialMetainfo dials destination with apikey and returns metainfo Client -func (uplink *Uplink) DialMetainfo(ctx context.Context, destination Peer, apikey string) (metainfo.Client, error) { - // TODO: handle disconnect - return metainfo.NewClient(ctx, uplink.Transport, destination.Addr(), apikey) +func (uplink *Uplink) DialMetainfo(ctx context.Context, destination Peer, apikey string) (*metainfo.Client, error) { + return metainfo.Dial(ctx, uplink.Transport, destination.Addr(), apikey) } // DialPiecestore dials destination storagenode and returns a piecestore client. diff --git a/lib/uplink/bucket.go b/lib/uplink/bucket.go index 0cf2abe92..c190bef45 100644 --- a/lib/uplink/bucket.go +++ b/lib/uplink/bucket.go @@ -27,6 +27,8 @@ type Bucket struct { streams streams.Store } +// TODO: move the object related OpenObject to object.go + // OpenObject returns an Object handle, if authorized. func (b *Bucket) OpenObject(ctx context.Context, path storj.Path) (o *Object, err error) { defer mon.Task()(&ctx)(&err) diff --git a/lib/uplink/project.go b/lib/uplink/project.go index a8c1134e9..7b1d5db11 100644 --- a/lib/uplink/project.go +++ b/lib/uplink/project.go @@ -24,7 +24,7 @@ import ( type Project struct { uplinkCfg *Config tc transport.Client - metainfo metainfo.Client + metainfo *metainfo.Client project *kvmetainfo.Project maxInlineSize memory.Size } @@ -144,6 +144,8 @@ func (p *Project) GetBucketInfo(ctx context.Context, bucket string) (b storj.Buc return b, cfg, nil } +// TODO: move the bucket related OpenBucket to bucket.go + // OpenBucket returns a Bucket handle with the given EncryptionAccess // information. func (p *Project) OpenBucket(ctx context.Context, bucketName string, access *EncryptionAccess) (b *Bucket, err error) { @@ -200,8 +202,3 @@ func (p *Project) OpenBucket(ctx context.Context, bucketName string, access *Enc streams: streamStore, }, nil } - -// Close closes the Project. -func (p *Project) Close() error { - return nil -} diff --git a/lib/uplink/uplink.go b/lib/uplink/uplink.go index ea61f903a..b4065db5e 100644 --- a/lib/uplink/uplink.go +++ b/lib/uplink/uplink.go @@ -120,11 +120,13 @@ func NewUplink(ctx context.Context, cfg *Config) (_ *Uplink, err error) { }, nil } +// TODO: move the project related OpenProject and Close to project.go + // OpenProject returns a Project handle with the given APIKey func (u *Uplink) OpenProject(ctx context.Context, satelliteAddr string, apiKey APIKey) (p *Project, err error) { defer mon.Task()(&ctx)(&err) - m, err := metainfo.NewClient(ctx, u.tc, satelliteAddr, apiKey.Serialize()) + m, err := metainfo.Dial(ctx, u.tc, satelliteAddr, apiKey.Serialize()) if err != nil { return nil, err } @@ -143,9 +145,12 @@ func (u *Uplink) OpenProject(ctx context.Context, satelliteAddr string, apiKey A }, nil } -// Close closes the Uplink. This may not do anything at present, but should -// still be called to allow forward compatibility. No Project or Bucket -// objects using this Uplink should be used after calling Close. +// Close closes the Project. Opened buckets or objects must not be used after calling Close. +func (p *Project) Close() error { + return p.metainfo.Close() +} + +// Close closes the Uplink. Opened projects, buckets or objects must not be used after calling Close. func (u *Uplink) Close() error { return nil } diff --git a/pkg/metainfo/kvmetainfo/buckets_test.go b/pkg/metainfo/kvmetainfo/buckets_test.go index 93fb14871..c874225a2 100644 --- a/pkg/metainfo/kvmetainfo/buckets_test.go +++ b/pkg/metainfo/kvmetainfo/buckets_test.go @@ -312,6 +312,7 @@ func newMetainfoParts(planet *testplanet.Planet) (*kvmetainfo.DB, streams.Store, if err != nil { return nil, nil, err } + // TODO(leak): call metainfo.Close somehow ec := ecclient.NewClient(planet.Uplinks[0].Transport, 0) fc, err := infectious.NewFEC(2, 4) diff --git a/pkg/metainfo/kvmetainfo/metainfo.go b/pkg/metainfo/kvmetainfo/metainfo.go index 46ef42401..8a13c05b8 100644 --- a/pkg/metainfo/kvmetainfo/metainfo.go +++ b/pkg/metainfo/kvmetainfo/metainfo.go @@ -30,7 +30,7 @@ var _ storj.Metainfo = (*DB)(nil) type DB struct { project *Project - metainfo metainfo.Client + metainfo *metainfo.Client streams streams.Store segments segments.Store @@ -39,7 +39,7 @@ type DB struct { } // New creates a new metainfo database -func New(project *Project, metainfo metainfo.Client, streams streams.Store, segments segments.Store, encStore *encryption.Store) *DB { +func New(project *Project, metainfo *metainfo.Client, streams streams.Store, segments segments.Store, encStore *encryption.Store) *DB { return &DB{ project: project, metainfo: metainfo, diff --git a/pkg/metainfo/kvmetainfo/temputils.go b/pkg/metainfo/kvmetainfo/temputils.go index 926f5f32b..608ea86df 100644 --- a/pkg/metainfo/kvmetainfo/temputils.go +++ b/pkg/metainfo/kvmetainfo/temputils.go @@ -22,7 +22,7 @@ var ( ) // SetupProject creates a project with temporary values until we can figure out how to bypass encryption related setup -func SetupProject(m metainfo.Client) (*Project, error) { +func SetupProject(m *metainfo.Client) (*Project, error) { whoCares := 1 // TODO: find a better way to do this fc, err := infectious.NewFEC(whoCares, whoCares) if err != nil { diff --git a/pkg/miniogw/gateway_test.go b/pkg/miniogw/gateway_test.go index 213cb7f35..01d9dcf1b 100644 --- a/pkg/miniogw/gateway_test.go +++ b/pkg/miniogw/gateway_test.go @@ -688,6 +688,7 @@ func initEnv(ctx context.Context, planet *testplanet.Planet) (minio.ObjectLayer, if err != nil { return nil, nil, nil, err } + // TODO(leak): close m metainfo.Client somehow ec := ecclient.NewClient(planet.Uplinks[0].Transport, 0) fc, err := infectious.NewFEC(2, 4) diff --git a/pkg/storage/segments/store.go b/pkg/storage/segments/store.go index 45646039b..181d133cd 100644 --- a/pkg/storage/segments/store.go +++ b/pkg/storage/segments/store.go @@ -53,7 +53,7 @@ type Store interface { } type segmentStore struct { - metainfo metainfo.Client + metainfo *metainfo.Client ec ecclient.Client rs eestream.RedundancyStrategy thresholdSize int @@ -61,7 +61,7 @@ type segmentStore struct { } // NewSegmentStore creates a new instance of segmentStore -func NewSegmentStore(metainfo metainfo.Client, ec ecclient.Client, rs eestream.RedundancyStrategy, threshold int, maxEncryptedSegmentSize int64) Store { +func NewSegmentStore(metainfo *metainfo.Client, ec ecclient.Client, rs eestream.RedundancyStrategy, threshold int, maxEncryptedSegmentSize int64) Store { return &segmentStore{ metainfo: metainfo, ec: ec, diff --git a/pkg/storage/segments/store_test.go b/pkg/storage/segments/store_test.go index b71bb8b0c..1c15184ac 100644 --- a/pkg/storage/segments/store_test.go +++ b/pkg/storage/segments/store_test.go @@ -268,6 +268,7 @@ func runTest(t *testing.T, test func(t *testing.T, ctx *testcontext.Context, pla metainfo, err := planet.Uplinks[0].DialMetainfo(context.Background(), planet.Satellites[0], TestAPIKey) require.NoError(t, err) + defer ctx.Check(metainfo.Close) ec := ecclient.NewClient(planet.Uplinks[0].Transport, 0) fc, err := infectious.NewFEC(2, 4) diff --git a/satellite/metainfo/metainfo_test.go b/satellite/metainfo/metainfo_test.go index 99e8e88c2..7baf9e3b7 100644 --- a/satellite/metainfo/metainfo_test.go +++ b/satellite/metainfo/metainfo_test.go @@ -54,6 +54,7 @@ func TestInvalidAPIKey(t *testing.T) { for _, invalidAPIKey := range []string{"", "invalid", "testKey"} { client, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], invalidAPIKey) require.NoError(t, err) + defer ctx.Check(client.Close) _, _, err = client.CreateSegment(ctx, "hello", "world", 1, &pb.RedundancyScheme{}, 123, time.Now()) assertUnauthenticated(t, err, false) @@ -167,6 +168,7 @@ func TestRestrictedAPIKey(t *testing.T) { client, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], restrictedKey.Serialize()) require.NoError(t, err) + defer ctx.Check(client.Close) _, _, err = client.CreateSegment(ctx, "testbucket", "testpath", 1, &pb.RedundancyScheme{}, 123, time.Now()) assertUnauthenticated(t, err, test.CreateSegmentAllowed) @@ -296,6 +298,7 @@ func TestCommitSegment(t *testing.T) { metainfo, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey) require.NoError(t, err) + defer ctx.Check(metainfo.Close) { // error if pointer is nil @@ -362,6 +365,7 @@ func TestCreateSegment(t *testing.T) { metainfo, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey) require.NoError(t, err) + defer ctx.Check(metainfo.Close) for _, r := range []struct { rs *pb.RedundancyScheme @@ -456,6 +460,7 @@ func TestDoubleCommitSegment(t *testing.T) { metainfo, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey) require.NoError(t, err) + defer ctx.Check(metainfo.Close) pointer, limits := runCreateSegment(ctx, t, metainfo) @@ -533,6 +538,7 @@ func TestCommitSegmentPointer(t *testing.T) { metainfo, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey) require.NoError(t, err) + defer ctx.Check(metainfo.Close) for _, test := range tests { pointer, limits := runCreateSegment(ctx, t, metainfo) @@ -561,6 +567,7 @@ func TestSetAttribution(t *testing.T) { metainfoClient, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey) require.NoError(t, err) + defer ctx.Check(metainfoClient.Close) partnerID, err := uuid.New() require.NoError(t, err) @@ -585,7 +592,7 @@ func TestSetAttribution(t *testing.T) { }) } -func runCreateSegment(ctx context.Context, t *testing.T, metainfo metainfo.Client) (*pb.Pointer, []*pb.OrderLimit2) { +func runCreateSegment(ctx context.Context, t *testing.T, metainfo *metainfo.Client) (*pb.Pointer, []*pb.OrderLimit2) { pointer := createTestPointer(t) expirationDate, err := ptypes.Timestamp(pointer.ExpirationDate) require.NoError(t, err) @@ -645,6 +652,7 @@ func TestBucketNameValidation(t *testing.T) { metainfo, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey) require.NoError(t, err) + defer ctx.Check(metainfo.Close) rs := &pb.RedundancyScheme{ MinReq: 1, diff --git a/uplink/config.go b/uplink/config.go index e2b79485e..ed82bbfa3 100644 --- a/uplink/config.go +++ b/uplink/config.go @@ -93,10 +93,11 @@ func (c Config) GetMetainfo(ctx context.Context, identity *identity.FullIdentity return nil, nil, errors.New("satellite address not specified") } - m, err := metainfo.NewClient(ctx, tc, c.Client.SatelliteAddr, c.Client.APIKey) + m, err := metainfo.Dial(ctx, tc, c.Client.SatelliteAddr, c.Client.APIKey) if err != nil { return nil, nil, Error.New("failed to connect to metainfo service: %v", err) } + // TODO: handle closing of m project, err := kvmetainfo.SetupProject(m) if err != nil { diff --git a/uplink/metainfo/client.go b/uplink/metainfo/client.go index dcf8e8eac..f757c34ca 100644 --- a/uplink/metainfo/client.go +++ b/uplink/metainfo/client.go @@ -30,19 +30,12 @@ var ( Error = errs.Class("metainfo error") ) -// Metainfo creates a grpcClient -type Metainfo struct { +// Client creates a grpcClient +type Client struct { client pb.MetainfoClient + conn *grpc.ClientConn } -// New used as a public function -func New(gcclient pb.MetainfoClient) (metainfo *Metainfo) { - return &Metainfo{client: gcclient} -} - -// a compiler trick to make sure *Metainfo implements Client -var _ Client = (*Metainfo)(nil) - // ListItem is a single item in a listing type ListItem struct { Path storj.Path @@ -50,19 +43,15 @@ type ListItem struct { IsPrefix bool } -// Client interface for the Metainfo service -type Client interface { - CreateSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64, redundancy *pb.RedundancyScheme, maxEncryptedSegmentSize int64, expiration time.Time) ([]*pb.AddressedOrderLimit, storj.PieceID, error) - CommitSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64, pointer *pb.Pointer, originalLimits []*pb.OrderLimit2) (*pb.Pointer, error) - SegmentInfo(ctx context.Context, bucket string, path storj.Path, segmentIndex int64) (*pb.Pointer, error) - ReadSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64) (*pb.Pointer, []*pb.AddressedOrderLimit, error) - DeleteSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64) ([]*pb.AddressedOrderLimit, error) - ListSegments(ctx context.Context, bucket string, prefix, startAfter, endBefore storj.Path, recursive bool, limit int32, metaFlags uint32) (items []ListItem, more bool, err error) - SetAttribution(ctx context.Context, bucket string, partnerID uuid.UUID) error +// New used as a public function +func New(client pb.MetainfoClient) *Client { + return &Client{ + client: client, + } } -// NewClient initializes a new metainfo client -func NewClient(ctx context.Context, tc transport.Client, address string, apiKey string) (*Metainfo, error) { +// Dial dials to metainfo endpoint with the specified api key. +func Dial(ctx context.Context, tc transport.Client, address string, apiKey string) (*Client, error) { apiKeyInjector := grpcauth.NewAPIKeyInjector(apiKey) conn, err := tc.DialAddress( ctx, @@ -73,11 +62,22 @@ func NewClient(ctx context.Context, tc transport.Client, address string, apiKey return nil, Error.Wrap(err) } - return &Metainfo{client: pb.NewMetainfoClient(conn)}, nil + return &Client{ + client: pb.NewMetainfoClient(conn), + conn: conn, + }, nil +} + +// Close closes the dialed connection. +func (client *Client) Close() error { + if client.conn != nil { + return Error.Wrap(client.conn.Close()) + } + return nil } // CreateSegment requests the order limits for creating a new segment -func (metainfo *Metainfo) CreateSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64, redundancy *pb.RedundancyScheme, maxEncryptedSegmentSize int64, expiration time.Time) (limits []*pb.AddressedOrderLimit, rootPieceID storj.PieceID, err error) { +func (client *Client) CreateSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64, redundancy *pb.RedundancyScheme, maxEncryptedSegmentSize int64, expiration time.Time) (limits []*pb.AddressedOrderLimit, rootPieceID storj.PieceID, err error) { defer mon.Task()(&ctx)(&err) var exp *timestamp.Timestamp @@ -88,7 +88,7 @@ func (metainfo *Metainfo) CreateSegment(ctx context.Context, bucket string, path } } - response, err := metainfo.client.CreateSegment(ctx, &pb.SegmentWriteRequest{ + response, err := client.client.CreateSegment(ctx, &pb.SegmentWriteRequest{ Bucket: []byte(bucket), Path: []byte(path), Segment: segmentIndex, @@ -104,10 +104,10 @@ func (metainfo *Metainfo) CreateSegment(ctx context.Context, bucket string, path } // CommitSegment requests to store the pointer for the segment -func (metainfo *Metainfo) CommitSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64, pointer *pb.Pointer, originalLimits []*pb.OrderLimit2) (savedPointer *pb.Pointer, err error) { +func (client *Client) CommitSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64, pointer *pb.Pointer, originalLimits []*pb.OrderLimit2) (savedPointer *pb.Pointer, err error) { defer mon.Task()(&ctx)(&err) - response, err := metainfo.client.CommitSegment(ctx, &pb.SegmentCommitRequest{ + response, err := client.client.CommitSegment(ctx, &pb.SegmentCommitRequest{ Bucket: []byte(bucket), Path: []byte(path), Segment: segmentIndex, @@ -122,10 +122,10 @@ func (metainfo *Metainfo) CommitSegment(ctx context.Context, bucket string, path } // SegmentInfo requests the pointer of a segment -func (metainfo *Metainfo) SegmentInfo(ctx context.Context, bucket string, path storj.Path, segmentIndex int64) (pointer *pb.Pointer, err error) { +func (client *Client) SegmentInfo(ctx context.Context, bucket string, path storj.Path, segmentIndex int64) (pointer *pb.Pointer, err error) { defer mon.Task()(&ctx)(&err) - response, err := metainfo.client.SegmentInfo(ctx, &pb.SegmentInfoRequest{ + response, err := client.client.SegmentInfo(ctx, &pb.SegmentInfoRequest{ Bucket: []byte(bucket), Path: []byte(path), Segment: segmentIndex, @@ -141,10 +141,10 @@ func (metainfo *Metainfo) SegmentInfo(ctx context.Context, bucket string, path s } // ReadSegment requests the order limits for reading a segment -func (metainfo *Metainfo) ReadSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64) (pointer *pb.Pointer, limits []*pb.AddressedOrderLimit, err error) { +func (client *Client) ReadSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64) (pointer *pb.Pointer, limits []*pb.AddressedOrderLimit, err error) { defer mon.Task()(&ctx)(&err) - response, err := metainfo.client.DownloadSegment(ctx, &pb.SegmentDownloadRequest{ + response, err := client.client.DownloadSegment(ctx, &pb.SegmentDownloadRequest{ Bucket: []byte(bucket), Path: []byte(path), Segment: segmentIndex, @@ -178,10 +178,10 @@ func getLimitByStorageNodeID(limits []*pb.AddressedOrderLimit, storageNodeID sto } // DeleteSegment requests the order limits for deleting a segment -func (metainfo *Metainfo) DeleteSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64) (limits []*pb.AddressedOrderLimit, err error) { +func (client *Client) DeleteSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64) (limits []*pb.AddressedOrderLimit, err error) { defer mon.Task()(&ctx)(&err) - response, err := metainfo.client.DeleteSegment(ctx, &pb.SegmentDeleteRequest{ + response, err := client.client.DeleteSegment(ctx, &pb.SegmentDeleteRequest{ Bucket: []byte(bucket), Path: []byte(path), Segment: segmentIndex, @@ -197,10 +197,10 @@ func (metainfo *Metainfo) DeleteSegment(ctx context.Context, bucket string, path } // ListSegments lists the available segments -func (metainfo *Metainfo) ListSegments(ctx context.Context, bucket string, prefix, startAfter, endBefore storj.Path, recursive bool, limit int32, metaFlags uint32) (items []ListItem, more bool, err error) { +func (client *Client) ListSegments(ctx context.Context, bucket string, prefix, startAfter, endBefore storj.Path, recursive bool, limit int32, metaFlags uint32) (items []ListItem, more bool, err error) { defer mon.Task()(&ctx)(&err) - response, err := metainfo.client.ListSegments(ctx, &pb.ListSegmentsRequest{ + response, err := client.client.ListSegments(ctx, &pb.ListSegmentsRequest{ Bucket: []byte(bucket), Prefix: []byte(prefix), StartAfter: []byte(startAfter), @@ -227,10 +227,10 @@ func (metainfo *Metainfo) ListSegments(ctx context.Context, bucket string, prefi } // SetAttribution tries to set the attribution information on the bucket. -func (metainfo *Metainfo) SetAttribution(ctx context.Context, bucket string, partnerID uuid.UUID) (err error) { +func (client *Client) SetAttribution(ctx context.Context, bucket string, partnerID uuid.UUID) (err error) { defer mon.Task()(&ctx)(&err) - _, err = metainfo.client.SetAttribution(ctx, &pb.SetAttributionRequest{ + _, err = client.client.SetAttribution(ctx, &pb.SetAttributionRequest{ PartnerId: partnerID[:], // TODO: implement storj.UUID that can be sent using pb BucketName: []byte(bucket), })