ECClient pads data to fit RS block size (#199)
This commit is contained in:
parent
c9c2a93c31
commit
aac7c6fbd1
@ -78,12 +78,12 @@ func (c *crcChecker) Transform(out, in []byte, blockOffset int64) (
|
||||
|
||||
// addCRC is a Ranger constructor, given a specific crc table and an existing
|
||||
// un-crced Ranger
|
||||
func addCRC(data ranger.Ranger, tab *crc32.Table) (ranger.Ranger, error) {
|
||||
func addCRC(data ranger.RangeCloser, tab *crc32.Table) (ranger.RangeCloser, error) {
|
||||
return Transform(data, newCRCAdder(tab))
|
||||
}
|
||||
|
||||
// checkCRC is a Ranger constructor, given a specific crc table and an existing
|
||||
// crced Ranger
|
||||
func checkCRC(data ranger.Ranger, tab *crc32.Table) (ranger.Ranger, error) {
|
||||
func checkCRC(data ranger.RangeCloser, tab *crc32.Table) (ranger.RangeCloser, error) {
|
||||
return Transform(data, newCRCChecker(tab))
|
||||
}
|
||||
|
@ -48,7 +48,8 @@ func DecodeReaders(ctx context.Context, rs map[int]io.ReadCloser,
|
||||
}
|
||||
if expectedSize%int64(es.DecodedBlockSize()) != 0 {
|
||||
return readcloser.FatalReadCloser(
|
||||
Error.New("expected size not a factor decoded block size"))
|
||||
Error.New("expected size (%d) not a factor decoded block size (%d)",
|
||||
expectedSize, es.DecodedBlockSize()))
|
||||
}
|
||||
if err := checkMBM(mbm); err != nil {
|
||||
return readcloser.FatalReadCloser(err)
|
||||
@ -252,11 +253,12 @@ func Decode(rrs map[int]ranger.RangeCloser, es ErasureScheme, mbm int) (ranger.R
|
||||
}
|
||||
}
|
||||
if size == -1 {
|
||||
return ranger.NopCloser(ranger.ByteRanger(nil)), nil
|
||||
return ranger.ByteRangeCloser(nil), nil
|
||||
}
|
||||
if size%int64(es.EncodedBlockSize()) != 0 {
|
||||
return nil, Error.New("invalid erasure decoder and range reader combo. " +
|
||||
"range reader size must be a multiple of erasure encoder block size")
|
||||
return nil, Error.New("invalid erasure decoder and range reader combo. "+
|
||||
"range reader size (%d) must be a multiple of erasure encoder block size (%d)",
|
||||
size, es.EncodedBlockSize())
|
||||
}
|
||||
return &decodedRanger{
|
||||
es: es,
|
||||
@ -304,7 +306,7 @@ func (dr *decodedRanger) Range(ctx context.Context, offset, length int64) (io.Re
|
||||
}
|
||||
}
|
||||
// decode from all those ranges
|
||||
r := DecodeReaders(ctx, readers, dr.es, length, dr.mbm)
|
||||
r := DecodeReaders(ctx, readers, dr.es, blockCount*int64(dr.es.DecodedBlockSize()), dr.mbm)
|
||||
// offset might start a few bytes in, potentially discard the initial bytes
|
||||
_, err := io.CopyN(ioutil.Discard, r,
|
||||
offset-firstBlock*int64(dr.es.DecodedBlockSize()))
|
||||
|
@ -42,13 +42,13 @@ func Pad(data ranger.Ranger, blockSize int) (
|
||||
// Unpad takes a previously padded Ranger data source and returns an unpadded
|
||||
// ranger, given the amount of padding. This is preferable to UnpadSlow if you
|
||||
// can swing it.
|
||||
func Unpad(data ranger.Ranger, padding int) (ranger.Ranger, error) {
|
||||
func Unpad(data ranger.RangeCloser, padding int) (ranger.RangeCloser, error) {
|
||||
return ranger.Subrange(data, 0, data.Size()-int64(padding))
|
||||
}
|
||||
|
||||
// UnpadSlow is like Unpad, but does not require the amount of padding.
|
||||
// UnpadSlow will have to do extra work to make up for this missing information.
|
||||
func UnpadSlow(ctx context.Context, data ranger.Ranger) (ranger.Ranger, error) {
|
||||
func UnpadSlow(ctx context.Context, data ranger.RangeCloser) (ranger.RangeCloser, error) {
|
||||
r, err := data.Range(ctx, data.Size()-uint32Size, uint32Size)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
|
@ -44,7 +44,7 @@ func TestPad(t *testing.T) {
|
||||
if int64(padding+len(example.data)) != padded.Size() {
|
||||
t.Fatalf("invalid padding")
|
||||
}
|
||||
unpadded, err := Unpad(padded, padding)
|
||||
unpadded, err := Unpad(ranger.NopCloser(padded), padding)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error")
|
||||
}
|
||||
@ -59,7 +59,7 @@ func TestPad(t *testing.T) {
|
||||
if !bytes.Equal(data, []byte(example.data)) {
|
||||
t.Fatalf("mismatch")
|
||||
}
|
||||
unpadded, err = UnpadSlow(ctx, padded)
|
||||
unpadded, err = UnpadSlow(ctx, ranger.NopCloser(padded))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error")
|
||||
}
|
||||
|
@ -113,7 +113,7 @@ func TestRSRanger(t *testing.T) {
|
||||
}
|
||||
rrs := map[int]ranger.RangeCloser{}
|
||||
for i, piece := range pieces {
|
||||
rrs[i] = ranger.NopCloser(ranger.ByteRanger(piece))
|
||||
rrs[i] = ranger.ByteRangeCloser(piece)
|
||||
}
|
||||
decrypter, err := NewAESGCMDecrypter(
|
||||
&encKey, &firstNonce, rs.DecodedBlockSize())
|
||||
|
@ -92,12 +92,12 @@ func (t *transformedReader) Close() error {
|
||||
}
|
||||
|
||||
type transformedRanger struct {
|
||||
rr ranger.Ranger
|
||||
rr ranger.RangeCloser
|
||||
t Transformer
|
||||
}
|
||||
|
||||
// Transform will apply a Transformer to a Ranger.
|
||||
func Transform(rr ranger.Ranger, t Transformer) (ranger.Ranger, error) {
|
||||
func Transform(rr ranger.RangeCloser, t Transformer) (ranger.RangeCloser, error) {
|
||||
if rr.Size()%int64(t.InBlockSize()) != 0 {
|
||||
return nil, Error.New("invalid transformer and range reader combination." +
|
||||
"the range reader size is not a multiple of the block size")
|
||||
@ -159,3 +159,7 @@ func (t *transformedRanger) Range(ctx context.Context, offset, length int64) (io
|
||||
// the range might have been too long. only return what was requested
|
||||
return readcloser.LimitReadCloser(tr, length), nil
|
||||
}
|
||||
|
||||
func (t *transformedRanger) Close() error {
|
||||
return t.rr.Close()
|
||||
}
|
||||
|
@ -69,7 +69,7 @@ func TestCalcEncompassingBlocks(t *testing.T) {
|
||||
|
||||
func TestCRC(t *testing.T) {
|
||||
const blocks = 3
|
||||
rr, err := addCRC(ranger.ByteRanger(bytes.Repeat([]byte{0}, blocks*64)),
|
||||
rr, err := addCRC(ranger.ByteRangeCloser(bytes.Repeat([]byte{0}, blocks*64)),
|
||||
crc32.IEEETable)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected: %v", err)
|
||||
@ -87,7 +87,7 @@ func TestCRC(t *testing.T) {
|
||||
t.Fatalf("unexpected: %v", err)
|
||||
}
|
||||
|
||||
rr, err = checkCRC(ranger.ByteRanger(data), crc32.IEEETable)
|
||||
rr, err = checkCRC(ranger.ByteRangeCloser(data), crc32.IEEETable)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected: %v", err)
|
||||
}
|
||||
@ -113,7 +113,7 @@ func TestCRC(t *testing.T) {
|
||||
func TestCRCSubranges(t *testing.T) {
|
||||
const blocks = 3
|
||||
data := bytes.Repeat([]byte{0, 1, 2}, blocks*64)
|
||||
internal, err := addCRC(ranger.ByteRanger(data), crc32.IEEETable)
|
||||
internal, err := addCRC(ranger.ByteRangeCloser(data), crc32.IEEETable)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected: %v", err)
|
||||
}
|
||||
|
@ -59,6 +59,11 @@ func (b ByteRanger) Range(ctx context.Context, offset, length int64) (io.ReadClo
|
||||
return ioutil.NopCloser(bytes.NewReader(b[offset : offset+length])), nil
|
||||
}
|
||||
|
||||
// ByteRangeCloser turns a byte slice into a RangeCloser
|
||||
func ByteRangeCloser(data []byte) RangeCloser {
|
||||
return NopCloser(ByteRanger(data))
|
||||
}
|
||||
|
||||
type concatReader struct {
|
||||
r1 Ranger
|
||||
r2 Ranger
|
||||
@ -107,12 +112,12 @@ func Concat(r ...Ranger) Ranger {
|
||||
}
|
||||
|
||||
type subrange struct {
|
||||
r Ranger
|
||||
r RangeCloser
|
||||
offset, length int64
|
||||
}
|
||||
|
||||
// Subrange returns a subset of a Ranger.
|
||||
func Subrange(data Ranger, offset, length int64) (Ranger, error) {
|
||||
func Subrange(data RangeCloser, offset, length int64) (RangeCloser, error) {
|
||||
dSize := data.Size()
|
||||
if offset < 0 || offset > dSize {
|
||||
return nil, Error.New("invalid offset")
|
||||
@ -130,3 +135,7 @@ func (s *subrange) Size() int64 {
|
||||
func (s *subrange) Range(ctx context.Context, offset, length int64) (io.ReadCloser, error) {
|
||||
return s.r.Range(ctx, offset+s.offset, length)
|
||||
}
|
||||
|
||||
func (s *subrange) Close() error {
|
||||
return s.r.Close()
|
||||
}
|
||||
|
@ -115,7 +115,7 @@ func TestSubranger(t *testing.T) {
|
||||
{"abcdefghijkl", 8, 4, 0, 3, "ijk"},
|
||||
{"abcdefghijkl", 8, 4, 1, 3, "jkl"},
|
||||
} {
|
||||
rr, err := Subrange(ByteRanger([]byte(example.data)),
|
||||
rr, err := Subrange(ByteRangeCloser([]byte(example.data)),
|
||||
example.offset1, example.length1)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
@ -148,7 +148,7 @@ func TestSubrangerError(t *testing.T) {
|
||||
{name: "Length and offset is bigger than DataSize", data: "abcd", offset: 4, length: 1},
|
||||
} {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
rr, err := Subrange(ByteRanger([]byte(tt.data)), tt.offset, tt.length)
|
||||
rr, err := Subrange(ByteRangeCloser([]byte(tt.data)), tt.offset, tt.length)
|
||||
assert.Nil(t, rr)
|
||||
assert.NotNil(t, err)
|
||||
})
|
||||
|
@ -6,6 +6,7 @@ package ecclient
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
@ -66,7 +67,8 @@ func (ec *ecClient) Put(ctx context.Context, nodes []*proto.Node, rs eestream.Re
|
||||
if !unique(nodes) {
|
||||
return Error.New("duplicated nodes are not allowed")
|
||||
}
|
||||
readers, err := eestream.EncodeReader(ctx, data, rs, ec.mbm)
|
||||
padded := eestream.PadReader(ioutil.NopCloser(data), rs.DecodedBlockSize())
|
||||
readers, err := eestream.EncodeReader(ctx, padded, rs, ec.mbm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -113,6 +115,8 @@ func (ec *ecClient) Get(ctx context.Context, nodes []*proto.Node, es eestream.Er
|
||||
if len(nodes) != es.TotalCount() {
|
||||
return nil, Error.New("number of nodes do not match total count of erasure scheme")
|
||||
}
|
||||
paddedSize := calcPadded(size, es.DecodedBlockSize())
|
||||
pieceSize := paddedSize / int64(es.RequiredCount())
|
||||
rrs := map[int]ranger.RangeCloser{}
|
||||
type rangerInfo struct {
|
||||
i int
|
||||
@ -135,7 +139,7 @@ func (ec *ecClient) Get(ctx context.Context, nodes []*proto.Node, es eestream.Er
|
||||
ch <- rangerInfo{i: i, rr: nil, err: err}
|
||||
return
|
||||
}
|
||||
rr, err := ps.Get(ctx, derivedPieceID, size)
|
||||
rr, err := ps.Get(ctx, derivedPieceID, pieceSize)
|
||||
// no ps.CloseConn() here, the connection will be closed by
|
||||
// the caller using RangeCloser.Close
|
||||
if err != nil {
|
||||
@ -151,7 +155,11 @@ func (ec *ecClient) Get(ctx context.Context, nodes []*proto.Node, es eestream.Er
|
||||
rrs[rri.i] = rri.rr
|
||||
}
|
||||
}
|
||||
return eestream.Decode(rrs, es, ec.mbm)
|
||||
rr, err = eestream.Decode(rrs, es, ec.mbm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return eestream.Unpad(rr, int(paddedSize-size))
|
||||
}
|
||||
|
||||
func (ec *ecClient) Delete(ctx context.Context, nodes []*proto.Node, pieceID client.PieceID) (err error) {
|
||||
@ -228,3 +236,11 @@ func unique(nodes []*proto.Node) bool {
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func calcPadded(size int64, blockSize int) int64 {
|
||||
mod := size % int64(blockSize)
|
||||
if mod == 0 {
|
||||
return size
|
||||
}
|
||||
return size + int64(blockSize) - mod
|
||||
}
|
||||
|
@ -106,6 +106,15 @@ func TestPut(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
size := 32 * 1024
|
||||
k := 2
|
||||
n := 4
|
||||
fc, err := infectious.NewFEC(k, n)
|
||||
if !assert.NoError(t, err) {
|
||||
return
|
||||
}
|
||||
es := eestream.NewRSScheme(fc, size/n)
|
||||
|
||||
TestLoop:
|
||||
for i, tt := range []struct {
|
||||
nodes []*proto.Node
|
||||
@ -140,7 +149,6 @@ TestLoop:
|
||||
errTag := fmt.Sprintf("Test case #%d", i)
|
||||
|
||||
id := client.NewPieceID()
|
||||
size := 32 * 1024
|
||||
ttl := time.Now()
|
||||
|
||||
errs := make(map[*proto.Node]error, len(tt.nodes))
|
||||
@ -163,12 +171,6 @@ TestLoop:
|
||||
m[n] = ps
|
||||
}
|
||||
}
|
||||
|
||||
fc, err := infectious.NewFEC(2, 4)
|
||||
if !assert.NoError(t, err, errTag) {
|
||||
continue
|
||||
}
|
||||
es := eestream.NewRSScheme(fc, size/4)
|
||||
rs, err := eestream.NewRedundancyStrategy(es, tt.min, 0)
|
||||
if !assert.NoError(t, err, errTag) {
|
||||
continue
|
||||
@ -190,6 +192,15 @@ func TestGet(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
size := 32 * 1024
|
||||
k := 2
|
||||
n := 4
|
||||
fc, err := infectious.NewFEC(k, n)
|
||||
if !assert.NoError(t, err) {
|
||||
return
|
||||
}
|
||||
es := eestream.NewRSScheme(fc, size/n)
|
||||
|
||||
TestLoop:
|
||||
for i, tt := range []struct {
|
||||
nodes []*proto.Node
|
||||
@ -218,7 +229,6 @@ TestLoop:
|
||||
errTag := fmt.Sprintf("Test case #%d", i)
|
||||
|
||||
id := client.NewPieceID()
|
||||
size := 32 * 1024
|
||||
|
||||
errs := make(map[*proto.Node]error, len(tt.nodes))
|
||||
for i, n := range tt.nodes {
|
||||
@ -233,17 +243,11 @@ TestLoop:
|
||||
continue TestLoop
|
||||
}
|
||||
ps := NewMockPSClient(ctrl)
|
||||
ps.EXPECT().Get(gomock.Any(), derivedID, int64(size)).Return(
|
||||
ps.EXPECT().Get(gomock.Any(), derivedID, int64(size/k)).Return(
|
||||
ranger.NopCloser(ranger.ByteRanger(nil)), errs[n])
|
||||
m[n] = ps
|
||||
}
|
||||
}
|
||||
|
||||
fc, err := infectious.NewFEC(2, 4)
|
||||
if !assert.NoError(t, err, errTag) {
|
||||
continue
|
||||
}
|
||||
es := eestream.NewRSScheme(fc, size/4)
|
||||
ec := ecClient{d: &mockDialer{m: m}, mbm: tt.mbm}
|
||||
rr, err := ec.Get(ctx, tt.nodes, es, id, int64(size))
|
||||
|
||||
|
@ -196,7 +196,7 @@ func (s *segmentStore) Get(ctx context.Context, path paths.Path) (
|
||||
return nil, Meta{}, Error.Wrap(err)
|
||||
}
|
||||
} else {
|
||||
rr = ranger.NopCloser(ranger.ByteRanger(pr.InlineSegment))
|
||||
rr = ranger.ByteRangeCloser(pr.InlineSegment)
|
||||
}
|
||||
|
||||
return rr, convertMeta(pr), nil
|
||||
|
Loading…
Reference in New Issue
Block a user