libuplink changes for public usage (#1568)

Co-authored-by: paul cannon <thepaul@users.noreply.github.com>
Co-authored-by: Kaloyan Raev <kaloyan-raev@users.noreply.github.com>
This commit is contained in:
JT Olio 2019-04-03 02:46:21 -06:00 committed by Kaloyan Raev
parent 34a439a99c
commit 9af4f26d43
18 changed files with 785 additions and 495 deletions

View File

@ -1,91 +0,0 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package uplink
import (
"context"
"storj.io/storj/pkg/storj"
)
// Access holds a reference to Uplink and a set of permissions for actions on a bucket.
type Access struct {
Permissions Permissions
Uplink *Uplink
}
// A Macaroon represents an access credential to certain resources
type Macaroon interface {
Serialize() ([]byte, error)
Restrict(caveats ...Caveat) Macaroon
}
// Permissions are parsed by Uplink and return an Access struct
type Permissions struct {
Macaroon Macaroon
APIKey string
}
// Caveat could be a read-only restriction, a time-bound
// restriction, a bucket-specific restriction, a path-prefix restriction, a
// full path restriction, etc.
type Caveat interface {
}
// CreateBucketOptions holds the bucket opts
type CreateBucketOptions struct {
Encryption Encryption
}
// CreateBucket creates a bucket from the passed opts
func (a *Access) CreateBucket(ctx context.Context, bucket string, opts CreateBucketOptions) (storj.Bucket, error) {
metainfo, _, err := a.Uplink.config.GetMetainfo(ctx, a.Uplink.id)
if err != nil {
return storj.Bucket{}, Error.Wrap(err)
}
return metainfo.CreateBucket(ctx, bucket, &storj.Bucket{PathCipher: opts.Encryption.PathCipher})
}
// DeleteBucket deletes a bucket if authorized
func (a *Access) DeleteBucket(ctx context.Context, bucket string) error {
metainfo, _, err := a.Uplink.config.GetMetainfo(ctx, a.Uplink.id)
if err != nil {
return Error.Wrap(err)
}
return metainfo.DeleteBucket(ctx, bucket)
}
// ListBuckets will list authorized buckets
func (a *Access) ListBuckets(ctx context.Context, opts storj.BucketListOptions) (storj.BucketList, error) {
metainfo, _, err := a.Uplink.config.GetMetainfo(ctx, a.Uplink.id)
if err != nil {
return storj.BucketList{}, Error.Wrap(err)
}
return metainfo.ListBuckets(ctx, opts)
}
// GetBucketInfo returns info about the requested bucket if authorized
func (a *Access) GetBucketInfo(ctx context.Context, bucket string) (storj.Bucket, error) {
metainfo, _, err := a.Uplink.config.GetMetainfo(ctx, a.Uplink.id)
if err != nil {
return storj.Bucket{}, Error.Wrap(err)
}
return metainfo.GetBucket(ctx, bucket)
}
// GetBucket returns a Bucket with the given Encryption information
func (a *Access) GetBucket(ctx context.Context, bucket string, encryption *Encryption) *Bucket {
return &Bucket{
Access: a,
Enc: encryption,
Bucket: storj.Bucket{
Name: bucket,
PathCipher: encryption.PathCipher,
},
}
}

19
lib/uplink/apikey.go Normal file
View File

@ -0,0 +1,19 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package uplink
// APIKey represents an access credential to certain resources
type APIKey struct {
key string
}
// Serialize serializes the API Key to a string
func (a APIKey) Serialize() string {
return a.key
}
// ParseAPIKey parses an API Key
func ParseAPIKey(val string) (APIKey, error) {
return APIKey{key: val}, nil
}

147
lib/uplink/bucket.go Normal file
View File

@ -0,0 +1,147 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package uplink
import (
"context"
"io"
"time"
"github.com/zeebo/errs"
"storj.io/storj/pkg/metainfo/kvmetainfo"
"storj.io/storj/pkg/storage/streams"
"storj.io/storj/pkg/storj"
"storj.io/storj/pkg/stream"
)
// Bucket represents operations you can perform on a bucket
type Bucket struct {
storj.Bucket
Config BucketConfig
metainfo *kvmetainfo.DB
streams streams.Store
pathCipher storj.Cipher
}
// OpenObject returns an Object handle, if authorized.
func (b *Bucket) OpenObject(ctx context.Context, path storj.Path) (o *Object, err error) {
defer mon.Task()(&ctx)(&err)
info, err := b.metainfo.GetObject(ctx, b.Name, path)
if err != nil {
return nil, err
}
return &Object{
Meta: ObjectMeta{
Bucket: info.Bucket.Name,
Path: info.Path,
IsPrefix: info.IsPrefix,
ContentType: info.ContentType,
Metadata: info.Metadata,
Created: info.Created,
Modified: info.Modified,
Expires: info.Expires,
Size: info.Size,
Checksum: info.Checksum,
Volatile: struct {
EncryptionParameters storj.EncryptionParameters
RedundancyScheme storj.RedundancyScheme
}{
EncryptionParameters: info.ToEncryptionParameters(),
RedundancyScheme: info.RedundancyScheme,
},
},
metainfo: b.metainfo,
streams: b.streams,
}, nil
}
// UploadOptions controls options about uploading a new Object, if authorized.
type UploadOptions struct {
// ContentType, if set, gives a MIME content-type for the Object.
ContentType string
// Metadata contains additional information about an Object. It can
// hold arbitrary textual fields and can be retrieved together with the
// Object. Field names can be at most 1024 bytes long. Field values are
// not individually limited in size, but the total of all metadata
// (fields and values) can not exceed 4 kiB.
Metadata map[string]string
// Expires is the time at which the new Object can expire (be deleted
// automatically from storage nodes).
Expires time.Time
// Volatile groups config values that are likely to change semantics
// or go away entirely between releases. Be careful when using them!
Volatile struct {
// EncryptionParameters determines the cipher suite to use for
// the Object's data encryption. If not set, the Bucket's
// defaults will be used.
EncryptionParameters storj.EncryptionParameters
// RedundancyScheme determines the Reed-Solomon and/or Forward
// Error Correction encoding parameters to be used for this
// Object.
RedundancyScheme storj.RedundancyScheme
}
}
// UploadObject uploads a new object, if authorized.
func (b *Bucket) UploadObject(ctx context.Context, path storj.Path, data io.Reader, opts *UploadOptions) (err error) {
defer mon.Task()(&ctx)(&err)
if opts == nil {
opts = &UploadOptions{}
}
createInfo := storj.CreateObject{
ContentType: opts.ContentType,
Metadata: opts.Metadata,
Expires: opts.Expires,
RedundancyScheme: opts.Volatile.RedundancyScheme,
EncryptionScheme: opts.Volatile.EncryptionParameters.ToEncryptionScheme(),
}
obj, err := b.metainfo.CreateObject(ctx, b.Name, path, &createInfo)
if err != nil {
return err
}
mutableStream, err := obj.CreateStream(ctx)
if err != nil {
return err
}
upload := stream.NewUpload(ctx, mutableStream, b.streams)
_, err = io.Copy(upload, data)
return errs.Combine(err, upload.Close())
}
// DeleteObject removes an object, if authorized.
func (b *Bucket) DeleteObject(ctx context.Context, path storj.Path) (err error) {
defer mon.Task()(&ctx)(&err)
return b.metainfo.DeleteObject(ctx, b.Bucket.Name, path)
}
// ListOptions controls options for the ListObjects() call.
type ListOptions = storj.ListOptions
// ListObjects lists objects a user is authorized to see.
// TODO(paul): should probably have a ListOptions defined in this package, for consistency's sake
func (b *Bucket) ListObjects(ctx context.Context, cfg *ListOptions) (list storj.ObjectList, err error) {
defer mon.Task()(&ctx)(&err)
if cfg == nil {
cfg = &storj.ListOptions{}
}
return b.metainfo.ListObjects(ctx, b.Bucket.Name, *cfg)
}
// Close closes the Bucket session.
func (b *Bucket) Close() error {
return nil
}

View File

@ -1,133 +0,0 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package uplink
import (
"bytes"
"context"
"io"
"io/ioutil"
"net/http"
"github.com/zeebo/errs"
"storj.io/storj/pkg/storj"
"storj.io/storj/pkg/stream"
)
// Encryption holds the cipher, path, key, and enc. scheme for each bucket since they
// can be different for each
type Encryption struct {
PathCipher storj.Cipher
EncPathPrefix storj.Path
Key storj.Key
EncryptionScheme storj.EncryptionScheme
}
// Bucket is a struct that allows operations on a Bucket after a user providers Permissions
type Bucket struct {
Access *Access
Enc *Encryption
Bucket storj.Bucket
}
// GetObject returns the info for a given object
func (b *Bucket) GetObject(ctx context.Context, path storj.Path) (storj.Object, error) {
metainfo, _, err := b.Access.Uplink.config.GetMetainfo(ctx, b.Access.Uplink.id)
if err != nil {
return storj.Object{}, Error.Wrap(err)
}
return metainfo.GetObject(ctx, b.Bucket.Name, path)
}
// List returns a list of objects in a bucket
func (b *Bucket) List(ctx context.Context, cfg ListObjectsConfig) (items storj.ObjectList, err error) {
metainfo, _, err := b.Access.Uplink.config.GetMetainfo(ctx, b.Access.Uplink.id)
if err != nil {
return storj.ObjectList{}, Error.Wrap(err)
}
listOpts := storj.ListOptions{
Prefix: cfg.Prefix,
Cursor: cfg.Cursor,
Recursive: cfg.Recursive,
Direction: cfg.Direction,
Limit: cfg.Limit,
}
return metainfo.ListObjects(ctx, b.Bucket.Name, listOpts)
}
// Upload puts an object in a bucket
func (b *Bucket) Upload(ctx context.Context, path storj.Path, data []byte, opts UploadOpts) error {
metainfo, streams, err := b.Access.Uplink.config.GetMetainfo(ctx, b.Access.Uplink.id)
if err != nil {
return Error.Wrap(err)
}
encScheme := b.Access.Uplink.config.GetEncryptionScheme()
redScheme := b.Access.Uplink.config.GetRedundancyScheme()
contentType := http.DetectContentType(data)
create := storj.CreateObject{
RedundancyScheme: redScheme,
EncryptionScheme: encScheme,
ContentType: contentType,
}
obj, err := metainfo.CreateObject(ctx, b.Bucket.Name, path, &create)
if err != nil {
return Error.Wrap(err)
}
reader := bytes.NewReader(data)
mutableStream, err := obj.CreateStream(ctx)
if err != nil {
return Error.Wrap(err)
}
upload := stream.NewUpload(ctx, mutableStream, streams)
_, err = io.Copy(upload, reader)
if err != nil {
return Error.Wrap(err)
}
return errs.Combine(err, upload.Close())
}
// Download downloads an object from a bucket
func (b *Bucket) Download(ctx context.Context, path storj.Path) ([]byte, error) {
metainfo, streams, err := b.Access.Uplink.config.GetMetainfo(ctx, b.Access.Uplink.id)
if err != nil {
return nil, Error.Wrap(err)
}
readStream, err := metainfo.GetObjectStream(ctx, b.Bucket.Name, path)
if err != nil {
return nil, Error.Wrap(err)
}
stream := stream.NewDownload(ctx, readStream, streams)
defer func() { err = errs.Combine(err, stream.Close()) }()
data, err := ioutil.ReadAll(stream)
if err != nil {
return nil, Error.Wrap(err)
}
return data, nil
}
// Delete removes an object from a bucket and returns an error if there was an issue
func (b *Bucket) Delete(ctx context.Context, path storj.Path) error {
metainfo, _, err := b.Access.Uplink.config.GetMetainfo(ctx, b.Access.Uplink.id)
if err != nil {
return Error.Wrap(err)
}
return metainfo.DeleteObject(ctx, b.Bucket.Name, path)
}

View File

@ -1,9 +1,16 @@
// Copyright (C) 2018 Storj Labs, Inc.
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package uplink
// APIKey is an interface for authenticating with the Satellite
type APIKey interface {
Serialize() ([]byte, error)
}
import (
"github.com/zeebo/errs"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
)
var (
mon = monkit.Package()
// Error is the toplevel class of errors for the uplink library.
Error = errs.Class("libuplink")
)

5
lib/uplink/doc.go Normal file
View File

@ -0,0 +1,5 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
// Package uplink is the main entrypoint to the Storj V3 network.
package uplink

23
lib/uplink/encryption.go Normal file
View File

@ -0,0 +1,23 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package uplink
import (
"storj.io/storj/pkg/storj"
)
const (
defaultCipher = storj.EncAESGCM
)
// EncryptionAccess specifies the encryption details needed to encrypt or
// decrypt objects.
type EncryptionAccess struct {
// Key is the base encryption key to be used for decrypting objects.
Key storj.Key
// EncryptedPathPrefix is the (possibly empty) encrypted version of the
// path from the top of the storage Bucket to this point. This is
// necessary to have in order to derive further encryption keys.
EncryptedPathPrefix storj.Path
}

View File

@ -1,121 +0,0 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package uplink
import (
"testing"
"github.com/spf13/pflag"
"github.com/stretchr/testify/assert"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testplanet"
"storj.io/storj/pkg/cfgstruct"
"storj.io/storj/pkg/storj"
"storj.io/storj/satellite"
ul "storj.io/storj/uplink"
)
func TestUplink(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
satelliteAddr := satellite.Addr() // get address
cfg := getConfig(satellite, planet)
uplink := NewUplink(planet.Uplinks[0].Identity, satelliteAddr, cfg)
permissions := Permissions{}
access := uplink.Access(ctx, permissions)
opts := CreateBucketOptions{}
bucket, err := access.CreateBucket(ctx, "testbucket", opts)
assert.NoError(t, err)
assert.NotNil(t, bucket)
bucketListOptions := storj.BucketListOptions{
Limit: 1000,
Direction: storj.ListDirection(1),
}
buckets, err := access.ListBuckets(ctx, bucketListOptions)
assert.NoError(t, err)
assert.NotNil(t, buckets)
storjBucket, err := access.GetBucketInfo(ctx, "testbucket")
assert.NoError(t, err)
assert.NotNil(t, storjBucket)
assert.Equal(t, storjBucket.Name, "testbucket")
encOpts := &Encryption{}
getbucket := access.GetBucket(ctx, "testbucket", encOpts)
assert.NoError(t, err)
assert.NotNil(t, getbucket)
err = access.DeleteBucket(ctx, "testbucket")
assert.NoError(t, err)
uploadtest, err := access.CreateBucket(ctx, "uploadtest", opts)
assert.NoError(t, err)
assert.NotNil(t, uploadtest)
assert.Equal(t, uploadtest.Name, "uploadtest")
uploadBucket := access.GetBucket(ctx, "uploadtest", encOpts)
assert.NotNil(t, uploadBucket)
list, err := uploadBucket.List(ctx, ListObjectsConfig{
Direction: storj.ListDirection(1),
Limit: 100,
})
assert.NoError(t, err)
assert.NotNil(t, list)
assert.Equal(t, len(list.Items), 0)
testdata := []byte{1, 1, 1, 1, 1}
uploadOpts := UploadOpts{}
err = uploadBucket.Upload(ctx, "testpath", testdata, uploadOpts)
assert.NoError(t, err)
downloadedData, err := uploadBucket.Download(ctx, "testpath")
assert.NotNil(t, downloadedData)
assert.NoError(t, err)
assert.Equal(t, testdata, downloadedData)
list2, err := uploadBucket.List(ctx, ListObjectsConfig{
Direction: storj.ListDirection(1),
Limit: 100,
})
assert.NotNil(t, list2)
assert.NoError(t, err)
assert.NotNil(t, list2.Items)
assert.Equal(t, len(list2.Items), 1)
})
}
func getConfig(satellite *satellite.Peer, planet *testplanet.Planet) ul.Config {
config := getDefaultConfig()
config.Client.SatelliteAddr = satellite.Addr()
config.Client.APIKey = planet.Uplinks[0].APIKey[satellite.ID()]
config.RS.MinThreshold = 1 * len(planet.StorageNodes) / 5
config.RS.RepairThreshold = 2 * len(planet.StorageNodes) / 5
config.RS.SuccessThreshold = 3 * len(planet.StorageNodes) / 5
config.RS.MaxThreshold = 4 * len(planet.StorageNodes) / 5
config.TLS.UsePeerCAWhitelist = false
config.TLS.Extensions.Revocation = false
config.TLS.Extensions.WhitelistSignedLeaf = false
return config
}
func getDefaultConfig() ul.Config {
cfg := ul.Config{}
cfgstruct.Bind(&pflag.FlagSet{}, &cfg, true)
return cfg
}

