Adds Libuplink (#1452)
* Merge in upstream * Some initial wireup * Added common.go file, more misc. work * WIP adding identity in * Get FullIdentity combined into Uplink * Structure libuplink a little better * Update some types and add some comments * WIP uplink stuff * Get uplink types and configs figured out * add initial setup for tests, happy path is working * Remove dependency from miniogw * Adds miniogw code and wires it up correctly * WIP working on getting test suite setup * Uplink client now returns successfully and passes some initial happy path tets * WIP trying to get v2 draft ready * WIP * WIP wiring up bucket methods and adjusting to some review feedback * Getting closer to v2 libuplink draft * CreateBucket now works and has tests to prove it * Bucket tests are passing now * removing some code * Updates error handling and linter fixes * Removes main_test * Uploads and downloads are now working * Rename BucketOpts to Encryption * updates * added test file back to git that was being ignored for some reason * more test conditions * changes Checksum in ObjectMeta struct to be type []byte * linter fix * Updates how encryption is passed through to bucket opts * Updates encryption handling at bucket and access level * Fixes imports * Updates per code review
This commit is contained in:
parent
1d78ddc3df
commit
6bf46e80ee
91
lib/uplink/access.go
Normal file
91
lib/uplink/access.go
Normal file
@ -0,0 +1,91 @@
|
||||
// Copyright (C) 2018 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package uplink
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"storj.io/storj/pkg/storj"
|
||||
)
|
||||
|
||||
// Access holds a reference to Uplink and a set of permissions for actions on a bucket.
|
||||
type Access struct {
|
||||
Permissions Permissions
|
||||
Uplink *Uplink
|
||||
}
|
||||
|
||||
// A Macaroon represents an access credential to certain resources
|
||||
type Macaroon interface {
|
||||
Serialize() ([]byte, error)
|
||||
Restrict(caveats ...Caveat) Macaroon
|
||||
}
|
||||
|
||||
// Permissions are parsed by Uplink and return an Access struct
|
||||
type Permissions struct {
|
||||
Macaroon Macaroon
|
||||
APIKey string
|
||||
}
|
||||
|
||||
// Caveat could be a read-only restriction, a time-bound
|
||||
// restriction, a bucket-specific restriction, a path-prefix restriction, a
|
||||
// full path restriction, etc.
|
||||
type Caveat interface {
|
||||
}
|
||||
|
||||
// CreateBucketOptions holds the bucket opts
|
||||
type CreateBucketOptions struct {
|
||||
Encryption Encryption
|
||||
}
|
||||
|
||||
// CreateBucket creates a bucket from the passed opts
|
||||
func (a *Access) CreateBucket(ctx context.Context, bucket string, opts CreateBucketOptions) (storj.Bucket, error) {
|
||||
metainfo, _, err := a.Uplink.config.GetMetainfo(ctx, a.Uplink.id)
|
||||
if err != nil {
|
||||
return storj.Bucket{}, Error.Wrap(err)
|
||||
}
|
||||
|
||||
return metainfo.CreateBucket(ctx, bucket, &storj.Bucket{PathCipher: opts.Encryption.PathCipher})
|
||||
}
|
||||
|
||||
// DeleteBucket deletes a bucket if authorized
|
||||
func (a *Access) DeleteBucket(ctx context.Context, bucket string) error {
|
||||
metainfo, _, err := a.Uplink.config.GetMetainfo(ctx, a.Uplink.id)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
return metainfo.DeleteBucket(ctx, bucket)
|
||||
}
|
||||
|
||||
// ListBuckets will list authorized buckets
|
||||
func (a *Access) ListBuckets(ctx context.Context, opts storj.BucketListOptions) (storj.BucketList, error) {
|
||||
metainfo, _, err := a.Uplink.config.GetMetainfo(ctx, a.Uplink.id)
|
||||
if err != nil {
|
||||
return storj.BucketList{}, Error.Wrap(err)
|
||||
}
|
||||
|
||||
return metainfo.ListBuckets(ctx, opts)
|
||||
}
|
||||
|
||||
// GetBucketInfo returns info about the requested bucket if authorized
|
||||
func (a *Access) GetBucketInfo(ctx context.Context, bucket string) (storj.Bucket, error) {
|
||||
metainfo, _, err := a.Uplink.config.GetMetainfo(ctx, a.Uplink.id)
|
||||
if err != nil {
|
||||
return storj.Bucket{}, Error.Wrap(err)
|
||||
}
|
||||
|
||||
return metainfo.GetBucket(ctx, bucket)
|
||||
}
|
||||
|
||||
// GetBucket returns a Bucket with the given Encryption information
|
||||
func (a *Access) GetBucket(ctx context.Context, bucket string, encryption *Encryption) *Bucket {
|
||||
return &Bucket{
|
||||
Access: a,
|
||||
Enc: encryption,
|
||||
Bucket: storj.Bucket{
|
||||
Name: bucket,
|
||||
PathCipher: encryption.PathCipher,
|
||||
},
|
||||
}
|
||||
}
|
132
lib/uplink/buckets.go
Normal file
132
lib/uplink/buckets.go
Normal file
@ -0,0 +1,132 @@
|
||||
// Copyright (C) 2018 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package uplink
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/pkg/stream"
|
||||
"storj.io/storj/pkg/utils"
|
||||
)
|
||||
|
||||
// Encryption holds the cipher, path, key, and enc. scheme for each bucket since they
|
||||
// can be different for each
|
||||
type Encryption struct {
|
||||
PathCipher storj.Cipher
|
||||
EncPathPrefix storj.Path
|
||||
Key storj.Key
|
||||
EncryptionScheme storj.EncryptionScheme
|
||||
}
|
||||
|
||||
// Bucket is a struct that allows operations on a Bucket after a user providers Permissions
|
||||
type Bucket struct {
|
||||
Access *Access
|
||||
Enc *Encryption
|
||||
Bucket storj.Bucket
|
||||
}
|
||||
|
||||
// GetObject returns the info for a given object
|
||||
func (b *Bucket) GetObject(ctx context.Context, path storj.Path) (storj.Object, error) {
|
||||
metainfo, _, err := b.Access.Uplink.config.GetMetainfo(ctx, b.Access.Uplink.id)
|
||||
if err != nil {
|
||||
return storj.Object{}, Error.Wrap(err)
|
||||
}
|
||||
|
||||
return metainfo.GetObject(ctx, b.Bucket.Name, path)
|
||||
}
|
||||
|
||||
// List returns a list of objects in a bucket
|
||||
func (b *Bucket) List(ctx context.Context, cfg ListObjectsConfig) (items storj.ObjectList, err error) {
|
||||
metainfo, _, err := b.Access.Uplink.config.GetMetainfo(ctx, b.Access.Uplink.id)
|
||||
if err != nil {
|
||||
return storj.ObjectList{}, Error.Wrap(err)
|
||||
}
|
||||
|
||||
listOpts := storj.ListOptions{
|
||||
Prefix: cfg.Prefix,
|
||||
Cursor: cfg.Cursor,
|
||||
Recursive: cfg.Recursive,
|
||||
Direction: cfg.Direction,
|
||||
Limit: cfg.Limit,
|
||||
}
|
||||
|
||||
return metainfo.ListObjects(ctx, b.Bucket.Name, listOpts)
|
||||
}
|
||||
|
||||
// Upload puts an object in a bucket
|
||||
func (b *Bucket) Upload(ctx context.Context, path storj.Path, data []byte, opts UploadOpts) error {
|
||||
metainfo, streams, err := b.Access.Uplink.config.GetMetainfo(ctx, b.Access.Uplink.id)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
encScheme := b.Access.Uplink.config.GetEncryptionScheme()
|
||||
redScheme := b.Access.Uplink.config.GetRedundancyScheme()
|
||||
contentType := http.DetectContentType(data)
|
||||
|
||||
create := storj.CreateObject{
|
||||
RedundancyScheme: redScheme,
|
||||
EncryptionScheme: encScheme,
|
||||
ContentType: contentType,
|
||||
}
|
||||
|
||||
obj, err := metainfo.CreateObject(ctx, b.Bucket.Name, path, &create)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
reader := bytes.NewReader(data)
|
||||
mutableStream, err := obj.CreateStream(ctx)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
upload := stream.NewUpload(ctx, mutableStream, streams)
|
||||
|
||||
_, err = io.Copy(upload, reader)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
return utils.CombineErrors(err, upload.Close())
|
||||
}
|
||||
|
||||
// Download downloads an object from a bucket
|
||||
func (b *Bucket) Download(ctx context.Context, path storj.Path) ([]byte, error) {
|
||||
metainfo, streams, err := b.Access.Uplink.config.GetMetainfo(ctx, b.Access.Uplink.id)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
readStream, err := metainfo.GetObjectStream(ctx, b.Bucket.Name, path)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
stream := stream.NewDownload(ctx, readStream, streams)
|
||||
|
||||
defer func() { err = utils.CombineErrors(err, stream.Close()) }()
|
||||
|
||||
data, err := ioutil.ReadAll(stream)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// Delete removes an object from a bucket and returns an error if there was an issue
|
||||
func (b *Bucket) Delete(ctx context.Context, path storj.Path) error {
|
||||
metainfo, _, err := b.Access.Uplink.config.GetMetainfo(ctx, b.Access.Uplink.id)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
return metainfo.DeleteObject(ctx, b.Bucket.Name, path)
|
||||
}
|
9
lib/uplink/common.go
Normal file
9
lib/uplink/common.go
Normal file
@ -0,0 +1,9 @@
|
||||
// Copyright (C) 2018 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package uplink
|
||||
|
||||
// APIKey is an interface for authenticating with the Satellite
|
||||
type APIKey interface {
|
||||
Serialize() ([]byte, error)
|
||||
}
|
131
lib/uplink/libuplink_test.go
Normal file
131
lib/uplink/libuplink_test.go
Normal file
@ -0,0 +1,131 @@
|
||||
// Copyright (C) 2018 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package uplink
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/spf13/pflag"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"storj.io/storj/internal/testcontext"
|
||||
"storj.io/storj/internal/testplanet"
|
||||
"storj.io/storj/pkg/cfgstruct"
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/satellite"
|
||||
ul "storj.io/storj/uplink"
|
||||
)
|
||||
|
||||
func TestUplink(t *testing.T) {
|
||||
// Planet Config for Uplink
|
||||
testplanetConfig := testplanet.Config{
|
||||
SatelliteCount: 1,
|
||||
StorageNodeCount: 20,
|
||||
UplinkCount: 1,
|
||||
}
|
||||
|
||||
// Run Tests
|
||||
testplanet.Run(t, testplanetConfig, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
satellite := planet.Satellites[0]
|
||||
identity, err := identity.NewFullIdentity(ctx, 12, 4)
|
||||
assert.NoError(t, err)
|
||||
satelliteAddr := satellite.Addr() // get address
|
||||
cfg := getConfig(satellite, planet)
|
||||
uplink := NewUplink(identity, satelliteAddr, cfg)
|
||||
|
||||
permissions := Permissions{}
|
||||
access := uplink.Access(ctx, permissions)
|
||||
assert.NoError(t, err)
|
||||
|
||||
opts := CreateBucketOptions{}
|
||||
bucket, err := access.CreateBucket(ctx, "testbucket", opts)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, bucket)
|
||||
|
||||
bucketListOptions := storj.BucketListOptions{
|
||||
Limit: 1000,
|
||||
Direction: storj.ListDirection(1),
|
||||
}
|
||||
buckets, err := access.ListBuckets(ctx, bucketListOptions)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, buckets)
|
||||
|
||||
storjBucket, err := access.GetBucketInfo(ctx, "testbucket")
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, storjBucket)
|
||||
assert.Equal(t, storjBucket.Name, "testbucket")
|
||||
assert.IsType(t, storj.Bucket{}, storjBucket)
|
||||
|
||||
encOpts := &Encryption{}
|
||||
getbucket := access.GetBucket(ctx, "testbucket", encOpts)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, getbucket)
|
||||
|
||||
err = access.DeleteBucket(ctx, "testbucket")
|
||||
assert.NoError(t, err)
|
||||
|
||||
uploadtest, err := access.CreateBucket(ctx, "uploadtest", opts)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, uploadtest)
|
||||
assert.Equal(t, uploadtest.Name, "uploadtest")
|
||||
|
||||
uploadBucket := access.GetBucket(ctx, "uploadtest", encOpts)
|
||||
assert.NotNil(t, uploadBucket)
|
||||
|
||||
list, err := uploadBucket.List(ctx, ListObjectsConfig{
|
||||
Direction: storj.ListDirection(1),
|
||||
Limit: 100,
|
||||
})
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, list)
|
||||
assert.Equal(t, len(list.Items), 0)
|
||||
|
||||
testdata := []byte{1, 1, 1, 1, 1}
|
||||
uploadOpts := UploadOpts{}
|
||||
|
||||
err = uploadBucket.Upload(ctx, "testpath", testdata, uploadOpts)
|
||||
assert.NoError(t, err)
|
||||
|
||||
downloadedData, err := uploadBucket.Download(ctx, "testpath")
|
||||
assert.NotNil(t, downloadedData)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, testdata, downloadedData)
|
||||
|
||||
list2, err := uploadBucket.List(ctx, ListObjectsConfig{
|
||||
Direction: storj.ListDirection(1),
|
||||
Limit: 100,
|
||||
})
|
||||
|
||||
assert.NotNil(t, list2)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, list2.Items)
|
||||
assert.Equal(t, len(list2.Items), 1)
|
||||
})
|
||||
}
|
||||
|
||||
func getConfig(satellite *satellite.Peer, planet *testplanet.Planet) ul.Config {
|
||||
config := getDefaultConfig()
|
||||
config.Client.OverlayAddr = satellite.Addr()
|
||||
config.Client.PointerDBAddr = satellite.Addr()
|
||||
config.Client.APIKey = planet.Uplinks[0].APIKey[satellite.ID()]
|
||||
|
||||
config.RS.MinThreshold = 1 * len(planet.StorageNodes) / 5
|
||||
config.RS.RepairThreshold = 2 * len(planet.StorageNodes) / 5
|
||||
config.RS.SuccessThreshold = 3 * len(planet.StorageNodes) / 5
|
||||
config.RS.MaxThreshold = 4 * len(planet.StorageNodes) / 5
|
||||
|
||||
config.TLS.UsePeerCAWhitelist = false
|
||||
config.TLS.Extensions.Revocation = false
|
||||
config.TLS.Extensions.WhitelistSignedLeaf = false
|
||||
|
||||
return config
|
||||
}
|
||||
|
||||
func getDefaultConfig() ul.Config {
|
||||
cfg := ul.Config{}
|
||||
cfgstruct.Bind(&pflag.FlagSet{}, &cfg, true)
|
||||
return cfg
|
||||
}
|
37
lib/uplink/main.go
Normal file
37
lib/uplink/main.go
Normal file
@ -0,0 +1,37 @@
|
||||
// Copyright (C) 2018 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package uplink
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"storj.io/storj/pkg/identity"
|
||||
ul "storj.io/storj/uplink"
|
||||
)
|
||||
|
||||
// Uplink represents the main entrypoint to Storj V3. An Uplink connects to
|
||||
// a specific Satellite and caches connections and resources, allowing one to
|
||||
// create sessions delineated by specific access controls.
|
||||
type Uplink struct {
|
||||
id *identity.FullIdentity
|
||||
satelliteAddr string
|
||||
config ul.Config
|
||||
}
|
||||
|
||||
// Access returns a pointer to an Access for bucket operations to occur on
|
||||
func (u *Uplink) Access(ctx context.Context, permissions Permissions) *Access {
|
||||
// TODO (dylan): Parse permissions here
|
||||
return &Access{
|
||||
Uplink: u,
|
||||
}
|
||||
}
|
||||
|
||||
// NewUplink returns a pointer to a new Uplink or an error
|
||||
func NewUplink(identity *identity.FullIdentity, satelliteAddr string, cfg ul.Config) *Uplink {
|
||||
return &Uplink{
|
||||
id: identity,
|
||||
satelliteAddr: satelliteAddr,
|
||||
config: cfg,
|
||||
}
|
||||
}
|
84
lib/uplink/objects.go
Normal file
84
lib/uplink/objects.go
Normal file
@ -0,0 +1,84 @@
|
||||
// Copyright (C) 2018 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package uplink
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"storj.io/storj/pkg/storj"
|
||||
)
|
||||
|
||||
// Object holds the information for a given object
|
||||
type Object struct {
|
||||
Meta ObjectMeta
|
||||
}
|
||||
|
||||
// ObjectMeta represents metadata about a specific Object
|
||||
type ObjectMeta struct {
|
||||
Bucket string
|
||||
Path storj.Path
|
||||
IsPrefix bool
|
||||
|
||||
Metadata map[string]string
|
||||
|
||||
Created time.Time
|
||||
Modified time.Time
|
||||
Expires time.Time
|
||||
|
||||
Size int64
|
||||
Checksum []byte
|
||||
}
|
||||
|
||||
// UploadOpts controls options about uploading a new Object, if authorized.
|
||||
type UploadOpts struct {
|
||||
Metadata map[string]string
|
||||
Expires time.Time
|
||||
|
||||
Encryption *Encryption
|
||||
}
|
||||
|
||||
// ListObjectsField numbers the fields of list objects
|
||||
type ListObjectsField int
|
||||
|
||||
const (
|
||||
// ListObjectsMetaNone opts
|
||||
ListObjectsMetaNone ListObjectsField = 0
|
||||
// ListObjectsMetaModified opts
|
||||
ListObjectsMetaModified ListObjectsField = 1 << iota
|
||||
// ListObjectsMetaExpiration opts
|
||||
ListObjectsMetaExpiration ListObjectsField = 1 << iota
|
||||
// ListObjectsMetaSize opts
|
||||
ListObjectsMetaSize ListObjectsField = 1 << iota
|
||||
// ListObjectsMetaChecksum opts
|
||||
ListObjectsMetaChecksum ListObjectsField = 1 << iota
|
||||
// ListObjectsMetaUserDefined opts
|
||||
ListObjectsMetaUserDefined ListObjectsField = 1 << iota
|
||||
// ListObjectsMetaAll opts
|
||||
ListObjectsMetaAll ListObjectsField = 1 << iota
|
||||
)
|
||||
|
||||
// ListObjectsConfig holds params for listing objects with the Gateway
|
||||
type ListObjectsConfig struct {
|
||||
// this differs from storj.ListOptions by removing the Delimiter field
|
||||
// (ours is hardcoded as "/"), and adding the Fields field to optionally
|
||||
// support efficient listing that doesn't require looking outside of the
|
||||
// path index in pointerdb.
|
||||
|
||||
Prefix storj.Path
|
||||
Cursor storj.Path
|
||||
Recursive bool
|
||||
Direction storj.ListDirection
|
||||
Limit int
|
||||
Fields ListObjectsFields
|
||||
}
|
||||
|
||||
// ListObjectsFields is an interface that I haven't figured out yet
|
||||
type ListObjectsFields interface{}
|
||||
|
||||
// Range returns an objects data
|
||||
func (o *Object) Range(ctx context.Context, offset, length int64) (io.ReadCloser, error) {
|
||||
panic("TODO")
|
||||
}
|
13
lib/uplink/uplink.go
Normal file
13
lib/uplink/uplink.go
Normal file
@ -0,0 +1,13 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package uplink
|
||||
|
||||
import (
|
||||
"github.com/zeebo/errs"
|
||||
)
|
||||
|
||||
var (
|
||||
// Error is the errs class of standard End User Client errors
|
||||
Error = errs.Class("libuplink error")
|
||||
)
|
Loading…
Reference in New Issue
Block a user