From a4d1070d688581e88d6ccca523374e679234f043 Mon Sep 17 00:00:00 2001 From: JT Olio Date: Wed, 5 Sep 2018 10:08:39 -0700 Subject: [PATCH] storage/ec: fix some Get socket leaks (#308) * storage/ec: fix some Get socket leaks * fix linters --- pkg/eestream/pad.go | 2 +- pkg/ranger/reader.go | 20 ++++++++++++++++---- pkg/ranger/reader_test.go | 2 +- pkg/storage/ec/client.go | 14 ++++++++++++-- pkg/storage/streams/store.go | 10 ++++++++-- 5 files changed, 38 insertions(+), 10 deletions(-) diff --git a/pkg/eestream/pad.go b/pkg/eestream/pad.go index d46c5a591..3c5333f97 100644 --- a/pkg/eestream/pad.go +++ b/pkg/eestream/pad.go @@ -33,7 +33,7 @@ func makePadding(dataLen int64, blockSize int) []byte { // Pad takes a Ranger and returns another Ranger that is a multiple of // blockSize in length. The return value padding is a convenience to report how // much padding was added. -func Pad(data ranger.Ranger, blockSize int) ( +func Pad(data ranger.RangeCloser, blockSize int) ( rr ranger.Ranger, padding int) { paddingBytes := makePadding(data.Size(), blockSize) return ranger.Concat(data, ranger.ByteRanger(paddingBytes)), len(paddingBytes) diff --git a/pkg/ranger/reader.go b/pkg/ranger/reader.go index aa5b29f2c..8ca4ead6b 100644 --- a/pkg/ranger/reader.go +++ b/pkg/ranger/reader.go @@ -59,14 +59,17 @@ func (b ByteRanger) Range(ctx context.Context, offset, length int64) (io.ReadClo return ioutil.NopCloser(bytes.NewReader(b[offset : offset+length])), nil } +// Close is a no-op +func (b ByteRanger) Close() error { return nil } + // ByteRangeCloser turns a byte slice into a RangeCloser func ByteRangeCloser(data []byte) RangeCloser { return NopCloser(ByteRanger(data)) } type concatReader struct { - r1 Ranger - r2 Ranger + r1 RangeCloser + r2 RangeCloser } func (c *concatReader) Size() int64 { @@ -92,12 +95,21 @@ func (c *concatReader) Range(ctx context.Context, offset, length int64) (io.Read })), nil } -func concat2(r1, r2 Ranger) Ranger { +func (c *concatReader) Close() error { + err1 := c.r1.Close() + err2 := c.r2.Close() + if err1 != nil { + return err1 + } + return err2 +} + +func concat2(r1, r2 RangeCloser) RangeCloser { return &concatReader{r1: r1, r2: r2} } // Concat concatenates Rangers -func Concat(r ...Ranger) Ranger { +func Concat(r ...RangeCloser) RangeCloser { switch len(r) { case 0: return ByteRanger(nil) diff --git a/pkg/ranger/reader_test.go b/pkg/ranger/reader_test.go index 698ea9ed8..0dce0a4d6 100644 --- a/pkg/ranger/reader_test.go +++ b/pkg/ranger/reader_test.go @@ -76,7 +76,7 @@ func TestConcatReader(t *testing.T) { {[]string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l"}, 12, 7, 3, "hij"}, } { - var readers []Ranger + var readers []RangeCloser for _, data := range example.data { readers = append(readers, ByteRanger([]byte(data))) } diff --git a/pkg/storage/ec/client.go b/pkg/storage/ec/client.go index 25236fa56..aeb6146b2 100644 --- a/pkg/storage/ec/client.go +++ b/pkg/storage/ec/client.go @@ -152,8 +152,10 @@ func (ec *ecClient) Get(ctx context.Context, nodes []*proto.Node, es eestream.Er if err != nil { zap.S().Errorf("Failed getting piece %s -> %s from node %s: %v", pieceID, derivedPieceID, n.GetId(), err) + ch <- rangerInfo{i: i, rr: nil, err: err} + return } - ch <- rangerInfo{i: i, rr: rr, err: err} + ch <- rangerInfo{i: i, rr: rr, err: nil} }(i, n) } for range nodes { @@ -164,9 +166,17 @@ func (ec *ecClient) Get(ctx context.Context, nodes []*proto.Node, es eestream.Er } rr, err = eestream.Decode(rrs, es, ec.mbm) if err != nil { + for _, rr := range rrs { + _ = rr.Close() + } return nil, err } - return eestream.Unpad(rr, int(paddedSize-size)) + uprr, err := eestream.Unpad(rr, int(paddedSize-size)) + if err != nil { + _ = rr.Close() + return nil, err + } + return uprr, nil } func (ec *ecClient) Delete(ctx context.Context, nodes []*proto.Node, pieceID client.PieceID) (err error) { diff --git a/pkg/storage/streams/store.go b/pkg/storage/streams/store.go index 388c346a1..b16be9e2c 100644 --- a/pkg/storage/streams/store.go +++ b/pkg/storage/streams/store.go @@ -144,20 +144,26 @@ func (s *streamStore) Get(ctx context.Context, path paths.Path) ( msi := streamspb.MetaStreamInfo{} err = proto.Unmarshal(lastSegmentMeta.Data, &msi) if err != nil { + _ = lastRangerCloser.Close() return nil, Meta{}, err } newMeta, err := convertMeta(lastSegmentMeta) if err != nil { + _ = lastRangerCloser.Close() return nil, Meta{}, err } - var rangers []ranger.Ranger + var rangers []ranger.RangeCloser for i := 0; i < int(msi.NumberOfSegments); i++ { currentPath := fmt.Sprintf("s%d", i) rangeCloser, _, err := s.segments.Get(ctx, path.Prepend(currentPath)) if err != nil { + for _, ranger := range rangers { + _ = ranger.Close() + } + _ = lastRangerCloser.Close() return nil, Meta{}, err } @@ -168,7 +174,7 @@ func (s *streamStore) Get(ctx context.Context, path paths.Path) ( catRangers := ranger.Concat(rangers...) - return ranger.NopCloser(catRangers), newMeta, nil + return catRangers, newMeta, nil } // Meta implements Store.Meta