Merge pull request #1 from jtolds/master

merge eestream
This commit is contained in:
Kaloyan Raev 2018-04-11 22:32:06 +03:00 committed by GitHub
commit 6c2c4b92e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 1048 additions and 0 deletions

11
pkg/eestream/common.go Normal file
View File

@ -0,0 +1,11 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package eestream
import (
"github.com/zeebo/errs"
)
// Error is the default eestream errs class
var Error = errs.Class("eestream error")

89
pkg/eestream/crc_test.go Normal file
View File

@ -0,0 +1,89 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package eestream
import (
"encoding/binary"
"hash/crc32"
"storj.io/storj/pkg/ranger"
)
const (
crcBlockSize = 64 // this could literally be whatever
uint64Size = 8
)
// crcAdder is a Transformer that is going to add a block number and a crc to
// the end of each block
type crcAdder struct {
Table *crc32.Table
}
func newCRCAdder(t *crc32.Table) *crcAdder {
return &crcAdder{Table: t}
}
func (c *crcAdder) InBlockSize() int { return crcBlockSize }
func (c *crcAdder) OutBlockSize() int {
return crcBlockSize + uint32Size + uint64Size
}
func (c *crcAdder) Transform(out, in []byte, blockOffset int64) (
[]byte, error) {
// we're just going to take the input data, then add the block number,
// big-endian encoded, then add the big-endian crc of the input + block
// number.
out = append(out, in...)
var buf [uint64Size]byte
binary.BigEndian.PutUint64(buf[:], uint64(blockOffset))
out = append(out, buf[:]...)
binary.BigEndian.PutUint32(buf[:uint32Size], crc32.Checksum(out, c.Table))
out = append(out, buf[:uint32Size]...)
return out, nil
}
// crcChecker is a Transformer that validates a given CRC and compares the
// block number, then removes them from the input, returning the original
// unchecked input.
type crcChecker struct {
Table *crc32.Table
}
func newCRCChecker(t *crc32.Table) *crcChecker {
return &crcChecker{Table: t}
}
func (c *crcChecker) InBlockSize() int {
return crcBlockSize + uint32Size + uint64Size
}
func (c *crcChecker) OutBlockSize() int { return crcBlockSize }
func (c *crcChecker) Transform(out, in []byte, blockOffset int64) (
[]byte, error) {
bs := c.OutBlockSize()
// first check the crc
if binary.BigEndian.Uint32(in[bs+uint64Size:bs+uint64Size+uint32Size]) !=
crc32.Checksum(in[:bs+uint64Size], c.Table) {
return nil, Error.New("crc check mismatch")
}
// then check the block offset
if binary.BigEndian.Uint64(in[bs:bs+uint64Size]) != uint64(blockOffset) {
return nil, Error.New("block offset mismatch")
}
return append(out, in[:bs]...), nil
}
// addCRC is a Ranger constructor, given a specific crc table and an existing
// un-crced Ranger
func addCRC(data ranger.Ranger, tab *crc32.Table) (ranger.Ranger, error) {
return Transform(data, newCRCAdder(tab))
}
// checkCRC is a Ranger constructor, given a specific crc table and an existing
// crced Ranger
func checkCRC(data ranger.Ranger, tab *crc32.Table) (ranger.Ranger, error) {
return Transform(data, newCRCChecker(tab))
}

149
pkg/eestream/decode.go Normal file
View File

