storage/ec: fix some Get socket leaks (#308)

* storage/ec: fix some Get socket leaks

* fix linters
This commit is contained in:
JT Olio 2018-09-05 10:08:39 -07:00 committed by GitHub
parent 2686476364
commit a4d1070d68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 38 additions and 10 deletions

View File

@ -33,7 +33,7 @@ func makePadding(dataLen int64, blockSize int) []byte {
// Pad takes a Ranger and returns another Ranger that is a multiple of
// blockSize in length. The return value padding is a convenience to report how
// much padding was added.
func Pad(data ranger.Ranger, blockSize int) (
func Pad(data ranger.RangeCloser, blockSize int) (
rr ranger.Ranger, padding int) {
paddingBytes := makePadding(data.Size(), blockSize)
return ranger.Concat(data, ranger.ByteRanger(paddingBytes)), len(paddingBytes)

View File

@ -59,14 +59,17 @@ func (b ByteRanger) Range(ctx context.Context, offset, length int64) (io.ReadClo
return ioutil.NopCloser(bytes.NewReader(b[offset : offset+length])), nil
}
// Close is a no-op
func (b ByteRanger) Close() error { return nil }
// ByteRangeCloser turns a byte slice into a RangeCloser
func ByteRangeCloser(data []byte) RangeCloser {
return NopCloser(ByteRanger(data))
}
type concatReader struct {
r1 Ranger
r2 Ranger
r1 RangeCloser
r2 RangeCloser
}
func (c *concatReader) Size() int64 {
@ -92,12 +95,21 @@ func (c *concatReader) Range(ctx context.Context, offset, length int64) (io.Read
})), nil
}
func concat2(r1, r2 Ranger) Ranger {
func (c *concatReader) Close() error {
err1 := c.r1.Close()
err2 := c.r2.Close()
if err1 != nil {
return err1
}
return err2
}
func concat2(r1, r2 RangeCloser) RangeCloser {
return &concatReader{r1: r1, r2: r2}
}
// Concat concatenates Rangers
func Concat(r ...Ranger) Ranger {
func Concat(r ...RangeCloser) RangeCloser {
switch len(r) {
case 0:
return ByteRanger(nil)

View File

@ -76,7 +76,7 @@ func TestConcatReader(t *testing.T) {
{[]string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l"},
12, 7, 3, "hij"},
} {
var readers []Ranger
var readers []RangeCloser
for _, data := range example.data {
readers = append(readers, ByteRanger([]byte(data)))
}

View File

@ -152,8 +152,10 @@ func (ec *ecClient) Get(ctx context.Context, nodes []*proto.Node, es eestream.Er
if err != nil {
zap.S().Errorf("Failed getting piece %s -> %s from node %s: %v",
pieceID, derivedPieceID, n.GetId(), err)
ch <- rangerInfo{i: i, rr: nil, err: err}
return
}
ch <- rangerInfo{i: i, rr: rr, err: err}
ch <- rangerInfo{i: i, rr: rr, err: nil}
}(i, n)
}
for range nodes {
@ -164,9 +166,17 @@ func (ec *ecClient) Get(ctx context.Context, nodes []*proto.Node, es eestream.Er
}
rr, err = eestream.Decode(rrs, es, ec.mbm)
if err != nil {
for _, rr := range rrs {
_ = rr.Close()
}
return nil, err
}
return eestream.Unpad(rr, int(paddedSize-size))
uprr, err := eestream.Unpad(rr, int(paddedSize-size))
if err != nil {
_ = rr.Close()
return nil, err
}
return uprr, nil
}
func (ec *ecClient) Delete(ctx context.Context, nodes []*proto.Node, pieceID client.PieceID) (err error) {

View File

@ -144,20 +144,26 @@ func (s *streamStore) Get(ctx context.Context, path paths.Path) (
msi := streamspb.MetaStreamInfo{}
err = proto.Unmarshal(lastSegmentMeta.Data, &msi)
if err != nil {
_ = lastRangerCloser.Close()
return nil, Meta{}, err
}
newMeta, err := convertMeta(lastSegmentMeta)
if err != nil {
_ = lastRangerCloser.Close()
return nil, Meta{}, err
}
var rangers []ranger.Ranger
var rangers []ranger.RangeCloser
for i := 0; i < int(msi.NumberOfSegments); i++ {
currentPath := fmt.Sprintf("s%d", i)
rangeCloser, _, err := s.segments.Get(ctx, path.Prepend(currentPath))
if err != nil {
for _, ranger := range rangers {
_ = ranger.Close()
}
_ = lastRangerCloser.Close()
return nil, Meta{}, err
}
@ -168,7 +174,7 @@ func (s *streamStore) Get(ctx context.Context, path paths.Path) (
catRangers := ranger.Concat(rangers...)
return ranger.NopCloser(catRangers), newMeta, nil
return catRangers, newMeta, nil
}
// Meta implements Store.Meta