From 1feef7105b7634df0700b250dda13a97b4a3be82 Mon Sep 17 00:00:00 2001 From: Egon Elbre Date: Thu, 20 Dec 2018 16:51:39 +0200 Subject: [PATCH] Implement memory and file pipe (#894) --- internal/sync2/io.go | 81 +++++++++ internal/sync2/limiter_test.go | 8 +- internal/sync2/pipe.go | 280 ++++++++++++++++++++++++++++++++ internal/sync2/pipe_test.go | 93 +++++++++++ internal/sync2/throttle_test.go | 8 +- 5 files changed, 464 insertions(+), 6 deletions(-) create mode 100644 internal/sync2/io.go create mode 100644 internal/sync2/pipe.go create mode 100644 internal/sync2/pipe_test.go diff --git a/internal/sync2/io.go b/internal/sync2/io.go new file mode 100644 index 000000000..68328e051 --- /dev/null +++ b/internal/sync2/io.go @@ -0,0 +1,81 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information + +package sync2 + +import ( + "io" + "os" + "sync/atomic" +) + +// ReadAtWriteAtCloser implements all io.ReaderAt, io.WriterAt and io.Closer +type ReadAtWriteAtCloser interface { + io.ReaderAt + io.WriterAt + io.Closer +} + +// PipeWriter allows closing the writer with an error +type PipeWriter interface { + io.WriteCloser + CloseWithError(reason error) error +} + +// PipeReader allows closing the reader with an error +type PipeReader interface { + io.ReadCloser + CloseWithError(reason error) error +} + +// memory implements ReadAtWriteAtCloser on a memory buffer +type memory []byte + +// Size returns size of memory buffer +func (memory memory) Size() int { return len(memory) } + +// ReadAt implements io.ReaderAt methods +func (memory memory) ReadAt(data []byte, at int64) (amount int, err error) { + if at > int64(len(memory)) { + return 0, io.ErrClosedPipe + } + amount = copy(data, memory[at:]) + return amount, nil +} + +// WriteAt implements io.WriterAt methods +func (memory memory) WriteAt(data []byte, at int64) (amount int, err error) { + if at > int64(len(memory)) { + return 0, io.ErrClosedPipe + } + amount = copy(memory[at:], data) + return amount, nil +} + +// Close implements io.Closer implementation +func (memory memory) Close() error { return nil } + +// offsetFile implements ReadAt, WriteAt offset to the file with reference counting +type offsetFile struct { + file *os.File + offset int64 + open *int64 // pointer to MultiPipe.open +} + +// ReadAt implements io.ReaderAt methods +func (file offsetFile) ReadAt(data []byte, at int64) (amount int, err error) { + return file.file.ReadAt(data, file.offset+at) +} + +// WriteAt implements io.WriterAt methods +func (file offsetFile) WriteAt(data []byte, at int64) (amount int, err error) { + return file.file.WriteAt(data, file.offset+at) +} + +// Close implements io.Closer methods +func (file offsetFile) Close() error { + if atomic.AddInt64(file.open, -1) == 0 { + return file.Close() + } + return nil +} diff --git a/internal/sync2/limiter_test.go b/internal/sync2/limiter_test.go index 3a6e71881..fa9e2133a 100644 --- a/internal/sync2/limiter_test.go +++ b/internal/sync2/limiter_test.go @@ -1,19 +1,21 @@ // Copyright (C) 2018 Storj Labs, Inc. // See LICENSE for copying information -package sync2 +package sync2_test import ( "context" "sync/atomic" "testing" "time" + + "storj.io/storj/internal/sync2" ) func TestLimiterLimiting(t *testing.T) { const N, Limit = 1000, 10 ctx := context.Background() - limiter := NewLimiter(Limit) + limiter := sync2.NewLimiter(Limit) counter := int32(0) for i := 0; i < N; i++ { limiter.Go(ctx, func() { @@ -29,7 +31,7 @@ func TestLimiterLimiting(t *testing.T) { func TestLimiterCancelling(t *testing.T) { const N, Limit = 1000, 10 - limiter := NewLimiter(Limit) + limiter := sync2.NewLimiter(Limit) ctx, cancel := context.WithCancel(context.Background()) diff --git a/internal/sync2/pipe.go b/internal/sync2/pipe.go new file mode 100644 index 000000000..8365d6174 --- /dev/null +++ b/internal/sync2/pipe.go @@ -0,0 +1,280 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information + +package sync2 + +import ( + "fmt" + "io" + "io/ioutil" + "math" + "sync" + "sync/atomic" +) + +// pipe is a io.Reader/io.Writer pipe backed by ReadAtWriteAtCloser +type pipe struct { + buffer ReadAtWriteAtCloser + open int32 // number of halves open (starts at 2) + + mu sync.Mutex + nodata sync.Cond + read int64 + write int64 + limit int64 + + writerDone bool + writerErr error + + readerDone bool + readerErr error +} + +// NewPipeFile returns a pipe that uses file-system to offload memory +func NewPipeFile(tempdir string) (PipeReader, PipeWriter, error) { + tempfile, err := ioutil.TempFile(tempdir, "filepipe") + if err != nil { + return nil, nil, err + } + + pipe := &pipe{ + buffer: tempfile, + open: 2, + limit: math.MaxInt64, + } + pipe.nodata.L = &pipe.mu + + return pipeReader{pipe}, pipeWriter{pipe}, nil +} + +// NewPipeMemory returns a pipe that uses an inmemory buffer +func NewPipeMemory(pipeSize int64) (PipeReader, PipeWriter, error) { + pipe := &pipe{ + buffer: make(memory, pipeSize), + open: 2, + limit: pipeSize, + } + pipe.nodata.L = &pipe.mu + return pipeReader{pipe}, pipeWriter{pipe}, nil +} + +type pipeReader struct{ pipe *pipe } +type pipeWriter struct{ pipe *pipe } + +// Close implements io.Reader Close +func (reader pipeReader) Close() error { return reader.CloseWithError(io.ErrClosedPipe) } + +// Close implements io.Writer Close +func (writer pipeWriter) Close() error { return writer.CloseWithError(io.EOF) } + +// CloseWithError implements closing with error +func (reader pipeReader) CloseWithError(err error) error { + pipe := reader.pipe + pipe.mu.Lock() + if pipe.readerDone { + pipe.mu.Unlock() + return io.ErrClosedPipe + } + pipe.readerDone = true + pipe.readerErr = err + pipe.mu.Unlock() + + return pipe.closeHalf() +} + +// CloseWithError implements closing with error +func (writer pipeWriter) CloseWithError(err error) error { + pipe := writer.pipe + pipe.mu.Lock() + if pipe.writerDone { + pipe.mu.Unlock() + return io.ErrClosedPipe + } + pipe.writerDone = true + pipe.writerErr = err + pipe.nodata.Broadcast() + pipe.mu.Unlock() + + return pipe.closeHalf() +} + +// closeHalf closes one side of the pipe +func (pipe *pipe) closeHalf() error { + if atomic.AddInt32(&pipe.open, -1) == 0 { + return pipe.buffer.Close() + } + return nil +} + +// Write writes to the pipe returning io.ErrClosedPipe when pipeSize is reached +func (writer pipeWriter) Write(data []byte) (n int, err error) { + pipe := writer.pipe + pipe.mu.Lock() + + // has the reader finished? + if pipe.readerDone { + pipe.mu.Unlock() + return 0, pipe.readerErr + } + + // have we closed already + if pipe.writerDone { + pipe.mu.Unlock() + return 0, io.ErrClosedPipe + } + + // check how much do they want to write + canWrite := pipe.limit - pipe.write + + // no more room to write + if canWrite == 0 { + pipe.mu.Unlock() + return 0, io.ErrClosedPipe + } + + // figure out how much to write + toWrite := int64(len(data)) + if toWrite > canWrite { + toWrite = canWrite + } + + writeAt := pipe.write + pipe.mu.Unlock() + + // write data to buffer + writeAmount, err := pipe.buffer.WriteAt(data[:toWrite], writeAt) + + pipe.mu.Lock() + // update writing head + pipe.write += int64(writeAmount) + // wake up reader + pipe.nodata.Broadcast() + // check whether we have finished + done := pipe.write >= pipe.limit + pipe.mu.Unlock() + + if err == nil && done { + err = io.ErrClosedPipe + } + return writeAmount, err +} + +// Read reads from the pipe returning io.EOF when writer is closed or pipeSize is reached +func (reader pipeReader) Read(data []byte) (n int, err error) { + pipe := reader.pipe + pipe.mu.Lock() + // wait until we have something to read + for pipe.read >= pipe.write { + // has the writer finished? + if pipe.writerDone { + pipe.mu.Unlock() + return 0, pipe.writerErr + } + + // have we closed already + if pipe.readerDone { + pipe.mu.Unlock() + return 0, io.ErrClosedPipe + } + + // have we run out of the limit + if pipe.read >= pipe.limit { + pipe.mu.Unlock() + return 0, io.EOF + } + + // ok, lets wait + pipe.nodata.Wait() + } + + // how much there's available for reading + canRead := pipe.write - pipe.read + // how much do they want to read? + toRead := int64(len(data)) + if toRead > canRead { + toRead = canRead + } + readAt := pipe.read + pipe.mu.Unlock() + + // read data + readAmount, err := pipe.buffer.ReadAt(data[:toRead], readAt) + + pipe.mu.Lock() + // update info on how much we have read + pipe.read += int64(readAmount) + done := pipe.read >= pipe.limit + pipe.mu.Unlock() + + if err == nil && done { + err = io.EOF + } + return readAmount, err +} + +// MultiPipe is a multipipe backed by a single file +type MultiPipe struct { + pipes []pipe + open int64 // number of pipes open +} + +// NewMultiPipeFile returns a new MultiPipe that is created in tempdir +// if tempdir == "" the fill will be created it into os.TempDir +func NewMultiPipeFile(tempdir string, pipeCount, pipeSize int64) (*MultiPipe, error) { + tempfile, err := ioutil.TempFile(tempdir, "multifilepipe") + if err != nil { + return nil, err + } + + err = tempfile.Truncate(pipeCount * pipeSize) + if err != nil { + closeErr := tempfile.Close() + if closeErr != nil { + return nil, fmt.Errorf("%v/%v", err, closeErr) + } + return nil, err + } + + multipipe := &MultiPipe{ + pipes: make([]pipe, pipeCount), + open: pipeCount, + } + + for i := range multipipe.pipes { + pipe := &multipipe.pipes[i] + pipe.buffer = offsetFile{ + file: tempfile, + offset: int64(i) * pipeSize, + open: &multipipe.open, + } + pipe.limit = pipeSize + pipe.nodata.L = &pipe.mu + } + + return multipipe, nil +} + +// NewMultiPipeMemory returns a new MultiPipe that is using a memory buffer +func NewMultiPipeMemory(pipeCount, pipeSize int64) (*MultiPipe, error) { + buffer := make(memory, pipeCount*pipeSize) + + multipipe := &MultiPipe{ + pipes: make([]pipe, pipeCount), + open: pipeCount, + } + + for i := range multipipe.pipes { + pipe := &multipipe.pipes[i] + pipe.buffer = buffer[i*int(pipeSize) : (i+1)*int(pipeSize)] + pipe.limit = pipeSize + pipe.nodata.L = &pipe.mu + } + + return multipipe, nil +} + +// Pipe returns the two ends of a block stream pipe +func (multipipe *MultiPipe) Pipe(index int) (PipeReader, PipeWriter) { + pipe := &multipipe.pipes[index] + return pipeReader{pipe}, pipeWriter{pipe} +} diff --git a/internal/sync2/pipe_test.go b/internal/sync2/pipe_test.go new file mode 100644 index 000000000..d66c6b2fd --- /dev/null +++ b/internal/sync2/pipe_test.go @@ -0,0 +1,93 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information + +package sync2_test + +import ( + "errors" + "io" + "io/ioutil" + "testing" + + "github.com/stretchr/testify/assert" + "golang.org/x/sync/errgroup" + + "storj.io/storj/internal/sync2" +) + +func TestPipe_Basic(t *testing.T) { + testPipes(t, func(t *testing.T, reader sync2.PipeReader, writer sync2.PipeWriter) { + var group errgroup.Group + group.Go(func() error { + n, err := writer.Write([]byte{1, 2, 3}) + assert.Equal(t, n, 3) + assert.NoError(t, err) + + n, err = writer.Write([]byte{1, 2, 3}) + assert.Equal(t, n, 3) + assert.NoError(t, err) + + assert.NoError(t, writer.Close()) + return nil + }) + + group.Go(func() error { + data, err := ioutil.ReadAll(reader) + assert.Equal(t, []byte{1, 2, 3, 1, 2, 3}, data) + if err != nil { + assert.Equal(t, io.EOF, err) + } + assert.NoError(t, reader.Close()) + return nil + }) + + assert.NoError(t, group.Wait()) + }) +} + +func TestPipe_CloseWithError(t *testing.T) { + testPipes(t, func(t *testing.T, reader sync2.PipeReader, writer sync2.PipeWriter) { + var failure = errors.New("write failure") + + var group errgroup.Group + group.Go(func() error { + n, err := writer.Write([]byte{1, 2, 3}) + assert.Equal(t, n, 3) + assert.NoError(t, err) + + err = writer.CloseWithError(failure) + assert.NoError(t, err) + + return nil + }) + + group.Go(func() error { + data, err := ioutil.ReadAll(reader) + assert.Equal(t, []byte{1, 2, 3}, data) + if err != nil { + assert.Equal(t, failure, err) + } + assert.NoError(t, reader.Close()) + return nil + }) + + assert.NoError(t, group.Wait()) + }) +} + +func testPipes(t *testing.T, test func(t *testing.T, reader sync2.PipeReader, writer sync2.PipeWriter)) { + t.Run("Memory", func(t *testing.T) { + reader, writer, err := sync2.NewPipeFile("") + if err != nil { + t.Fatal(err) + } + test(t, reader, writer) + }) + t.Run("File", func(t *testing.T) { + reader, writer, err := sync2.NewPipeMemory(1024) + if err != nil { + t.Fatal(err) + } + test(t, reader, writer) + }) +} diff --git a/internal/sync2/throttle_test.go b/internal/sync2/throttle_test.go index e78248b4f..e2359e59f 100644 --- a/internal/sync2/throttle_test.go +++ b/internal/sync2/throttle_test.go @@ -1,7 +1,7 @@ // Copyright (C) 2018 Storj Labs, Inc. // See LICENSE for copying information -package sync2 +package sync2_test import ( "fmt" @@ -11,10 +11,12 @@ import ( "sync/atomic" "testing" "time" + + "storj.io/storj/internal/sync2" ) func ExampleThrottle() { - throttle := NewThrottle() + throttle := sync2.NewThrottle() var wg sync.WaitGroup // consumer @@ -58,7 +60,7 @@ func ExampleThrottle() { } func TestThrottleBasic(t *testing.T) { - throttle := NewThrottle() + throttle := sync2.NewThrottle() var stage int64 c := make(chan error, 1)