storj/private/testplanet/uplink.go
Michal Niewrzal d7b5df70d3 cmd/uplink: remove unused flag
New API has limited number of options to configure at the moment. We
should remove unused flags from Uplink CLI and add if needed in the
future.

Change-Id: Icf3f3dadd43cb61a3b408b02d0762aef34425dbf
2020-03-09 13:44:46 +00:00

466 lines
14 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information
package testplanet
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"strconv"
"time"
"github.com/skyrings/skyring-common/tools/uuid"
"github.com/spf13/pflag"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/identity"
"storj.io/common/macaroon"
"storj.io/common/pb"
"storj.io/common/peertls/tlsopts"
"storj.io/common/rpc"
"storj.io/common/storj"
libuplink "storj.io/storj/lib/uplink"
"storj.io/storj/pkg/cfgstruct"
"storj.io/storj/satellite/console"
"storj.io/uplink/private/metainfo"
"storj.io/uplink/private/piecestore"
)
// Uplink is a general purpose
type Uplink struct {
Log *zap.Logger
Info pb.Node
Identity *identity.FullIdentity
Dialer rpc.Dialer
StorageNodeCount int
APIKey map[storj.NodeID]*macaroon.APIKey
ProjectID map[storj.NodeID]uuid.UUID
}
// newUplinks creates initializes uplinks, requires peer to have at least one satellite
func (planet *Planet) newUplinks(prefix string, count, storageNodeCount int) ([]*Uplink, error) {
var xs []*Uplink
for i := 0; i < count; i++ {
uplink, err := planet.newUplink(prefix+strconv.Itoa(i), storageNodeCount)
if err != nil {
return nil, err
}
xs = append(xs, uplink)
}
return xs, nil
}
// newUplink creates a new uplink
func (planet *Planet) newUplink(name string, storageNodeCount int) (*Uplink, error) {
identity, err := planet.NewIdentity()
if err != nil {
return nil, err
}
tlsOptions, err := tlsopts.NewOptions(identity, tlsopts.Config{
PeerIDVersions: strconv.Itoa(int(planet.config.IdentityVersion.Number)),
}, nil)
if err != nil {
return nil, err
}
uplink := &Uplink{
Log: planet.log.Named(name),
Identity: identity,
StorageNodeCount: storageNodeCount,
APIKey: map[storj.NodeID]*macaroon.APIKey{},
ProjectID: map[storj.NodeID]uuid.UUID{},
}
uplink.Log.Debug("id=" + identity.ID.String())
uplink.Dialer = rpc.NewDefaultDialer(tlsOptions)
uplink.Info = pb.Node{
Id: uplink.Identity.ID,
Address: &pb.NodeAddress{
Transport: pb.NodeTransport_TCP_TLS_GRPC,
Address: "",
},
}
for j, satellite := range planet.Satellites {
// TODO: find a nicer way to do this
// populate satellites console with example
// project and API key and pass that to uplinks
consoleDB := satellite.DB.Console()
projectName := fmt.Sprintf("%s_%d", name, j)
key, err := macaroon.NewAPIKey([]byte("testSecret"))
if err != nil {
return nil, err
}
project, err := consoleDB.Projects().Insert(
context.Background(),
&console.Project{
Name: projectName,
},
)
if err != nil {
return nil, err
}
_, err = consoleDB.APIKeys().Create(
context.Background(),
key.Head(),
console.APIKeyInfo{
Name: "root",
ProjectID: project.ID,
Secret: []byte("testSecret"),
},
)
if err != nil {
return nil, err
}
uplink.APIKey[satellite.ID()] = key
uplink.ProjectID[satellite.ID()] = project.ID
}
planet.uplinks = append(planet.uplinks, uplink)
return uplink, nil
}
// ID returns uplink id
func (client *Uplink) ID() storj.NodeID { return client.Info.Id }
// Addr returns uplink address
func (client *Uplink) Addr() string { return client.Info.Address.Address }
// Local returns uplink info
func (client *Uplink) Local() pb.Node { return client.Info }
// Shutdown shuts down all uplink dependencies
func (client *Uplink) Shutdown() error { return nil }
// DialMetainfo dials destination with apikey and returns metainfo Client
func (client *Uplink) DialMetainfo(ctx context.Context, destination Peer, apikey *macaroon.APIKey) (*metainfo.Client, error) {
return metainfo.Dial(ctx, client.Dialer, destination.Addr(), apikey, "Test/1.0")
}
// DialPiecestore dials destination storagenode and returns a piecestore client.
func (client *Uplink) DialPiecestore(ctx context.Context, destination Peer) (*piecestore.Client, error) {
node := destination.Local()
return piecestore.Dial(ctx, client.Dialer, &node.Node, client.Log.Named("uplink>piecestore"), piecestore.DefaultConfig)
}
// Upload data to specific satellite
func (client *Uplink) Upload(ctx context.Context, satellite *SatelliteSystem, bucket string, path storj.Path, data []byte) error {
return client.UploadWithExpiration(ctx, satellite, bucket, path, data, time.Time{})
}
// UploadWithExpiration data to specific satellite and expiration time
func (client *Uplink) UploadWithExpiration(ctx context.Context, satellite *SatelliteSystem, bucketName string, path storj.Path, data []byte, expiration time.Time) error {
config := client.GetConfig(satellite)
project, bucket, err := client.GetProjectAndBucket(ctx, satellite, bucketName, config)
if err != nil {
return err
}
defer func() { err = errs.Combine(err, bucket.Close(), project.Close()) }()
opts := &libuplink.UploadOptions{}
opts.Expires = expiration
opts.Volatile.RedundancyScheme = config.GetRedundancyScheme()
opts.Volatile.EncryptionParameters = config.GetEncryptionParameters()
reader := bytes.NewReader(data)
if err := bucket.UploadObject(ctx, path, reader, opts); err != nil {
return err
}
return nil
}
// UploadWithClientConfig uploads data to specific satellite with custom client configuration
func (client *Uplink) UploadWithClientConfig(ctx context.Context, satellite *SatelliteSystem, clientConfig UplinkConfig, bucketName string, path storj.Path, data []byte) (err error) {
project, bucket, err := client.GetProjectAndBucket(ctx, satellite, bucketName, clientConfig)
if err != nil {
return err
}
defer func() { err = errs.Combine(err, bucket.Close(), project.Close()) }()
opts := &libuplink.UploadOptions{}
opts.Volatile.RedundancyScheme = clientConfig.GetRedundancyScheme()
opts.Volatile.EncryptionParameters = clientConfig.GetEncryptionParameters()
reader := bytes.NewReader(data)
if err := bucket.UploadObject(ctx, path, reader, opts); err != nil {
return err
}
return nil
}
// Download data from specific satellite
func (client *Uplink) Download(ctx context.Context, satellite *SatelliteSystem, bucketName string, path storj.Path) ([]byte, error) {
project, bucket, err := client.GetProjectAndBucket(ctx, satellite, bucketName, client.GetConfig(satellite))
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, project.Close(), bucket.Close()) }()
object, err := bucket.OpenObject(ctx, path)
if err != nil {
return nil, err
}
rc, err := object.DownloadRange(ctx, 0, object.Meta.Size)
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, rc.Close()) }()
data, err := ioutil.ReadAll(rc)
if err != nil {
return []byte{}, err
}
return data, nil
}
// DownloadStream returns stream for downloading data
func (client *Uplink) DownloadStream(ctx context.Context, satellite *SatelliteSystem, bucketName string, path storj.Path) (_ io.ReadCloser, cleanup func() error, err error) {
project, bucket, err := client.GetProjectAndBucket(ctx, satellite, bucketName, client.GetConfig(satellite))
if err != nil {
return nil, nil, err
}
cleanup = func() error {
err = errs.Combine(err,
project.Close(),
bucket.Close(),
)
return err
}
downloader, err := bucket.Download(ctx, path)
return downloader, cleanup, err
}
// DownloadStreamRange returns stream for downloading data
func (client *Uplink) DownloadStreamRange(ctx context.Context, satellite *SatelliteSystem, bucketName string, path storj.Path, start, limit int64) (_ io.ReadCloser, cleanup func() error, err error) {
project, bucket, err := client.GetProjectAndBucket(ctx, satellite, bucketName, client.GetConfig(satellite))
if err != nil {
return nil, nil, err
}
cleanup = func() error {
err = errs.Combine(err,
project.Close(),
bucket.Close(),
)
return err
}
downloader, err := bucket.DownloadRange(ctx, path, start, limit)
return downloader, cleanup, err
}
// DeleteObject deletes an object at the path in a bucket
func (client *Uplink) DeleteObject(ctx context.Context, satellite *SatelliteSystem, bucketName string, path storj.Path) error {
project, bucket, err := client.GetProjectAndBucket(ctx, satellite, bucketName, client.GetConfig(satellite))
if err != nil {
return err
}
defer func() { err = errs.Combine(err, project.Close(), bucket.Close()) }()
err = bucket.DeleteObject(ctx, path)
if err != nil {
return err
}
return nil
}
// CreateBucket creates a new bucket
func (client *Uplink) CreateBucket(ctx context.Context, satellite *SatelliteSystem, bucketName string) error {
project, err := client.GetProject(ctx, satellite)
if err != nil {
return err
}
defer func() { err = errs.Combine(err, project.Close()) }()
clientCfg := client.GetConfig(satellite)
bucketCfg := &libuplink.BucketConfig{}
bucketCfg.PathCipher = clientCfg.GetPathCipherSuite()
bucketCfg.EncryptionParameters = clientCfg.GetEncryptionParameters()
bucketCfg.Volatile.RedundancyScheme = clientCfg.GetRedundancyScheme()
bucketCfg.Volatile.SegmentsSize = clientCfg.GetSegmentSize()
_, err = project.CreateBucket(ctx, bucketName, bucketCfg)
if err != nil {
return err
}
return nil
}
// DeleteBucket deletes a bucket.
func (client *Uplink) DeleteBucket(ctx context.Context, satellite *SatelliteSystem, bucketName string) error {
project, err := client.GetProject(ctx, satellite)
if err != nil {
return err
}
defer func() { err = errs.Combine(err, project.Close()) }()
err = project.DeleteBucket(ctx, bucketName)
if err != nil {
return err
}
return nil
}
// GetConfig returns a default config for a given satellite.
func (client *Uplink) GetConfig(satellite *SatelliteSystem) UplinkConfig {
config := getDefaultConfig()
// client.APIKey[satellite.ID()] is a *macaroon.APIKey, but we want a
// *libuplink.APIKey, so, serialize and parse for now
apiKey, err := libuplink.ParseAPIKey(client.APIKey[satellite.ID()].Serialize())
if err != nil {
panic(err)
}
encAccess := libuplink.NewEncryptionAccess()
encAccess.SetDefaultKey(storj.Key{})
encAccess.SetDefaultPathCipher(storj.EncAESGCM)
accessData, err := (&libuplink.Scope{
SatelliteAddr: satellite.URL().String(),
APIKey: apiKey,
EncryptionAccess: encAccess,
}).Serialize()
if err != nil {
panic(err)
}
config.Access = accessData
// Support some legacy stuff
config.Legacy.Client.APIKey = apiKey.Serialize()
config.Legacy.Client.SatelliteAddr = satellite.Addr()
config.Client.DialTimeout = 10 * time.Second
config.RS.MinThreshold = atLeastOne(client.StorageNodeCount * 1 / 5) // 20% of storage nodes
config.RS.RepairThreshold = atLeastOne(client.StorageNodeCount * 2 / 5) // 40% of storage nodes
config.RS.SuccessThreshold = atLeastOne(client.StorageNodeCount * 3 / 5) // 60% of storage nodes
config.RS.MaxThreshold = atLeastOne(client.StorageNodeCount * 4 / 5) // 80% of storage nodes
config.TLS.UsePeerCAWhitelist = false
config.TLS.Extensions.Revocation = false
config.TLS.Extensions.WhitelistSignedLeaf = false
return config
}
func getDefaultConfig() UplinkConfig {
config := UplinkConfig{}
cfgstruct.Bind(&pflag.FlagSet{}, &config, cfgstruct.UseDevDefaults())
return config
}
// atLeastOne returns 1 if value < 1, or value otherwise.
func atLeastOne(value int) int {
if value < 1 {
return 1
}
return value
}
// NewLibuplink creates a libuplink.Uplink object with the testplanet Uplink config
func (client *Uplink) NewLibuplink(ctx context.Context) (*libuplink.Uplink, error) {
config := getDefaultConfig()
libuplinkCfg := &libuplink.Config{}
libuplinkCfg.Volatile.Log = client.Log
libuplinkCfg.Volatile.MaxInlineSize = config.Client.MaxInlineSize
libuplinkCfg.Volatile.MaxMemory = config.RS.MaxBufferMem
libuplinkCfg.Volatile.PeerIDVersion = config.TLS.PeerIDVersions
libuplinkCfg.Volatile.TLS.SkipPeerCAWhitelist = !config.TLS.UsePeerCAWhitelist
libuplinkCfg.Volatile.TLS.PeerCAWhitelistPath = config.TLS.PeerCAWhitelistPath
libuplinkCfg.Volatile.DialTimeout = config.Client.DialTimeout
return libuplink.NewUplink(ctx, libuplinkCfg)
}
// GetProject returns a libuplink.Project which allows interactions with a specific project
func (client *Uplink) GetProject(ctx context.Context, satellite *SatelliteSystem) (*libuplink.Project, error) {
testLibuplink, err := client.NewLibuplink(ctx)
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, testLibuplink.Close()) }()
access, err := client.GetConfig(satellite).GetAccess()
if err != nil {
return nil, err
}
project, err := testLibuplink.OpenProject(ctx, access.SatelliteAddr, access.APIKey)
if err != nil {
return nil, err
}
return project, nil
}
// GetProjectAndBucket returns a libuplink.Project and Bucket which allows interactions with a specific project and its buckets
func (client *Uplink) GetProjectAndBucket(ctx context.Context, satellite *SatelliteSystem, bucketName string, clientCfg UplinkConfig) (_ *libuplink.Project, _ *libuplink.Bucket, err error) {
project, err := client.GetProject(ctx, satellite)
if err != nil {
return nil, nil, err
}
defer func() {
if err != nil {
err = errs.Combine(err, project.Close())
}
}()
access, err := client.GetConfig(satellite).GetAccess()
if err != nil {
return nil, nil, err
}
// Check if the bucket exists, if not then create it
_, _, err = project.GetBucketInfo(ctx, bucketName)
if err != nil {
if storj.ErrBucketNotFound.Has(err) {
err := createBucket(ctx, clientCfg, *project, bucketName)
if err != nil {
return nil, nil, err
}
} else {
return nil, nil, err
}
}
bucket, err := project.OpenBucket(ctx, bucketName, access.EncryptionAccess)
if err != nil {
return nil, nil, err
}
return project, bucket, nil
}
func createBucket(ctx context.Context, config UplinkConfig, project libuplink.Project, bucketName string) error {
bucketCfg := &libuplink.BucketConfig{}
bucketCfg.PathCipher = config.GetPathCipherSuite()
bucketCfg.EncryptionParameters = config.GetEncryptionParameters()
bucketCfg.Volatile.RedundancyScheme = config.GetRedundancyScheme()
bucketCfg.Volatile.SegmentsSize = config.GetSegmentSize()
_, err := project.CreateBucket(ctx, bucketName, bucketCfg)
if err != nil {
return err
}
return nil
}