Fix some metainfo.Client leaks (#2327)

This commit is contained in:
Egon Elbre 2019-06-25 18:36:23 +03:00 committed by GitHub
parent 35f2ab5ded
commit 414648d660
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 71 additions and 56 deletions

View File

@ -147,9 +147,8 @@ func (uplink *Uplink) Local() pb.Node { return uplink.Info }
func (uplink *Uplink) Shutdown() error { return nil }
// DialMetainfo dials destination with apikey and returns metainfo Client
func (uplink *Uplink) DialMetainfo(ctx context.Context, destination Peer, apikey string) (metainfo.Client, error) {
// TODO: handle disconnect
return metainfo.NewClient(ctx, uplink.Transport, destination.Addr(), apikey)
func (uplink *Uplink) DialMetainfo(ctx context.Context, destination Peer, apikey string) (*metainfo.Client, error) {
return metainfo.Dial(ctx, uplink.Transport, destination.Addr(), apikey)
}
// DialPiecestore dials destination storagenode and returns a piecestore client.

View File

@ -27,6 +27,8 @@ type Bucket struct {
streams streams.Store
}
// TODO: move the object related OpenObject to object.go
// OpenObject returns an Object handle, if authorized.
func (b *Bucket) OpenObject(ctx context.Context, path storj.Path) (o *Object, err error) {
defer mon.Task()(&ctx)(&err)

View File

@ -24,7 +24,7 @@ import (
type Project struct {
uplinkCfg *Config
tc transport.Client
metainfo metainfo.Client
metainfo *metainfo.Client
project *kvmetainfo.Project
maxInlineSize memory.Size
}
@ -144,6 +144,8 @@ func (p *Project) GetBucketInfo(ctx context.Context, bucket string) (b storj.Buc
return b, cfg, nil
}
// TODO: move the bucket related OpenBucket to bucket.go
// OpenBucket returns a Bucket handle with the given EncryptionAccess
// information.
func (p *Project) OpenBucket(ctx context.Context, bucketName string, access *EncryptionAccess) (b *Bucket, err error) {
@ -200,8 +202,3 @@ func (p *Project) OpenBucket(ctx context.Context, bucketName string, access *Enc
streams: streamStore,
}, nil
}
// Close closes the Project.
func (p *Project) Close() error {
return nil
}

View File

@ -120,11 +120,13 @@ func NewUplink(ctx context.Context, cfg *Config) (_ *Uplink, err error) {
}, nil
}
// TODO: move the project related OpenProject and Close to project.go
// OpenProject returns a Project handle with the given APIKey
func (u *Uplink) OpenProject(ctx context.Context, satelliteAddr string, apiKey APIKey) (p *Project, err error) {
defer mon.Task()(&ctx)(&err)
m, err := metainfo.NewClient(ctx, u.tc, satelliteAddr, apiKey.Serialize())
m, err := metainfo.Dial(ctx, u.tc, satelliteAddr, apiKey.Serialize())
if err != nil {
return nil, err
}
@ -143,9 +145,12 @@ func (u *Uplink) OpenProject(ctx context.Context, satelliteAddr string, apiKey A
}, nil
}
// Close closes the Uplink. This may not do anything at present, but should
// still be called to allow forward compatibility. No Project or Bucket
// objects using this Uplink should be used after calling Close.
// Close closes the Project. Opened buckets or objects must not be used after calling Close.
func (p *Project) Close() error {
return p.metainfo.Close()
}
// Close closes the Uplink. Opened projects, buckets or objects must not be used after calling Close.
func (u *Uplink) Close() error {
return nil
}

View File

@ -312,6 +312,7 @@ func newMetainfoParts(planet *testplanet.Planet) (*kvmetainfo.DB, streams.Store,
if err != nil {
return nil, nil, err
}
// TODO(leak): call metainfo.Close somehow
ec := ecclient.NewClient(planet.Uplinks[0].Transport, 0)
fc, err := infectious.NewFEC(2, 4)

View File

@ -30,7 +30,7 @@ var _ storj.Metainfo = (*DB)(nil)
type DB struct {
project *Project
metainfo metainfo.Client
metainfo *metainfo.Client
streams streams.Store
segments segments.Store
@ -39,7 +39,7 @@ type DB struct {
}
// New creates a new metainfo database
func New(project *Project, metainfo metainfo.Client, streams streams.Store, segments segments.Store, encStore *encryption.Store) *DB {
func New(project *Project, metainfo *metainfo.Client, streams streams.Store, segments segments.Store, encStore *encryption.Store) *DB {
return &DB{
project: project,
metainfo: metainfo,

View File

@ -22,7 +22,7 @@ var (
)
// SetupProject creates a project with temporary values until we can figure out how to bypass encryption related setup
func SetupProject(m metainfo.Client) (*Project, error) {
func SetupProject(m *metainfo.Client) (*Project, error) {
whoCares := 1 // TODO: find a better way to do this
fc, err := infectious.NewFEC(whoCares, whoCares)
if err != nil {

View File

@ -688,6 +688,7 @@ func initEnv(ctx context.Context, planet *testplanet.Planet) (minio.ObjectLayer,
if err != nil {
return nil, nil, nil, err
}
// TODO(leak): close m metainfo.Client somehow
ec := ecclient.NewClient(planet.Uplinks[0].Transport, 0)
fc, err := infectious.NewFEC(2, 4)

View File

@ -53,7 +53,7 @@ type Store interface {
}
type segmentStore struct {
metainfo metainfo.Client
metainfo *metainfo.Client
ec ecclient.Client
rs eestream.RedundancyStrategy
thresholdSize int
@ -61,7 +61,7 @@ type segmentStore struct {
}
// NewSegmentStore creates a new instance of segmentStore
func NewSegmentStore(metainfo metainfo.Client, ec ecclient.Client, rs eestream.RedundancyStrategy, threshold int, maxEncryptedSegmentSize int64) Store {
func NewSegmentStore(metainfo *metainfo.Client, ec ecclient.Client, rs eestream.RedundancyStrategy, threshold int, maxEncryptedSegmentSize int64) Store {
return &segmentStore{
metainfo: metainfo,
ec: ec,

View File

@ -268,6 +268,7 @@ func runTest(t *testing.T, test func(t *testing.T, ctx *testcontext.Context, pla
metainfo, err := planet.Uplinks[0].DialMetainfo(context.Background(), planet.Satellites[0], TestAPIKey)
require.NoError(t, err)
defer ctx.Check(metainfo.Close)
ec := ecclient.NewClient(planet.Uplinks[0].Transport, 0)
fc, err := infectious.NewFEC(2, 4)

View File

@ -54,6 +54,7 @@ func TestInvalidAPIKey(t *testing.T) {
for _, invalidAPIKey := range []string{"", "invalid", "testKey"} {
client, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], invalidAPIKey)
require.NoError(t, err)
defer ctx.Check(client.Close)
_, _, err = client.CreateSegment(ctx, "hello", "world", 1, &pb.RedundancyScheme{}, 123, time.Now())
assertUnauthenticated(t, err, false)
@ -167,6 +168,7 @@ func TestRestrictedAPIKey(t *testing.T) {
client, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], restrictedKey.Serialize())
require.NoError(t, err)
defer ctx.Check(client.Close)
_, _, err = client.CreateSegment(ctx, "testbucket", "testpath", 1, &pb.RedundancyScheme{}, 123, time.Now())
assertUnauthenticated(t, err, test.CreateSegmentAllowed)
@ -296,6 +298,7 @@ func TestCommitSegment(t *testing.T) {
metainfo, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey)
require.NoError(t, err)
defer ctx.Check(metainfo.Close)
{
// error if pointer is nil
@ -362,6 +365,7 @@ func TestCreateSegment(t *testing.T) {
metainfo, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey)
require.NoError(t, err)
defer ctx.Check(metainfo.Close)
for _, r := range []struct {
rs *pb.RedundancyScheme
@ -456,6 +460,7 @@ func TestDoubleCommitSegment(t *testing.T) {
metainfo, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey)
require.NoError(t, err)
defer ctx.Check(metainfo.Close)
pointer, limits := runCreateSegment(ctx, t, metainfo)
@ -533,6 +538,7 @@ func TestCommitSegmentPointer(t *testing.T) {
metainfo, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey)
require.NoError(t, err)
defer ctx.Check(metainfo.Close)
for _, test := range tests {
pointer, limits := runCreateSegment(ctx, t, metainfo)
@ -561,6 +567,7 @@ func TestSetAttribution(t *testing.T) {
metainfoClient, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey)
require.NoError(t, err)
defer ctx.Check(metainfoClient.Close)
partnerID, err := uuid.New()
require.NoError(t, err)
@ -585,7 +592,7 @@ func TestSetAttribution(t *testing.T) {
})
}
func runCreateSegment(ctx context.Context, t *testing.T, metainfo metainfo.Client) (*pb.Pointer, []*pb.OrderLimit2) {
func runCreateSegment(ctx context.Context, t *testing.T, metainfo *metainfo.Client) (*pb.Pointer, []*pb.OrderLimit2) {
pointer := createTestPointer(t)
expirationDate, err := ptypes.Timestamp(pointer.ExpirationDate)
require.NoError(t, err)
@ -645,6 +652,7 @@ func TestBucketNameValidation(t *testing.T) {
metainfo, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey)
require.NoError(t, err)
defer ctx.Check(metainfo.Close)
rs := &pb.RedundancyScheme{
MinReq: 1,

View File

@ -93,10 +93,11 @@ func (c Config) GetMetainfo(ctx context.Context, identity *identity.FullIdentity
return nil, nil, errors.New("satellite address not specified")
}
m, err := metainfo.NewClient(ctx, tc, c.Client.SatelliteAddr, c.Client.APIKey)
m, err := metainfo.Dial(ctx, tc, c.Client.SatelliteAddr, c.Client.APIKey)
if err != nil {
return nil, nil, Error.New("failed to connect to metainfo service: %v", err)
}
// TODO: handle closing of m
project, err := kvmetainfo.SetupProject(m)
if err != nil {

View File

@ -30,19 +30,12 @@ var (
Error = errs.Class("metainfo error")
)
// Metainfo creates a grpcClient
type Metainfo struct {
// Client creates a grpcClient
type Client struct {
client pb.MetainfoClient
conn *grpc.ClientConn
}
// New used as a public function
func New(gcclient pb.MetainfoClient) (metainfo *Metainfo) {
return &Metainfo{client: gcclient}
}
// a compiler trick to make sure *Metainfo implements Client
var _ Client = (*Metainfo)(nil)
// ListItem is a single item in a listing
type ListItem struct {
Path storj.Path
@ -50,19 +43,15 @@ type ListItem struct {
IsPrefix bool
}
// Client interface for the Metainfo service
type Client interface {
CreateSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64, redundancy *pb.RedundancyScheme, maxEncryptedSegmentSize int64, expiration time.Time) ([]*pb.AddressedOrderLimit, storj.PieceID, error)
CommitSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64, pointer *pb.Pointer, originalLimits []*pb.OrderLimit2) (*pb.Pointer, error)
SegmentInfo(ctx context.Context, bucket string, path storj.Path, segmentIndex int64) (*pb.Pointer, error)
ReadSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64) (*pb.Pointer, []*pb.AddressedOrderLimit, error)
DeleteSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64) ([]*pb.AddressedOrderLimit, error)
ListSegments(ctx context.Context, bucket string, prefix, startAfter, endBefore storj.Path, recursive bool, limit int32, metaFlags uint32) (items []ListItem, more bool, err error)
SetAttribution(ctx context.Context, bucket string, partnerID uuid.UUID) error
// New used as a public function
func New(client pb.MetainfoClient) *Client {
return &Client{
client: client,
}
}
// NewClient initializes a new metainfo client
func NewClient(ctx context.Context, tc transport.Client, address string, apiKey string) (*Metainfo, error) {
// Dial dials to metainfo endpoint with the specified api key.
func Dial(ctx context.Context, tc transport.Client, address string, apiKey string) (*Client, error) {
apiKeyInjector := grpcauth.NewAPIKeyInjector(apiKey)
conn, err := tc.DialAddress(
ctx,
@ -73,11 +62,22 @@ func NewClient(ctx context.Context, tc transport.Client, address string, apiKey
return nil, Error.Wrap(err)
}
return &Metainfo{client: pb.NewMetainfoClient(conn)}, nil
return &Client{
client: pb.NewMetainfoClient(conn),
conn: conn,
}, nil
}
// Close closes the dialed connection.
func (client *Client) Close() error {
if client.conn != nil {
return Error.Wrap(client.conn.Close())
}
return nil
}
// CreateSegment requests the order limits for creating a new segment
func (metainfo *Metainfo) CreateSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64, redundancy *pb.RedundancyScheme, maxEncryptedSegmentSize int64, expiration time.Time) (limits []*pb.AddressedOrderLimit, rootPieceID storj.PieceID, err error) {
func (client *Client) CreateSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64, redundancy *pb.RedundancyScheme, maxEncryptedSegmentSize int64, expiration time.Time) (limits []*pb.AddressedOrderLimit, rootPieceID storj.PieceID, err error) {
defer mon.Task()(&ctx)(&err)
var exp *timestamp.Timestamp
@ -88,7 +88,7 @@ func (metainfo *Metainfo) CreateSegment(ctx context.Context, bucket string, path
}
}
response, err := metainfo.client.CreateSegment(ctx, &pb.SegmentWriteRequest{
response, err := client.client.CreateSegment(ctx, &pb.SegmentWriteRequest{
Bucket: []byte(bucket),
Path: []byte(path),
Segment: segmentIndex,
@ -104,10 +104,10 @@ func (metainfo *Metainfo) CreateSegment(ctx context.Context, bucket string, path
}
// CommitSegment requests to store the pointer for the segment
func (metainfo *Metainfo) CommitSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64, pointer *pb.Pointer, originalLimits []*pb.OrderLimit2) (savedPointer *pb.Pointer, err error) {
func (client *Client) CommitSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64, pointer *pb.Pointer, originalLimits []*pb.OrderLimit2) (savedPointer *pb.Pointer, err error) {
defer mon.Task()(&ctx)(&err)
response, err := metainfo.client.CommitSegment(ctx, &pb.SegmentCommitRequest{
response, err := client.client.CommitSegment(ctx, &pb.SegmentCommitRequest{
Bucket: []byte(bucket),
Path: []byte(path),
Segment: segmentIndex,
@ -122,10 +122,10 @@ func (metainfo *Metainfo) CommitSegment(ctx context.Context, bucket string, path
}
// SegmentInfo requests the pointer of a segment
func (metainfo *Metainfo) SegmentInfo(ctx context.Context, bucket string, path storj.Path, segmentIndex int64) (pointer *pb.Pointer, err error) {
func (client *Client) SegmentInfo(ctx context.Context, bucket string, path storj.Path, segmentIndex int64) (pointer *pb.Pointer, err error) {
defer mon.Task()(&ctx)(&err)
response, err := metainfo.client.SegmentInfo(ctx, &pb.SegmentInfoRequest{
response, err := client.client.SegmentInfo(ctx, &pb.SegmentInfoRequest{
Bucket: []byte(bucket),
Path: []byte(path),
Segment: segmentIndex,
@ -141,10 +141,10 @@ func (metainfo *Metainfo) SegmentInfo(ctx context.Context, bucket string, path s
}
// ReadSegment requests the order limits for reading a segment
func (metainfo *Metainfo) ReadSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64) (pointer *pb.Pointer, limits []*pb.AddressedOrderLimit, err error) {
func (client *Client) ReadSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64) (pointer *pb.Pointer, limits []*pb.AddressedOrderLimit, err error) {
defer mon.Task()(&ctx)(&err)
response, err := metainfo.client.DownloadSegment(ctx, &pb.SegmentDownloadRequest{
response, err := client.client.DownloadSegment(ctx, &pb.SegmentDownloadRequest{
Bucket: []byte(bucket),
Path: []byte(path),
Segment: segmentIndex,
@ -178,10 +178,10 @@ func getLimitByStorageNodeID(limits []*pb.AddressedOrderLimit, storageNodeID sto
}
// DeleteSegment requests the order limits for deleting a segment
func (metainfo *Metainfo) DeleteSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64) (limits []*pb.AddressedOrderLimit, err error) {
func (client *Client) DeleteSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64) (limits []*pb.AddressedOrderLimit, err error) {
defer mon.Task()(&ctx)(&err)
response, err := metainfo.client.DeleteSegment(ctx, &pb.SegmentDeleteRequest{
response, err := client.client.DeleteSegment(ctx, &pb.SegmentDeleteRequest{
Bucket: []byte(bucket),
Path: []byte(path),
Segment: segmentIndex,
@ -197,10 +197,10 @@ func (metainfo *Metainfo) DeleteSegment(ctx context.Context, bucket string, path
}
// ListSegments lists the available segments
func (metainfo *Metainfo) ListSegments(ctx context.Context, bucket string, prefix, startAfter, endBefore storj.Path, recursive bool, limit int32, metaFlags uint32) (items []ListItem, more bool, err error) {
func (client *Client) ListSegments(ctx context.Context, bucket string, prefix, startAfter, endBefore storj.Path, recursive bool, limit int32, metaFlags uint32) (items []ListItem, more bool, err error) {
defer mon.Task()(&ctx)(&err)
response, err := metainfo.client.ListSegments(ctx, &pb.ListSegmentsRequest{
response, err := client.client.ListSegments(ctx, &pb.ListSegmentsRequest{
Bucket: []byte(bucket),
Prefix: []byte(prefix),
StartAfter: []byte(startAfter),
@ -227,10 +227,10 @@ func (metainfo *Metainfo) ListSegments(ctx context.Context, bucket string, prefi
}
// SetAttribution tries to set the attribution information on the bucket.
func (metainfo *Metainfo) SetAttribution(ctx context.Context, bucket string, partnerID uuid.UUID) (err error) {
func (client *Client) SetAttribution(ctx context.Context, bucket string, partnerID uuid.UUID) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = metainfo.client.SetAttribution(ctx, &pb.SetAttributionRequest{
_, err = client.client.SetAttribution(ctx, &pb.SetAttributionRequest{
PartnerId: partnerID[:], // TODO: implement storj.UUID that can be sent using pb
BucketName: []byte(bucket),
})