light client upload file support -V3 gateway (#57)
* integrated eestream's serve-pieces functionality * added ref to http request function * created dummy bucket list * Initialized the buckets with files with hardcoded sample data * supports upload Object(s) * uploads to corresponding folders * code cleanup for review * updated based on code review comments * updates based on missed code review comments * updated with review comments * implemented review comments * merged latest and tested * added filepath.Join() * updates based on the comments * fixes the eestreamer parameter due to merge
This commit is contained in:
parent
5e97cf7a2e
commit
fa65f449ed
@ -5,18 +5,34 @@ package storj
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/minio/cli"
|
||||
|
||||
"github.com/minio/minio/cmd"
|
||||
minio "github.com/minio/minio/cmd"
|
||||
"github.com/minio/minio/pkg/auth"
|
||||
"github.com/minio/minio/pkg/hash"
|
||||
"github.com/vivint/infectious"
|
||||
|
||||
"storj.io/storj/pkg/eestream"
|
||||
)
|
||||
|
||||
var (
|
||||
pieceBlockSize = flag.Int("piece_block_size", 4*1024, "block size of pieces")
|
||||
key = flag.String("key", "a key", "the secret key")
|
||||
rsk = flag.Int("required", 20, "rs required")
|
||||
rsn = flag.Int("total", 40, "rs total")
|
||||
)
|
||||
|
||||
func init() {
|
||||
cmd.RegisterGatewayCommand(cli.Command{
|
||||
minio.RegisterGatewayCommand(cli.Command{
|
||||
Name: "storj",
|
||||
Usage: "Storj",
|
||||
Action: storjGatewayMain,
|
||||
@ -24,12 +40,108 @@ func init() {
|
||||
})
|
||||
}
|
||||
|
||||
// getBuckets returns the buckets list
|
||||
func (s *storjObjects) getBuckets() (buckets []minio.BucketInfo, err error) {
|
||||
buckets = make([]minio.BucketInfo, len(s.storj.bucketlist))
|
||||
for i, bi := range s.storj.bucketlist {
|
||||
buckets[i] = minio.BucketInfo{
|
||||
Name: bi.bucket.Name,
|
||||
Created: bi.bucket.Created,
|
||||
}
|
||||
}
|
||||
return buckets, nil
|
||||
}
|
||||
|
||||
// uploadFile function handles to add the uploaded file to the bucket's file list structure
|
||||
func (s *storjObjects) uploadFile(bucket, object string, filesize int64, metadata map[string]string) (result minio.ListObjectsInfo, err error) {
|
||||
var fl []minio.ObjectInfo
|
||||
for i, v := range s.storj.bucketlist {
|
||||
// bucket string comparision
|
||||
if v.bucket.Name == bucket {
|
||||
/* append the file to the filelist */
|
||||
s.storj.bucketlist[i].filelist.file.Objects = append(
|
||||
s.storj.bucketlist[i].filelist.file.Objects,
|
||||
minio.ObjectInfo{
|
||||
Bucket: bucket,
|
||||
Name: object,
|
||||
ModTime: time.Now(),
|
||||
Size: filesize,
|
||||
IsDir: false,
|
||||
ContentType: "application/octet-stream",
|
||||
},
|
||||
)
|
||||
/* populate the filelist */
|
||||
f := make([]minio.ObjectInfo, len(s.storj.bucketlist[i].filelist.file.Objects))
|
||||
for j, fi := range s.storj.bucketlist[i].filelist.file.Objects {
|
||||
f[j] = minio.ObjectInfo{
|
||||
Bucket: v.bucket.Name,
|
||||
Name: fi.Name,
|
||||
ModTime: fi.ModTime,
|
||||
Size: fi.Size,
|
||||
IsDir: fi.IsDir,
|
||||
ContentType: fi.ContentType,
|
||||
}
|
||||
}
|
||||
fl = f
|
||||
break
|
||||
}
|
||||
}
|
||||
result = minio.ListObjectsInfo{
|
||||
IsTruncated: false,
|
||||
Objects: fl,
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// getFiles returns the files list for a bucket
|
||||
func (s *storjObjects) getFiles(bucket string) (result minio.ListObjectsInfo, err error) {
|
||||
var fl []minio.ObjectInfo
|
||||
for i, v := range s.storj.bucketlist {
|
||||
if v.bucket.Name == bucket {
|
||||
/* populate the filelist */
|
||||
f := make([]minio.ObjectInfo, len(s.storj.bucketlist[i].filelist.file.Objects))
|
||||
for j, fi := range s.storj.bucketlist[i].filelist.file.Objects {
|
||||
f[j] = minio.ObjectInfo{
|
||||
Bucket: v.bucket.Name,
|
||||
Name: fi.Name,
|
||||
ModTime: fi.ModTime,
|
||||
Size: fi.Size,
|
||||
IsDir: fi.IsDir,
|
||||
ContentType: fi.ContentType,
|
||||
}
|
||||
}
|
||||
fl = f
|
||||
break
|
||||
}
|
||||
}
|
||||
result = minio.ListObjectsInfo{
|
||||
IsTruncated: false,
|
||||
Objects: fl,
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func storjGatewayMain(ctx *cli.Context) {
|
||||
cmd.StartGateway(ctx, &Storj{})
|
||||
s := &Storj{}
|
||||
s.createSampleBucketList()
|
||||
minio.StartGateway(ctx, s)
|
||||
}
|
||||
|
||||
//S3Bucket structure
|
||||
type S3Bucket struct {
|
||||
bucket minio.BucketInfo
|
||||
filelist S3FileList
|
||||
}
|
||||
|
||||
//S3FileList structure
|
||||
type S3FileList struct {
|
||||
file minio.ListObjectsInfo
|
||||
}
|
||||
|
||||
// Storj is the implementation of a minio cmd.Gateway
|
||||
type Storj struct{}
|
||||
type Storj struct {
|
||||
bucketlist []S3Bucket
|
||||
}
|
||||
|
||||
// Name implements cmd.Gateway
|
||||
func (s *Storj) Name() string {
|
||||
@ -38,8 +150,8 @@ func (s *Storj) Name() string {
|
||||
|
||||
// NewGatewayLayer implements cmd.Gateway
|
||||
func (s *Storj) NewGatewayLayer(creds auth.Credentials) (
|
||||
cmd.ObjectLayer, error) {
|
||||
return &storjObjects{}, nil
|
||||
minio.ObjectLayer, error) {
|
||||
return &storjObjects{storj: s}, nil
|
||||
}
|
||||
|
||||
// Production implements cmd.Gateway
|
||||
@ -47,8 +159,28 @@ func (s *Storj) Production() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
//createSampleBucketList function initializes sample buckets and files in each bucket
|
||||
func (s *Storj) createSampleBucketList() {
|
||||
s.bucketlist = make([]S3Bucket, 10)
|
||||
for i := range s.bucketlist {
|
||||
s.bucketlist[i].bucket.Name = "TestBucket" + strconv.Itoa(i+1)
|
||||
s.bucketlist[i].bucket.Created = time.Now()
|
||||
s.bucketlist[i].filelist.file.IsTruncated = false
|
||||
s.bucketlist[i].filelist.file.Objects = make([]minio.ObjectInfo, 0x0A)
|
||||
for j := range s.bucketlist[i].filelist.file.Objects {
|
||||
s.bucketlist[i].filelist.file.Objects[j].Bucket = s.bucketlist[i].bucket.Name
|
||||
s.bucketlist[i].filelist.file.Objects[j].Name = s.bucketlist[i].bucket.Name + "file" + strconv.Itoa(j+1)
|
||||
s.bucketlist[i].filelist.file.Objects[j].ModTime = time.Now()
|
||||
s.bucketlist[i].filelist.file.Objects[j].Size = 100
|
||||
s.bucketlist[i].filelist.file.Objects[j].ContentType = "application/octet-stream"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type storjObjects struct {
|
||||
cmd.GatewayUnsupported
|
||||
minio.GatewayUnsupported
|
||||
TempDir string // Temporary storage location for file transfers.
|
||||
storj *Storj
|
||||
}
|
||||
|
||||
func (s *storjObjects) DeleteBucket(ctx context.Context, bucket string) error {
|
||||
@ -61,7 +193,7 @@ func (s *storjObjects) DeleteObject(ctx context.Context, bucket,
|
||||
}
|
||||
|
||||
func (s *storjObjects) GetBucketInfo(ctx context.Context, bucket string) (
|
||||
bucketInfo cmd.BucketInfo, err error) {
|
||||
bucketInfo minio.BucketInfo, err error) {
|
||||
panic("TODO")
|
||||
}
|
||||
|
||||
@ -72,31 +204,18 @@ func (s *storjObjects) GetObject(ctx context.Context, bucket, object string,
|
||||
}
|
||||
|
||||
func (s *storjObjects) GetObjectInfo(ctx context.Context, bucket,
|
||||
object string) (objInfo cmd.ObjectInfo, err error) {
|
||||
object string) (objInfo minio.ObjectInfo, err error) {
|
||||
panic("TODO")
|
||||
}
|
||||
|
||||
func (s *storjObjects) ListBuckets(ctx context.Context) (
|
||||
buckets []cmd.BucketInfo, err error) {
|
||||
return []cmd.BucketInfo{{
|
||||
Name: "test-bucket",
|
||||
Created: time.Now(),
|
||||
}}, nil
|
||||
buckets []minio.BucketInfo, err error) {
|
||||
return s.getBuckets()
|
||||
}
|
||||
|
||||
func (s *storjObjects) ListObjects(ctx context.Context, bucket, prefix, marker,
|
||||
delimiter string, maxKeys int) (result cmd.ListObjectsInfo, err error) {
|
||||
return cmd.ListObjectsInfo{
|
||||
IsTruncated: false,
|
||||
Objects: []cmd.ObjectInfo{{
|
||||
Bucket: "test-bucket",
|
||||
Name: "test-file",
|
||||
ModTime: time.Now(),
|
||||
Size: 0,
|
||||
IsDir: false,
|
||||
ContentType: "application/octet-stream",
|
||||
}},
|
||||
}, nil
|
||||
delimiter string, maxKeys int) (result minio.ListObjectsInfo, err error) {
|
||||
return s.getFiles(bucket)
|
||||
}
|
||||
|
||||
func (s *storjObjects) MakeBucketWithLocation(ctx context.Context,
|
||||
@ -104,16 +223,87 @@ func (s *storjObjects) MakeBucketWithLocation(ctx context.Context,
|
||||
panic("TODO")
|
||||
}
|
||||
|
||||
//encryptFile encrypts the uploaded files
|
||||
func encryptFile(data io.ReadCloser, blockSize uint, bucket, object string) error {
|
||||
dir := os.TempDir()
|
||||
dir = filepath.Join(dir, "gateway", bucket, object)
|
||||
err := os.MkdirAll(dir, 0755)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fc, err := infectious.NewFEC(*rsk, *rsn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
es := eestream.NewRSScheme(fc, *pieceBlockSize)
|
||||
encKey := sha256.Sum256([]byte(*key))
|
||||
var firstNonce [12]byte
|
||||
encrypter, err := eestream.NewAESGCMEncrypter(
|
||||
&encKey, &firstNonce, es.DecodedBlockSize())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
readers, err := eestream.EncodeReader(context.Background(), eestream.TransformReader(
|
||||
eestream.PadReader(data, encrypter.InBlockSize()), encrypter, 0),
|
||||
es, 0, 0, 4*1024*1024)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
errs := make(chan error, len(readers))
|
||||
for i := range readers {
|
||||
go func(i int) {
|
||||
fh, err := os.Create(
|
||||
filepath.Join(dir, fmt.Sprintf("%d.piece", i)))
|
||||
if err != nil {
|
||||
errs <- err
|
||||
return
|
||||
}
|
||||
defer fh.Close()
|
||||
_, err = io.Copy(fh, readers[i])
|
||||
errs <- err
|
||||
}(i)
|
||||
}
|
||||
for range readers {
|
||||
err := <-errs
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *storjObjects) PutObject(ctx context.Context, bucket, object string,
|
||||
data *hash.Reader, metadata map[string]string) (objInfo cmd.ObjectInfo,
|
||||
data *hash.Reader, metadata map[string]string) (objInfo minio.ObjectInfo,
|
||||
err error) {
|
||||
panic("TODO")
|
||||
srcFile := path.Join(s.TempDir, minio.MustGetUUID())
|
||||
writer, err := os.Create(srcFile)
|
||||
if err != nil {
|
||||
return objInfo, err
|
||||
}
|
||||
|
||||
wsize, err := io.CopyN(writer, data, data.Size())
|
||||
if err != nil {
|
||||
os.Remove(srcFile)
|
||||
return objInfo, err
|
||||
}
|
||||
|
||||
err = encryptFile(writer, uint(wsize), bucket, object)
|
||||
if err == nil {
|
||||
s.uploadFile(bucket, object, wsize, metadata)
|
||||
}
|
||||
return minio.ObjectInfo{
|
||||
Name: object,
|
||||
Bucket: bucket,
|
||||
ModTime: time.Now(),
|
||||
Size: wsize,
|
||||
ETag: minio.GenETag(),
|
||||
}, err
|
||||
}
|
||||
|
||||
func (s *storjObjects) Shutdown(context.Context) error {
|
||||
panic("TODO")
|
||||
}
|
||||
|
||||
func (s *storjObjects) StorageInfo(context.Context) cmd.StorageInfo {
|
||||
panic("TODO")
|
||||
func (s *storjObjects) StorageInfo(context.Context) minio.StorageInfo {
|
||||
return minio.StorageInfo{}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user