cmd/uplink: add buffering while writing to stdout

Current pipelining to stdout is synchronous so we don't have any
advantage from using --parallelism flag. This change adds buffer
while writing to stdout. Each part is first read into the buffer
and flushed only when all data was read from this part.

https://github.com/storj/uplink/issues/105

Change-Id: I07bec0f4864dc4fccb42224e450d85d4d196f2ee
This commit is contained in:
Michał Niewrzał 2022-06-01 08:19:09 +02:00 committed by Storj Robot
parent 7608ea7af0
commit 7e387af010
3 changed files with 241 additions and 0 deletions

View File

@ -182,6 +182,10 @@ func (c *cmdCp) copyRecursive(ctx clingy.Context, fs ulfs.Filesystem) error {
}
addError := func(err error) {
if err == nil {
return
}
mu.Lock()
defer mu.Unlock()
@ -340,6 +344,10 @@ func (c *cmdCp) parallelCopy(
defer cancel()
addError := func(err error) {
if err == nil {
return
}
mu.Lock()
defer mu.Unlock()
@ -349,6 +357,12 @@ func (c *cmdCp) parallelCopy(
cancel()
}
var readBufs *ulfs.BytesPool
if p > 1 && c.dest.Std() {
// Create the read buffer pool only for downloads to stdout with parallelism > 1.
readBufs = ulfs.NewBytesPool(int(chunkSize))
}
for i := 0; length != 0; i++ {
i := i
@ -378,6 +392,13 @@ func (c *cmdCp) parallelCopy(
defer func() { _ = rh.Close() }()
defer func() { _ = wh.Abort() }()
if readBufs != nil {
buf := readBufs.Get()
defer readBufs.Put(buf)
rh = ulfs.NewBufferedReadHandle(ctx, rh, buf)
}
var w io.Writer = wh
if bar != nil {
bar.SetTotal(rh.Info().ContentLength).Start()

99
cmd/uplink/ulfs/buffer.go Normal file
View File

@ -0,0 +1,99 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package ulfs
import (
"context"
"errors"
"io"
"sync"
)
// BufferedReadHandle wraps a ReadHandler with an in-memory buffer.
type BufferedReadHandle struct {
ctx context.Context
reader ReadHandle
buf []byte
ready bool
size int
pos int
}
// NewBufferedReadHandle wraps reader with buf.
func NewBufferedReadHandle(ctx context.Context, reader ReadHandle, buf []byte) ReadHandle {
return &BufferedReadHandle{
ctx: ctx,
reader: reader,
buf: buf,
}
}
// Read will first read the entire content of the wrapped reader to the
// internal buffer before returning.
func (b *BufferedReadHandle) Read(p []byte) (int, error) {
// Read out reader to fill up buf before returning the first byte.
if !b.ready {
n, err := io.ReadFull(b.reader, b.buf)
if err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
return 0, err
}
b.ready = true
b.size = n
}
n := copy(p, b.buf[b.pos:b.size])
if n == 0 {
return 0, io.EOF
}
b.pos += n
return n, nil
}
// Close closes the wrapped ReadHandle.
func (b *BufferedReadHandle) Close() error {
return b.reader.Close()
}
// Info returns Info of the wrapped ReadHandle.
func (b *BufferedReadHandle) Info() ObjectInfo { return b.reader.Info() }
// BytesPool is a fixed-size pool of []byte.
type BytesPool struct {
size int
mu sync.Mutex
free [][]byte
}
// NewBytesPool creates a pool for []byte slices of length `size`.
func NewBytesPool(size int) *BytesPool {
return &BytesPool{
size: size,
}
}
// Get returns a new []byte from the pool.
func (pool *BytesPool) Get() []byte {
pool.mu.Lock()
defer pool.mu.Unlock()
if len(pool.free) > 0 {
n := len(pool.free)
last := pool.free[n-1]
pool.free = pool.free[:n-1]
return last
}
return make([]byte, pool.size)
}
// Put releases buf back to the pool.
func (pool *BytesPool) Put(buf []byte) {
pool.mu.Lock()
defer pool.mu.Unlock()
pool.free = append(pool.free, buf)
}

View File

@ -0,0 +1,121 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package ulfs
import (
"bytes"
"io"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"storj.io/common/memory"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/cmd/uplink/ulloc"
)
type testReadHandle struct {
bytes.Reader
info ObjectInfo
closed bool
}
func newTestReadHandle(content []byte, info ObjectInfo) *testReadHandle {
return &testReadHandle{
Reader: *bytes.NewReader(content),
info: info,
}
}
func (rh *testReadHandle) Close() error {
rh.closed = true
return nil
}
func (rh *testReadHandle) Info() ObjectInfo {
return rh.info
}
func TestBufferedReadHandle(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
size := 1 * memory.KiB
content := testrand.Bytes(size)
info := ObjectInfo{
Loc: ulloc.NewLocal("/test/path"),
Created: time.Now(),
ContentLength: size.Int64(),
}
rh := newTestReadHandle(content, info)
buf := make([]byte, size.Int())
// Check that ObjectInfo is passed through correctly.
bufrh := NewBufferedReadHandle(ctx, rh, buf)
assert.Equal(t, info, bufrh.Info())
// Byte slice for the read content.
read := make([]byte, size.Int())
// Read just one byte.
n, err := bufrh.Read(read[:1])
require.NoError(t, err)
assert.Equal(t, 1, n)
assert.Equal(t, content[0], read[0])
// Check that the buffer has the content.
assert.Equal(t, content, buf)
// Read the rest.
n, err = bufrh.Read(read[1:])
require.NoError(t, err)
assert.Equal(t, size.Int()-1, n)
assert.Equal(t, content, read)
// Reading more should return io.EOF.
n, err = bufrh.Read(read)
require.EqualError(t, err, io.EOF.Error())
assert.Zero(t, n)
// Check that Close closes the underlying reader.
err = bufrh.Close()
require.NoError(t, err)
assert.True(t, rh.closed)
}
func TestBufferPool(t *testing.T) {
// Create a pool with size 2
bufSize := 1 * memory.KiB.Int()
pool := NewBytesPool(bufSize)
// Get one []bytes
buf1 := pool.Get()
require.Len(t, buf1, bufSize)
// Write something to buf1.
copy(buf1, "first")
// Get second []byte.
buf2 := pool.Get()
require.Len(t, buf2, bufSize)
// Write something to buf2.
copy(buf2, "second")
// The two []byte should be different.
assert.NotEqual(t, buf1, buf2)
// Put it back to the pool.
pool.Put(buf2)
// Get it back from the pool.
buf3 := pool.Get()
require.Len(t, buf3, bufSize)
// Should be the same as buf2.
assert.Equal(t, buf2, buf3)
}