@ -0,0 +1,149 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package eestream
import (
"io"
"io/ioutil"
"storj.io/storj/pkg/ranger"
)
type decodedReader struct {
rs map[int]io.Reader
es ErasureScheme
inbufs map[int][]byte
outbuf []byte
err error
}
// DecodeReaders takes a map of readers and an ErasureScheme returning a
// combined Reader. The map, 'rs', must be a mapping of erasure piece numbers
// to erasure piece streams.
func DecodeReaders(rs map[int]io.Reader, es ErasureScheme) io.Reader {
dr := &decodedReader{
rs: rs,
es: es,
inbufs: make(map[int][]byte, len(rs)),
outbuf: make([]byte, 0, es.DecodedBlockSize()),
}
for i := range rs {
dr.inbufs[i] = make([]byte, es.EncodedBlockSize())
}
return dr
}
func (dr *decodedReader) Read(p []byte) (n int, err error) {
if len(dr.outbuf) <= 0 {
// if the output buffer is empty, let's fill it again
// if we've already had an error, fail
if dr.err != nil {
return 0, err
}
// we're going to kick off a bunch of goroutines. make a
// channel to catch those goroutine errors. importantly,
// the channel has a buffer size to contain all the errors
// even if we read none, so we can return without receiving
// every channel value
errs := make(chan error, len(dr.rs))
for i := range dr.rs {
go func(i int) {
// fill the buffer from the piece input
_, err := io.ReadFull(dr.rs[i], dr.inbufs[i])
errs <- err
}(i)
}
// catch all the errors
for range dr.rs {
err := <-errs
if err != nil {
// return on the first failure
dr.err = err
return 0, err
}
}
// we have all the input buffers, fill the decoded output buffer
dr.outbuf, err = dr.es.Decode(dr.outbuf, dr.inbufs)
if err != nil {
return 0, err
}
}
// copy what data we have to the output
n = copy(p, dr.outbuf)
// slide the remaining bytes to the beginning
copy(dr.outbuf, dr.outbuf[n:])
// shrink the remaining buffer
dr.outbuf = dr.outbuf[:len(dr.outbuf)-n]
return n, nil
}
type decodedRanger struct {
es ErasureScheme
rrs map[int]ranger.Ranger
inSize int64
}
// Decode takes a map of Rangers and an ErasureSchema and returns a combined
// Ranger. The map, 'rrs', must be a mapping of erasure piece numbers
// to erasure piece rangers.
func Decode(rrs map[int]ranger.Ranger, es ErasureScheme) (
ranger.Ranger, error) {
size := int64(-1)
for _, rr := range rrs {
if size == -1 {
size = rr.Size()
} else {
if size != rr.Size() {
return nil, Error.New("decode failure: range reader sizes don't " +
"all match")
}
}
}
if size == -1 {
return ranger.ByteRanger(nil), nil
}
if size%int64(es.EncodedBlockSize()) != 0 {
return nil, Error.New("invalid erasure decoder and range reader combo. " +
"range reader size must be a multiple of erasure encoder block size")
}
if len(rrs) < es.RequiredCount() {
return nil, Error.New("not enough readers to reconstruct data!")
}
return &decodedRanger{
es: es,
rrs: rrs,
inSize: size,
}, nil
}
func (dr *decodedRanger) Size() int64 {
blocks := dr.inSize / int64(dr.es.EncodedBlockSize())
return blocks * int64(dr.es.DecodedBlockSize())
}
func (dr *decodedRanger) Range(offset, length int64) io.Reader {
// offset and length might not be block-aligned. figure out which
// blocks contain this request
firstBlock, blockCount := calcEncompassingBlocks(
offset, length, dr.es.DecodedBlockSize())
// go ask for ranges for all those block boundaries
readers := make(map[int]io.Reader, len(dr.rrs))
for i, rr := range dr.rrs {
readers[i] = rr.Range(
firstBlock*int64(dr.es.EncodedBlockSize()),
blockCount*int64(dr.es.EncodedBlockSize()))
}
// decode from all those ranges
r := DecodeReaders(readers, dr.es)
// offset might start a few bytes in, potentially discard the initial bytes
_, err := io.CopyN(ioutil.Discard, r,
offset-firstBlock*int64(dr.es.DecodedBlockSize()))
if err != nil {
return ranger.FatalReader(Error.Wrap(err))
}
// length might not have included all of the blocks, limit what we return
return io.LimitReader(r, length)
}

192
pkg/eestream/encode.go Normal file
View File