View File

@ -1,37 +0,0 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package uplink
import (
"context"
"storj.io/storj/pkg/identity"
ul "storj.io/storj/uplink"
)
// Uplink represents the main entrypoint to Storj V3. An Uplink connects to
// a specific Satellite and caches connections and resources, allowing one to
// create sessions delineated by specific access controls.
type Uplink struct {
id *identity.FullIdentity
satelliteAddr string
config ul.Config
}
// Access returns a pointer to an Access for bucket operations to occur on
func (u *Uplink) Access(ctx context.Context, permissions Permissions) *Access {
// TODO (dylan): Parse permissions here
return &Access{
Uplink: u,
}
}
// NewUplink returns a pointer to a new Uplink or an error
func NewUplink(identity *identity.FullIdentity, satelliteAddr string, cfg ul.Config) *Uplink {
return &Uplink{
id: identity,
satelliteAddr: satelliteAddr,
config: cfg,
}
}

101
lib/uplink/object.go Normal file
View File

@ -0,0 +1,101 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package uplink
import (
"context"
"io"
"time"
"storj.io/storj/internal/readcloser"
"storj.io/storj/pkg/metainfo/kvmetainfo"
"storj.io/storj/pkg/storage/streams"
"storj.io/storj/pkg/storj"
"storj.io/storj/pkg/stream"
)
// ObjectMeta contains metadata about a specific Object
type ObjectMeta struct {
// Bucket gives the name of the bucket in which an Object is placed.
Bucket string
// Path is the path of the Object within the Bucket. Path components are
// forward-slash-separated, like Unix file paths ("one/two/three").
Path storj.Path
// IsPrefix is true if this ObjectMeta does not refer to a specific
// Object, but to some arbitrary point in the path hierarchy. This would
// be called a "folder" or "directory" in a typical filesystem.
IsPrefix bool
// ContentType, if set, gives a MIME content-type for the Object, as
// set when the object was created.
ContentType string
// Metadata contains the additional information about an Object that was
// set when the object was created. See UploadOptions.Metadata for more
// information.
Metadata map[string]string
// Created is the time at which the Object was created.
Created time.Time
// Modified is the time at which the Object was last modified.
Modified time.Time
// Expires is the time at which the Object expires (after which it will
// be automatically deleted from storage nodes).
Expires time.Time
// Size gives the size of the Object in bytes.
Size int64
// Checksum gives a checksum of the contents of the Object.
Checksum []byte
// Volatile groups config values that are likely to change semantics
// or go away entirely between releases. Be careful when using them!
Volatile struct {
// EncryptionParameters gives the encryption parameters being
// used for the Object's data encryption.
EncryptionParameters storj.EncryptionParameters
// RedundancyScheme determines the Reed-Solomon and/or Forward
// Error Correction encoding parameters to be used for this
// Object.
RedundancyScheme storj.RedundancyScheme
}
}
// An Object is a sequence of bytes with associated metadata, stored in the
// Storj network (or being prepared for such storage). It belongs to a specific
// bucket, and has a path and a size. It is comparable to a "file" in a
// conventional filesystem.
type Object struct {
// Meta holds the metainfo associated with the Object.
Meta ObjectMeta
metainfo *kvmetainfo.DB
streams streams.Store
}
// DownloadRange returns an Object's data. A length of -1 will mean
// (Object.Size - offset).
func (o *Object) DownloadRange(ctx context.Context, offset, length int64) (io.ReadCloser, error) {
readOnlyStream, err := o.metainfo.GetObjectStream(ctx, o.Meta.Bucket, o.Meta.Path)
if err != nil {
return nil, err
}
download := stream.NewDownload(ctx, readOnlyStream, o.streams)
_, err = download.Seek(offset, io.SeekStart)
if err != nil {
return nil, err
}
if length == -1 {
return download, nil
}
return readcloser.LimitReadCloser(download, length), nil
}
// Close closes the Object.
func (o *Object) Close() error {
return nil
}

