Moved filepiece into storj (#66)

* Moved filepiece into storj

* Fix linter errors

* Seek comment for linter

* gofmt/golinter accidentally removed import

* Fix small typos

* Use the weird iota. P cool dude ✌️

* Do things the cool way

* Changes requested by kaloyan

* didn't need test main
This commit is contained in:
Alexander Leitner 2018-05-31 15:42:45 -04:00 committed by Kaloyan Raev
parent 4c4cf3c34e
commit 5e97cf7a2e
5 changed files with 439 additions and 30 deletions

54
go.mod
View File

@ -1,30 +1,30 @@
module "storj.io/storj"
module storj.io/storj
require (
"github.com/aleitner/FilePiece" v0.0.0-20180516062859-b020bc25bf96
"github.com/fsnotify/fsnotify" v1.4.7
"github.com/go-redis/redis" v0.0.0-20180417061816-9ccc23344a52
"github.com/gogo/protobuf" v1.0.0
"github.com/golang/protobuf" v1.0.0
"github.com/hashicorp/hcl" v0.0.0-20180404174102-ef8a98b0bbce
"github.com/magiconair/properties" v1.7.6
"github.com/mitchellh/mapstructure" v0.0.0-20180220230111-00c29f56e238
"github.com/pelletier/go-toml" v1.1.0
"github.com/spacemonkeygo/errors" v0.0.0-20171212215202-9064522e9fd1
"github.com/spf13/afero" v1.1.0
"github.com/spf13/cast" v1.2.0
"github.com/spf13/jWalterWeatherman" v0.0.0-20180109140146-7c0cea34c8ec
"github.com/spf13/pflag" v1.0.1
"github.com/spf13/viper" v1.0.2
"github.com/tyler-smith/go-bip39" v0.0.0-20160629163856-8e7a99b3e716
"github.com/urfave/cli" v1.20.0
"github.com/vivint/infectious" v0.0.0-20180418194855-57d6abddc3d4
"github.com/zeebo/errs" v0.1.0
"golang.org/x/crypto" v0.0.0-20180410182641-f70185d77e82
"golang.org/x/net" v0.0.0-20180420171651-5f9ae10d9af5
"golang.org/x/sys" v0.0.0-20180430173509-4adea008a5e5
"golang.org/x/text" v0.3.0
"google.golang.org/genproto" v0.0.0-20180427144745-86e600f69ee4
"google.golang.org/grpc" v1.11.3
"gopkg.in/yaml.v2" v2.2.1
github.com/fsnotify/fsnotify v1.4.7
github.com/go-redis/redis v0.0.0-20180417061816-9ccc23344a52
github.com/gogo/protobuf v1.0.0
github.com/golang/protobuf v1.0.0
github.com/hashicorp/hcl v0.0.0-20180404174102-ef8a98b0bbce
github.com/magiconair/properties v1.7.6
github.com/mitchellh/mapstructure v0.0.0-20180220230111-00c29f56e238
github.com/pelletier/go-toml v1.1.0
github.com/spacemonkeygo/errors v0.0.0-20171212215202-9064522e9fd1
github.com/spf13/afero v1.1.0
github.com/spf13/cast v1.2.0
github.com/spf13/jWalterWeatherman v0.0.0-20180109140146-7c0cea34c8ec
github.com/spf13/pflag v1.0.1
github.com/spf13/viper v1.0.2
github.com/stretchr/testify v1.2.1
github.com/tyler-smith/go-bip39 v0.0.0-20160629163856-8e7a99b3e716
github.com/urfave/cli v1.20.0
github.com/vivint/infectious v0.0.0-20180418194855-57d6abddc3d4
github.com/zeebo/errs v0.1.0
golang.org/x/crypto v0.0.0-20180410182641-f70185d77e82
golang.org/x/net v0.0.0-20180420171651-5f9ae10d9af5
golang.org/x/sys v0.0.0-20180430173509-4adea008a5e5
golang.org/x/text v0.3.0
google.golang.org/genproto v0.0.0-20180427144745-86e600f69ee4
google.golang.org/grpc v1.11.3
gopkg.in/yaml.v2 v2.2.1
)

67
pkg/filepiece/README.md Normal file
View File

@ -0,0 +1,67 @@
# FilePiece
Concurrently read and write from files
## Installation
```BASH
go get storj.io/storj/pkg/filepiece
```
## Usage
```Golang
import "storj.io/storj/pkg/filepiece"
```
### Chunk struct
```Golang
type Chunk struct {
file *os.File
offset int64
length int64
currentPos int64
}
```
* Chunk.file - os.File being read from
* Chunk.offset - starting position for reading/writing data
* Chunk.length - length of data to be read/written
* Chunk.currentPos - Keeps track to know where to write to or read from next
### NewChunk
Create a chunk from a file
```Golang
func NewChunk(file *os.File, offset int64, length int64) (*Chunk, error)
```
### Read
Concurrently read from a file
```Golang
func (f *Chunk) Read(b []byte) (n int, err error)
```
```Golang
func (f *Chunk) ReadAt(p []byte, off int64) (n int, err error)
```
### Write
Concurrently write to a file
```Golang
func (f *Chunk) Write(b []byte) (n int, err error)
```
```Golang
func (f *Chunk) WriteAt(p []byte, off int64) (n int, err error)
```
### Other
Get the size of the Chunk
```Golang
func (f *Chunk) Size() int64
```
Close the Chunk File
```Golang
func (f *Chunk) Close() error
```
Seek to certain position of Chunk
```Golang
func (f *Chunk) Seek(offset int64, whence int) (int64, error)
```

136
pkg/filepiece/fpiece.go Normal file
View File

@ -0,0 +1,136 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package fpiece
import (
"errors"
"io"
"os"
)
// Chunk - Section of data to be concurrently read
type Chunk struct {
file *os.File
start int64
final int64
currentPos int64
}
// NewChunk - Create Chunk
func NewChunk(file *os.File, offset int64, length int64) (*Chunk, error) {
if length < 0 {
return nil, errors.New("Invalid Length")
}
return &Chunk{file, offset, length + offset, offset}, nil
}
// Size - Get size of Chunk
func (f *Chunk) Size() int64 {
return f.final - f.start
}
// Close - Close file associated with chunk
func (f *Chunk) Close() error {
return f.file.Close()
}
// Read - Concurrently read from Chunk
func (f *Chunk) Read(b []byte) (n int, err error) {
if f.currentPos >= f.final {
return 0, io.EOF
}
var readLen int64 // starts at 0
if f.final-f.currentPos > int64(len(b)) {
readLen = int64(len(b))
} else {
readLen = f.final - f.currentPos
}
n, err = f.file.ReadAt(b[:readLen], f.currentPos)
f.currentPos += int64(n)
return n, err
}
// ReadAt - Concurrently read from Chunk at specific offset
func (f *Chunk) ReadAt(p []byte, off int64) (n int, err error) {
if off < 0 || off >= f.final-f.start {
return 0, io.EOF
}
off += f.start
if max := f.final - off; int64(len(p)) > max {
p = p[:max]
n, err = f.file.ReadAt(p, off)
if err == nil {
err = io.EOF
}
return n, err
}
return f.file.ReadAt(p, off)
}
// Write - Concurrently write to Chunk
func (f *Chunk) Write(b []byte) (n int, err error) {
if f.currentPos >= f.final {
return 0, io.EOF
}
var writeLen int64 // starts at 0
if f.final-f.currentPos > int64(len(b)) {
writeLen = int64(len(b))
} else {
writeLen = f.final - f.currentPos
}
n, err = f.file.WriteAt(b[:writeLen], f.currentPos)
f.currentPos += int64(n)
return n, err
}
// WriteAt - Concurrently write to Chunk at specific offset
func (f *Chunk) WriteAt(p []byte, off int64) (n int, err error) {
if off < 0 || off >= f.final-f.start {
return 0, io.EOF
}
off += f.start
if max := f.final - off; int64(len(p)) > max {
p = p[:max]
n, err = f.file.WriteAt(p, off)
if err == nil {
err = io.EOF
}
return n, err
}
return f.file.WriteAt(p, off)
}
var errWhence = errors.New("Seek: invalid whence")
var errOffset = errors.New("Seek: invalid offset")
// Seek - Seek to position of chunk for reading/writing at specific offset
func (f *Chunk) Seek(offset int64, whence int) (int64, error) {
switch whence {
default:
return 0, errWhence
case io.SeekStart:
offset += f.start
case io.SeekCurrent:
offset += f.currentPos
case io.SeekEnd:
offset += f.final
}
// Do not seek to where somewhere outside the chunk
if offset < f.start || offset > f.final {
return 0, errOffset
}
f.currentPos = offset
return offset - f.start, nil
}

View File

@ -0,0 +1,205 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package fpiece
import (
"io/ioutil"
"log"
"os"
"testing"
)
var readTests = []struct {
in string
offset int64
len int64
out string
}{
{"butts", 0, 5, "butts"},
{"butts", 0, 2, "bu"},
{"butts", 3, 2, "ts"},
{"butts", 0, 10, "butts"},
{"butts", 1, 1100, "utts"},
}
var readAtTests = []struct {
in string
offset int64
out string
}{
{"butts", 0, "butts"},
{"butts", 1, "utts"},
{"butts", 2, "tts"},
{"butts", 100, ""},
{"butts", -1, ""},
}
var writeTests = []struct {
in string
offset int64
len int64
out string
}{
{"butts", 0, 5, "butts"},
{"butts", 0, 2, "bu"},
{"butts", 3, 2, "\x00\x00\x00bu"},
{"butts", 0, 10, "butts"},
{"butts", 1, 1100, "\x00butts"},
}
var writeAtTests = []struct {
in string
offset int64
out string
}{
{"butts", 0, "butts"},
{"butts", 1, "\x00butt"},
{"butts", 3, "\x00\x00\x00bu"},
{"butts", 1000, ""},
{"butts", -11, ""},
}
func TestRead(t *testing.T) {
for _, tt := range readTests {
t.Run("Reads data properly", func(t *testing.T) {
tmpfilePtr, err := ioutil.TempFile("", "read_test")
if err != nil {
log.Fatal(err)
}
defer os.Remove(tmpfilePtr.Name()) // clean up
if _, err := tmpfilePtr.Write([]byte(tt.in)); err != nil {
log.Fatal(err)
}
chunk, err := NewChunk(tmpfilePtr, tt.offset, tt.len)
if err != nil {
log.Fatal(err)
}
buffer := make([]byte, 100)
n, _ := chunk.Read(buffer)
if err := tmpfilePtr.Close(); err != nil {
log.Fatal(err)
}
if string(buffer[:n]) != tt.out {
t.Errorf("got %q, want %q", string(buffer[:n]), tt.out)
}
})
}
}
func TestReadAt(t *testing.T) {
for _, tt := range readAtTests {
t.Run("Reads data properly using ReadAt", func(t *testing.T) {
tmpfilePtr, err := ioutil.TempFile("", "readAt_test")
if err != nil {
log.Fatal(err)
}
defer os.Remove(tmpfilePtr.Name()) // clean up
if _, err := tmpfilePtr.Write([]byte(tt.in)); err != nil {
log.Fatal(err)
}
chunk, err := NewChunk(tmpfilePtr, 0, int64(len(tt.in)))
if err != nil {
log.Fatal(err)
}
buffer := make([]byte, 100)
n, _ := chunk.ReadAt(buffer, tt.offset)
if err := tmpfilePtr.Close(); err != nil {
log.Fatal(err)
}
if string(buffer[:n]) != tt.out {
t.Errorf("got %q, want %q", string(buffer[:n]), tt.out)
}
})
}
}
func TestWrite(t *testing.T) {
for _, tt := range writeTests {
t.Run("Writes data properly", func(t *testing.T) {
tmpfilePtr, err := ioutil.TempFile("", "write_test")
if err != nil {
log.Fatal(err)
}
defer os.Remove(tmpfilePtr.Name()) // clean up
chunk, err := NewChunk(tmpfilePtr, tt.offset, tt.len)
if err != nil {
log.Fatal(err)
}
chunk.Write([]byte(tt.in))
buffer := make([]byte, 100)
n, err := tmpfilePtr.Read(buffer)
if err := tmpfilePtr.Close(); err != nil {
log.Fatal(err)
}
if string(buffer[:n]) != tt.out {
t.Errorf("got %q, want %q", string(buffer[:n]), tt.out)
}
})
}
}
func TestWriteAt(t *testing.T) {
for _, tt := range writeAtTests {
t.Run("Writes data properly", func(t *testing.T) {
tmpfilePtr, err := ioutil.TempFile("", "writeAt_test")
if err != nil {
log.Fatal(err)
}
defer os.Remove(tmpfilePtr.Name()) // clean up
chunk, err := NewChunk(tmpfilePtr, 0, int64(len(tt.in)))
if err != nil {
log.Fatal(err)
}
chunk.WriteAt([]byte(tt.in), tt.offset)
buffer := make([]byte, 100)
n, err := tmpfilePtr.Read(buffer)
if err := tmpfilePtr.Close(); err != nil {
log.Fatal(err)
}
if string(buffer[:n]) != tt.out {
t.Errorf("got %q, want %q", string(buffer[:n]), tt.out)
}
})
}
}

View File

@ -9,8 +9,9 @@ import (
"path"
"path/filepath"
"github.com/aleitner/FilePiece"
"github.com/zeebo/errs"
"storj.io/storj/pkg/filepiece"
)
// Errors
@ -66,7 +67,7 @@ func Store(hash string, r io.Reader, length int64, psFileOffset int64, dir strin
return 0, err
}
dataFileChunk := fpiece.NewChunk(dataFile, psFileOffset, length)
dataFileChunk, _ := fpiece.NewChunk(dataFile, psFileOffset, length)
// Close when finished
defer dataFile.Close()
@ -119,7 +120,7 @@ func Retrieve(hash string, w io.Writer, length int64, readPosOffset int64, dir s
defer dataFile.Close()
// Created a section reader so that we can concurrently retrieve the same file.
dataFileChunk := fpiece.NewChunk(dataFile, readPosOffset, length)
dataFileChunk, _ := fpiece.NewChunk(dataFile, readPosOffset, length)
total, err := io.CopyN(w, dataFileChunk, length)