@ -0,0 +1,192 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package eestream
import (
"io"
"io/ioutil"
"sync"
"storj.io/storj/pkg/ranger"
)
// ErasureScheme represents the general format of any erasure scheme algorithm.
// If this interface can be implemented, the rest of this library will work
// with it.
type ErasureScheme interface {
// Encode will take 'in' and call 'out' with erasure coded pieces.
Encode(in []byte, out func(num int, data []byte)) error
// Decode will take a mapping of available erasure coded piece num -> data,
// 'in', and append the combined data to 'out', returning it.
Decode(out []byte, in map[int][]byte) ([]byte, error)
// EncodedBlockSize is the size the erasure coded pieces should be that come
// from Encode and are passed to Decode.
EncodedBlockSize() int
// DecodedBlockSize is the size the combined file blocks that should be
// passed in to Encode and will come from Decode.
DecodedBlockSize() int
// Encode will generate this many pieces
TotalCount() int
// Decode requires at least this many pieces
RequiredCount() int
}
type encodedReader struct {
r io.Reader
es ErasureScheme
cv *sync.Cond
inbuf []byte
outbufs [][]byte
piecesRemaining int
err error
}
// EncodeReader will take a Reader and an ErasureScheme and return a slice of
// Readers
func EncodeReader(r io.Reader, es ErasureScheme) []io.Reader {
er := &encodedReader{
r: r,
es: es,
cv: sync.NewCond(&sync.Mutex{}),
inbuf: make([]byte, es.DecodedBlockSize()),
outbufs: make([][]byte, es.TotalCount()),
}
readers := make([]io.Reader, 0, es.TotalCount())
for i := 0; i < es.TotalCount(); i++ {
er.outbufs[i] = make([]byte, 0, es.EncodedBlockSize())
readers = append(readers, &encodedPiece{
er: er,
i: i,
})
}
return readers
}
func (er *encodedReader) wait() (err error) {
// have we already failed? just return that
if er.err != nil {
return er.err
}
// are other pieces still using buffer? wait on a condition variable for
// the last remaining piece to fill all the buffers.
if er.piecesRemaining > 0 {
er.cv.Wait()
// whoever broadcast a wakeup either set an error or filled the buffers.
// er.err might be nil, which means the buffers are filled.
return er.err
}
// we are going to set an error or fill the buffers
defer er.cv.Broadcast()
defer func() {
// at the end of this function, if we're returning an error, set er.err
if err != nil {
er.err = err
}
}()
_, err = io.ReadFull(er.r, er.inbuf)
if err != nil {
return err
}
err = er.es.Encode(er.inbuf, func(num int, data []byte) {
er.outbufs[num] = append(er.outbufs[num], data...)
})
if err != nil {
return err
}
// reset piecesRemaining
er.piecesRemaining = er.es.TotalCount()
return nil
}
type encodedPiece struct {
er *encodedReader
i int
}
func (ep *encodedPiece) Read(p []byte) (n int, err error) {
// lock! threadsafety matters here
ep.er.cv.L.Lock()
defer ep.er.cv.L.Unlock()
outbufs, i := ep.er.outbufs, ep.i
if len(outbufs[i]) <= 0 {
// if we don't have any buffered result yet, wait until we do
err := ep.er.wait()
if err != nil {
return 0, err
}
}
// we have some buffer remaining for this piece. write it to the output
n = copy(p, outbufs[i])
// slide the unused (if any) bytes to the beginning of the buffer
copy(outbufs[i], outbufs[i][n:])
// and shrink the buffer
outbufs[i] = outbufs[i][:len(outbufs[i])-n]
// if there's nothing left, decrement the amount of pieces we have
if len(outbufs[i]) <= 0 {
ep.er.piecesRemaining--
}
return n, nil
}
// EncodedRanger will take an existing Ranger and provide a means to get
// multiple Ranged sub-Readers. EncodedRanger does not match the normal Ranger
// interface.
type EncodedRanger struct {
es ErasureScheme
rr ranger.Ranger
}
// NewEncodedRanger creates an EncodedRanger
func NewEncodedRanger(rr ranger.Ranger, es ErasureScheme) (*EncodedRanger,
error) {
if rr.Size()%int64(es.DecodedBlockSize()) != 0 {
return nil, Error.New("invalid erasure encoder and range reader combo. " +
"range reader size must be a multiple of erasure encoder block size")
}
return &EncodedRanger{
es: es,
rr: rr,
}, nil
}
// OutputSize is like Ranger.Size but returns the Size of the erasure encoded
// pieces that come out.
func (er *EncodedRanger) OutputSize() int64 {
blocks := er.rr.Size() / int64(er.es.DecodedBlockSize())
return blocks * int64(er.es.EncodedBlockSize())
}
// Range is like Ranger.Range, but returns a slice of Readers
func (er *EncodedRanger) Range(offset, length int64) ([]io.Reader, error) {
// the offset and length given may not be block-aligned, so let's figure
// out which blocks contain the request.
firstBlock, blockCount := calcEncompassingBlocks(
offset, length, er.es.EncodedBlockSize())
// okay, now let's encode the reader for the range containing the blocks
readers := EncodeReader(er.rr.Range(
firstBlock*int64(er.es.DecodedBlockSize()),
blockCount*int64(er.es.DecodedBlockSize())), er.es)
for i, r := range readers {
// the offset might start a few bytes in, so we potentially have to
// discard the beginning bytes
_, err := io.CopyN(ioutil.Discard, r,
offset-firstBlock*int64(er.es.EncodedBlockSize()))
if err != nil {
return nil, Error.Wrap(err)
}
// the length might be shorter than a multiple of the block size, so
// limit it
readers[i] = io.LimitReader(r, length)
}
return readers, nil
}

