946ec201e2
What: we move api keys out of the grpc connection-level metadata on the client side and into the request protobufs directly. the server side still supports both mechanisms for backwards compatibility. Why: dRPC won't support connection-level metadata. the only thing we currently use connection-level metadata for is api keys. we need to move all information needed by a request into the request protobuf itself for drpc support. check out the .proto changes for the main details. One fun side-fact: Did you know that protobuf fields 1-15 are special and only use one byte for both the field number and type? Additionally did you know we don't use field 15 anywhere yet? So the new request header will use field 15, and should use field 15 on all protobufs going forward. Please describe the tests: all existing tests should pass Please describe the performance impact: none
481 lines
15 KiB
Go
481 lines
15 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information
|
|
|
|
package testplanet
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/skyrings/skyring-common/tools/uuid"
|
|
"github.com/spf13/pflag"
|
|
"github.com/zeebo/errs"
|
|
"go.uber.org/zap"
|
|
|
|
libuplink "storj.io/storj/lib/uplink"
|
|
"storj.io/storj/pkg/cfgstruct"
|
|
"storj.io/storj/pkg/identity"
|
|
"storj.io/storj/pkg/macaroon"
|
|
"storj.io/storj/pkg/pb"
|
|
"storj.io/storj/pkg/peertls/tlsopts"
|
|
"storj.io/storj/pkg/storj"
|
|
"storj.io/storj/pkg/transport"
|
|
"storj.io/storj/satellite/console"
|
|
"storj.io/storj/uplink"
|
|
"storj.io/storj/uplink/metainfo"
|
|
"storj.io/storj/uplink/piecestore"
|
|
)
|
|
|
|
// Uplink is a general purpose
|
|
type Uplink struct {
|
|
Log *zap.Logger
|
|
Info pb.Node
|
|
Identity *identity.FullIdentity
|
|
Transport transport.Client
|
|
StorageNodeCount int
|
|
|
|
APIKey map[storj.NodeID]*macaroon.APIKey
|
|
ProjectID map[storj.NodeID]uuid.UUID
|
|
}
|
|
|
|
// newUplinks creates initializes uplinks, requires peer to have at least one satellite
|
|
func (planet *Planet) newUplinks(prefix string, count, storageNodeCount int) ([]*Uplink, error) {
|
|
var xs []*Uplink
|
|
for i := 0; i < count; i++ {
|
|
uplink, err := planet.newUplink(prefix+strconv.Itoa(i), storageNodeCount)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
xs = append(xs, uplink)
|
|
}
|
|
|
|
return xs, nil
|
|
}
|
|
|
|
// newUplink creates a new uplink
|
|
func (planet *Planet) newUplink(name string, storageNodeCount int) (*Uplink, error) {
|
|
identity, err := planet.NewIdentity()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
tlsOpts, err := tlsopts.NewOptions(identity, tlsopts.Config{
|
|
PeerIDVersions: strconv.Itoa(int(planet.config.IdentityVersion.Number)),
|
|
}, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
uplink := &Uplink{
|
|
Log: planet.log.Named(name),
|
|
Identity: identity,
|
|
StorageNodeCount: storageNodeCount,
|
|
APIKey: map[storj.NodeID]*macaroon.APIKey{},
|
|
ProjectID: map[storj.NodeID]uuid.UUID{},
|
|
}
|
|
|
|
uplink.Log.Debug("id=" + identity.ID.String())
|
|
|
|
uplink.Transport = transport.NewClient(tlsOpts)
|
|
|
|
uplink.Info = pb.Node{
|
|
Id: uplink.Identity.ID,
|
|
Address: &pb.NodeAddress{
|
|
Transport: pb.NodeTransport_TCP_TLS_GRPC,
|
|
Address: "",
|
|
},
|
|
}
|
|
|
|
for j, satellite := range planet.Satellites {
|
|
// TODO: find a nicer way to do this
|
|
// populate satellites console with example
|
|
// project and API key and pass that to uplinks
|
|
consoleDB := satellite.DB.Console()
|
|
|
|
projectName := fmt.Sprintf("%s_%d", name, j)
|
|
key, err := macaroon.NewAPIKey([]byte("testSecret"))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
project, err := consoleDB.Projects().Insert(
|
|
context.Background(),
|
|
&console.Project{
|
|
Name: projectName,
|
|
},
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
_, err = consoleDB.APIKeys().Create(
|
|
context.Background(),
|
|
key.Head(),
|
|
console.APIKeyInfo{
|
|
Name: "root",
|
|
ProjectID: project.ID,
|
|
Secret: []byte("testSecret"),
|
|
},
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
uplink.APIKey[satellite.ID()] = key
|
|
uplink.ProjectID[satellite.ID()] = project.ID
|
|
}
|
|
|
|
planet.uplinks = append(planet.uplinks, uplink)
|
|
|
|
return uplink, nil
|
|
}
|
|
|
|
// ID returns uplink id
|
|
func (client *Uplink) ID() storj.NodeID { return client.Info.Id }
|
|
|
|
// Addr returns uplink address
|
|
func (client *Uplink) Addr() string { return client.Info.Address.Address }
|
|
|
|
// Local returns uplink info
|
|
func (client *Uplink) Local() pb.Node { return client.Info }
|
|
|
|
// Shutdown shuts down all uplink dependencies
|
|
func (client *Uplink) Shutdown() error { return nil }
|
|
|
|
// DialMetainfo dials destination with apikey and returns metainfo Client
|
|
func (client *Uplink) DialMetainfo(ctx context.Context, destination Peer, apikey *macaroon.APIKey) (*metainfo.Client, error) {
|
|
return metainfo.Dial(ctx, client.Transport, destination.Addr(), apikey)
|
|
}
|
|
|
|
// DialPiecestore dials destination storagenode and returns a piecestore client.
|
|
func (client *Uplink) DialPiecestore(ctx context.Context, destination Peer) (*piecestore.Client, error) {
|
|
node := destination.Local()
|
|
return piecestore.Dial(ctx, client.Transport, &node.Node, client.Log.Named("uplink>piecestore"), piecestore.DefaultConfig)
|
|
}
|
|
|
|
// Upload data to specific satellite
|
|
func (client *Uplink) Upload(ctx context.Context, satellite *SatelliteSystem, bucket string, path storj.Path, data []byte) error {
|
|
return client.UploadWithExpiration(ctx, satellite, bucket, path, data, time.Time{})
|
|
}
|
|
|
|
// UploadWithExpiration data to specific satellite and expiration time
|
|
func (client *Uplink) UploadWithExpiration(ctx context.Context, satellite *SatelliteSystem, bucket string, path storj.Path, data []byte, expiration time.Time) error {
|
|
return client.UploadWithExpirationAndConfig(ctx, satellite, nil, bucket, path, data, expiration)
|
|
}
|
|
|
|
// UploadWithConfig uploads data to specific satellite with configured values
|
|
func (client *Uplink) UploadWithConfig(ctx context.Context, satellite *SatelliteSystem, redundancy *uplink.RSConfig, bucket string, path storj.Path, data []byte) error {
|
|
return client.UploadWithExpirationAndConfig(ctx, satellite, redundancy, bucket, path, data, time.Time{})
|
|
}
|
|
|
|
// UploadWithExpirationAndConfig uploads data to specific satellite with configured values and expiration time
|
|
func (client *Uplink) UploadWithExpirationAndConfig(ctx context.Context, satellite *SatelliteSystem, redundancy *uplink.RSConfig, bucketName string, path storj.Path, data []byte, expiration time.Time) (err error) {
|
|
config := client.GetConfig(satellite)
|
|
if redundancy != nil {
|
|
if redundancy.MinThreshold > 0 {
|
|
config.RS.MinThreshold = redundancy.MinThreshold
|
|
}
|
|
if redundancy.RepairThreshold > 0 {
|
|
config.RS.RepairThreshold = redundancy.RepairThreshold
|
|
}
|
|
if redundancy.SuccessThreshold > 0 {
|
|
config.RS.SuccessThreshold = redundancy.SuccessThreshold
|
|
}
|
|
if redundancy.MaxThreshold > 0 {
|
|
config.RS.MaxThreshold = redundancy.MaxThreshold
|
|
}
|
|
if redundancy.ErasureShareSize > 0 {
|
|
config.RS.ErasureShareSize = redundancy.ErasureShareSize
|
|
}
|
|
}
|
|
|
|
project, bucket, err := client.GetProjectAndBucket(ctx, satellite, bucketName, config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() { err = errs.Combine(err, bucket.Close(), project.Close()) }()
|
|
|
|
opts := &libuplink.UploadOptions{}
|
|
opts.Expires = expiration
|
|
opts.Volatile.RedundancyScheme = config.GetRedundancyScheme()
|
|
opts.Volatile.EncryptionParameters = config.GetEncryptionParameters()
|
|
|
|
reader := bytes.NewReader(data)
|
|
if err := bucket.UploadObject(ctx, path, reader, opts); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// UploadWithClientConfig uploads data to specific satellite with custom client configuration
|
|
func (client *Uplink) UploadWithClientConfig(ctx context.Context, satellite *SatelliteSystem, clientConfig uplink.Config, bucketName string, path storj.Path, data []byte) (err error) {
|
|
project, bucket, err := client.GetProjectAndBucket(ctx, satellite, bucketName, clientConfig)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() { err = errs.Combine(err, bucket.Close(), project.Close()) }()
|
|
|
|
opts := &libuplink.UploadOptions{}
|
|
opts.Volatile.RedundancyScheme = clientConfig.GetRedundancyScheme()
|
|
opts.Volatile.EncryptionParameters = clientConfig.GetEncryptionParameters()
|
|
|
|
reader := bytes.NewReader(data)
|
|
if err := bucket.UploadObject(ctx, path, reader, opts); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Download data from specific satellite
|
|
func (client *Uplink) Download(ctx context.Context, satellite *SatelliteSystem, bucketName string, path storj.Path) ([]byte, error) {
|
|
project, bucket, err := client.GetProjectAndBucket(ctx, satellite, bucketName, client.GetConfig(satellite))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer func() { err = errs.Combine(err, project.Close(), bucket.Close()) }()
|
|
|
|
object, err := bucket.OpenObject(ctx, path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
rc, err := object.DownloadRange(ctx, 0, object.Meta.Size)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer func() { err = errs.Combine(err, rc.Close()) }()
|
|
|
|
data, err := ioutil.ReadAll(rc)
|
|
if err != nil {
|
|
return []byte{}, err
|
|
}
|
|
return data, nil
|
|
}
|
|
|
|
// DownloadStream returns stream for downloading data
|
|
func (client *Uplink) DownloadStream(ctx context.Context, satellite *SatelliteSystem, bucketName string, path storj.Path) (_ io.ReadCloser, cleanup func() error, err error) {
|
|
project, bucket, err := client.GetProjectAndBucket(ctx, satellite, bucketName, client.GetConfig(satellite))
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
cleanup = func() error {
|
|
err = errs.Combine(err,
|
|
project.Close(),
|
|
bucket.Close(),
|
|
)
|
|
return err
|
|
}
|
|
|
|
downloader, err := bucket.Download(ctx, path)
|
|
return downloader, cleanup, err
|
|
}
|
|
|
|
// DownloadStreamRange returns stream for downloading data
|
|
func (client *Uplink) DownloadStreamRange(ctx context.Context, satellite *SatelliteSystem, bucketName string, path storj.Path, start, limit int64) (_ io.ReadCloser, cleanup func() error, err error) {
|
|
project, bucket, err := client.GetProjectAndBucket(ctx, satellite, bucketName, client.GetConfig(satellite))
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
cleanup = func() error {
|
|
err = errs.Combine(err,
|
|
project.Close(),
|
|
bucket.Close(),
|
|
)
|
|
return err
|
|
}
|
|
|
|
downloader, err := bucket.DownloadRange(ctx, path, start, limit)
|
|
return downloader, cleanup, err
|
|
}
|
|
|
|
// Delete deletes an object at the path in a bucket
|
|
func (client *Uplink) Delete(ctx context.Context, satellite *SatelliteSystem, bucketName string, path storj.Path) error {
|
|
project, bucket, err := client.GetProjectAndBucket(ctx, satellite, bucketName, client.GetConfig(satellite))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() { err = errs.Combine(err, project.Close(), bucket.Close()) }()
|
|
|
|
err = bucket.DeleteObject(ctx, path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// CreateBucket creates a new bucket
|
|
func (client *Uplink) CreateBucket(ctx context.Context, satellite *SatelliteSystem, bucketName string) error {
|
|
project, err := client.GetProject(ctx, satellite)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() { err = errs.Combine(err, project.Close()) }()
|
|
|
|
clientCfg := client.GetConfig(satellite)
|
|
bucketCfg := &libuplink.BucketConfig{}
|
|
bucketCfg.PathCipher = clientCfg.GetPathCipherSuite()
|
|
bucketCfg.EncryptionParameters = clientCfg.GetEncryptionParameters()
|
|
bucketCfg.Volatile.RedundancyScheme = clientCfg.GetRedundancyScheme()
|
|
bucketCfg.Volatile.SegmentsSize = clientCfg.GetSegmentSize()
|
|
|
|
_, err = project.CreateBucket(ctx, bucketName, bucketCfg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetConfig returns a default config for a given satellite.
|
|
func (client *Uplink) GetConfig(satellite *SatelliteSystem) uplink.Config {
|
|
config := getDefaultConfig()
|
|
|
|
// client.APIKey[satellite.ID()] is a *macaroon.APIKey, but we want a
|
|
// *libuplink.APIKey, so, serialize and parse for now
|
|
apiKey, err := libuplink.ParseAPIKey(client.APIKey[satellite.ID()].Serialize())
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
encAccess := libuplink.NewEncryptionAccess()
|
|
encAccess.SetDefaultKey(storj.Key{})
|
|
|
|
scopeData, err := (&libuplink.Scope{
|
|
SatelliteAddr: satellite.Addr(),
|
|
APIKey: apiKey,
|
|
EncryptionAccess: encAccess,
|
|
}).Serialize()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
config.Scope = scopeData
|
|
|
|
// Support some legacy stuff
|
|
config.Legacy.Client.APIKey = apiKey.Serialize()
|
|
config.Legacy.Client.SatelliteAddr = satellite.Addr()
|
|
|
|
config.Client.RequestTimeout = 10 * time.Second
|
|
config.Client.DialTimeout = 10 * time.Second
|
|
|
|
config.RS.MinThreshold = atLeastOne(client.StorageNodeCount * 1 / 5) // 20% of storage nodes
|
|
config.RS.RepairThreshold = atLeastOne(client.StorageNodeCount * 2 / 5) // 40% of storage nodes
|
|
config.RS.SuccessThreshold = atLeastOne(client.StorageNodeCount * 3 / 5) // 60% of storage nodes
|
|
config.RS.MaxThreshold = atLeastOne(client.StorageNodeCount * 4 / 5) // 80% of storage nodes
|
|
|
|
config.TLS.UsePeerCAWhitelist = false
|
|
config.TLS.Extensions.Revocation = false
|
|
config.TLS.Extensions.WhitelistSignedLeaf = false
|
|
|
|
return config
|
|
}
|
|
|
|
func getDefaultConfig() uplink.Config {
|
|
config := uplink.Config{}
|
|
cfgstruct.Bind(&pflag.FlagSet{}, &config, cfgstruct.UseDevDefaults())
|
|
return config
|
|
}
|
|
|
|
// atLeastOne returns 1 if value < 1, or value otherwise.
|
|
func atLeastOne(value int) int {
|
|
if value < 1 {
|
|
return 1
|
|
}
|
|
return value
|
|
}
|
|
|
|
// NewLibuplink creates a libuplink.Uplink object with the testplanet Uplink config
|
|
func (client *Uplink) NewLibuplink(ctx context.Context) (*libuplink.Uplink, error) {
|
|
config := getDefaultConfig()
|
|
libuplinkCfg := &libuplink.Config{}
|
|
libuplinkCfg.Volatile.Log = client.Log
|
|
libuplinkCfg.Volatile.MaxInlineSize = config.Client.MaxInlineSize
|
|
libuplinkCfg.Volatile.MaxMemory = config.RS.MaxBufferMem
|
|
libuplinkCfg.Volatile.PeerIDVersion = config.TLS.PeerIDVersions
|
|
libuplinkCfg.Volatile.TLS.SkipPeerCAWhitelist = !config.TLS.UsePeerCAWhitelist
|
|
libuplinkCfg.Volatile.TLS.PeerCAWhitelistPath = config.TLS.PeerCAWhitelistPath
|
|
libuplinkCfg.Volatile.DialTimeout = config.Client.DialTimeout
|
|
libuplinkCfg.Volatile.RequestTimeout = config.Client.RequestTimeout
|
|
|
|
return libuplink.NewUplink(ctx, libuplinkCfg)
|
|
}
|
|
|
|
// GetProject returns a libuplink.Project which allows interactions with a specific project
|
|
func (client *Uplink) GetProject(ctx context.Context, satellite *SatelliteSystem) (*libuplink.Project, error) {
|
|
testLibuplink, err := client.NewLibuplink(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer func() { err = errs.Combine(err, testLibuplink.Close()) }()
|
|
|
|
scope, err := client.GetConfig(satellite).GetScope()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
project, err := testLibuplink.OpenProject(ctx, scope.SatelliteAddr, scope.APIKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return project, nil
|
|
}
|
|
|
|
// GetProjectAndBucket returns a libuplink.Project and Bucket which allows interactions with a specific project and its buckets
|
|
func (client *Uplink) GetProjectAndBucket(ctx context.Context, satellite *SatelliteSystem, bucketName string, clientCfg uplink.Config) (_ *libuplink.Project, _ *libuplink.Bucket, err error) {
|
|
project, err := client.GetProject(ctx, satellite)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
defer func() {
|
|
if err != nil {
|
|
err = errs.Combine(err, project.Close())
|
|
}
|
|
}()
|
|
|
|
scope, err := client.GetConfig(satellite).GetScope()
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Check if the bucket exists, if not then create it
|
|
_, _, err = project.GetBucketInfo(ctx, bucketName)
|
|
if err != nil {
|
|
if storj.ErrBucketNotFound.Has(err) {
|
|
err := createBucket(ctx, clientCfg, *project, bucketName)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
} else {
|
|
return nil, nil, err
|
|
}
|
|
}
|
|
|
|
bucket, err := project.OpenBucket(ctx, bucketName, scope.EncryptionAccess)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
return project, bucket, nil
|
|
}
|
|
|
|
func createBucket(ctx context.Context, config uplink.Config, project libuplink.Project, bucketName string) error {
|
|
bucketCfg := &libuplink.BucketConfig{}
|
|
bucketCfg.PathCipher = config.GetPathCipherSuite()
|
|
bucketCfg.EncryptionParameters = config.GetEncryptionParameters()
|
|
bucketCfg.Volatile.RedundancyScheme = config.GetRedundancyScheme()
|
|
bucketCfg.Volatile.SegmentsSize = config.GetSegmentSize()
|
|
|
|
_, err := project.CreateBucket(ctx, bucketName, bucketCfg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|