storj/internal/sync2/pipe.go
2018-12-20 16:51:39 +02:00

281 lines
6.1 KiB
Go

// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information
package sync2
import (
"fmt"
"io"
"io/ioutil"
"math"
"sync"
"sync/atomic"
)
// pipe is a io.Reader/io.Writer pipe backed by ReadAtWriteAtCloser
type pipe struct {
buffer ReadAtWriteAtCloser
open int32 // number of halves open (starts at 2)
mu sync.Mutex
nodata sync.Cond
read int64
write int64
limit int64
writerDone bool
writerErr error
readerDone bool
readerErr error
}
// NewPipeFile returns a pipe that uses file-system to offload memory
func NewPipeFile(tempdir string) (PipeReader, PipeWriter, error) {
tempfile, err := ioutil.TempFile(tempdir, "filepipe")
if err != nil {
return nil, nil, err
}
pipe := &pipe{
buffer: tempfile,
open: 2,
limit: math.MaxInt64,
}
pipe.nodata.L = &pipe.mu
return pipeReader{pipe}, pipeWriter{pipe}, nil
}
// NewPipeMemory returns a pipe that uses an inmemory buffer
func NewPipeMemory(pipeSize int64) (PipeReader, PipeWriter, error) {
pipe := &pipe{
buffer: make(memory, pipeSize),
open: 2,
limit: pipeSize,
}
pipe.nodata.L = &pipe.mu
return pipeReader{pipe}, pipeWriter{pipe}, nil
}
type pipeReader struct{ pipe *pipe }
type pipeWriter struct{ pipe *pipe }
// Close implements io.Reader Close
func (reader pipeReader) Close() error { return reader.CloseWithError(io.ErrClosedPipe) }
// Close implements io.Writer Close
func (writer pipeWriter) Close() error { return writer.CloseWithError(io.EOF) }
// CloseWithError implements closing with error
func (reader pipeReader) CloseWithError(err error) error {
pipe := reader.pipe
pipe.mu.Lock()
if pipe.readerDone {
pipe.mu.Unlock()
return io.ErrClosedPipe
}
pipe.readerDone = true
pipe.readerErr = err
pipe.mu.Unlock()
return pipe.closeHalf()
}
// CloseWithError implements closing with error
func (writer pipeWriter) CloseWithError(err error) error {
pipe := writer.pipe
pipe.mu.Lock()
if pipe.writerDone {
pipe.mu.Unlock()
return io.ErrClosedPipe
}
pipe.writerDone = true
pipe.writerErr = err
pipe.nodata.Broadcast()
pipe.mu.Unlock()
return pipe.closeHalf()
}
// closeHalf closes one side of the pipe
func (pipe *pipe) closeHalf() error {
if atomic.AddInt32(&pipe.open, -1) == 0 {
return pipe.buffer.Close()
}
return nil
}
// Write writes to the pipe returning io.ErrClosedPipe when pipeSize is reached
func (writer pipeWriter) Write(data []byte) (n int, err error) {
pipe := writer.pipe
pipe.mu.Lock()
// has the reader finished?
if pipe.readerDone {
pipe.mu.Unlock()
return 0, pipe.readerErr
}
// have we closed already
if pipe.writerDone {
pipe.mu.Unlock()
return 0, io.ErrClosedPipe
}
// check how much do they want to write
canWrite := pipe.limit - pipe.write
// no more room to write
if canWrite == 0 {
pipe.mu.Unlock()
return 0, io.ErrClosedPipe
}
// figure out how much to write
toWrite := int64(len(data))
if toWrite > canWrite {
toWrite = canWrite
}
writeAt := pipe.write
pipe.mu.Unlock()
// write data to buffer
writeAmount, err := pipe.buffer.WriteAt(data[:toWrite], writeAt)
pipe.mu.Lock()
// update writing head
pipe.write += int64(writeAmount)
// wake up reader
pipe.nodata.Broadcast()
// check whether we have finished
done := pipe.write >= pipe.limit
pipe.mu.Unlock()
if err == nil && done {
err = io.ErrClosedPipe
}
return writeAmount, err
}
// Read reads from the pipe returning io.EOF when writer is closed or pipeSize is reached
func (reader pipeReader) Read(data []byte) (n int, err error) {
pipe := reader.pipe
pipe.mu.Lock()
// wait until we have something to read
for pipe.read >= pipe.write {
// has the writer finished?
if pipe.writerDone {
pipe.mu.Unlock()
return 0, pipe.writerErr
}
// have we closed already
if pipe.readerDone {
pipe.mu.Unlock()
return 0, io.ErrClosedPipe
}
// have we run out of the limit
if pipe.read >= pipe.limit {
pipe.mu.Unlock()
return 0, io.EOF
}
// ok, lets wait
pipe.nodata.Wait()
}
// how much there's available for reading
canRead := pipe.write - pipe.read
// how much do they want to read?
toRead := int64(len(data))
if toRead > canRead {
toRead = canRead
}
readAt := pipe.read
pipe.mu.Unlock()
// read data
readAmount, err := pipe.buffer.ReadAt(data[:toRead], readAt)
pipe.mu.Lock()
// update info on how much we have read
pipe.read += int64(readAmount)
done := pipe.read >= pipe.limit
pipe.mu.Unlock()
if err == nil && done {
err = io.EOF
}
return readAmount, err
}
// MultiPipe is a multipipe backed by a single file
type MultiPipe struct {
pipes []pipe
open int64 // number of pipes open
}
// NewMultiPipeFile returns a new MultiPipe that is created in tempdir
// if tempdir == "" the fill will be created it into os.TempDir
func NewMultiPipeFile(tempdir string, pipeCount, pipeSize int64) (*MultiPipe, error) {
tempfile, err := ioutil.TempFile(tempdir, "multifilepipe")
if err != nil {
return nil, err
}
err = tempfile.Truncate(pipeCount * pipeSize)
if err != nil {
closeErr := tempfile.Close()
if closeErr != nil {
return nil, fmt.Errorf("%v/%v", err, closeErr)
}
return nil, err
}
multipipe := &MultiPipe{
pipes: make([]pipe, pipeCount),
open: pipeCount,
}
for i := range multipipe.pipes {
pipe := &multipipe.pipes[i]
pipe.buffer = offsetFile{
file: tempfile,
offset: int64(i) * pipeSize,
open: &multipipe.open,
}
pipe.limit = pipeSize
pipe.nodata.L = &pipe.mu
}
return multipipe, nil
}
// NewMultiPipeMemory returns a new MultiPipe that is using a memory buffer
func NewMultiPipeMemory(pipeCount, pipeSize int64) (*MultiPipe, error) {
buffer := make(memory, pipeCount*pipeSize)
multipipe := &MultiPipe{
pipes: make([]pipe, pipeCount),
open: pipeCount,
}
for i := range multipipe.pipes {
pipe := &multipipe.pipes[i]
pipe.buffer = buffer[i*int(pipeSize) : (i+1)*int(pipeSize)]
pipe.limit = pipeSize
pipe.nodata.L = &pipe.mu
}
return multipipe, nil
}
// Pipe returns the two ends of a block stream pipe
func (multipipe *MultiPipe) Pipe(index int) (PipeReader, PipeWriter) {
pipe := &multipipe.pipes[index]
return pipeReader{pipe}, pipeWriter{pipe}
}