78
pkg/eestream/pad.go Normal file
View File

@ -0,0 +1,78 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package eestream
import (
"bytes"
"encoding/binary"
"io"
"storj.io/storj/pkg/ranger"
)
const (
uint32Size = 4
)
func makePadding(dataLen int64, blockSize int) []byte {
amount := dataLen + uint32Size
r := amount % int64(blockSize)
padding := uint32Size
if r > 0 {
padding += blockSize - int(r)
}
paddingBytes := bytes.Repeat([]byte{0}, padding)
binary.BigEndian.PutUint32(paddingBytes[padding-uint32Size:], uint32(padding))
return paddingBytes
}
// Pad takes a Ranger and returns another Ranger that is a multiple of
// blockSize in length. The return value padding is a convenience to report how
// much padding was added.
func Pad(data ranger.Ranger, blockSize int) (
rr ranger.Ranger, padding int) {
paddingBytes := makePadding(data.Size(), blockSize)
return ranger.Concat(data, ranger.ByteRanger(paddingBytes)), len(paddingBytes)
}
// Unpad takes a previously padded Ranger data source and returns an unpadded
// ranger, given the amount of padding. This is preferable to UnpadSlow if you
// can swing it.
func Unpad(data ranger.Ranger, padding int) (ranger.Ranger, error) {
return ranger.Subrange(data, 0, data.Size()-int64(padding))
}
// UnpadSlow is like Unpad, but does not require the amount of padding.
// UnpadSlow will have to do extra work to make up for this missing information.
func UnpadSlow(data ranger.Ranger) (ranger.Ranger, error) {
var p [uint32Size]byte
_, err := io.ReadFull(data.Range(data.Size()-uint32Size, uint32Size), p[:])
if err != nil {
return nil, Error.Wrap(err)
}
return Unpad(data, int(binary.BigEndian.Uint32(p[:])))
}
// PadReader is like Pad but works on a basic Reader instead of a Ranger.
func PadReader(data io.Reader, blockSize int) io.Reader {
cr := newCountingReader(data)
return io.MultiReader(cr, ranger.LazyReader(func() io.Reader {
return bytes.NewReader(makePadding(cr.N, blockSize))
}))
}
type countingReader struct {
R io.Reader
N int64
}
func newCountingReader(r io.Reader) *countingReader {
return &countingReader{R: r}
}
func (cr *countingReader) Read(p []byte) (n int, err error) {
n, err = cr.R.Read(p)
cr.N += int64(n)
return n, err
}