View File

@ -1,84 +0,0 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package uplink
import (
"context"
"io"
"time"
"storj.io/storj/pkg/storj"
)
// Object holds the information for a given object
type Object struct {
Meta ObjectMeta
}
// ObjectMeta represents metadata about a specific Object
type ObjectMeta struct {
Bucket string
Path storj.Path
IsPrefix bool
Metadata map[string]string
Created time.Time
Modified time.Time
Expires time.Time
Size int64
Checksum []byte
}
// UploadOpts controls options about uploading a new Object, if authorized.
type UploadOpts struct {
Metadata map[string]string
Expires time.Time
Encryption *Encryption
}
// ListObjectsField numbers the fields of list objects
type ListObjectsField int
const (
// ListObjectsMetaNone opts
ListObjectsMetaNone ListObjectsField = 0
// ListObjectsMetaModified opts
ListObjectsMetaModified ListObjectsField = 1 << iota
// ListObjectsMetaExpiration opts
ListObjectsMetaExpiration ListObjectsField = 1 << iota
// ListObjectsMetaSize opts
ListObjectsMetaSize ListObjectsField = 1 << iota
// ListObjectsMetaChecksum opts
ListObjectsMetaChecksum ListObjectsField = 1 << iota
// ListObjectsMetaUserDefined opts
ListObjectsMetaUserDefined ListObjectsField = 1 << iota
// ListObjectsMetaAll opts
ListObjectsMetaAll ListObjectsField = 1 << iota
)
// ListObjectsConfig holds params for listing objects with the Gateway
type ListObjectsConfig struct {
// this differs from storj.ListOptions by removing the Delimiter field
// (ours is hardcoded as "/"), and adding the Fields field to optionally
// support efficient listing that doesn't require looking outside of the
// path index in pointerdb.
Prefix storj.Path
Cursor storj.Path
Recursive bool
Direction storj.ListDirection
Limit int
Fields ListObjectsFields
}
// ListObjectsFields is an interface that I haven't figured out yet
type ListObjectsFields interface{}
// Range returns an objects data
func (o *Object) Range(ctx context.Context, offset, length int64) (io.ReadCloser, error) {
panic("TODO")
}

