Improve logic for cutting the long tail during upload (#909)

This commit is contained in:
Kaloyan Raev 2019-02-05 12:54:25 +02:00 committed by GitHub
parent 87d6410b50
commit dd76829d10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 225 additions and 382 deletions

View File

@ -62,7 +62,7 @@ func Main() error {
}
readers, err := eestream.EncodeReader(context.Background(),
encryption.TransformReader(eestream.PadReader(os.Stdin,
encrypter.InBlockSize()), encrypter, 0), rs, 4*1024*1024)
encrypter.InBlockSize()), encrypter, 0), rs)
if err != nil {
return err
}
@ -77,6 +77,7 @@ func Main() error {
}
defer printError(fh.Close)
defer printError(readers[i].Close)
_, err = io.Copy(fh, readers[i])
errs <- err

2
go.mod
View File

@ -99,7 +99,7 @@ require (
github.com/stretchr/testify v1.2.2
github.com/tidwall/gjson v1.1.3 // indirect
github.com/tidwall/match v0.0.0-20171002075945-1731857f09b1 // indirect
github.com/vivint/infectious v0.0.0-20180906161625-e155e6eb3575
github.com/vivint/infectious v0.0.0-20190108171102-2455b059135b
github.com/yuin/gopher-lua v0.0.0-20180918061612-799fa34954fb // indirect
github.com/zeebo/admission v0.0.0-20180821192747-f24f2a94a40c
github.com/zeebo/errs v1.1.0

2
go.sum
View File

@ -320,6 +320,8 @@ github.com/tidwall/match v0.0.0-20171002075945-1731857f09b1 h1:pWIN9LOlFRCJFqWIO
github.com/tidwall/match v0.0.0-20171002075945-1731857f09b1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E=
github.com/vivint/infectious v0.0.0-20180906161625-e155e6eb3575 h1:t0v1w3EiqMhDYBFbzwSfUHivx3yOMJG7R7YUm1Amlh8=
github.com/vivint/infectious v0.0.0-20180906161625-e155e6eb3575/go.mod h1:5oyMAv4hrBEKqBwORFsiqIrCNCmL2qcZLQTdJLYeYIc=
github.com/vivint/infectious v0.0.0-20190108171102-2455b059135b h1:dLkqBELopfQNhe8S9ucnSf+HhiUCgK/hPIjVG0f9GlY=
github.com/vivint/infectious v0.0.0-20190108171102-2455b059135b/go.mod h1:5oyMAv4hrBEKqBwORFsiqIrCNCmL2qcZLQTdJLYeYIc=
github.com/yuin/gopher-lua v0.0.0-20180918061612-799fa34954fb h1:Jmfk7z2f/+gxVFAgPsJMuczO1uEIxZy6wytTdeZ49lg=
github.com/yuin/gopher-lua v0.0.0-20180918061612-799fa34954fb/go.mod h1:aEV29XrmTYFr3CiRxZeGHpkvbwq+prZduBqMaascyCU=
github.com/zeebo/admission v0.0.0-20180821192747-f24f2a94a40c h1:WoYvMZp+keiJz+ZogLAhwsUZvWe81W+mCnpfdgEUOl4=

View File

@ -36,8 +36,7 @@ type decodedReader struct {
// expectedSize is the number of bytes expected to be returned by the Reader.
// mbm is the maximum memory (in bytes) to be allocated for read buffers. If
// set to 0, the minimum possible memory will be used.
func DecodeReaders(ctx context.Context, rs map[int]io.ReadCloser,
es ErasureScheme, expectedSize int64, mbm int) io.ReadCloser {
func DecodeReaders(ctx context.Context, rs map[int]io.ReadCloser, es ErasureScheme, expectedSize int64, mbm int) io.ReadCloser {
if expectedSize < 0 {
return readcloser.FatalReadCloser(Error.New("negative expected size"))
}
@ -210,3 +209,10 @@ func (dr *decodedRanger) Range(ctx context.Context, offset, length int64) (io.Re
// length might not have included all of the blocks, limit what we return
return readcloser.LimitReadCloser(r, length), nil
}
func checkMBM(mbm int) error {
if mbm < 0 {
return Error.New("negative max buffer memory")
}
return nil
}

View File

@ -7,9 +7,12 @@ import (
"context"
"io"
"io/ioutil"
"sync"
"time"
"os"
"go.uber.org/zap"
"storj.io/storj/internal/readcloser"
"storj.io/storj/internal/sync2"
"storj.io/storj/pkg/encryption"
"storj.io/storj/pkg/ranger"
)
@ -21,6 +24,9 @@ type ErasureScheme interface {
// Encode will take 'in' and call 'out' with erasure coded pieces.
Encode(in []byte, out func(num int, data []byte)) error
// EncodeSingle will take 'in' with the stripe and fill 'out' with the erasure share for piece 'num'.
EncodeSingle(in, out []byte, num int) error
// Decode will take a mapping of available erasure coded piece num -> data,
// 'in', and append the combined data to 'out', returning it.
Decode(out []byte, in map[int][]byte) ([]byte, error)
@ -98,260 +104,111 @@ func (rs *RedundancyStrategy) OptimalThreshold() int {
}
type encodedReader struct {
ctx context.Context
cancel context.CancelFunc
r io.Reader
rs RedundancyStrategy
inbuf []byte
eps map[int](*encodedPiece)
mux sync.Mutex
start time.Time
done int // number of readers done
}
type block struct {
i int // reader index in the map
num int64 // block number
data []byte // block data
err error // error reading the block
pieces map[int]*encodedPiece
}
// EncodeReader takes a Reader and a RedundancyStrategy and returns a slice of
// Readers.
//
// mbm is the maximum memory (in bytes) to be allocated for read buffers. If
// set to 0, the minimum possible memory will be used.
//
// When the repair threshold is reached a timer will be started with another
// 1.5x the amount of time that took so far. The Readers will be aborted as
// soon as the timer expires or the optimal threshold is reached.
func EncodeReader(ctx context.Context, r io.Reader, rs RedundancyStrategy, mbm int) ([]io.Reader, error) {
if err := checkMBM(mbm); err != nil {
// io.ReadClosers.
func EncodeReader(ctx context.Context, r io.Reader, rs RedundancyStrategy) ([]io.ReadCloser, error) {
er := &encodedReader{
rs: rs,
pieces: make(map[int]*encodedPiece, rs.TotalCount()),
}
pipeReaders, pipeWriter, err := sync2.NewTeeFile(rs.TotalCount(), os.TempDir())
if err != nil {
return nil, err
}
er := &encodedReader{
r: r,
rs: rs,
inbuf: make([]byte, rs.StripeSize()),
eps: make(map[int](*encodedPiece), rs.TotalCount()),
start: time.Now(),
}
er.ctx, er.cancel = context.WithCancel(ctx)
readers := make([]io.Reader, 0, rs.TotalCount())
readers := make([]io.ReadCloser, 0, rs.TotalCount())
for i := 0; i < rs.TotalCount(); i++ {
er.eps[i] = &encodedPiece{
er: er,
er.pieces[i] = &encodedPiece{
er: er,
pipeReader: pipeReaders[i],
num: i,
stripeBuf: make([]byte, rs.StripeSize()),
shareBuf: make([]byte, rs.ErasureShareSize()),
}
er.eps[i].ctx, er.eps[i].cancel = context.WithCancel(er.ctx)
readers = append(readers, er.eps[i])
readers = append(readers, er.pieces[i])
}
chanSize := mbm / (rs.TotalCount() * rs.ErasureShareSize())
if chanSize < 1 {
chanSize = 1
}
for i := 0; i < rs.TotalCount(); i++ {
er.eps[i].ch = make(chan block, chanSize)
}
go er.fillBuffer()
go er.fillBuffer(ctx, r, pipeWriter)
return readers, nil
}
func (er *encodedReader) fillBuffer() {
// these channels will synchronize the erasure encoder output with the
// goroutines for adding the output to the reader buffers
copiers := make(map[int]chan block, er.rs.TotalCount())
for i := 0; i < er.rs.TotalCount(); i++ {
copiers[i] = make(chan block)
// closing the channel will exit the next goroutine
defer close(copiers[i])
// kick off goroutine for parallel copy of encoded data to each
// reader buffer
go er.copyData(i, copiers[i])
}
// read from the input and encode until EOF or error
for blockNum := int64(0); ; blockNum++ {
_, err := io.ReadFull(er.r, er.inbuf)
if err != nil {
for i := range copiers {
copiers[i] <- block{i: i, num: blockNum, err: err}
}
return
}
err = er.rs.Encode(er.inbuf, func(num int, data []byte) {
b := block{
i: num,
num: blockNum,
data: make([]byte, len(data)),
}
// data is reused by infecious, so add a copy to the channel
copy(b.data, data)
// send the block to the goroutine for adding it to the reader buffer
copiers[num] <- b
})
if err != nil {
for i := range copiers {
copiers[i] <- block{i: i, num: blockNum, err: err}
}
return
}
}
}
// copyData waits for data block from the erasure encoder and copies it to the
// targeted reader buffer
func (er *encodedReader) copyData(num int, copier <-chan block) {
// close the respective buffer channel when this goroutine exits
defer er.closeReaderChannel(num)
// process the channel until closed
for b := range copier {
er.addToReader(b)
}
}
func (er *encodedReader) closeReaderChannel(num int) {
// use mutex to avoid data race with checkSlowChannel
er.mux.Lock()
defer er.mux.Unlock()
if !er.eps[num].closed {
er.eps[num].closed = true
close(er.eps[num].ch)
}
}
func (er *encodedReader) addToReader(b block) {
if er.eps[b.i].closed {
// this channel is already closed for slowness - skip it
return
}
for {
// initialize timer
timer := time.NewTimer(50 * time.Millisecond)
defer timer.Stop()
// add the encoded data to the respective reader buffer channel
select {
case er.eps[b.i].ch <- b:
return
// block for no more than 50 ms
case <-timer.C:
if er.checkSlowChannel(b.i) {
return
}
}
}
}
func (er *encodedReader) checkSlowChannel(num int) (closed bool) {
// use mutex to avoid concurrent map iteration and map write on channels
er.mux.Lock()
defer er.mux.Unlock()
// check how many buffer channels are already empty
ec := 0
for i := range er.eps {
if i != num && !er.eps[i].closed && len(er.eps[i].ch) == 0 {
ec++
}
}
// check if more than the required buffer channels are empty, i.e. the
// current channel is slow and should be closed and its context should be
// canceled
closed = ec >= er.rs.RepairThreshold()
if closed {
er.eps[num].closed = true
close(er.eps[num].ch)
er.eps[num].cancel()
}
return closed
}
// Called every time an encoded piece is done reading everything
func (er *encodedReader) readerDone() {
er.mux.Lock()
defer er.mux.Unlock()
er.done++
if er.done == er.rs.RepairThreshold() {
// repair threshold reached, wait for 1.5x the duration and cancel
// the context regardless if optimal threshold is reached
time.AfterFunc(time.Since(er.start)*3/2, er.cancel)
}
if er.done == er.rs.OptimalThreshold() {
// optimal threshold reached - cancel the context
er.cancel()
func (er *encodedReader) fillBuffer(ctx context.Context, r io.Reader, w sync2.PipeWriter) {
_, err := sync2.Copy(ctx, w, r)
err = w.CloseWithError(err)
if err != nil {
zap.S().Error(err)
}
}
type encodedPiece struct {
ctx context.Context
cancel context.CancelFunc
er *encodedReader
ch chan block
closed bool
outbuf []byte
err error
er *encodedReader
pipeReader sync2.PipeReader
num int
currentStripe int64
stripeBuf []byte
shareBuf []byte
available int
err error
}
func (ep *encodedPiece) Read(p []byte) (n int, err error) {
if ep.err != nil {
return 0, ep.err
}
if len(ep.outbuf) <= 0 {
// take the next block from the channel or block if channel is empty
select {
case b, ok := <-ep.ch:
if !ok {
// channel was closed due to slowness
return 0, io.ErrUnexpectedEOF
}
if b.err != nil {
ep.err = b.err
if ep.err == io.EOF {
ep.er.readerDone()
}
return 0, ep.err
}
ep.outbuf = b.data
case <-ep.ctx.Done():
// context was canceled due to:
// - slowness
// - optimal threshold reached
// - timeout after reaching repair threshold expired
return 0, io.ErrUnexpectedEOF
if ep.available == 0 {
// take the next stripe from the segment buffer
_, err := io.ReadFull(ep.pipeReader, ep.stripeBuf)
if err != nil {
return 0, err
}
// encode the num-th erasure share
err = ep.er.rs.EncodeSingle(ep.stripeBuf, ep.shareBuf, ep.num)
if err != nil {
return 0, err
}
ep.currentStripe++
ep.available = ep.er.rs.ErasureShareSize()
}
// we have some buffer remaining for this piece. write it to the output
n = copy(p, ep.outbuf)
// slide the unused (if any) bytes to the beginning of the buffer
copy(ep.outbuf, ep.outbuf[n:])
// and shrink the buffer
ep.outbuf = ep.outbuf[:len(ep.outbuf)-n]
off := len(ep.shareBuf) - ep.available
n = copy(p, ep.shareBuf[off:])
ep.available -= n
return n, nil
}
func (ep *encodedPiece) Close() error {
return ep.pipeReader.Close()
}
// EncodedRanger will take an existing Ranger and provide a means to get
// multiple Ranged sub-Readers. EncodedRanger does not match the normal Ranger
// interface.
type EncodedRanger struct {
rr ranger.Ranger
rs RedundancyStrategy
mbm int // max buffer memory
rr ranger.Ranger
rs RedundancyStrategy
}
// NewEncodedRanger from the given Ranger and RedundancyStrategy. See the
// comments for EncodeReader about the repair and optimal thresholds, and the
// max buffer memory.
func NewEncodedRanger(rr ranger.Ranger, rs RedundancyStrategy, mbm int) (*EncodedRanger, error) {
// comments for EncodeReader about the repair and success thresholds.
func NewEncodedRanger(rr ranger.Ranger, rs RedundancyStrategy) (*EncodedRanger, error) {
if rr.Size()%int64(rs.StripeSize()) != 0 {
return nil, Error.New("invalid erasure encoder and range reader combo. " +
"range reader size must be a multiple of erasure encoder block size")
}
if err := checkMBM(mbm); err != nil {
return nil, err
}
return &EncodedRanger{
rs: rs,
rr: rr,
mbm: mbm,
rs: rs,
rr: rr,
}, nil
}
@ -363,7 +220,7 @@ func (er *EncodedRanger) OutputSize() int64 {
}
// Range is like Ranger.Range, but returns a slice of Readers
func (er *EncodedRanger) Range(ctx context.Context, offset, length int64) ([]io.Reader, error) {
func (er *EncodedRanger) Range(ctx context.Context, offset, length int64) ([]io.ReadCloser, error) {
// the offset and length given may not be block-aligned, so let's figure
// out which blocks contain the request.
firstBlock, blockCount := encryption.CalcEncompassingBlocks(
@ -375,7 +232,7 @@ func (er *EncodedRanger) Range(ctx context.Context, offset, length int64) ([]io.
if err != nil {
return nil, err
}
readers, err := EncodeReader(ctx, r, er.rs, er.mbm)
readers, err := EncodeReader(ctx, r, er.rs)
if err != nil {
return nil, err
}
@ -389,14 +246,7 @@ func (er *EncodedRanger) Range(ctx context.Context, offset, length int64) ([]io.
}
// the length might be shorter than a multiple of the block size, so
// limit it
readers[i] = io.LimitReader(r, length)
readers[i] = readcloser.LimitReadCloser(r, length)
}
return readers, nil
}
func checkMBM(mbm int) error {
if mbm < 0 {
return Error.New("negative max buffer memory")
}
return nil
}

View File

@ -58,6 +58,18 @@ func (mr *MockErasureSchemeMockRecorder) Encode(arg0, arg1 interface{}) *gomock.
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Encode", reflect.TypeOf((*MockErasureScheme)(nil).Encode), arg0, arg1)
}
// EncodeSingle mocks base method
func (m *MockErasureScheme) EncodeSingle(arg0, arg1 []byte, arg2 int) error {
ret := m.ctrl.Call(m, "EncodeSingle", arg0, arg1, arg2)
ret0, _ := ret[0].(error)
return ret0
}
// EncodeSingle indicates an expected call of EncodeSingle
func (mr *MockErasureSchemeMockRecorder) EncodeSingle(arg0, arg1, arg2 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EncodeSingle", reflect.TypeOf((*MockErasureScheme)(nil).EncodeSingle), arg0, arg1, arg2)
}
// ErasureShareSize mocks base method
func (m *MockErasureScheme) ErasureShareSize() int {
ret := m.ctrl.Call(m, "ErasureShareSize")

View File

@ -17,6 +17,10 @@ func NewRSScheme(fc *infectious.FEC, erasureShareSize int) ErasureScheme {
return &rsScheme{fc: fc, erasureShareSize: erasureShareSize}
}
func (s *rsScheme) EncodeSingle(input, output []byte, num int) (err error) {
return s.fc.EncodeSingle(input, output, num)
}
func (s *rsScheme) Encode(input []byte, output func(num int, data []byte)) (
err error) {
return s.fc.Encode(input, func(s infectious.Share) {

View File

@ -17,6 +17,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/vivint/infectious"
"github.com/zeebo/errs"
"storj.io/storj/internal/readcloser"
"storj.io/storj/pkg/encryption"
@ -45,13 +46,13 @@ func TestRS(t *testing.T) {
if err != nil {
t.Fatal(err)
}
readers, err := EncodeReader(ctx, bytes.NewReader(data), rs, 0)
readers, err := EncodeReader(ctx, bytes.NewReader(data), rs)
if err != nil {
t.Fatal(err)
}
readerMap := make(map[int]io.ReadCloser, len(readers))
for i, reader := range readers {
readerMap[i] = ioutil.NopCloser(reader)
readerMap[i] = reader
}
decoder := DecodeReaders(ctx, readerMap, rs, 32*1024, 0)
defer func() { assert.NoError(t, decoder.Close()) }()
@ -76,13 +77,13 @@ func TestRSUnexpectedEOF(t *testing.T) {
if err != nil {
t.Fatal(err)
}
readers, err := EncodeReader(ctx, bytes.NewReader(data), rs, 0)
readers, err := EncodeReader(ctx, bytes.NewReader(data), rs)
if err != nil {
t.Fatal(err)
}
readerMap := make(map[int]io.ReadCloser, len(readers))
for i, reader := range readers {
readerMap[i] = ioutil.NopCloser(reader)
readerMap[i] = reader
}
decoder := DecodeReaders(ctx, readerMap, rs, 32*1024, 0)
defer func() { assert.NoError(t, decoder.Close()) }()
@ -111,7 +112,7 @@ func TestRSRanger(t *testing.T) {
t.Fatal(err)
}
readers, err := EncodeReader(ctx, encryption.TransformReader(PadReader(ioutil.NopCloser(
bytes.NewReader(data)), encrypter.InBlockSize()), encrypter, 0), rs, 0)
bytes.NewReader(data)), encrypter.InBlockSize()), encrypter, 0), rs)
if err != nil {
t.Fatal(err)
}
@ -189,66 +190,6 @@ func TestNewRedundancyStrategy(t *testing.T) {
}
}
func TestRSEncoderInputParams(t *testing.T) {
for i, tt := range []struct {
mbm int
errString string
}{
{0, ""},
{-1, "eestream error: negative max buffer memory"},
{1024, ""},
} {
errTag := fmt.Sprintf("Test case #%d", i)
ctx := context.Background()
data := randData(32 * 1024)
fc, err := infectious.NewFEC(2, 4)
if !assert.NoError(t, err, errTag) {
continue
}
es := NewRSScheme(fc, 8*1024)
rs, err := NewRedundancyStrategy(es, 0, 0)
if !assert.NoError(t, err, errTag) {
continue
}
_, err = EncodeReader(ctx, bytes.NewReader(data), rs, tt.mbm)
if tt.errString == "" {
assert.NoError(t, err, errTag)
} else {
assert.EqualError(t, err, tt.errString, errTag)
}
}
}
func TestRSRangerInputParams(t *testing.T) {
for i, tt := range []struct {
mbm int
errString string
}{
{0, ""},
{-1, "eestream error: negative max buffer memory"},
{1024, ""},
} {
errTag := fmt.Sprintf("Test case #%d", i)
ctx := context.Background()
data := randData(32 * 1024)
fc, err := infectious.NewFEC(2, 4)
if !assert.NoError(t, err, errTag) {
continue
}
es := NewRSScheme(fc, 8*1024)
rs, err := NewRedundancyStrategy(es, 0, 0)
if !assert.NoError(t, err, errTag) {
continue
}
_, err = EncodeReader(ctx, bytes.NewReader(data), rs, tt.mbm)
if tt.errString == "" {
assert.NoError(t, err, errTag)
} else {
assert.EqualError(t, err, tt.errString, errTag)
}
}
}
// Some pieces will read error.
// Test will pass if at least required number of pieces are still good.
func TestRSErrors(t *testing.T) {
@ -448,7 +389,7 @@ func testRSProblematic(t *testing.T, tt testCase, i int, fn problematicReadClose
if !assert.NoError(t, err, errTag) {
return
}
readers, err := EncodeReader(ctx, bytes.NewReader(data), rs, 3*1024)
readers, err := EncodeReader(ctx, bytes.NewReader(data), rs)
if !assert.NoError(t, err, errTag) {
return
}
@ -479,18 +420,18 @@ func testRSProblematic(t *testing.T, tt testCase, i int, fn problematicReadClose
}
}
func readAll(readers []io.Reader) ([][]byte, error) {
func readAll(readers []io.ReadCloser) ([][]byte, error) {
pieces := make([][]byte, len(readers))
errs := make(chan error, len(readers))
errors := make(chan error, len(readers))
for i := range readers {
go func(i int) {
var err error
pieces[i], err = ioutil.ReadAll(readers[i])
errs <- err
errors <- errs.Combine(err, readers[i].Close())
}(i)
}
for range readers {
err := <-errs
err := <-errors
if err != nil {
return nil, err
}
@ -524,7 +465,7 @@ func TestEncoderStalledReaders(t *testing.T) {
if err != nil {
t.Fatal(err)
}
readers, err := EncodeReader(ctx, bytes.NewReader(data), rs, 0)
readers, err := EncodeReader(ctx, bytes.NewReader(data), rs)
if err != nil {
t.Fatal(err)
}
@ -534,9 +475,12 @@ func TestEncoderStalledReaders(t *testing.T) {
if time.Since(start) > 1*time.Second {
t.Fatalf("waited for slow reader")
}
for _, reader := range readers {
assert.NoError(t, reader.Close())
}
}
func readAllStalled(readers []io.Reader, stalled int) ([][]byte, error) {
func readAllStalled(readers []io.ReadCloser, stalled int) ([][]byte, error) {
pieces := make([][]byte, len(readers))
errs := make(chan error, len(readers))
for i := stalled; i < len(readers); i++ {
@ -567,7 +511,7 @@ func TestDecoderErrorWithStalledReaders(t *testing.T) {
if err != nil {
t.Fatal(err)
}
readers, err := EncodeReader(ctx, bytes.NewReader(data), rs, 0)
readers, err := EncodeReader(ctx, bytes.NewReader(data), rs)
if err != nil {
t.Fatal(err)
}

View File

@ -352,17 +352,17 @@ func newMetainfoParts(planet *testplanet.Planet) (*kvmetainfo.DB, buckets.Store,
return nil, nil, nil, err
}
rs, err := eestream.NewRedundancyStrategy(eestream.NewRSScheme(fc, int(1*memory.KB)), 3, 4)
rs, err := eestream.NewRedundancyStrategy(eestream.NewRSScheme(fc, 1*memory.KB.Int()), 3, 4)
if err != nil {
return nil, nil, nil, err
}
segments := segments.NewSegmentStore(oc, ec, pdb, rs, int(8*memory.KB))
segments := segments.NewSegmentStore(oc, ec, pdb, rs, 8*memory.KB.Int())
key := new(storj.Key)
copy(key[:], TestEncKey)
streams, err := streams.NewStreamStore(segments, int64(64*memory.MB), key, int(1*memory.KB), storj.AESGCM)
streams, err := streams.NewStreamStore(segments, 64*memory.MB.Int64(), key, 1*memory.KB.Int(), storj.AESGCM)
if err != nil {
return nil, nil, nil, err
}

View File

@ -150,7 +150,7 @@ func TestGetObjectStream(t *testing.T) {
assertStream(ctx, t, db, streams, bucket, "empty-file", 0, []byte{})
assertStream(ctx, t, db, streams, bucket, "small-file", 4, []byte("test"))
assertStream(ctx, t, db, streams, bucket, "large-file", int64(32*memory.KB), data)
assertStream(ctx, t, db, streams, bucket, "large-file", 32*memory.KB.Int64(), data)
})
}
@ -205,7 +205,7 @@ func assertStream(ctx context.Context, t *testing.T, db *kvmetainfo.DB, streams
assert.EqualValues(t, 0, segments[0].Index)
assert.EqualValues(t, len(content), segments[0].Size)
if segments[0].Size > int64(4*memory.KB) {
if segments[0].Size > 4*memory.KB.Int64() {
assertRemoteSegment(t, segments[0])
} else {
assertInlineSegment(t, segments[0], content)

View File

@ -677,17 +677,17 @@ func initEnv(planet *testplanet.Planet) (minio.ObjectLayer, storj.Metainfo, stre
return nil, nil, nil, err
}
rs, err := eestream.NewRedundancyStrategy(eestream.NewRSScheme(fc, int(1*memory.KB)), 3, 4)
rs, err := eestream.NewRedundancyStrategy(eestream.NewRSScheme(fc, 1*memory.KB.Int()), 3, 4)
if err != nil {
return nil, nil, nil, err
}
segments := segments.NewSegmentStore(oc, ec, pdb, rs, int(8*memory.KB))
segments := segments.NewSegmentStore(oc, ec, pdb, rs, 8*memory.KB.Int())
key := new(storj.Key)
copy(key[:], TestEncKey)
streams, err := streams.NewStreamStore(segments, int64(64*memory.MB), key, int(1*memory.KB), storj.AESGCM)
streams, err := streams.NewStreamStore(segments, 64*memory.MB.Int64(), key, 1*memory.KB.Int(), storj.AESGCM)
if err != nil {
return nil, nil, nil, err
}

View File

@ -8,7 +8,6 @@ import (
"flag"
"fmt"
"io"
"log"
"time"
"github.com/zeebo/errs"
@ -140,21 +139,13 @@ func (ps *PieceStore) Put(ctx context.Context, id PieceID, data io.Reader, ttl t
defer func() {
if err := writer.Close(); err != nil && err != io.EOF {
log.Printf("failed to close writer: %s\n", err)
zap.S().Debugf("failed to close writer: %s\n", err)
}
}()
bufw := bufio.NewWriterSize(writer, 32*1024)
_, err = io.Copy(bufw, data)
if err == io.ErrUnexpectedEOF {
_ = writer.Close()
zap.S().Infof("Node cut from upload due to slow connection. Deleting piece %s...", id)
deleteErr := ps.Delete(ctx, id, authorization)
if deleteErr != nil {
return deleteErr
}
}
if err != nil {
return err
}

View File

@ -8,6 +8,7 @@ import (
"io"
"io/ioutil"
"sort"
"sync/atomic"
"time"
"github.com/zeebo/errs"
@ -28,10 +29,8 @@ var mon = monkit.Package()
// Client defines an interface for storing erasure coded data to piece store nodes
type Client interface {
Put(ctx context.Context, nodes []*pb.Node, rs eestream.RedundancyStrategy,
pieceID psclient.PieceID, data io.Reader, expiration time.Time, pba *pb.PayerBandwidthAllocation, authorization *pb.SignedMessage) (successfulNodes []*pb.Node, err error)
Get(ctx context.Context, nodes []*pb.Node, es eestream.ErasureScheme,
pieceID psclient.PieceID, size int64, pba *pb.PayerBandwidthAllocation, authorization *pb.SignedMessage) (ranger.Ranger, error)
Put(ctx context.Context, nodes []*pb.Node, rs eestream.RedundancyStrategy, pieceID psclient.PieceID, data io.Reader, expiration time.Time, pba *pb.PayerBandwidthAllocation, authorization *pb.SignedMessage) (successfulNodes []*pb.Node, err error)
Get(ctx context.Context, nodes []*pb.Node, es eestream.ErasureScheme, pieceID psclient.PieceID, size int64, pba *pb.PayerBandwidthAllocation, authorization *pb.SignedMessage) (ranger.Ranger, error)
Delete(ctx context.Context, nodes []*pb.Node, pieceID psclient.PieceID, authorization *pb.SignedMessage) error
}
@ -59,8 +58,7 @@ func (ec *ecClient) newPSClient(ctx context.Context, n *pb.Node) (psclient.Clien
return ec.newPSClientFunc(ctx, ec.transport, n, 0)
}
func (ec *ecClient) Put(ctx context.Context, nodes []*pb.Node, rs eestream.RedundancyStrategy,
pieceID psclient.PieceID, data io.Reader, expiration time.Time, pba *pb.PayerBandwidthAllocation, authorization *pb.SignedMessage) (successfulNodes []*pb.Node, err error) {
func (ec *ecClient) Put(ctx context.Context, nodes []*pb.Node, rs eestream.RedundancyStrategy, pieceID psclient.PieceID, data io.Reader, expiration time.Time, pba *pb.PayerBandwidthAllocation, authorization *pb.SignedMessage) (successfulNodes []*pb.Node, err error) {
defer mon.Task()(&ctx)(&err)
if len(nodes) != rs.TotalCount() {
return nil, Error.New("size of nodes slice (%d) does not match total count (%d) of erasure scheme", len(nodes), rs.TotalCount())
@ -75,7 +73,7 @@ func (ec *ecClient) Put(ctx context.Context, nodes []*pb.Node, rs eestream.Redun
}
padded := eestream.PadReader(ioutil.NopCloser(data), rs.StripeSize())
readers, err := eestream.EncodeReader(ctx, padded, rs, ec.memoryLimit)
readers, err := eestream.EncodeReader(ctx, padded, rs)
if err != nil {
return nil, err
}
@ -86,57 +84,48 @@ func (ec *ecClient) Put(ctx context.Context, nodes []*pb.Node, rs eestream.Redun
}
infos := make(chan info, len(nodes))
for i, n := range nodes {
psCtx, cancel := context.WithCancel(ctx)
defer cancel()
if n != nil {
n.Type.DPanicOnInvalid("ec client Put")
start := time.Now()
for i, node := range nodes {
if node != nil {
node.Type.DPanicOnInvalid("ec client Put")
}
go func(i int, n *pb.Node) {
if n == nil {
_, err := io.Copy(ioutil.Discard, readers[i])
infos <- info{i: i, err: err}
return
}
derivedPieceID, err := pieceID.Derive(n.Id.Bytes())
if err != nil {
zap.S().Errorf("Failed deriving piece id for %s: %v", pieceID, err)
infos <- info{i: i, err: err}
return
}
ps, err := ec.newPSClient(ctx, n)
if err != nil {
zap.S().Errorf("Failed dialing for putting piece %s -> %s to node %s: %v",
pieceID, derivedPieceID, n.Id, err)
infos <- info{i: i, err: err}
return
}
err = ps.Put(ctx, derivedPieceID, readers[i], expiration, pba, authorization)
// normally the bellow call should be deferred, but doing so fails
// randomly the unit tests
err = errs.Combine(err, ps.Close())
// io.ErrUnexpectedEOF means the piece upload was interrupted due to slow connection.
// No error logging for this case.
if err != nil && err != io.ErrUnexpectedEOF {
nodeAddress := "nil"
if n.Address != nil {
nodeAddress = n.Address.Address
}
zap.S().Errorf("Failed putting piece %s -> %s to node %s (%+v): %v",
pieceID, derivedPieceID, n.Id, nodeAddress, err)
}
go func(i int, node *pb.Node) {
err := ec.putPiece(psCtx, ctx, node, pieceID, readers[i], expiration, pba, authorization)
infos <- info{i: i, err: err}
}(i, n)
}(i, node)
}
successfulNodes = make([]*pb.Node, len(nodes))
var successfulCount int
var successfulCount int32
var timer *time.Timer
for range nodes {
info := <-infos
if info.err == nil {
successfulNodes[info.i] = nodes[info.i]
successfulCount++
switch int(atomic.AddInt32(&successfulCount, 1)) {
case rs.RepairThreshold():
elapsed := time.Since(start)
more := elapsed * 3 / 2
zap.S().Infof("Repair threshold (%d nodes) reached in %.2f s. Starting a timer for %.2f s for reaching the success threshold (%d nodes)...",
rs.RepairThreshold(), elapsed.Seconds(), more.Seconds(), rs.OptimalThreshold())
timer = time.AfterFunc(more, func() {
zap.S().Infof("Timer expired. Successfully uploaded to %d nodes. Canceling the long tail...", atomic.LoadInt32(&successfulCount))
cancel()
})
case rs.OptimalThreshold():
zap.S().Infof("Success threshold (%d nodes) reached. Canceling the long tail...", rs.OptimalThreshold())
timer.Stop()
cancel()
}
}
}
@ -152,13 +141,55 @@ func (ec *ecClient) Put(ctx context.Context, nodes []*pb.Node, rs eestream.Redun
}
}()
if successfulCount < rs.RepairThreshold() {
if int(atomic.LoadInt32(&successfulCount)) < rs.RepairThreshold() {
return nil, Error.New("successful puts (%d) less than repair threshold (%d)", successfulCount, rs.RepairThreshold())
}
return successfulNodes, nil
}
func (ec *ecClient) putPiece(ctx, parent context.Context, node *pb.Node, pieceID psclient.PieceID, data io.ReadCloser, expiration time.Time, pba *pb.PayerBandwidthAllocation, authorization *pb.SignedMessage) (err error) {
defer func() { err = errs.Combine(err, data.Close()) }()
if node == nil {
_, err = io.Copy(ioutil.Discard, data)
return err
}
derivedPieceID, err := pieceID.Derive(node.Id.Bytes())
if err != nil {
zap.S().Errorf("Failed deriving piece id for %s: %v", pieceID, err)
return err
}
ps, err := ec.newPSClient(ctx, node)
if err != nil {
zap.S().Errorf("Failed dialing for putting piece %s -> %s to node %s: %v",
pieceID, derivedPieceID, node.Id, err)
return err
}
err = ps.Put(ctx, derivedPieceID, data, expiration, pba, authorization)
defer func() { err = errs.Combine(err, ps.Close()) }()
// Canceled context means the piece upload was interrupted by user or due
// to slow connection. No error logging for this case.
if ctx.Err() == context.Canceled {
if parent.Err() == context.Canceled {
zap.S().Infof("Upload to node %s canceled by user.", node.Id)
} else {
zap.S().Infof("Node %s cut from upload due to slow connection.", node.Id)
}
err = context.Canceled
} else if err != nil {
nodeAddress := "nil"
if node.Address != nil {
nodeAddress = node.Address.Address
}
zap.S().Errorf("Failed putting piece %s -> %s to node %s (%+v): %v",
pieceID, derivedPieceID, node.Id, nodeAddress, err)
}
return err
}
func (ec *ecClient) Get(ctx context.Context, nodes []*pb.Node, es eestream.ErasureScheme,
pieceID psclient.PieceID, size int64, pba *pb.PayerBandwidthAllocation, authorization *pb.SignedMessage) (rr ranger.Ranger, err error) {
defer mon.Task()(&ctx)(&err)

View File

@ -83,33 +83,29 @@ TestLoop:
for i, tt := range []struct {
nodes []*pb.Node
min int
mbm int
badInput bool
errs []error
errString string
}{
{[]*pb.Node{}, 0, 0, true, []error{},
{[]*pb.Node{}, 0, true, []error{},
fmt.Sprintf("ecclient error: size of nodes slice (0) does not match total count (%v) of erasure scheme", n)},
{[]*pb.Node{node0, node1, node2, node3}, 0, -1, true,
[]error{nil, nil, nil, nil},
"eestream error: negative max buffer memory"},
{[]*pb.Node{node0, node1, node0, node3}, 0, 0, true,
{[]*pb.Node{node0, node1, node0, node3}, 0, true,
[]error{nil, nil, nil, nil},
"ecclient error: duplicated nodes are not allowed"},
{[]*pb.Node{node0, node1, node2, node3}, 0, 0, false,
{[]*pb.Node{node0, node1, node2, node3}, 0, false,
[]error{nil, nil, nil, nil}, ""},
{[]*pb.Node{node0, node1, node2, node3}, 0, 0, false,
{[]*pb.Node{node0, node1, node2, node3}, 0, false,
[]error{nil, ErrDialFailed, nil, nil},
"ecclient error: successful puts (3) less than repair threshold (4)"},
{[]*pb.Node{node0, node1, node2, node3}, 0, 0, false,
{[]*pb.Node{node0, node1, node2, node3}, 0, false,
[]error{nil, ErrOpFailed, nil, nil},
"ecclient error: successful puts (3) less than repair threshold (4)"},
{[]*pb.Node{node0, node1, node2, node3}, 2, 0, false,
{[]*pb.Node{node0, node1, node2, node3}, 2, false,
[]error{nil, ErrDialFailed, nil, nil}, ""},
{[]*pb.Node{node0, node1, node2, node3}, 2, 0, false,
{[]*pb.Node{node0, node1, node2, node3}, 2, false,
[]error{ErrOpFailed, ErrDialFailed, nil, ErrDialFailed},
"ecclient error: successful puts (1) less than repair threshold (2)"},
{[]*pb.Node{nil, nil, node2, node3}, 2, 0, false,
{[]*pb.Node{nil, nil, node2, node3}, 2, false,
[]error{nil, nil, nil, nil}, ""},
} {
errTag := fmt.Sprintf("Test case #%d", i)
@ -149,7 +145,7 @@ TestLoop:
continue
}
r := io.LimitReader(rand.Reader, int64(size))
ec := ecClient{newPSClientFunc: mockNewPSClient(clients), memoryLimit: tt.mbm}
ec := ecClient{newPSClientFunc: mockNewPSClient(clients)}
successfulNodes, err := ec.Put(ctx, tt.nodes, rs, id, r, ttl, nil, nil)

View File

@ -63,7 +63,13 @@ type segmentStore struct {
// NewSegmentStore creates a new instance of segmentStore
func NewSegmentStore(oc overlay.Client, ec ecclient.Client, pdb pdbclient.Client, rs eestream.RedundancyStrategy, threshold int) Store {
return &segmentStore{oc: oc, ec: ec, pdb: pdb, rs: rs, thresholdSize: threshold}
return &segmentStore{
oc: oc,
ec: ec,
pdb: pdb,
rs: rs,
thresholdSize: threshold,
}
}
// Meta retrieves the metadata of the segment