63
pkg/eestream/pad_test.go Normal file
View File

@ -0,0 +1,63 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package eestream
import (
"bytes"
"io/ioutil"
"strings"
"testing"
"storj.io/storj/pkg/ranger"
)
func TestPad(t *testing.T) {
for examplenum, example := range []struct {
data string
blockSize int
padding int
}{
{"abcdef", 24, 24 - 6},
{"abcdef", 6, 6},
{"abcdef", 7, 8},
{"abcdef", 8, 10},
{"abcdef", 9, 12},
{"abcdef", 10, 4},
{"abcdef", 11, 5},
{"abcdef", 11, 5},
{"abcde", 7, 9},
{"abcdefg", 7, 7},
{"abcdef", 512, 506},
{"abcdef", 32 * 1024, 32*1024 - 6},
{"", 32 * 1024, 32 * 1024},
{strings.Repeat("\x00", 16*1024), 32 * 1024, 16 * 1024},
{strings.Repeat("\x00", 32*1024+1), 32 * 1024, 32*1024 - 1},
} {
padded, padding := Pad(ranger.ByteRanger([]byte(example.data)),
example.blockSize)
if padding != example.padding {
t.Fatalf("invalid padding: %d, %v != %v", examplenum,
padding, example.padding)
}
if int64(padding+len(example.data)) != padded.Size() {
t.Fatalf("invalid padding")
}
unpadded, err := Unpad(padded, padding)
data, err := ioutil.ReadAll(unpadded.Range(0, unpadded.Size()))
if err != nil {
t.Fatalf("unexpected error")
}
if !bytes.Equal(data, []byte(example.data)) {
t.Fatalf("mismatch")
}
unpadded, err = UnpadSlow(padded)
data, err = ioutil.ReadAll(unpadded.Range(0, unpadded.Size()))
if err != nil {
t.Fatalf("unexpected error")
}
if !bytes.Equal(data, []byte(example.data)) {
t.Fatalf("mismatch")
}
}
}

49
pkg/eestream/rs.go Normal file
View File

@ -0,0 +1,49 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package eestream
import (
"github.com/vivint/infectious"
)
type rsScheme struct {
fc *infectious.FEC
blockSize int
}
// NewRSScheme returns a Reed-Solomon-based ErasureScheme.
func NewRSScheme(fc *infectious.FEC, blockSize int) ErasureScheme {
return &rsScheme{fc: fc, blockSize: blockSize}
}
func (s *rsScheme) Encode(input []byte, output func(num int, data []byte)) (
err error) {
return s.fc.Encode(input, func(s infectious.Share) {
output(s.Number, s.Data)
})
}
func (s *rsScheme) Decode(out []byte, in map[int][]byte) ([]byte, error) {
shares := make([]infectious.Share, 0, len(in))
for num, data := range in {
shares = append(shares, infectious.Share{Number: num, Data: data})
}
return s.fc.Decode(out, shares)
}
func (s *rsScheme) EncodedBlockSize() int {
return s.blockSize
}
func (s *rsScheme) DecodedBlockSize() int {
return s.blockSize * s.fc.Required()
}
func (s *rsScheme) TotalCount() int {
return s.fc.Total()
}
func (s *rsScheme) RequiredCount() int {
return s.fc.Required()
}

34
pkg/eestream/rs_test.go Normal file
View File

@ -0,0 +1,34 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package eestream
import (
"bytes"
"io"
"io/ioutil"
"testing"
"github.com/vivint/infectious"
)
func TestRS(t *testing.T) {
data := randData(32 * 1024)
fc, err := infectious.NewFEC(2, 4)
if err != nil {
t.Fatal(err)
}
rs := NewRSScheme(fc, 8*1024)
readers := EncodeReader(bytes.NewReader(data), rs)
readerMap := make(map[int]io.Reader, len(readers))
for i, reader := range readers {
readerMap[i] = reader
}
data2, err := ioutil.ReadAll(DecodeReaders(readerMap, rs))
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(data, data2) {
t.Fatalf("rs encode/decode failed")
}
}