204
lib/uplink/project.go Normal file
View File

@ -0,0 +1,204 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package uplink
import (
"context"
"github.com/vivint/infectious"
"storj.io/storj/internal/memory"
"storj.io/storj/pkg/eestream"
"storj.io/storj/pkg/encryption"
"storj.io/storj/pkg/metainfo/kvmetainfo"
"storj.io/storj/pkg/storage/buckets"
ecclient "storj.io/storj/pkg/storage/ec"
"storj.io/storj/pkg/storage/segments"
"storj.io/storj/pkg/storage/streams"
"storj.io/storj/pkg/storj"
"storj.io/storj/pkg/transport"
"storj.io/storj/uplink/metainfo"
)
// Project represents a specific project access session.
type Project struct {
tc transport.Client
metainfo metainfo.Client
project *kvmetainfo.Project
maxInlineSize memory.Size
}
// BucketConfig holds information about a bucket's configuration. This is
// filled in by the caller for use with CreateBucket(), or filled in by the
// library as Bucket.Config when a bucket is returned from OpenBucket().
type BucketConfig struct {
// PathCipher indicates which cipher suite is to be used for path
// encryption within the new Bucket. If not set, AES-GCM encryption
// will be used.
PathCipher storj.CipherSuite
// EncryptionParameters specifies the default encryption parameters to
// be used for data encryption of new Objects in this bucket.
EncryptionParameters storj.EncryptionParameters
// Volatile groups config values that are likely to change semantics
// or go away entirely between releases. Be careful when using them!
Volatile struct {
// RedundancyScheme defines the default Reed-Solomon and/or
// Forward Error Correction encoding parameters to be used by
// objects in this Bucket.
RedundancyScheme storj.RedundancyScheme
// SegmentSize is the default segment size to use for new
// objects in this Bucket.
SegmentSize memory.Size
}
}
func (c *BucketConfig) setDefaults() {
if c.PathCipher == storj.EncUnspecified {
c.PathCipher = defaultCipher
}
if c.EncryptionParameters.CipherSuite == storj.EncUnspecified {
c.EncryptionParameters.CipherSuite = defaultCipher
}
if c.EncryptionParameters.BlockSize == 0 {
c.EncryptionParameters.BlockSize = (1 * memory.KiB).Int32()
}
if c.Volatile.RedundancyScheme.RequiredShares == 0 {
c.Volatile.RedundancyScheme.RequiredShares = 29
}
if c.Volatile.RedundancyScheme.RepairShares == 0 {
c.Volatile.RedundancyScheme.RepairShares = 35
}
if c.Volatile.RedundancyScheme.OptimalShares == 0 {
c.Volatile.RedundancyScheme.OptimalShares = 80
}
if c.Volatile.RedundancyScheme.TotalShares == 0 {
c.Volatile.RedundancyScheme.TotalShares = 95
}
if c.Volatile.RedundancyScheme.ShareSize == 0 {
c.Volatile.RedundancyScheme.ShareSize = (1 * memory.KiB).Int32()
}
if c.Volatile.SegmentSize.Int() == 0 {
c.Volatile.SegmentSize = 64 * memory.MiB
}
}
// CreateBucket creates a new bucket if authorized.
func (p *Project) CreateBucket(ctx context.Context, name string, cfg *BucketConfig) (b storj.Bucket, err error) {
defer mon.Task()(&ctx)(&err)
if cfg == nil {
cfg = &BucketConfig{}
}
cfg.setDefaults()
if cfg.Volatile.RedundancyScheme.ShareSize*int32(cfg.Volatile.RedundancyScheme.RequiredShares)%cfg.EncryptionParameters.BlockSize != 0 {
return b, Error.New("EncryptionParameters.BlockSize must be a multiple of RS ShareSize * RS RequiredShares")
}
pathCipher := cfg.PathCipher.ToCipher()
return p.project.CreateBucket(ctx, name, &storj.Bucket{PathCipher: pathCipher})
}
// DeleteBucket deletes a bucket if authorized. If the bucket contains any
// Objects at the time of deletion, they may be lost permanently.
func (p *Project) DeleteBucket(ctx context.Context, bucket string) (err error) {
defer mon.Task()(&ctx)(&err)
return p.project.DeleteBucket(ctx, bucket)
}
// BucketListOptions controls options to the ListBuckets() call.
type BucketListOptions = storj.BucketListOptions
// ListBuckets will list authorized buckets.
func (p *Project) ListBuckets(ctx context.Context, opts *BucketListOptions) (bl storj.BucketList, err error) {
defer mon.Task()(&ctx)(&err)
if opts == nil {
opts = &BucketListOptions{}
}
return p.project.ListBuckets(ctx, *opts)
}
// GetBucketInfo returns info about the requested bucket if authorized.
func (p *Project) GetBucketInfo(ctx context.Context, bucket string) (b storj.Bucket, bi *BucketConfig, err error) {
defer mon.Task()(&ctx)(&err)
b, err = p.project.GetBucket(ctx, bucket)
if err != nil {
return b, nil, err
}
// TODO(paul): fill in once info is plumbed
cfg := &BucketConfig{}
return b, cfg, nil
}
// OpenBucket returns a Bucket handle with the given EncryptionAccess
// information.
//
// maxMem is the default maximum amount of memory to be allocated for read
// buffers while performing decodes of objects in this Bucket. If set to a
// negative value, the system will use the smallest amount of memory it can. If
// set to zero, the library default amount of memory will be used.
func (p *Project) OpenBucket(ctx context.Context, bucket string, access *EncryptionAccess, maxMem memory.Size) (b *Bucket, err error) {
defer mon.Task()(&ctx)(&err)
bucketInfo, cfg, err := p.GetBucketInfo(ctx, bucket)
if err != nil {
return nil, err
}
if access == nil || access.Key == (storj.Key{}) {
return nil, Error.New("No encryption key chosen")
}
pathCipher := cfg.PathCipher.ToCipher()
if err != nil {
return nil, err
}
encryptionScheme := cfg.EncryptionParameters.ToEncryptionScheme()
if maxMem.Int() == 0 {
maxMem = 4 * memory.MiB
} else if maxMem.Int() < 0 {
maxMem = 0
}
ec := ecclient.NewClient(p.tc, maxMem.Int())
fc, err := infectious.NewFEC(int(cfg.Volatile.RedundancyScheme.RequiredShares), int(cfg.Volatile.RedundancyScheme.TotalShares))
if err != nil {
return nil, err
}
rs, err := eestream.NewRedundancyStrategy(
eestream.NewRSScheme(fc, int(cfg.Volatile.RedundancyScheme.ShareSize)),
int(cfg.Volatile.RedundancyScheme.RepairShares),
int(cfg.Volatile.RedundancyScheme.OptimalShares))
if err != nil {
return nil, err
}
maxEncryptedSegmentSize, err := encryption.CalcEncryptedSize(cfg.Volatile.SegmentSize.Int64(),
cfg.EncryptionParameters.ToEncryptionScheme())
if err != nil {
return nil, err
}
segments := segments.NewSegmentStore(p.metainfo, ec, rs, p.maxInlineSize.Int(), maxEncryptedSegmentSize)
key := new(storj.Key)
copy(key[:], access.Key[:])
streams, err := streams.NewStreamStore(segments, cfg.Volatile.SegmentSize.Int64(), key, int(encryptionScheme.BlockSize), encryptionScheme.Cipher)
if err != nil {
return nil, err
}
buckets := buckets.NewStore(streams)
return &Bucket{
Bucket: bucketInfo,
Config: *cfg,
metainfo: kvmetainfo.New(p.metainfo, buckets, streams, segments, key),
streams: streams,
pathCipher: pathCipher,
}, nil
}
// Close closes the Project.
func (p *Project) Close() error {
return nil
}

