storj/pkg/eestream/transform.go

180 lines
5.4 KiB
Go
Raw Normal View History

2018-04-11 14:41:50 +01:00
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package eestream
import (
"bytes"
"context"
2018-04-11 14:41:50 +01:00
"io"
"io/ioutil"
"storj.io/storj/internal/pkg/readcloser"
2018-04-11 14:41:50 +01:00
"storj.io/storj/pkg/ranger"
)
// A Transformer is a data transformation that may change the size of the blocks
// of data it operates on in a deterministic fashion.
type Transformer interface {
InBlockSize() int // The block size prior to transformation
OutBlockSize() int // The block size after transformation
Transform(out, in []byte, blockNum int64) ([]byte, error)
}
type transformedReader struct {
r io.ReadCloser
t Transformer
blockNum int64
inbuf []byte
outbuf []byte
expectedSize int64
bytesRead int
2018-04-11 14:41:50 +01:00
}
Stream encryption (#302) * begin adding encryption for remote pieces * begin adding decryption * add encryption key as arg to Put and Get * move encryption/decryption to object store * Add encryption key to object store constructor * Add the erasure scheme to object store constructor * Ensure decrypter is initialized with the stripe size used by encrypter * Revert "Ensure decrypter is initialized with the stripe size used by encrypter" This reverts commit 07272333f461606edfb43ad106cc152f37a3bd46. * Revert "Add the erasure scheme to object store constructor" This reverts commit ea5e793b536159d993b96e3db69a37c1656a193c. * move encryption to stream store * move decryption stuff to stream store * revert changes in object store * add encryptedBlockSize and close rangers on error during Get * calculate padding sizes correctly * encryptedBlockSize -> encryptionBlockSize * pass encryption key and block size into stream store * remove encryption key and block size from object store constructor * move encrypter/decrypter initialization * remove unnecessary cast * Fix padding issue * Fix linter * add todos * use random encryption key for data encryption. Store an encrypted copy of this key in segment metadata * use different encryption key for each segment * encrypt data in one step if it is small enough * refactor and move encryption stuff * fix errors related to nil slices passed to copy * fix encrypter vs. decrypter bug * put encryption stuff in eestream * get captplanet test to pass * fix linting errors * add types for encryption keys/nonces and clean up * fix tests * more review changes * add Cipher type for encryption stuff * fix rs_test * Simplify type casting of key and nonce * Init starting nonce to the segment index * don't copy derived key * remove default encryption key; force user to explicitly set it * move getSegmentPath to streams package * dont require user to specify encryption key for captplanet * rename GenericKey and GenericNonce to Key and Nonce * review changes * fix linting error * Download uses the encryption type from metadata * Store enc block size in metadata and use it for download
2018-09-26 14:32:23 +01:00
// NoopTransformer is a dummy Transformer that passes data through without modifying it
type NoopTransformer struct{}
// InBlockSize is 1
func (t *NoopTransformer) InBlockSize() int {
return 1
}
// OutBlockSize is 1
func (t *NoopTransformer) OutBlockSize() int {
return 1
}
// Transform returns the input without modification
func (t *NoopTransformer) Transform(out, in []byte, blockNum int64) ([]byte, error) {
return append(out, in...), nil
}
2018-04-11 14:41:50 +01:00
// TransformReader applies a Transformer to a Reader. startingBlockNum should
// probably be 0 unless you know you're already starting at a block offset.
func TransformReader(r io.ReadCloser, t Transformer,
startingBlockNum int64) io.ReadCloser {
2018-04-11 14:41:50 +01:00
return &transformedReader{
r: r,
t: t,
blockNum: startingBlockNum,
inbuf: make([]byte, t.InBlockSize()),
outbuf: make([]byte, 0, t.OutBlockSize()),
}
}
// TransformReaderSize creates a TransformReader with expected size,
// i.e. the number of bytes that is expected to be read from this reader.
// If less than the expected bytes are read, the reader will return
// io.ErrUnexpectedEOF instead of io.EOF.
func TransformReaderSize(r io.ReadCloser, t Transformer,
startingBlockNum int64, expectedSize int64) io.ReadCloser {
return &transformedReader{
r: r,
t: t,
blockNum: startingBlockNum,
inbuf: make([]byte, t.InBlockSize()),
outbuf: make([]byte, 0, t.OutBlockSize()),
expectedSize: expectedSize,
}
}
2018-04-11 14:41:50 +01:00
func (t *transformedReader) Read(p []byte) (n int, err error) {
if len(t.outbuf) <= 0 {
// If there's no more buffered data left, let's fill the buffer with
// the next block
b, err := io.ReadFull(t.r, t.inbuf)
t.bytesRead += b
if err == io.EOF && int64(t.bytesRead) < t.expectedSize {
return 0, io.ErrUnexpectedEOF
} else if err != nil {
2018-04-11 14:41:50 +01:00
return 0, err
}
t.outbuf, err = t.t.Transform(t.outbuf, t.inbuf, t.blockNum)
if err != nil {
return 0, Error.Wrap(err)
}
t.blockNum++
}
// return as much as we can from the current buffered block
n = copy(p, t.outbuf)
// slide the uncopied data to the beginning of the buffer
copy(t.outbuf, t.outbuf[n:])
// resize the buffer
t.outbuf = t.outbuf[:len(t.outbuf)-n]
return n, nil
}
func (t *transformedReader) Close() error {
return t.r.Close()
}
2018-04-11 14:41:50 +01:00
type transformedRanger struct {
rr ranger.Ranger
2018-04-11 14:41:50 +01:00
t Transformer
}
// Transform will apply a Transformer to a Ranger.
func Transform(rr ranger.Ranger, t Transformer) (ranger.Ranger, error) {
2018-04-11 14:41:50 +01:00
if rr.Size()%int64(t.InBlockSize()) != 0 {
return nil, Error.New("invalid transformer and range reader combination." +
"the range reader size is not a multiple of the block size")
}
return &transformedRanger{rr: rr, t: t}, nil
}
func (t *transformedRanger) Size() int64 {
blocks := t.rr.Size() / int64(t.t.InBlockSize())
return blocks * int64(t.t.OutBlockSize())
}
// calcEncompassingBlocks is a useful helper function that, given an offset,
// length, and blockSize, will tell you which blocks contain the requested
// offset and length
func calcEncompassingBlocks(offset, length int64, blockSize int) (
firstBlock, blockCount int64) {
firstBlock = offset / int64(blockSize)
if length <= 0 {
return firstBlock, 0
}
lastBlock := (offset + length) / int64(blockSize)
if (offset+length)%int64(blockSize) == 0 {
return firstBlock, lastBlock - firstBlock
}
return firstBlock, 1 + lastBlock - firstBlock
}
func (t *transformedRanger) Range(ctx context.Context, offset, length int64) (io.ReadCloser, error) {
2018-04-11 14:41:50 +01:00
// Range may not have been called for block-aligned offsets and lengths, so
// let's figure out which blocks encompass the request
firstBlock, blockCount := calcEncompassingBlocks(
offset, length, t.t.OutBlockSize())
// If block count is 0, there is nothing to transform, so return a dumb
// reader that will just return io.EOF on read
if blockCount == 0 {
return ioutil.NopCloser(bytes.NewReader(nil)), nil
}
2018-04-11 14:41:50 +01:00
// okay, now let's get the range on the underlying ranger for those blocks
// and then Transform it.
r, err := t.rr.Range(ctx,
firstBlock*int64(t.t.InBlockSize()),
blockCount*int64(t.t.InBlockSize()))
if err != nil {
return nil, err
}
tr := TransformReaderSize(r, t.t, firstBlock, blockCount*int64(t.t.InBlockSize()))
2018-04-11 14:41:50 +01:00
// the range we got potentially includes more than we wanted. if the
// offset started past the beginning of the first block, we need to
// swallow the first few bytes
_, err = io.CopyN(ioutil.Discard, tr,
2018-04-11 14:41:50 +01:00
offset-firstBlock*int64(t.t.OutBlockSize()))
if err != nil {
if err == io.EOF {
return nil, io.ErrUnexpectedEOF
2018-04-11 14:41:50 +01:00
}
return nil, Error.Wrap(err)
2018-04-11 14:41:50 +01:00
}
// the range might have been too long. only return what was requested
return readcloser.LimitReadCloser(tr, length), nil
2018-04-11 14:41:50 +01:00
}