88
pkg/eestream/secretbox.go Normal file
View File

@ -0,0 +1,88 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package eestream
import (
"encoding/binary"
"golang.org/x/crypto/nacl/secretbox"
)
type secretboxEncrypter struct {
blockSize int
key [32]byte
}
func setKey(dst *[32]byte, key []byte) error {
if len((*dst)[:]) != len(key) {
return Error.New("invalid key length, expected %d", len((*dst)[:]))
}
copy((*dst)[:], key)
return nil
}
// NewSecretboxEncrypter returns a Transformer that encrypts the data passing
// through with key.
func NewSecretboxEncrypter(key []byte, encryptedBlockSize int) (
Transformer, error) {
if encryptedBlockSize <= secretbox.Overhead {
return nil, Error.New("block size too small")
}
rv := &secretboxEncrypter{blockSize: encryptedBlockSize - secretbox.Overhead}
return rv, setKey(&rv.key, key)
}
func (s *secretboxEncrypter) InBlockSize() int {
return s.blockSize
}
func (s *secretboxEncrypter) OutBlockSize() int {
return s.blockSize + secretbox.Overhead
}
func calcNonce(blockNum int64) *[24]byte {
var buf [uint32Size]byte
binary.BigEndian.PutUint32(buf[:], uint32(blockNum))
var nonce [24]byte
copy(nonce[:], buf[1:])
return &nonce
}
func (s *secretboxEncrypter) Transform(out, in []byte, blockNum int64) (
[]byte, error) {
return secretbox.Seal(out, in, calcNonce(blockNum), &s.key), nil
}
type secretboxDecrypter struct {
blockSize int
key [32]byte
}
// NewSecretboxDecrypter returns a Transformer that decrypts the data passing
// through with key.
func NewSecretboxDecrypter(key []byte, encryptedBlockSize int) (
Transformer, error) {
if encryptedBlockSize <= secretbox.Overhead {
return nil, Error.New("block size too small")
}
rv := &secretboxDecrypter{blockSize: encryptedBlockSize - secretbox.Overhead}
return rv, setKey(&rv.key, key)
}
func (s *secretboxDecrypter) InBlockSize() int {
return s.blockSize + secretbox.Overhead
}
func (s *secretboxDecrypter) OutBlockSize() int {
return s.blockSize
}
func (s *secretboxDecrypter) Transform(out, in []byte, blockNum int64) (
[]byte, error) {
rv, success := secretbox.Open(out, in, calcNonce(blockNum), &s.key)
if !success {
return nil, Error.New("failed decrypting")
}
return rv, nil
}

View File

@ -0,0 +1,43 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package eestream
import (
"bytes"
"crypto/rand"
"io/ioutil"
"testing"
)
func randData(amount int) []byte {
buf := make([]byte, amount)
_, err := rand.Read(buf)
if err != nil {
panic(err)
}
return buf
}
func TestSecretbox(t *testing.T) {
key := randData(32)
encrypter, err := NewSecretboxEncrypter(key, 4*1024)
if err != nil {
t.Fatal(err)
}
data := randData(encrypter.InBlockSize() * 10)
encrypted := TransformReader(bytes.NewReader(data),
encrypter, 0)
decrypter, err := NewSecretboxDecrypter(key, 4*1024)
if err != nil {
t.Fatal(err)
}
decrypted := TransformReader(encrypted, decrypter, 0)
data2, err := ioutil.ReadAll(decrypted)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(data, data2) {
t.Fatalf("encryption/decryption failed")
}
}

126
pkg/eestream/transform.go Normal file
View File