View File

@ -4,10 +4,131 @@
package uplink
import (
"github.com/zeebo/errs"
"context"
"storj.io/storj/internal/memory"
"storj.io/storj/pkg/eestream"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/metainfo/kvmetainfo"
"storj.io/storj/pkg/peertls/tlsopts"
"storj.io/storj/pkg/storage/buckets"
"storj.io/storj/pkg/storage/segments"
"storj.io/storj/pkg/storage/streams"
"storj.io/storj/pkg/storj"
"storj.io/storj/pkg/transport"
"storj.io/storj/uplink/metainfo"
)
var (
// Error is the errs class of standard End User Client errors
Error = errs.Class("libuplink error")
maxBucketMetaSize = 10 * memory.MiB
)
// Config represents configuration options for an Uplink
type Config struct {
// Volatile groups config values that are likely to change semantics
// or go away entirely between releases. Be careful when using them!
Volatile struct {
// TLS defines options that affect TLS negotiation for outbound
// connections initiated by this uplink.
TLS struct {
// SkipPeerCAWhitelist determines whether to require all
// remote hosts to have identity certificates signed by
// Certificate Authorities in the default whitelist. If
// set to true, the whitelist will be ignored.
SkipPeerCAWhitelist bool
// PeerCAWhitelistPath gives the path to a CA cert
// whitelist file. It is ignored if SkipPeerCAWhitelist
// is set. If empty, the internal default peer whitelist
// is used.
PeerCAWhitelistPath string
}
// UseIdentity specifies the identity to be used by the uplink.
// If nil, a new identity will be generated.
UseIdentity *identity.FullIdentity
// MaxInlineSize determines whether the uplink will attempt to
// store a new object in the satellite's pointerDB. Objects at
// or below this size will be marked for inline storage, and
// objects above this size will not. (The satellite may reject
// the inline storage and require remote storage, still.)
MaxInlineSize memory.Size
}
}
func (c *Config) setDefaults(ctx context.Context) error {
if c.Volatile.UseIdentity == nil {
var err error
c.Volatile.UseIdentity, err = identity.NewFullIdentity(ctx, 0, 1)
if err != nil {
return err
}
}
if c.Volatile.MaxInlineSize.Int() == 0 {
c.Volatile.MaxInlineSize = 4 * memory.KiB
}
return nil
}
// Uplink represents the main entrypoint to Storj V3. An Uplink connects to
// a specific Satellite and caches connections and resources, allowing one to
// create sessions delineated by specific access controls.
type Uplink struct {
tc transport.Client
cfg *Config
}
// NewUplink creates a new Uplink
func NewUplink(ctx context.Context, cfg *Config) (*Uplink, error) {
if cfg == nil {
cfg = &Config{}
}
if err := cfg.setDefaults(ctx); err != nil {
return nil, err
}
tlsConfig := tlsopts.Config{
UsePeerCAWhitelist: !cfg.Volatile.TLS.SkipPeerCAWhitelist,
PeerCAWhitelistPath: cfg.Volatile.TLS.PeerCAWhitelistPath,
}
tlsOpts, err := tlsopts.NewOptions(cfg.Volatile.UseIdentity, tlsConfig)
if err != nil {
return nil, err
}
tc := transport.NewClient(tlsOpts)
return &Uplink{
tc: tc,
cfg: cfg,
}, nil
}
// OpenProject returns a Project handle with the given APIKey
func (u *Uplink) OpenProject(ctx context.Context, satelliteAddr string, apiKey APIKey) (p *Project, err error) {
defer mon.Task()(&ctx)(&err)
metainfo, err := metainfo.NewClient(ctx, u.tc, satelliteAddr, apiKey.key)
if err != nil {
return nil, err
}
// TODO: we shouldn't need segment or stream stores to manage buckets
segments := segments.NewSegmentStore(metainfo, nil, eestream.RedundancyStrategy{}, maxBucketMetaSize.Int(), maxBucketMetaSize.Int64())
streams, err := streams.NewStreamStore(segments, maxBucketMetaSize.Int64(), nil, 0, storj.Unencrypted)
if err != nil {
return nil, err
}
return &Project{
tc: u.tc,
metainfo: metainfo,
project: kvmetainfo.NewProject(buckets.NewStore(streams)),
maxInlineSize: u.cfg.Volatile.MaxInlineSize,
}, nil
}
// Close closes the Uplink. This may not do anything at present, but should
// still be called to allow forward compatibility.
func (u *Uplink) Close() error {
return nil
}

