diff --git a/examples/eestream/serve-collected/main.go b/examples/eestream/serve-collected/main.go new file mode 100644 index 000000000..c3cd6ea24 --- /dev/null +++ b/examples/eestream/serve-collected/main.go @@ -0,0 +1,74 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package main + +import ( + "crypto/sha256" + "flag" + "fmt" + "net/http" + "os" + "time" + + "github.com/vivint/infectious" + + "storj.io/storj/pkg/eestream" + "storj.io/storj/pkg/ranger" +) + +var ( + addr = flag.String("addr", "localhost:8080", "address to serve from") + pieceBlockSize = flag.Int("piece_block_size", 4*1024, "block size of pieces") + key = flag.String("key", "a key", "the secret key") + rsk = flag.Int("required", 20, "rs required") + rsn = flag.Int("total", 40, "rs total") +) + +func main() { + err := Main() + if err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } +} + +func Main() error { + encKey := sha256.Sum256([]byte(*key)) + fc, err := infectious.NewFEC(*rsk, *rsn) + if err != nil { + return err + } + es := eestream.NewRSScheme(fc, *pieceBlockSize) + var firstNonce [24]byte + decrypter, err := eestream.NewSecretboxDecrypter( + &encKey, &firstNonce, es.DecodedBlockSize()) + if err != nil { + return err + } + rrs := map[int]ranger.Ranger{} + for i := 0; i < 40; i++ { + url := fmt.Sprintf("http://localhost:%d", 10000+i) + rrs[i], err = ranger.HTTPRanger(url) + if err != nil { + return err + } + } + rr, err := eestream.Decode(rrs, es) + if err != nil { + return err + } + rr, err = eestream.Transform(rr, decrypter) + if err != nil { + return err + } + rr, err = eestream.UnpadSlow(rr) + if err != nil { + return err + } + + return http.ListenAndServe(*addr, http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + ranger.ServeContent(w, r, flag.Arg(0), time.Time{}, rr) + })) +} diff --git a/examples/eestream/serve-pieces/main.go b/examples/eestream/serve-pieces/main.go new file mode 100644 index 000000000..e4903793c --- /dev/null +++ b/examples/eestream/serve-pieces/main.go @@ -0,0 +1,49 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package main + +import ( + "flag" + "fmt" + "io/ioutil" + "net/http" + "os" + "path/filepath" + "strconv" + "strings" +) + +func main() { + flag.Parse() + if flag.Arg(0) == "" { + fmt.Printf("usage: %s \n", os.Args[0]) + os.Exit(1) + } + err := Main() + if err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } +} + +func Main() error { + pieces, err := ioutil.ReadDir(flag.Arg(0)) + if err != nil { + return err + } + for _, piece := range pieces { + pieceNum, err := strconv.Atoi(strings.TrimSuffix(piece.Name(), ".piece")) + if err != nil { + return err + } + pieceAddr := "localhost:" + strconv.Itoa(10000+pieceNum) + piecePath := filepath.Join(flag.Arg(0), piece.Name()) + go http.ListenAndServe(pieceAddr, http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + http.ServeFile(w, r, piecePath) + })) + } + + select {} // sleep forever +} diff --git a/examples/eestream/serve/main.go b/examples/eestream/serve/main.go new file mode 100644 index 000000000..052518daf --- /dev/null +++ b/examples/eestream/serve/main.go @@ -0,0 +1,96 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package main + +import ( + "crypto/sha256" + "flag" + "fmt" + "io/ioutil" + "net/http" + "os" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/vivint/infectious" + + "storj.io/storj/pkg/eestream" + "storj.io/storj/pkg/ranger" +) + +var ( + addr = flag.String("addr", "localhost:8080", "address to serve from") + pieceBlockSize = flag.Int("piece_block_size", 4*1024, "block size of pieces") + key = flag.String("key", "a key", "the secret key") + rsk = flag.Int("required", 20, "rs required") + rsn = flag.Int("total", 40, "rs total") +) + +func main() { + flag.Parse() + if flag.Arg(0) == "" { + fmt.Printf("usage: %s \n", os.Args[0]) + os.Exit(1) + } + err := Main() + if err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } +} + +func Main() error { + encKey := sha256.Sum256([]byte(*key)) + fc, err := infectious.NewFEC(*rsk, *rsn) + if err != nil { + return err + } + es := eestream.NewRSScheme(fc, *pieceBlockSize) + var firstNonce [24]byte + decrypter, err := eestream.NewSecretboxDecrypter( + &encKey, &firstNonce, es.DecodedBlockSize()) + if err != nil { + return err + } + pieces, err := ioutil.ReadDir(flag.Arg(0)) + if err != nil { + return err + } + rrs := map[int]ranger.Ranger{} + for _, piece := range pieces { + piecenum, err := strconv.Atoi(strings.TrimSuffix(piece.Name(), ".piece")) + if err != nil { + return err + } + fh, err := os.Open(filepath.Join(flag.Arg(0), piece.Name())) + if err != nil { + return err + } + defer fh.Close() + fs, err := fh.Stat() + if err != nil { + return err + } + rrs[piecenum] = ranger.ReaderAtRanger(fh, fs.Size()) + } + rr, err := eestream.Decode(rrs, es) + if err != nil { + return err + } + rr, err = eestream.Transform(rr, decrypter) + if err != nil { + return err + } + rr, err = eestream.UnpadSlow(rr) + if err != nil { + return err + } + + return http.ListenAndServe(*addr, http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + ranger.ServeContent(w, r, flag.Arg(0), time.Time{}, rr) + })) +} diff --git a/examples/eestream/store/main.go b/examples/eestream/store/main.go new file mode 100644 index 000000000..aad406915 --- /dev/null +++ b/examples/eestream/store/main.go @@ -0,0 +1,79 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package main + +import ( + "crypto/sha256" + "flag" + "fmt" + "io" + "os" + "path/filepath" + + "github.com/vivint/infectious" + + "storj.io/storj/pkg/eestream" +) + +var ( + pieceBlockSize = flag.Int("piece_block_size", 4*1024, "block size of pieces") + key = flag.String("key", "a key", "the secret key") + rsk = flag.Int("required", 20, "rs required") + rsn = flag.Int("total", 40, "rs total") +) + +func main() { + flag.Parse() + if flag.Arg(0) == "" { + fmt.Printf("usage: cat data | %s \n", os.Args[0]) + os.Exit(1) + } + err := Main() + if err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } +} + +func Main() error { + err := os.MkdirAll(flag.Arg(0), 0755) + if err != nil { + return err + } + fc, err := infectious.NewFEC(*rsk, *rsn) + if err != nil { + return err + } + es := eestream.NewRSScheme(fc, *pieceBlockSize) + encKey := sha256.Sum256([]byte(*key)) + var firstNonce [24]byte + encrypter, err := eestream.NewSecretboxEncrypter( + &encKey, &firstNonce, es.DecodedBlockSize()) + if err != nil { + return err + } + readers := eestream.EncodeReader(eestream.TransformReader( + eestream.PadReader(os.Stdin, encrypter.InBlockSize()), encrypter, 0), es) + errs := make(chan error, len(readers)) + for i := range readers { + go func(i int) { + fh, err := os.Create( + filepath.Join(flag.Arg(0), fmt.Sprintf("%d.piece", i))) + if err != nil { + errs <- err + return + } + defer fh.Close() + _, err = io.Copy(fh, readers[i]) + errs <- err + }(i) + } + for range readers { + err := <-errs + if err != nil { + return err + } + } + return nil +} diff --git a/internal/pkg/readcloser/fatal.go b/internal/pkg/readcloser/fatal.go new file mode 100644 index 000000000..559069d89 --- /dev/null +++ b/internal/pkg/readcloser/fatal.go @@ -0,0 +1,23 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package readcloser + +import "io" + +// FatalReadCloser returns a ReadCloser that always fails with err. +func FatalReadCloser(err error) io.ReadCloser { + return &fatalReadCloser{Err: err} +} + +type fatalReadCloser struct { + Err error +} + +func (f *fatalReadCloser) Read(p []byte) (n int, err error) { + return 0, f.Err +} + +func (f *fatalReadCloser) Close() error { + return nil +} diff --git a/internal/pkg/readcloser/lazy.go b/internal/pkg/readcloser/lazy.go new file mode 100644 index 000000000..bff482b62 --- /dev/null +++ b/internal/pkg/readcloser/lazy.go @@ -0,0 +1,32 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package readcloser + +import "io" + +// LazyReadCloser returns an ReadCloser that doesn't initialize the backing +// Reader until the first Read. +func LazyReadCloser(reader func() io.ReadCloser) io.ReadCloser { + return &lazyReadCloser{fn: reader} +} + +type lazyReadCloser struct { + fn func() io.ReadCloser + r io.ReadCloser +} + +func (l *lazyReadCloser) Read(p []byte) (n int, err error) { + if l.r == nil { + l.r = l.fn() + l.fn = nil + } + return l.r.Read(p) +} + +func (l *lazyReadCloser) Close() error { + if l.r != nil { + return l.r.Close() + } + return nil +} diff --git a/internal/pkg/readcloser/limit.go b/internal/pkg/readcloser/limit.go new file mode 100644 index 000000000..0eb5bfddb --- /dev/null +++ b/internal/pkg/readcloser/limit.go @@ -0,0 +1,25 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package readcloser + +import "io" + +// LimitReadCloser is a LimitReader extension that returns a ReadCloser +// that reads from r but stops with EOF after n bytes. +func LimitReadCloser(r io.ReadCloser, n int64) io.ReadCloser { + return &limitedReadCloser{io.LimitReader(r, n), r} +} + +type limitedReadCloser struct { + R io.Reader + C io.Closer +} + +func (l *limitedReadCloser) Read(p []byte) (n int, err error) { + return l.R.Read(p) +} + +func (l *limitedReadCloser) Close() error { + return l.C.Close() +} diff --git a/internal/pkg/readcloser/multi.go b/internal/pkg/readcloser/multi.go new file mode 100644 index 000000000..1aabf9796 --- /dev/null +++ b/internal/pkg/readcloser/multi.go @@ -0,0 +1,43 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package readcloser + +import "io" + +// MultiReadCloser is a MultiReader extension that returns a ReaderCloser +// that's the logical concatenation of the provided input readers. +// They're read sequentially. Once all inputs have returned EOF, +// Read will return EOF. If any of the readers return a non-nil, +// non-EOF error, Read will return that error. +func MultiReadCloser(readers ...io.ReadCloser) io.ReadCloser { + r := make([]io.Reader, len(readers)) + for i := range readers { + r[i] = readers[i] + } + c := make([]io.Closer, len(readers)) + for i := range readers { + c[i] = readers[i] + } + return &multiReadCloser{io.MultiReader(r...), c} +} + +type multiReadCloser struct { + multireader io.Reader + closers []io.Closer +} + +func (l *multiReadCloser) Read(p []byte) (n int, err error) { + return l.multireader.Read(p) +} + +func (l *multiReadCloser) Close() error { + var firstErr error + for _, c := range l.closers { + err := c.Close() + if err != nil && firstErr == nil { + firstErr = err + } + } + return firstErr +} diff --git a/pkg/eestream/decode.go b/pkg/eestream/decode.go index 0fb695903..e1c3c0719 100644 --- a/pkg/eestream/decode.go +++ b/pkg/eestream/decode.go @@ -7,11 +7,12 @@ import ( "io" "io/ioutil" + "storj.io/storj/internal/pkg/readcloser" "storj.io/storj/pkg/ranger" ) type decodedReader struct { - rs map[int]io.Reader + rs map[int]io.ReadCloser es ErasureScheme inbufs map[int][]byte outbuf []byte @@ -21,7 +22,7 @@ type decodedReader struct { // DecodeReaders takes a map of readers and an ErasureScheme returning a // combined Reader. The map, 'rs', must be a mapping of erasure piece numbers // to erasure piece streams. -func DecodeReaders(rs map[int]io.Reader, es ErasureScheme) io.Reader { +func DecodeReaders(rs map[int]io.ReadCloser, es ErasureScheme) io.ReadCloser { dr := &decodedReader{ rs: rs, es: es, @@ -79,6 +80,17 @@ func (dr *decodedReader) Read(p []byte) (n int, err error) { return n, nil } +func (dr *decodedReader) Close() error { + var firstErr error + for _, c := range dr.rs { + err := c.Close() + if err != nil && firstErr == nil { + firstErr = err + } + } + return firstErr +} + type decodedRanger struct { es ErasureScheme rrs map[int]ranger.Ranger @@ -123,14 +135,14 @@ func (dr *decodedRanger) Size() int64 { return blocks * int64(dr.es.DecodedBlockSize()) } -func (dr *decodedRanger) Range(offset, length int64) io.Reader { +func (dr *decodedRanger) Range(offset, length int64) io.ReadCloser { // offset and length might not be block-aligned. figure out which // blocks contain this request firstBlock, blockCount := calcEncompassingBlocks( offset, length, dr.es.DecodedBlockSize()) // go ask for ranges for all those block boundaries - readers := make(map[int]io.Reader, len(dr.rrs)) + readers := make(map[int]io.ReadCloser, len(dr.rrs)) for i, rr := range dr.rrs { readers[i] = rr.Range( firstBlock*int64(dr.es.EncodedBlockSize()), @@ -142,8 +154,8 @@ func (dr *decodedRanger) Range(offset, length int64) io.Reader { _, err := io.CopyN(ioutil.Discard, r, offset-firstBlock*int64(dr.es.DecodedBlockSize())) if err != nil { - return ranger.FatalReader(Error.Wrap(err)) + return readcloser.FatalReadCloser(Error.Wrap(err)) } // length might not have included all of the blocks, limit what we return - return io.LimitReader(r, length) + return readcloser.LimitReadCloser(r, length) } diff --git a/pkg/eestream/pad.go b/pkg/eestream/pad.go index ecf6b8d28..4c67f7bdf 100644 --- a/pkg/eestream/pad.go +++ b/pkg/eestream/pad.go @@ -7,7 +7,9 @@ import ( "bytes" "encoding/binary" "io" + "io/ioutil" + "storj.io/storj/internal/pkg/readcloser" "storj.io/storj/pkg/ranger" ) @@ -55,19 +57,20 @@ func UnpadSlow(data ranger.Ranger) (ranger.Ranger, error) { } // PadReader is like Pad but works on a basic Reader instead of a Ranger. -func PadReader(data io.Reader, blockSize int) io.Reader { +func PadReader(data io.ReadCloser, blockSize int) io.ReadCloser { cr := newCountingReader(data) - return io.MultiReader(cr, ranger.LazyReader(func() io.Reader { - return bytes.NewReader(makePadding(cr.N, blockSize)) - })) + return readcloser.MultiReadCloser(cr, + readcloser.LazyReadCloser(func() io.ReadCloser { + return ioutil.NopCloser(bytes.NewReader(makePadding(cr.N, blockSize))) + })) } type countingReader struct { - R io.Reader + R io.ReadCloser N int64 } -func newCountingReader(r io.Reader) *countingReader { +func newCountingReader(r io.ReadCloser) *countingReader { return &countingReader{R: r} } @@ -76,3 +79,7 @@ func (cr *countingReader) Read(p []byte) (n int, err error) { cr.N += int64(n) return n, err } + +func (cr *countingReader) Close() error { + return cr.R.Close() +} diff --git a/pkg/eestream/rs_test.go b/pkg/eestream/rs_test.go index 721886f0a..b5b5d6f9f 100644 --- a/pkg/eestream/rs_test.go +++ b/pkg/eestream/rs_test.go @@ -20,9 +20,9 @@ func TestRS(t *testing.T) { } rs := NewRSScheme(fc, 8*1024) readers := EncodeReader(bytes.NewReader(data), rs) - readerMap := make(map[int]io.Reader, len(readers)) + readerMap := make(map[int]io.ReadCloser, len(readers)) for i, reader := range readers { - readerMap[i] = reader + readerMap[i] = ioutil.NopCloser(reader) } data2, err := ioutil.ReadAll(DecodeReaders(readerMap, rs)) if err != nil { diff --git a/pkg/eestream/secretbox_test.go b/pkg/eestream/secretbox_test.go index fc7d67b2a..cc2afe759 100644 --- a/pkg/eestream/secretbox_test.go +++ b/pkg/eestream/secretbox_test.go @@ -29,8 +29,8 @@ func TestSecretbox(t *testing.T) { t.Fatal(err) } data := randData(encrypter.InBlockSize() * 10) - encrypted := TransformReader(bytes.NewReader(data), - encrypter, 0) + encrypted := TransformReader( + ioutil.NopCloser(bytes.NewReader(data)), encrypter, 0) decrypter, err := NewSecretboxDecrypter(&key, &firstNonce, 4*1024) if err != nil { t.Fatal(err) diff --git a/pkg/eestream/transform.go b/pkg/eestream/transform.go index 66a565cf6..2a347b634 100644 --- a/pkg/eestream/transform.go +++ b/pkg/eestream/transform.go @@ -8,6 +8,7 @@ import ( "io" "io/ioutil" + "storj.io/storj/internal/pkg/readcloser" "storj.io/storj/pkg/ranger" ) @@ -20,7 +21,7 @@ type Transformer interface { } type transformedReader struct { - r io.Reader + r io.ReadCloser t Transformer blockNum int64 inbuf []byte @@ -29,8 +30,8 @@ type transformedReader struct { // TransformReader applies a Transformer to a Reader. startingBlockNum should // probably be 0 unless you know you're already starting at a block offset. -func TransformReader(r io.Reader, t Transformer, - startingBlockNum int64) io.Reader { +func TransformReader(r io.ReadCloser, t Transformer, + startingBlockNum int64) io.ReadCloser { return &transformedReader{ r: r, t: t, @@ -64,6 +65,10 @@ func (t *transformedReader) Read(p []byte) (n int, err error) { return n, nil } +func (t *transformedReader) Close() error { + return t.r.Close() +} + type transformedRanger struct { rr ranger.Ranger t Transformer @@ -99,7 +104,7 @@ func calcEncompassingBlocks(offset, length int64, blockSize int) ( return firstBlock, 1 + lastBlock - firstBlock } -func (t *transformedRanger) Range(offset, length int64) io.Reader { +func (t *transformedRanger) Range(offset, length int64) io.ReadCloser { // Range may not have been called for block-aligned offsets and lengths, so // let's figure out which blocks encompass the request firstBlock, blockCount := calcEncompassingBlocks( @@ -117,10 +122,10 @@ func (t *transformedRanger) Range(offset, length int64) io.Reader { offset-firstBlock*int64(t.t.OutBlockSize())) if err != nil { if err == io.EOF { - return bytes.NewReader(nil) + return ioutil.NopCloser(bytes.NewReader(nil)) } - return ranger.FatalReader(Error.Wrap(err)) + return readcloser.FatalReadCloser(Error.Wrap(err)) } // the range might have been too long. only return what was requested - return io.LimitReader(r, length) + return readcloser.LimitReadCloser(r, length) } diff --git a/pkg/ranger/content.go b/pkg/ranger/content.go index 35f4d00d8..a1ffdebaf 100644 --- a/pkg/ranger/content.go +++ b/pkg/ranger/content.go @@ -11,6 +11,7 @@ import ( "errors" "fmt" "io" + "io/ioutil" "mime" "mime/multipart" "net/http" @@ -47,7 +48,9 @@ func ServeContent(w http.ResponseWriter, r *http.Request, name string, amount = sniffLen } // TODO: cache this somewhere so we don't have to pull it out again - n, _ := io.ReadFull(content.Range(0, amount), buf[:]) + r := content.Range(0, amount) + defer r.Close() + n, _ := io.ReadFull(r, buf[:]) ctype = http.DetectContentType(buf[:n]) } w.Header().Set("Content-Type", ctype) @@ -64,7 +67,7 @@ func ServeContent(w http.ResponseWriter, r *http.Request, name string, // handle Content-Range header. sendSize := size - sendContent := func() io.Reader { + sendContent := func() io.ReadCloser { return content.Range(0, size) } @@ -97,7 +100,7 @@ func ServeContent(w http.ResponseWriter, r *http.Request, name string, // A response to a request for a single range MUST NOT // be sent using the multipart/byteranges media type." ra := ranges[0] - sendContent = func() io.Reader { return content.Range(ra.start, ra.length) } + sendContent = func() io.ReadCloser { return content.Range(ra.start, ra.length) } sendSize = ra.length code = http.StatusPartialContent w.Header().Set("Content-Range", ra.contentRange(size)) @@ -109,7 +112,7 @@ func ServeContent(w http.ResponseWriter, r *http.Request, name string, mw := multipart.NewWriter(pw) w.Header().Set("Content-Type", "multipart/byteranges; boundary="+mw.Boundary()) - sendContent = func() io.Reader { return pr } + sendContent = func() io.ReadCloser { return ioutil.NopCloser(pr) } // cause writing goroutine to fail and exit if CopyN doesn't finish. defer pr.Close() go func() { @@ -120,6 +123,7 @@ func ServeContent(w http.ResponseWriter, r *http.Request, name string, return } partReader := content.Range(ra.start, ra.length) + defer partReader.Close() if _, err := io.Copy(part, partReader); err != nil { pw.CloseWithError(err) return @@ -138,7 +142,9 @@ func ServeContent(w http.ResponseWriter, r *http.Request, name string, w.WriteHeader(code) if r.Method != "HEAD" { - io.CopyN(w, sendContent(), sendSize) + r := sendContent() + defer r.Close() + io.CopyN(w, r, sendSize) } } diff --git a/pkg/ranger/http.go b/pkg/ranger/http.go new file mode 100644 index 000000000..a17e2095e --- /dev/null +++ b/pkg/ranger/http.go @@ -0,0 +1,80 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package ranger + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "net/http" + "strconv" + + "storj.io/storj/internal/pkg/readcloser" +) + +type httpRanger struct { + URL string + size int64 +} + +// HTTPRanger turns an HTTP URL into a Ranger +func HTTPRanger(URL string) (Ranger, error) { + resp, err := http.Head(URL) + if err != nil { + return nil, err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return nil, Error.New("unexpected status code: %d (expected %d)", + resp.StatusCode, http.StatusOK) + } + contentLength := resp.Header.Get("Content-Length") + size, err := strconv.Atoi(contentLength) + if err != nil { + return nil, err + } + return &httpRanger{ + URL: URL, + size: int64(size), + }, nil +} + +// Size implements Ranger.Size +func (r *httpRanger) Size() int64 { + return r.size +} + +// Range implements Ranger.Range +func (r *httpRanger) Range(offset, length int64) io.ReadCloser { + if offset < 0 { + return readcloser.FatalReadCloser(Error.New("negative offset")) + } + if length < 0 { + return readcloser.FatalReadCloser(Error.New("negative length")) + } + if offset+length > r.size { + return readcloser.FatalReadCloser(Error.New("range beyond end")) + } + if length == 0 { + return ioutil.NopCloser(bytes.NewReader([]byte{})) + } + client := &http.Client{} + req, err := http.NewRequest("GET", r.URL, nil) + if err != nil { + return readcloser.FatalReadCloser(err) + } + req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+length-1)) + resp, err := client.Do(req) + if err != nil { + return readcloser.FatalReadCloser(err) + } + if resp.StatusCode != http.StatusPartialContent { + resp.Body.Close() + return readcloser.FatalReadCloser( + Error.New("unexpected status code: %d (expected %d)", + resp.StatusCode, http.StatusPartialContent)) + } + return resp.Body +} diff --git a/pkg/ranger/http_test.go b/pkg/ranger/http_test.go new file mode 100644 index 000000000..23e58aee9 --- /dev/null +++ b/pkg/ranger/http_test.go @@ -0,0 +1,58 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package ranger + +import ( + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestHTTPRanger(t *testing.T) { + var content string + ts := httptest.NewServer(http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + http.ServeContent(w, r, "test", time.Now(), strings.NewReader(content)) + })) + defer ts.Close() + + for i, tt := range []struct { + data string + size, offset, length int64 + substr string + errString string + }{ + {"", 0, 0, 0, "", ""}, + {"abcdef", 6, 0, 0, "", ""}, + {"abcdef", 6, 3, 0, "", ""}, + {"abcdef", 6, 0, 6, "abcdef", ""}, + {"abcdef", 6, 0, 5, "abcde", ""}, + {"abcdef", 6, 0, 4, "abcd", ""}, + {"abcdef", 6, 1, 4, "bcde", ""}, + {"abcdef", 6, 2, 4, "cdef", ""}, + {"abcdefg", 7, 1, 4, "bcde", ""}, + {"abcdef", 6, 0, 7, "abcdef", "ranger error: range beyond end"}, + {"abcdef", 6, -1, 7, "abcde", "ranger error: negative offset"}, + {"abcdef", 6, 0, -1, "abcde", "ranger error: negative length"}, + } { + errTag := fmt.Sprintf("Test case #%d", i) + content = tt.data + r, err := HTTPRanger(ts.URL) + assert.Equal(t, tt.size, r.Size(), errTag) + data, err := ioutil.ReadAll(r.Range(tt.offset, tt.length)) + if tt.errString != "" { + assert.EqualError(t, err, tt.errString, errTag) + continue + } + if assert.NoError(t, err, errTag) { + assert.Equal(t, data, []byte(tt.substr), errTag) + } + } +} diff --git a/pkg/ranger/reader.go b/pkg/ranger/reader.go index 9b0401691..6d6968997 100644 --- a/pkg/ranger/reader.go +++ b/pkg/ranger/reader.go @@ -6,6 +6,9 @@ package ranger import ( "bytes" "io" + "io/ioutil" + + "storj.io/storj/internal/pkg/readcloser" ) // A Ranger is a flexible data stream type that allows for more effective @@ -13,20 +16,7 @@ import ( // any subranges. type Ranger interface { Size() int64 - Range(offset, length int64) io.Reader -} - -// FatalReader returns a Reader that always fails with err. -func FatalReader(err error) io.Reader { - return &fatalReader{Err: err} -} - -type fatalReader struct { - Err error -} - -func (f *fatalReader) Read(p []byte) (n int, err error) { - return 0, f.Err + Range(offset, length int64) io.ReadCloser } // ByteRanger turns a byte slice into a Ranger @@ -36,15 +26,18 @@ type ByteRanger []byte func (b ByteRanger) Size() int64 { return int64(len(b)) } // Range implements Ranger.Range -func (b ByteRanger) Range(offset, length int64) io.Reader { +func (b ByteRanger) Range(offset, length int64) io.ReadCloser { if offset < 0 { - return FatalReader(Error.New("negative offset")) + return readcloser.FatalReadCloser(Error.New("negative offset")) + } + if length < 0 { + return readcloser.FatalReadCloser(Error.New("negative length")) } if offset+length > int64(len(b)) { - return FatalReader(Error.New("buffer runoff")) + return readcloser.FatalReadCloser(Error.New("buffer runoff")) } - return bytes.NewReader(b[offset : offset+length]) + return ioutil.NopCloser(bytes.NewReader(b[offset : offset+length])) } type concatReader struct { @@ -56,7 +49,7 @@ func (c *concatReader) Size() int64 { return c.r1.Size() + c.r2.Size() } -func (c *concatReader) Range(offset, length int64) io.Reader { +func (c *concatReader) Range(offset, length int64) io.ReadCloser { r1Size := c.r1.Size() if offset+length <= r1Size { return c.r1.Range(offset, length) @@ -64,9 +57,9 @@ func (c *concatReader) Range(offset, length int64) io.Reader { if offset >= r1Size { return c.r2.Range(offset-r1Size, length) } - return io.MultiReader( + return readcloser.MultiReadCloser( c.r1.Range(offset, r1Size-offset), - LazyReader(func() io.Reader { + readcloser.LazyReadCloser(func() io.ReadCloser { return c.r2.Range(0, length-(r1Size-offset)) })) } @@ -90,25 +83,6 @@ func Concat(r ...Ranger) Ranger { } } -type lazyReader struct { - fn func() io.Reader - r io.Reader -} - -// LazyReader returns an Reader that doesn't initialize the backing Reader -// until the first Read. -func LazyReader(reader func() io.Reader) io.Reader { - return &lazyReader{fn: reader} -} - -func (l *lazyReader) Read(p []byte) (n int, err error) { - if l.r == nil { - l.r = l.fn() - l.fn = nil - } - return l.r.Read(p) -} - type subrange struct { r Ranger offset, length int64 @@ -130,6 +104,6 @@ func (s *subrange) Size() int64 { return s.length } -func (s *subrange) Range(offset, length int64) io.Reader { +func (s *subrange) Range(offset, length int64) io.ReadCloser { return s.r.Range(offset+s.offset, length) } diff --git a/pkg/ranger/reader_test.go b/pkg/ranger/reader_test.go index fb5d449b1..625375b03 100644 --- a/pkg/ranger/reader_test.go +++ b/pkg/ranger/reader_test.go @@ -16,6 +16,9 @@ func TestByteRanger(t *testing.T) { substr string fail bool }{ + {"", 0, 0, 0, "", false}, + {"abcdef", 6, 0, 0, "", false}, + {"abcdef", 6, 3, 0, "", false}, {"abcdef", 6, 0, 6, "abcdef", false}, {"abcdef", 6, 0, 5, "abcde", false}, {"abcdef", 6, 0, 4, "abcd", false}, @@ -24,6 +27,7 @@ func TestByteRanger(t *testing.T) { {"abcdefg", 7, 1, 4, "bcde", false}, {"abcdef", 6, 0, 7, "", true}, {"abcdef", 6, -1, 7, "abcde", true}, + {"abcdef", 6, 0, -1, "abcde", true}, } { r := ByteRanger([]byte(example.data)) if r.Size() != example.size { diff --git a/pkg/ranger/readerat.go b/pkg/ranger/readerat.go index 0e9c1ad11..a09c8fe3a 100644 --- a/pkg/ranger/readerat.go +++ b/pkg/ranger/readerat.go @@ -5,6 +5,8 @@ package ranger import ( "io" + + "storj.io/storj/internal/pkg/readcloser" ) type readerAtRanger struct { @@ -29,12 +31,15 @@ type readerAtReader struct { offset, length int64 } -func (r *readerAtRanger) Range(offset, length int64) io.Reader { +func (r *readerAtRanger) Range(offset, length int64) io.ReadCloser { if offset < 0 { - return FatalReader(Error.New("negative offset")) + return readcloser.FatalReadCloser(Error.New("negative offset")) + } + if length < 0 { + return readcloser.FatalReadCloser(Error.New("negative length")) } if offset+length > r.size { - return FatalReader(Error.New("buffer runoff")) + return readcloser.FatalReadCloser(Error.New("buffer runoff")) } return &readerAtReader{r: r.r, offset: offset, length: length} } @@ -51,3 +56,7 @@ func (r *readerAtReader) Read(p []byte) (n int, err error) { r.length -= int64(n) return n, err } + +func (r *readerAtReader) Close() error { + return nil +}