@ -0,0 +1,126 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package eestream
import (
"bytes"
"io"
"io/ioutil"
"storj.io/storj/pkg/ranger"
)
// A Transformer is a data transformation that may change the size of the blocks
// of data it operates on in a deterministic fashion.
type Transformer interface {
InBlockSize() int // The block size prior to transformation
OutBlockSize() int // The block size after transformation
Transform(out, in []byte, blockNum int64) ([]byte, error)
}
type transformedReader struct {
r io.Reader
t Transformer
blockNum int64
inbuf []byte
outbuf []byte
}
// TransformReader applies a Transformer to a Reader. startingBlockNum should
// probably be 0 unless you know you're already starting at a block offset.
func TransformReader(r io.Reader, t Transformer,
startingBlockNum int64) io.Reader {
return &transformedReader{
r: r,
t: t,
blockNum: startingBlockNum,
inbuf: make([]byte, t.InBlockSize()),
outbuf: make([]byte, 0, t.OutBlockSize()),
}
}
func (t *transformedReader) Read(p []byte) (n int, err error) {
if len(t.outbuf) <= 0 {
// If there's no more buffered data left, let's fill the buffer with
// the next block
_, err = io.ReadFull(t.r, t.inbuf)
if err != nil {
return 0, err
}
t.outbuf, err = t.t.Transform(t.outbuf, t.inbuf, t.blockNum)
if err != nil {
return 0, Error.Wrap(err)
}
t.blockNum++
}
// return as much as we can from the current buffered block
n = copy(p, t.outbuf)
// slide the uncopied data to the beginning of the buffer
copy(t.outbuf, t.outbuf[n:])
// resize the buffer
t.outbuf = t.outbuf[:len(t.outbuf)-n]
return n, nil
}
type transformedRanger struct {
rr ranger.Ranger
t Transformer
}
// Transform will apply a Transformer to a Ranger.
func Transform(rr ranger.Ranger, t Transformer) (ranger.Ranger, error) {
if rr.Size()%int64(t.InBlockSize()) != 0 {
return nil, Error.New("invalid transformer and range reader combination." +
"the range reader size is not a multiple of the block size")
}
return &transformedRanger{rr: rr, t: t}, nil
}
func (t *transformedRanger) Size() int64 {
blocks := t.rr.Size() / int64(t.t.InBlockSize())
return blocks * int64(t.t.OutBlockSize())
}
// calcEncompassingBlocks is a useful helper function that, given an offset,
// length, and blockSize, will tell you which blocks contain the requested
// offset and length
func calcEncompassingBlocks(offset, length int64, blockSize int) (
firstBlock, blockCount int64) {
firstBlock = offset / int64(blockSize)
if length <= 0 {
return firstBlock, 0
}
lastBlock := (offset + length) / int64(blockSize)
if (offset+length)%int64(blockSize) == 0 {
return firstBlock, lastBlock - firstBlock
}
return firstBlock, 1 + lastBlock - firstBlock
}
func (t *transformedRanger) Range(offset, length int64) io.Reader {
// Range may not have been called for block-aligned offsets and lengths, so
// let's figure out which blocks encompass the request
firstBlock, blockCount := calcEncompassingBlocks(
offset, length, t.t.OutBlockSize())
// okay, now let's get the range on the underlying ranger for those blocks
// and then Transform it.
r := TransformReader(
t.rr.Range(
firstBlock*int64(t.t.InBlockSize()),
blockCount*int64(t.t.InBlockSize())), t.t, firstBlock)
// the range we got potentially includes more than we wanted. if the
// offset started past the beginning of the first block, we need to
// swallow the first few bytes
_, err := io.CopyN(ioutil.Discard, r,
offset-firstBlock*int64(t.t.OutBlockSize()))
if err != nil {
if err == io.EOF {
return bytes.NewReader(nil)
}
return ranger.FatalReader(Error.Wrap(err))
}
// the range might have been too long. only return what was requested
return io.LimitReader(r, length)
}

View File

