From fa65f449eda31b8b91953730b992d361895fb4c0 Mon Sep 17 00:00:00 2001 From: aligeti <34487396+aligeti@users.noreply.github.com> Date: Fri, 1 Jun 2018 08:51:13 -0400 Subject: [PATCH] light client upload file support -V3 gateway (#57) * integrated eestream's serve-pieces functionality * added ref to http request function * created dummy bucket list * Initialized the buckets with files with hardcoded sample data * supports upload Object(s) * uploads to corresponding folders * code cleanup for review * updated based on code review comments * updates based on missed code review comments * updated with review comments * implemented review comments * merged latest and tested * added filepath.Join() * updates based on the comments * fixes the eestreamer parameter due to merge --- pkg/miniogw/gateway-storj.go | 252 ++++++++++++++++++++++++++++++----- 1 file changed, 221 insertions(+), 31 deletions(-) diff --git a/pkg/miniogw/gateway-storj.go b/pkg/miniogw/gateway-storj.go index b9cfb75c7..670e35ff6 100644 --- a/pkg/miniogw/gateway-storj.go +++ b/pkg/miniogw/gateway-storj.go @@ -5,18 +5,34 @@ package storj import ( "context" + "crypto/sha256" + "flag" + "fmt" "io" + "os" + "path" + "path/filepath" + "strconv" "time" "github.com/minio/cli" - - "github.com/minio/minio/cmd" + minio "github.com/minio/minio/cmd" "github.com/minio/minio/pkg/auth" "github.com/minio/minio/pkg/hash" + "github.com/vivint/infectious" + + "storj.io/storj/pkg/eestream" +) + +var ( + pieceBlockSize = flag.Int("piece_block_size", 4*1024, "block size of pieces") + key = flag.String("key", "a key", "the secret key") + rsk = flag.Int("required", 20, "rs required") + rsn = flag.Int("total", 40, "rs total") ) func init() { - cmd.RegisterGatewayCommand(cli.Command{ + minio.RegisterGatewayCommand(cli.Command{ Name: "storj", Usage: "Storj", Action: storjGatewayMain, @@ -24,12 +40,108 @@ func init() { }) } +// getBuckets returns the buckets list +func (s *storjObjects) getBuckets() (buckets []minio.BucketInfo, err error) { + buckets = make([]minio.BucketInfo, len(s.storj.bucketlist)) + for i, bi := range s.storj.bucketlist { + buckets[i] = minio.BucketInfo{ + Name: bi.bucket.Name, + Created: bi.bucket.Created, + } + } + return buckets, nil +} + +// uploadFile function handles to add the uploaded file to the bucket's file list structure +func (s *storjObjects) uploadFile(bucket, object string, filesize int64, metadata map[string]string) (result minio.ListObjectsInfo, err error) { + var fl []minio.ObjectInfo + for i, v := range s.storj.bucketlist { + // bucket string comparision + if v.bucket.Name == bucket { + /* append the file to the filelist */ + s.storj.bucketlist[i].filelist.file.Objects = append( + s.storj.bucketlist[i].filelist.file.Objects, + minio.ObjectInfo{ + Bucket: bucket, + Name: object, + ModTime: time.Now(), + Size: filesize, + IsDir: false, + ContentType: "application/octet-stream", + }, + ) + /* populate the filelist */ + f := make([]minio.ObjectInfo, len(s.storj.bucketlist[i].filelist.file.Objects)) + for j, fi := range s.storj.bucketlist[i].filelist.file.Objects { + f[j] = minio.ObjectInfo{ + Bucket: v.bucket.Name, + Name: fi.Name, + ModTime: fi.ModTime, + Size: fi.Size, + IsDir: fi.IsDir, + ContentType: fi.ContentType, + } + } + fl = f + break + } + } + result = minio.ListObjectsInfo{ + IsTruncated: false, + Objects: fl, + } + return result, nil +} + +// getFiles returns the files list for a bucket +func (s *storjObjects) getFiles(bucket string) (result minio.ListObjectsInfo, err error) { + var fl []minio.ObjectInfo + for i, v := range s.storj.bucketlist { + if v.bucket.Name == bucket { + /* populate the filelist */ + f := make([]minio.ObjectInfo, len(s.storj.bucketlist[i].filelist.file.Objects)) + for j, fi := range s.storj.bucketlist[i].filelist.file.Objects { + f[j] = minio.ObjectInfo{ + Bucket: v.bucket.Name, + Name: fi.Name, + ModTime: fi.ModTime, + Size: fi.Size, + IsDir: fi.IsDir, + ContentType: fi.ContentType, + } + } + fl = f + break + } + } + result = minio.ListObjectsInfo{ + IsTruncated: false, + Objects: fl, + } + return result, nil +} + func storjGatewayMain(ctx *cli.Context) { - cmd.StartGateway(ctx, &Storj{}) + s := &Storj{} + s.createSampleBucketList() + minio.StartGateway(ctx, s) +} + +//S3Bucket structure +type S3Bucket struct { + bucket minio.BucketInfo + filelist S3FileList +} + +//S3FileList structure +type S3FileList struct { + file minio.ListObjectsInfo } // Storj is the implementation of a minio cmd.Gateway -type Storj struct{} +type Storj struct { + bucketlist []S3Bucket +} // Name implements cmd.Gateway func (s *Storj) Name() string { @@ -38,8 +150,8 @@ func (s *Storj) Name() string { // NewGatewayLayer implements cmd.Gateway func (s *Storj) NewGatewayLayer(creds auth.Credentials) ( - cmd.ObjectLayer, error) { - return &storjObjects{}, nil + minio.ObjectLayer, error) { + return &storjObjects{storj: s}, nil } // Production implements cmd.Gateway @@ -47,8 +159,28 @@ func (s *Storj) Production() bool { return false } +//createSampleBucketList function initializes sample buckets and files in each bucket +func (s *Storj) createSampleBucketList() { + s.bucketlist = make([]S3Bucket, 10) + for i := range s.bucketlist { + s.bucketlist[i].bucket.Name = "TestBucket" + strconv.Itoa(i+1) + s.bucketlist[i].bucket.Created = time.Now() + s.bucketlist[i].filelist.file.IsTruncated = false + s.bucketlist[i].filelist.file.Objects = make([]minio.ObjectInfo, 0x0A) + for j := range s.bucketlist[i].filelist.file.Objects { + s.bucketlist[i].filelist.file.Objects[j].Bucket = s.bucketlist[i].bucket.Name + s.bucketlist[i].filelist.file.Objects[j].Name = s.bucketlist[i].bucket.Name + "file" + strconv.Itoa(j+1) + s.bucketlist[i].filelist.file.Objects[j].ModTime = time.Now() + s.bucketlist[i].filelist.file.Objects[j].Size = 100 + s.bucketlist[i].filelist.file.Objects[j].ContentType = "application/octet-stream" + } + } +} + type storjObjects struct { - cmd.GatewayUnsupported + minio.GatewayUnsupported + TempDir string // Temporary storage location for file transfers. + storj *Storj } func (s *storjObjects) DeleteBucket(ctx context.Context, bucket string) error { @@ -61,7 +193,7 @@ func (s *storjObjects) DeleteObject(ctx context.Context, bucket, } func (s *storjObjects) GetBucketInfo(ctx context.Context, bucket string) ( - bucketInfo cmd.BucketInfo, err error) { + bucketInfo minio.BucketInfo, err error) { panic("TODO") } @@ -72,31 +204,18 @@ func (s *storjObjects) GetObject(ctx context.Context, bucket, object string, } func (s *storjObjects) GetObjectInfo(ctx context.Context, bucket, - object string) (objInfo cmd.ObjectInfo, err error) { + object string) (objInfo minio.ObjectInfo, err error) { panic("TODO") } func (s *storjObjects) ListBuckets(ctx context.Context) ( - buckets []cmd.BucketInfo, err error) { - return []cmd.BucketInfo{{ - Name: "test-bucket", - Created: time.Now(), - }}, nil + buckets []minio.BucketInfo, err error) { + return s.getBuckets() } func (s *storjObjects) ListObjects(ctx context.Context, bucket, prefix, marker, - delimiter string, maxKeys int) (result cmd.ListObjectsInfo, err error) { - return cmd.ListObjectsInfo{ - IsTruncated: false, - Objects: []cmd.ObjectInfo{{ - Bucket: "test-bucket", - Name: "test-file", - ModTime: time.Now(), - Size: 0, - IsDir: false, - ContentType: "application/octet-stream", - }}, - }, nil + delimiter string, maxKeys int) (result minio.ListObjectsInfo, err error) { + return s.getFiles(bucket) } func (s *storjObjects) MakeBucketWithLocation(ctx context.Context, @@ -104,16 +223,87 @@ func (s *storjObjects) MakeBucketWithLocation(ctx context.Context, panic("TODO") } +//encryptFile encrypts the uploaded files +func encryptFile(data io.ReadCloser, blockSize uint, bucket, object string) error { + dir := os.TempDir() + dir = filepath.Join(dir, "gateway", bucket, object) + err := os.MkdirAll(dir, 0755) + if err != nil { + return err + } + fc, err := infectious.NewFEC(*rsk, *rsn) + if err != nil { + return err + } + es := eestream.NewRSScheme(fc, *pieceBlockSize) + encKey := sha256.Sum256([]byte(*key)) + var firstNonce [12]byte + encrypter, err := eestream.NewAESGCMEncrypter( + &encKey, &firstNonce, es.DecodedBlockSize()) + if err != nil { + return err + } + readers, err := eestream.EncodeReader(context.Background(), eestream.TransformReader( + eestream.PadReader(data, encrypter.InBlockSize()), encrypter, 0), + es, 0, 0, 4*1024*1024) + if err != nil { + return err + } + errs := make(chan error, len(readers)) + for i := range readers { + go func(i int) { + fh, err := os.Create( + filepath.Join(dir, fmt.Sprintf("%d.piece", i))) + if err != nil { + errs <- err + return + } + defer fh.Close() + _, err = io.Copy(fh, readers[i]) + errs <- err + }(i) + } + for range readers { + err := <-errs + if err != nil { + return err + } + } + return nil +} + func (s *storjObjects) PutObject(ctx context.Context, bucket, object string, - data *hash.Reader, metadata map[string]string) (objInfo cmd.ObjectInfo, + data *hash.Reader, metadata map[string]string) (objInfo minio.ObjectInfo, err error) { - panic("TODO") + srcFile := path.Join(s.TempDir, minio.MustGetUUID()) + writer, err := os.Create(srcFile) + if err != nil { + return objInfo, err + } + + wsize, err := io.CopyN(writer, data, data.Size()) + if err != nil { + os.Remove(srcFile) + return objInfo, err + } + + err = encryptFile(writer, uint(wsize), bucket, object) + if err == nil { + s.uploadFile(bucket, object, wsize, metadata) + } + return minio.ObjectInfo{ + Name: object, + Bucket: bucket, + ModTime: time.Now(), + Size: wsize, + ETag: minio.GenETag(), + }, err } func (s *storjObjects) Shutdown(context.Context) error { panic("TODO") } -func (s *storjObjects) StorageInfo(context.Context) cmd.StorageInfo { - panic("TODO") +func (s *storjObjects) StorageInfo(context.Context) minio.StorageInfo { + return minio.StorageInfo{} }