uplink/ecclient: performance - close connections faster (#2757)

This commit is contained in:
Maximillian von Briesen 2019-08-14 10:03:51 -04:00 committed by GitHub
parent a496e26e44
commit 3a82b63974
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 24 additions and 15 deletions

View File

@ -489,16 +489,16 @@ type lazyPieceReader struct {
offset int64
length int64
mu sync.RWMutex
mu sync.Mutex
isClosed bool
closers []io.Closer
piecestore.Downloader
client *piecestore.Client
}
func (lr *lazyPieceReader) Read(data []byte) (_ int, err error) {
lr.mu.RLock()
defer lr.mu.RUnlock()
lr.mu.Lock()
defer lr.mu.Unlock()
if lr.isClosed {
return 0, io.EOF
@ -509,7 +509,7 @@ func (lr *lazyPieceReader) Read(data []byte) (_ int, err error) {
return 0, err
}
lr.Downloader = downloader
lr.closers = []io.Closer{downloader, client}
lr.client = client
}
return lr.Downloader.Read(data)
@ -541,8 +541,11 @@ func (lr *lazyPieceReader) Close() (err error) {
}
lr.isClosed = true
for _, c := range lr.closers {
err = errs.Combine(err, c.Close())
if lr.Downloader != nil {
err = errs.Combine(err, lr.Downloader.Close())
}
if lr.client != nil {
err = errs.Combine(err, lr.client.Close())
}
return err
}

View File

@ -42,7 +42,7 @@ type decodedReader struct {
// set to 0, the minimum possible memory will be used.
// if forceErrorDetection is set to true then k+1 pieces will be always
// required for decoding, so corrupted pieces can be detected.
func DecodeReaders(ctx context.Context, log *zap.Logger, rs map[int]io.ReadCloser, es ErasureScheme, expectedSize int64, mbm int, forceErrorDetection bool) io.ReadCloser {
func DecodeReaders(ctx context.Context, cancel func(), log *zap.Logger, rs map[int]io.ReadCloser, es ErasureScheme, expectedSize int64, mbm int, forceErrorDetection bool) io.ReadCloser {
defer mon.Task()(&ctx)(nil)
if expectedSize < 0 {
return readcloser.FatalReadCloser(Error.New("negative expected size"))
@ -63,7 +63,7 @@ func DecodeReaders(ctx context.Context, log *zap.Logger, rs map[int]io.ReadClose
outbuf: make([]byte, 0, es.StripeSize()),
expectedStripes: expectedSize / int64(es.StripeSize()),
}
dr.ctx, dr.cancel = context.WithCancel(ctx)
dr.ctx, dr.cancel = ctx, cancel
// Kick off a goroutine to watch for context cancelation.
go func() {
<-dr.ctx.Done()
@ -192,6 +192,8 @@ func (dr *decodedRanger) Size() int64 {
func (dr *decodedRanger) Range(ctx context.Context, offset, length int64) (_ io.ReadCloser, err error) {
defer mon.Task()(&ctx)(&err)
ctx, cancel := context.WithCancel(ctx)
// offset and length might not be block-aligned. figure out which
// blocks contain this request
firstBlock, blockCount := encryption.CalcEncompassingBlocks(offset, length, dr.es.StripeSize())
@ -208,7 +210,7 @@ func (dr *decodedRanger) Range(ctx context.Context, offset, length int64) (_ io.
}
// decode from all those ranges
r := DecodeReaders(ctx, dr.log, readers, dr.es, blockCount*int64(dr.es.StripeSize()), dr.mbm, dr.forceErrorDetection)
r := DecodeReaders(ctx, cancel, dr.log, readers, dr.es, blockCount*int64(dr.es.StripeSize()), dr.mbm, dr.forceErrorDetection)
// offset might start a few bytes in, potentially discard the initial bytes
_, err = io.CopyN(ioutil.Discard, r, offset-firstBlock*int64(dr.es.StripeSize()))
if err != nil {

View File

@ -50,7 +50,8 @@ func TestRS(t *testing.T) {
for i, reader := range readers {
readerMap[i] = reader
}
decoder := DecodeReaders(ctx, zaptest.NewLogger(t), readerMap, rs, 32*1024, 0, false)
ctx, cancel := context.WithCancel(ctx)
decoder := DecodeReaders(ctx, cancel, zaptest.NewLogger(t), readerMap, rs, 32*1024, 0, false)
defer func() { assert.NoError(t, decoder.Close()) }()
data2, err := ioutil.ReadAll(decoder)
if err != nil {
@ -81,7 +82,8 @@ func TestRSUnexpectedEOF(t *testing.T) {
for i, reader := range readers {
readerMap[i] = reader
}
decoder := DecodeReaders(ctx, zaptest.NewLogger(t), readerMap, rs, 32*1024, 0, false)
ctx, cancel := context.WithCancel(ctx)
decoder := DecodeReaders(ctx, cancel, zaptest.NewLogger(t), readerMap, rs, 32*1024, 0, false)
defer func() { assert.NoError(t, decoder.Close()) }()
// Try ReadFull more data from DecodeReaders than available
data2 := make([]byte, len(data)+1024)
@ -406,7 +408,8 @@ func testRSProblematic(t *testing.T, tt testCase, i int, fn problematicReadClose
for i := tt.problematic; i < tt.total; i++ {
readerMap[i] = ioutil.NopCloser(bytes.NewReader(pieces[i]))
}
decoder := DecodeReaders(ctx, zaptest.NewLogger(t), readerMap, rs, int64(tt.dataSize), 3*1024, false)
ctx, cancel := context.WithCancel(ctx)
decoder := DecodeReaders(ctx, cancel, zaptest.NewLogger(t), readerMap, rs, int64(tt.dataSize), 3*1024, false)
defer func() { assert.NoError(t, decoder.Close()) }()
data2, err := ioutil.ReadAll(decoder)
if tt.fail {
@ -532,7 +535,8 @@ func TestDecoderErrorWithStalledReaders(t *testing.T) {
for i := 7; i < 20; i++ {
readerMap[i] = readcloser.FatalReadCloser(errors.New("I am an error piece"))
}
decoder := DecodeReaders(ctx, zaptest.NewLogger(t), readerMap, rs, int64(10*1024), 0, false)
ctx, cancel := context.WithCancel(ctx)
decoder := DecodeReaders(ctx, cancel, zaptest.NewLogger(t), readerMap, rs, int64(10*1024), 0, false)
defer func() { assert.NoError(t, decoder.Close()) }()
// record the time for reading the data from the decoder
start := time.Now()

View File

@ -242,7 +242,7 @@ func (client *Download) Close() (err error) {
}()
client.closeWithError(nil)
return client.closingError
return errs2.IgnoreCanceled(client.closingError)
}
// ReadBuffer implements buffered reading with an error.