From 6bf46e80ee593ed8630e365010b050dacae23ab3 Mon Sep 17 00:00:00 2001 From: Dylan Lott Date: Wed, 20 Mar 2019 09:43:53 -0600 Subject: [PATCH] Adds Libuplink (#1452) * Merge in upstream * Some initial wireup * Added common.go file, more misc. work * WIP adding identity in * Get FullIdentity combined into Uplink * Structure libuplink a little better * Update some types and add some comments * WIP uplink stuff * Get uplink types and configs figured out * add initial setup for tests, happy path is working * Remove dependency from miniogw * Adds miniogw code and wires it up correctly * WIP working on getting test suite setup * Uplink client now returns successfully and passes some initial happy path tets * WIP trying to get v2 draft ready * WIP * WIP wiring up bucket methods and adjusting to some review feedback * Getting closer to v2 libuplink draft * CreateBucket now works and has tests to prove it * Bucket tests are passing now * removing some code * Updates error handling and linter fixes * Removes main_test * Uploads and downloads are now working * Rename BucketOpts to Encryption * updates * added test file back to git that was being ignored for some reason * more test conditions * changes Checksum in ObjectMeta struct to be type []byte * linter fix * Updates how encryption is passed through to bucket opts * Updates encryption handling at bucket and access level * Fixes imports * Updates per code review --- lib/uplink/access.go | 91 ++++++++++++++++++++++++ lib/uplink/buckets.go | 132 +++++++++++++++++++++++++++++++++++ lib/uplink/common.go | 9 +++ lib/uplink/libuplink_test.go | 131 ++++++++++++++++++++++++++++++++++ lib/uplink/main.go | 37 ++++++++++ lib/uplink/objects.go | 84 ++++++++++++++++++++++ lib/uplink/uplink.go | 13 ++++ 7 files changed, 497 insertions(+) create mode 100644 lib/uplink/access.go create mode 100644 lib/uplink/buckets.go create mode 100644 lib/uplink/common.go create mode 100644 lib/uplink/libuplink_test.go create mode 100644 lib/uplink/main.go create mode 100644 lib/uplink/objects.go create mode 100644 lib/uplink/uplink.go diff --git a/lib/uplink/access.go b/lib/uplink/access.go new file mode 100644 index 000000000..e8e7e716e --- /dev/null +++ b/lib/uplink/access.go @@ -0,0 +1,91 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package uplink + +import ( + "context" + + "storj.io/storj/pkg/storj" +) + +// Access holds a reference to Uplink and a set of permissions for actions on a bucket. +type Access struct { + Permissions Permissions + Uplink *Uplink +} + +// A Macaroon represents an access credential to certain resources +type Macaroon interface { + Serialize() ([]byte, error) + Restrict(caveats ...Caveat) Macaroon +} + +// Permissions are parsed by Uplink and return an Access struct +type Permissions struct { + Macaroon Macaroon + APIKey string +} + +// Caveat could be a read-only restriction, a time-bound +// restriction, a bucket-specific restriction, a path-prefix restriction, a +// full path restriction, etc. +type Caveat interface { +} + +// CreateBucketOptions holds the bucket opts +type CreateBucketOptions struct { + Encryption Encryption +} + +// CreateBucket creates a bucket from the passed opts +func (a *Access) CreateBucket(ctx context.Context, bucket string, opts CreateBucketOptions) (storj.Bucket, error) { + metainfo, _, err := a.Uplink.config.GetMetainfo(ctx, a.Uplink.id) + if err != nil { + return storj.Bucket{}, Error.Wrap(err) + } + + return metainfo.CreateBucket(ctx, bucket, &storj.Bucket{PathCipher: opts.Encryption.PathCipher}) +} + +// DeleteBucket deletes a bucket if authorized +func (a *Access) DeleteBucket(ctx context.Context, bucket string) error { + metainfo, _, err := a.Uplink.config.GetMetainfo(ctx, a.Uplink.id) + if err != nil { + return Error.Wrap(err) + } + + return metainfo.DeleteBucket(ctx, bucket) +} + +// ListBuckets will list authorized buckets +func (a *Access) ListBuckets(ctx context.Context, opts storj.BucketListOptions) (storj.BucketList, error) { + metainfo, _, err := a.Uplink.config.GetMetainfo(ctx, a.Uplink.id) + if err != nil { + return storj.BucketList{}, Error.Wrap(err) + } + + return metainfo.ListBuckets(ctx, opts) +} + +// GetBucketInfo returns info about the requested bucket if authorized +func (a *Access) GetBucketInfo(ctx context.Context, bucket string) (storj.Bucket, error) { + metainfo, _, err := a.Uplink.config.GetMetainfo(ctx, a.Uplink.id) + if err != nil { + return storj.Bucket{}, Error.Wrap(err) + } + + return metainfo.GetBucket(ctx, bucket) +} + +// GetBucket returns a Bucket with the given Encryption information +func (a *Access) GetBucket(ctx context.Context, bucket string, encryption *Encryption) *Bucket { + return &Bucket{ + Access: a, + Enc: encryption, + Bucket: storj.Bucket{ + Name: bucket, + PathCipher: encryption.PathCipher, + }, + } +} diff --git a/lib/uplink/buckets.go b/lib/uplink/buckets.go new file mode 100644 index 000000000..4ac57707e --- /dev/null +++ b/lib/uplink/buckets.go @@ -0,0 +1,132 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package uplink + +import ( + "bytes" + "context" + "io" + "io/ioutil" + "net/http" + + "storj.io/storj/pkg/storj" + "storj.io/storj/pkg/stream" + "storj.io/storj/pkg/utils" +) + +// Encryption holds the cipher, path, key, and enc. scheme for each bucket since they +// can be different for each +type Encryption struct { + PathCipher storj.Cipher + EncPathPrefix storj.Path + Key storj.Key + EncryptionScheme storj.EncryptionScheme +} + +// Bucket is a struct that allows operations on a Bucket after a user providers Permissions +type Bucket struct { + Access *Access + Enc *Encryption + Bucket storj.Bucket +} + +// GetObject returns the info for a given object +func (b *Bucket) GetObject(ctx context.Context, path storj.Path) (storj.Object, error) { + metainfo, _, err := b.Access.Uplink.config.GetMetainfo(ctx, b.Access.Uplink.id) + if err != nil { + return storj.Object{}, Error.Wrap(err) + } + + return metainfo.GetObject(ctx, b.Bucket.Name, path) +} + +// List returns a list of objects in a bucket +func (b *Bucket) List(ctx context.Context, cfg ListObjectsConfig) (items storj.ObjectList, err error) { + metainfo, _, err := b.Access.Uplink.config.GetMetainfo(ctx, b.Access.Uplink.id) + if err != nil { + return storj.ObjectList{}, Error.Wrap(err) + } + + listOpts := storj.ListOptions{ + Prefix: cfg.Prefix, + Cursor: cfg.Cursor, + Recursive: cfg.Recursive, + Direction: cfg.Direction, + Limit: cfg.Limit, + } + + return metainfo.ListObjects(ctx, b.Bucket.Name, listOpts) +} + +// Upload puts an object in a bucket +func (b *Bucket) Upload(ctx context.Context, path storj.Path, data []byte, opts UploadOpts) error { + metainfo, streams, err := b.Access.Uplink.config.GetMetainfo(ctx, b.Access.Uplink.id) + if err != nil { + return Error.Wrap(err) + } + + encScheme := b.Access.Uplink.config.GetEncryptionScheme() + redScheme := b.Access.Uplink.config.GetRedundancyScheme() + contentType := http.DetectContentType(data) + + create := storj.CreateObject{ + RedundancyScheme: redScheme, + EncryptionScheme: encScheme, + ContentType: contentType, + } + + obj, err := metainfo.CreateObject(ctx, b.Bucket.Name, path, &create) + if err != nil { + return Error.Wrap(err) + } + + reader := bytes.NewReader(data) + mutableStream, err := obj.CreateStream(ctx) + if err != nil { + return Error.Wrap(err) + } + + upload := stream.NewUpload(ctx, mutableStream, streams) + + _, err = io.Copy(upload, reader) + if err != nil { + return Error.Wrap(err) + } + + return utils.CombineErrors(err, upload.Close()) +} + +// Download downloads an object from a bucket +func (b *Bucket) Download(ctx context.Context, path storj.Path) ([]byte, error) { + metainfo, streams, err := b.Access.Uplink.config.GetMetainfo(ctx, b.Access.Uplink.id) + if err != nil { + return nil, Error.Wrap(err) + } + + readStream, err := metainfo.GetObjectStream(ctx, b.Bucket.Name, path) + if err != nil { + return nil, Error.Wrap(err) + } + + stream := stream.NewDownload(ctx, readStream, streams) + + defer func() { err = utils.CombineErrors(err, stream.Close()) }() + + data, err := ioutil.ReadAll(stream) + if err != nil { + return nil, Error.Wrap(err) + } + + return data, nil +} + +// Delete removes an object from a bucket and returns an error if there was an issue +func (b *Bucket) Delete(ctx context.Context, path storj.Path) error { + metainfo, _, err := b.Access.Uplink.config.GetMetainfo(ctx, b.Access.Uplink.id) + if err != nil { + return Error.Wrap(err) + } + + return metainfo.DeleteObject(ctx, b.Bucket.Name, path) +} diff --git a/lib/uplink/common.go b/lib/uplink/common.go new file mode 100644 index 000000000..68b3bfa0e --- /dev/null +++ b/lib/uplink/common.go @@ -0,0 +1,9 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package uplink + +// APIKey is an interface for authenticating with the Satellite +type APIKey interface { + Serialize() ([]byte, error) +} diff --git a/lib/uplink/libuplink_test.go b/lib/uplink/libuplink_test.go new file mode 100644 index 000000000..5db468cde --- /dev/null +++ b/lib/uplink/libuplink_test.go @@ -0,0 +1,131 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package uplink + +import ( + "testing" + + "github.com/spf13/pflag" + "github.com/stretchr/testify/assert" + + "storj.io/storj/internal/testcontext" + "storj.io/storj/internal/testplanet" + "storj.io/storj/pkg/cfgstruct" + "storj.io/storj/pkg/identity" + "storj.io/storj/pkg/storj" + "storj.io/storj/satellite" + ul "storj.io/storj/uplink" +) + +func TestUplink(t *testing.T) { + // Planet Config for Uplink + testplanetConfig := testplanet.Config{ + SatelliteCount: 1, + StorageNodeCount: 20, + UplinkCount: 1, + } + + // Run Tests + testplanet.Run(t, testplanetConfig, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + satellite := planet.Satellites[0] + identity, err := identity.NewFullIdentity(ctx, 12, 4) + assert.NoError(t, err) + satelliteAddr := satellite.Addr() // get address + cfg := getConfig(satellite, planet) + uplink := NewUplink(identity, satelliteAddr, cfg) + + permissions := Permissions{} + access := uplink.Access(ctx, permissions) + assert.NoError(t, err) + + opts := CreateBucketOptions{} + bucket, err := access.CreateBucket(ctx, "testbucket", opts) + assert.NoError(t, err) + assert.NotNil(t, bucket) + + bucketListOptions := storj.BucketListOptions{ + Limit: 1000, + Direction: storj.ListDirection(1), + } + buckets, err := access.ListBuckets(ctx, bucketListOptions) + assert.NoError(t, err) + assert.NotNil(t, buckets) + + storjBucket, err := access.GetBucketInfo(ctx, "testbucket") + assert.NoError(t, err) + assert.NotNil(t, storjBucket) + assert.Equal(t, storjBucket.Name, "testbucket") + assert.IsType(t, storj.Bucket{}, storjBucket) + + encOpts := &Encryption{} + getbucket := access.GetBucket(ctx, "testbucket", encOpts) + assert.NoError(t, err) + assert.NotNil(t, getbucket) + + err = access.DeleteBucket(ctx, "testbucket") + assert.NoError(t, err) + + uploadtest, err := access.CreateBucket(ctx, "uploadtest", opts) + assert.NoError(t, err) + assert.NotNil(t, uploadtest) + assert.Equal(t, uploadtest.Name, "uploadtest") + + uploadBucket := access.GetBucket(ctx, "uploadtest", encOpts) + assert.NotNil(t, uploadBucket) + + list, err := uploadBucket.List(ctx, ListObjectsConfig{ + Direction: storj.ListDirection(1), + Limit: 100, + }) + + assert.NoError(t, err) + assert.NotNil(t, list) + assert.Equal(t, len(list.Items), 0) + + testdata := []byte{1, 1, 1, 1, 1} + uploadOpts := UploadOpts{} + + err = uploadBucket.Upload(ctx, "testpath", testdata, uploadOpts) + assert.NoError(t, err) + + downloadedData, err := uploadBucket.Download(ctx, "testpath") + assert.NotNil(t, downloadedData) + assert.NoError(t, err) + assert.Equal(t, testdata, downloadedData) + + list2, err := uploadBucket.List(ctx, ListObjectsConfig{ + Direction: storj.ListDirection(1), + Limit: 100, + }) + + assert.NotNil(t, list2) + assert.NoError(t, err) + assert.NotNil(t, list2.Items) + assert.Equal(t, len(list2.Items), 1) + }) +} + +func getConfig(satellite *satellite.Peer, planet *testplanet.Planet) ul.Config { + config := getDefaultConfig() + config.Client.OverlayAddr = satellite.Addr() + config.Client.PointerDBAddr = satellite.Addr() + config.Client.APIKey = planet.Uplinks[0].APIKey[satellite.ID()] + + config.RS.MinThreshold = 1 * len(planet.StorageNodes) / 5 + config.RS.RepairThreshold = 2 * len(planet.StorageNodes) / 5 + config.RS.SuccessThreshold = 3 * len(planet.StorageNodes) / 5 + config.RS.MaxThreshold = 4 * len(planet.StorageNodes) / 5 + + config.TLS.UsePeerCAWhitelist = false + config.TLS.Extensions.Revocation = false + config.TLS.Extensions.WhitelistSignedLeaf = false + + return config +} + +func getDefaultConfig() ul.Config { + cfg := ul.Config{} + cfgstruct.Bind(&pflag.FlagSet{}, &cfg, true) + return cfg +} diff --git a/lib/uplink/main.go b/lib/uplink/main.go new file mode 100644 index 000000000..0c31247d8 --- /dev/null +++ b/lib/uplink/main.go @@ -0,0 +1,37 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package uplink + +import ( + "context" + + "storj.io/storj/pkg/identity" + ul "storj.io/storj/uplink" +) + +// Uplink represents the main entrypoint to Storj V3. An Uplink connects to +// a specific Satellite and caches connections and resources, allowing one to +// create sessions delineated by specific access controls. +type Uplink struct { + id *identity.FullIdentity + satelliteAddr string + config ul.Config +} + +// Access returns a pointer to an Access for bucket operations to occur on +func (u *Uplink) Access(ctx context.Context, permissions Permissions) *Access { + // TODO (dylan): Parse permissions here + return &Access{ + Uplink: u, + } +} + +// NewUplink returns a pointer to a new Uplink or an error +func NewUplink(identity *identity.FullIdentity, satelliteAddr string, cfg ul.Config) *Uplink { + return &Uplink{ + id: identity, + satelliteAddr: satelliteAddr, + config: cfg, + } +} diff --git a/lib/uplink/objects.go b/lib/uplink/objects.go new file mode 100644 index 000000000..b53991f54 --- /dev/null +++ b/lib/uplink/objects.go @@ -0,0 +1,84 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package uplink + +import ( + "context" + "io" + "time" + + "storj.io/storj/pkg/storj" +) + +// Object holds the information for a given object +type Object struct { + Meta ObjectMeta +} + +// ObjectMeta represents metadata about a specific Object +type ObjectMeta struct { + Bucket string + Path storj.Path + IsPrefix bool + + Metadata map[string]string + + Created time.Time + Modified time.Time + Expires time.Time + + Size int64 + Checksum []byte +} + +// UploadOpts controls options about uploading a new Object, if authorized. +type UploadOpts struct { + Metadata map[string]string + Expires time.Time + + Encryption *Encryption +} + +// ListObjectsField numbers the fields of list objects +type ListObjectsField int + +const ( + // ListObjectsMetaNone opts + ListObjectsMetaNone ListObjectsField = 0 + // ListObjectsMetaModified opts + ListObjectsMetaModified ListObjectsField = 1 << iota + // ListObjectsMetaExpiration opts + ListObjectsMetaExpiration ListObjectsField = 1 << iota + // ListObjectsMetaSize opts + ListObjectsMetaSize ListObjectsField = 1 << iota + // ListObjectsMetaChecksum opts + ListObjectsMetaChecksum ListObjectsField = 1 << iota + // ListObjectsMetaUserDefined opts + ListObjectsMetaUserDefined ListObjectsField = 1 << iota + // ListObjectsMetaAll opts + ListObjectsMetaAll ListObjectsField = 1 << iota +) + +// ListObjectsConfig holds params for listing objects with the Gateway +type ListObjectsConfig struct { + // this differs from storj.ListOptions by removing the Delimiter field + // (ours is hardcoded as "/"), and adding the Fields field to optionally + // support efficient listing that doesn't require looking outside of the + // path index in pointerdb. + + Prefix storj.Path + Cursor storj.Path + Recursive bool + Direction storj.ListDirection + Limit int + Fields ListObjectsFields +} + +// ListObjectsFields is an interface that I haven't figured out yet +type ListObjectsFields interface{} + +// Range returns an objects data +func (o *Object) Range(ctx context.Context, offset, length int64) (io.ReadCloser, error) { + panic("TODO") +} diff --git a/lib/uplink/uplink.go b/lib/uplink/uplink.go new file mode 100644 index 000000000..0ca9f29c6 --- /dev/null +++ b/lib/uplink/uplink.go @@ -0,0 +1,13 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package uplink + +import ( + "github.com/zeebo/errs" +) + +var ( + // Error is the errs class of standard End User Client errors + Error = errs.Class("libuplink error") +)