View File

@ -11,50 +11,50 @@ import (
)
// CreateBucket creates a new bucket with the specified information
func (db *DB) CreateBucket(ctx context.Context, bucket string, info *storj.Bucket) (bucketInfo storj.Bucket, err error) {
func (db *Project) CreateBucket(ctx context.Context, bucketName string, info *storj.Bucket) (bucketInfo storj.Bucket, err error) {
defer mon.Task()(&ctx)(&err)
if bucket == "" {
if bucketName == "" {
return storj.Bucket{}, storj.ErrNoBucket.New("")
}
meta, err := db.buckets.Put(ctx, bucket, getPathCipher(info))
meta, err := db.buckets.Put(ctx, bucketName, getPathCipher(info))
if err != nil {
return storj.Bucket{}, err
}
return bucketFromMeta(bucket, meta), nil
return bucketFromMeta(bucketName, meta), nil
}
// DeleteBucket deletes bucket
func (db *DB) DeleteBucket(ctx context.Context, bucket string) (err error) {
func (db *Project) DeleteBucket(ctx context.Context, bucketName string) (err error) {
defer mon.Task()(&ctx)(&err)
if bucket == "" {
if bucketName == "" {
return storj.ErrNoBucket.New("")
}
return db.buckets.Delete(ctx, bucket)
return db.buckets.Delete(ctx, bucketName)
}
// GetBucket gets bucket information
func (db *DB) GetBucket(ctx context.Context, bucket string) (bucketInfo storj.Bucket, err error) {
func (db *Project) GetBucket(ctx context.Context, bucketName string) (bucketInfo storj.Bucket, err error) {
defer mon.Task()(&ctx)(&err)
if bucket == "" {
if bucketName == "" {
return storj.Bucket{}, storj.ErrNoBucket.New("")
}
meta, err := db.buckets.Get(ctx, bucket)
meta, err := db.buckets.Get(ctx, bucketName)
if err != nil {
return storj.Bucket{}, err
}
return bucketFromMeta(bucket, meta), nil
return bucketFromMeta(bucketName, meta), nil
}
// ListBuckets lists buckets
func (db *DB) ListBuckets(ctx context.Context, options storj.BucketListOptions) (list storj.BucketList, err error) {
func (db *Project) ListBuckets(ctx context.Context, options storj.BucketListOptions) (list storj.BucketList, err error) {
defer mon.Task()(&ctx)(&err)
var startAfter, endBefore string
@ -104,9 +104,9 @@ func getPathCipher(info *storj.Bucket) storj.Cipher {
return info.PathCipher
}
func bucketFromMeta(bucket string, meta buckets.Meta) storj.Bucket {
func bucketFromMeta(bucketName string, meta buckets.Meta) storj.Bucket {
return storj.Bucket{
Name: bucket,
Name: bucketName,
Created: meta.Created,
PathCipher: meta.PathEncryptionType,
SegmentsSize: meta.SegmentsSize,

View File

@ -26,8 +26,10 @@ var _ storj.Metainfo = (*DB)(nil)
// DB implements metainfo database
type DB struct {
*Project
metainfo metainfo.Client
buckets buckets.Store
streams streams.Store
segments segments.Store
@ -37,8 +39,8 @@ type DB struct {
// New creates a new metainfo database
func New(metainfo metainfo.Client, buckets buckets.Store, streams streams.Store, segments segments.Store, rootKey *storj.Key) *DB {
return &DB{
Project: NewProject(buckets),
metainfo: metainfo,
buckets: buckets,
streams: streams,
segments: segments,
rootKey: rootKey,

View File

@ -0,0 +1,16 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package kvmetainfo
import "storj.io/storj/pkg/storage/buckets"
// Project implements project management operations
type Project struct {
buckets buckets.Store
}
// NewProject constructs a *Project
func NewProject(buckets buckets.Store) *Project {
return &Project{buckets: buckets}
}

View File

@ -3,9 +3,19 @@
package storj
// EncryptionScheme is the scheme and parameters used for encryption
// EncryptionScheme is the scheme and parameters used for encryption.
// Use the similar EncryptionParameters struct instead, if possible.
type EncryptionScheme struct {
Cipher Cipher
// Cipher specifies the cipher suite to be used for encryption.
Cipher Cipher
// BlockSize determines the unit size at which encryption is performed.
// It is important to distinguish this from the block size used by the
// cipher suite (probably 128 bits). There is some small overhead for
// each encryption unit, so BlockSize should not be too small, but
// smaller sizes yield shorter first-byte latency and better seek times.
// Note that BlockSize itself is the size of data blocks _after_ they
// have been encrypted and the authentication overhead has been added.
// It is _not_ the size of the data blocks to _be_ encrypted.
BlockSize int32
}
@ -14,16 +24,107 @@ func (scheme EncryptionScheme) IsZero() bool {
return scheme == (EncryptionScheme{})
}
// ToEncryptionParameters transforms an EncryptionScheme object into the
// equivalent EncryptionParameters object.
func (scheme EncryptionScheme) ToEncryptionParameters() EncryptionParameters {
return EncryptionParameters{
CipherSuite: scheme.Cipher.ToCipherSuite(),
BlockSize: scheme.BlockSize,
}
}
// EncryptionParameters is the cipher suite and parameters used for encryption
// It is like EncryptionScheme, but uses the CipherSuite type instead of Cipher.
// EncryptionParameters is preferred for new uses.
type EncryptionParameters struct {
// CipherSuite specifies the cipher suite to be used for encryption.
CipherSuite CipherSuite
// BlockSize determines the unit size at which encryption is performed.
// It is important to distinguish this from the block size used by the
// cipher suite (probably 128 bits). There is some small overhead for
// each encryption unit, so BlockSize should not be too small, but
// smaller sizes yield shorter first-byte latency and better seek times.
// Note that BlockSize itself is the size of data blocks _after_ they
// have been encrypted and the authentication overhead has been added.
// It is _not_ the size of the data blocks to _be_ encrypted.
BlockSize int32
}
// IsZero returns true if no field in the struct is set to non-zero value
func (params EncryptionParameters) IsZero() bool {
return params == (EncryptionParameters{})
}
// ToEncryptionScheme transforms an EncryptionParameters object into the
// equivalent EncryptionScheme object.
func (params EncryptionParameters) ToEncryptionScheme() EncryptionScheme {
return EncryptionScheme{
Cipher: params.CipherSuite.ToCipher(),
BlockSize: params.BlockSize,
}
}
// Cipher specifies an encryption algorithm
type Cipher byte
// List of supported encryption algorithms
const (
// Unencrypted indicates no encryption or decryption is to be performed.
Unencrypted = Cipher(iota)
// AESGCM indicates use of AES128-GCM encryption.
AESGCM
// SecretBox indicates use of XSalsa20-Poly1305 encryption, as provided by
// the NaCl cryptography library under the name "Secretbox".
SecretBox
// Invalid indicates a Cipher value whose use is not valid. This may be
// used as a replacement for "unspecified" in a pinch, although it is not
// the zero value.
Invalid
)
// ToCipherSuite converts a Cipher value to a CipherSuite value.
func (c Cipher) ToCipherSuite() CipherSuite {
switch c {
case Unencrypted:
return EncNull
case AESGCM:
return EncAESGCM
case SecretBox:
return EncSecretBox
}
return EncUnspecified
}
// CipherSuite specifies one of the encryption suites supported by Storj
// libraries for encryption of in-network data.
type CipherSuite byte
const (
// EncUnspecified indicates no encryption suite has been selected.
EncUnspecified = CipherSuite(iota)
// EncNull indicates use of the NULL cipher; that is, no encryption is
// done. The ciphertext is equal to the plaintext.
EncNull
// EncAESGCM indicates use of AES128-GCM encryption.
EncAESGCM
// EncSecretBox indicates use of XSalsa20-Poly1305 encryption, as provided
// by the NaCl cryptography library under the name "Secretbox".
EncSecretBox
)
// ToCipher converts a CipherSuite value to a Cipher value.
func (cs CipherSuite) ToCipher() Cipher {
switch cs {
case EncNull:
return Unencrypted
case EncAESGCM:
return AESGCM
case EncSecretBox:
return SecretBox
}
return Invalid
}
// Constant definitions for key and nonce sizes
const (
KeySize = 32

View File

@ -5,14 +5,24 @@ package storj
// RedundancyScheme specifies the parameters and the algorithm for redundancy
type RedundancyScheme struct {
// Algorithm determines the algorithm to be used for redundancy.
Algorithm RedundancyAlgorithm
// ShareSize is the size to use for new redundancy shares.
ShareSize int32
// RequiredShares is the minimum number of shares required to recover a
// segment.
RequiredShares int16
RepairShares int16
OptimalShares int16
TotalShares int16
// RepairShares is the minimum number of safe shares that can remain
// before a repair is triggered.
RepairShares int16
// OptimalShares is the desired total number of shares for a segment.
OptimalShares int16
// TotalShares is the number of shares to encode. If it is larger than
// OptimalShares, slower uploads of the excess shares will be aborted in
// order to improve performance.
TotalShares int16
}
// IsZero returns true if no field in the struct is set to non-zero value