@ -0,0 +1,126 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package eestream
import (
"bytes"
"hash/crc32"
"io/ioutil"
"testing"
"storj.io/storj/pkg/ranger"
)
func TestCalcEncompassingBlocks(t *testing.T) {
for _, example := range []struct {
blockSize int
offset, length, firstBlock, blockCount int64
}{
{2, 3, 4, 1, 3},
{4, 0, 0, 0, 0},
{4, 0, 1, 0, 1},
{4, 0, 2, 0, 1},
{4, 0, 3, 0, 1},
{4, 0, 4, 0, 1},
{4, 0, 5, 0, 2},
{4, 0, 6, 0, 2},
{4, 1, 0, 0, 0},
{4, 1, 1, 0, 1},
{4, 1, 2, 0, 1},
{4, 1, 3, 0, 1},
{4, 1, 4, 0, 2},
{4, 1, 5, 0, 2},
{4, 1, 6, 0, 2},
{4, 2, 0, 0, 0},
{4, 2, 1, 0, 1},
{4, 2, 2, 0, 1},
{4, 2, 3, 0, 2},
{4, 2, 4, 0, 2},
{4, 2, 5, 0, 2},
{4, 2, 6, 0, 2},
{4, 3, 0, 0, 0},
{4, 3, 1, 0, 1},
{4, 3, 2, 0, 2},
{4, 3, 3, 0, 2},
{4, 3, 4, 0, 2},
{4, 3, 5, 0, 2},
{4, 3, 6, 0, 3},
{4, 4, 0, 1, 0},
{4, 4, 1, 1, 1},
{4, 4, 2, 1, 1},
{4, 4, 3, 1, 1},
{4, 4, 4, 1, 1},
{4, 4, 5, 1, 2},
{4, 4, 6, 1, 2},
} {
first, count := calcEncompassingBlocks(
example.offset, example.length, example.blockSize)
if first != example.firstBlock || count != example.blockCount {
t.Fatalf("invalid calculation for %#v: %v %v", example, first, count)
}
}
}
func TestCRC(t *testing.T) {
const blocks = 3
rr, err := addCRC(ranger.ByteRanger(bytes.Repeat([]byte{0}, blocks*64)),
crc32.IEEETable)
if err != nil {
t.Fatalf("unexpected: %v", err)
}
if rr.Size() != blocks*(64+4+8) {
t.Fatalf("invalid crc padded size")
}
data, err := ioutil.ReadAll(rr.Range(0, rr.Size()))
if err != nil || int64(len(data)) != rr.Size() {
t.Fatalf("unexpected: %v", err)
}
rr, err = checkCRC(ranger.ByteRanger(data), crc32.IEEETable)
if err != nil {
t.Fatalf("unexpected: %v", err)
}
if rr.Size() != blocks*64 {
t.Fatalf("invalid crc padded size")
}
data, err = ioutil.ReadAll(rr.Range(0, rr.Size()))
if err != nil {
t.Fatalf("unexpected: %v", err)
}
if !bytes.Equal(data, bytes.Repeat([]byte{0}, blocks*64)) {
t.Fatalf("invalid data")
}
}
func TestCRCSubranges(t *testing.T) {
const blocks = 3
data := bytes.Repeat([]byte{0, 1, 2}, blocks*64)
internal, err := addCRC(ranger.ByteRanger(data), crc32.IEEETable)
if err != nil {
t.Fatalf("unexpected: %v", err)
}
external, err := checkCRC(internal, crc32.IEEETable)
if err != nil {
t.Fatalf("unexpected: %v", err)
}
if external.Size() != int64(len(data)) {
t.Fatalf("wrong size")
}
for i := 0; i < len(data); i++ {
for j := i; j < len(data); j++ {
read, err := ioutil.ReadAll(external.Range(int64(i), int64(j-i)))
if err != nil {
t.Fatalf("unexpected: %v", err)
}
if !bytes.Equal(read, data[i:j]) {
t.Fatalf("bad subrange")
}
}
}
}