storj/uplink/ecclient/client.go
Jeff Wendling 098cbc9c67 all: use pkg/rpc instead of pkg/transport
all of the packages and tests work with both grpc and
drpc. we'll probably need to do some jenkins pipelines
to run the tests with drpc as well.

most of the changes are really due to a bit of cleanup
of the pkg/transport.Client api into an rpc.Dialer in
the spirit of a net.Dialer. now that we don't need
observers, we can pass around stateless configuration
to everything rather than stateful things that issue
observations. it also adds a DialAddressID for the
case where we don't have a pb.Node, but we do have an
address and want to assert some ID. this happened
pretty frequently, and now there's no more weird
contortions creating custom tls options, etc.

a lot of the other changes are being consistent/using
the abstractions in the rpc package to do rpc style
things like finding peer information, or checking
status codes.

Change-Id: Ief62875e21d80a21b3c56a5a37f45887679f9412
2019-09-25 15:37:06 -06:00

479 lines
13 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package ecclient
import (
"context"
"io"
"io/ioutil"
"sort"
"sync"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/internal/errs2"
"storj.io/storj/internal/sync2"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/ranger"
"storj.io/storj/pkg/rpc"
"storj.io/storj/pkg/storj"
"storj.io/storj/uplink/eestream"
"storj.io/storj/uplink/piecestore"
)
var mon = monkit.Package()
// Client defines an interface for storing erasure coded data to piece store nodes
type Client interface {
Put(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, rs eestream.RedundancyStrategy, data io.Reader, expiration time.Time) (successfulNodes []*pb.Node, successfulHashes []*pb.PieceHash, err error)
Get(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, es eestream.ErasureScheme, size int64) (ranger.Ranger, error)
Delete(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey) error
WithForceErrorDetection(force bool) Client
}
type dialPiecestoreFunc func(context.Context, *pb.Node) (*piecestore.Client, error)
type ecClient struct {
log *zap.Logger
dialer rpc.Dialer
memoryLimit int
forceErrorDetection bool
}
// NewClient from the given identity and max buffer memory
func NewClient(log *zap.Logger, dialer rpc.Dialer, memoryLimit int) Client {
return &ecClient{
log: log,
dialer: dialer,
memoryLimit: memoryLimit,
}
}
func (ec *ecClient) WithForceErrorDetection(force bool) Client {
ec.forceErrorDetection = force
return ec
}
func (ec *ecClient) dialPiecestore(ctx context.Context, n *pb.Node) (*piecestore.Client, error) {
logger := ec.log.Named(n.Id.String())
return piecestore.Dial(ctx, ec.dialer, n, logger, piecestore.DefaultConfig)
}
func (ec *ecClient) Put(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, rs eestream.RedundancyStrategy, data io.Reader, expiration time.Time) (successfulNodes []*pb.Node, successfulHashes []*pb.PieceHash, err error) {
defer mon.Task()(&ctx)(&err)
pieceCount := len(limits)
if pieceCount != rs.TotalCount() {
return nil, nil, Error.New("size of limits slice (%d) does not match total count (%d) of erasure scheme", pieceCount, rs.TotalCount())
}
nonNilLimits := nonNilCount(limits)
if nonNilLimits <= rs.RepairThreshold() && nonNilLimits < rs.OptimalThreshold() {
return nil, nil, Error.New("number of non-nil limits (%d) is less than or equal to the repair threshold (%d) of erasure scheme", nonNilLimits, rs.RepairThreshold())
}
if !unique(limits) {
return nil, nil, Error.New("duplicated nodes are not allowed")
}
ec.log.Debug("Uploading to storage nodes",
zap.Int("Erasure Share Size", rs.ErasureShareSize()),
zap.Int("Stripe Size", rs.StripeSize()),
zap.Int("Repair Threshold", rs.RepairThreshold()),
zap.Int("Optimal Threshold", rs.OptimalThreshold()),
)
padded := eestream.PadReader(ioutil.NopCloser(data), rs.StripeSize())
readers, err := eestream.EncodeReader(ctx, ec.log, padded, rs)
if err != nil {
return nil, nil, err
}
type info struct {
i int
err error
hash *pb.PieceHash
}
infos := make(chan info, pieceCount)
psCtx, cancel := context.WithCancel(ctx)
defer cancel()
for i, addressedLimit := range limits {
go func(i int, addressedLimit *pb.AddressedOrderLimit) {
hash, err := ec.putPiece(psCtx, ctx, addressedLimit, privateKey, readers[i], expiration)
infos <- info{i: i, err: err, hash: hash}
}(i, addressedLimit)
}
successfulNodes = make([]*pb.Node, pieceCount)
successfulHashes = make([]*pb.PieceHash, pieceCount)
var successfulCount, failureCount, cancellationCount int32
for range limits {
info := <-infos
if limits[info.i] == nil {
continue
}
if info.err != nil {
if !errs2.IsCanceled(info.err) {
failureCount++
} else {
cancellationCount++
}
ec.log.Debug("Upload to storage node failed",
zap.String("NodeID", limits[info.i].GetLimit().StorageNodeId.String()),
zap.Error(info.err),
)
continue
}
successfulNodes[info.i] = &pb.Node{
Id: limits[info.i].GetLimit().StorageNodeId,
Address: limits[info.i].GetStorageNodeAddress(),
}
successfulHashes[info.i] = info.hash
successfulCount++
if int(successfulCount) >= rs.OptimalThreshold() {
ec.log.Debug("Success threshold reached. Cancelling remaining uploads.",
zap.Int("Optimal Threshold", rs.OptimalThreshold()),
)
cancel()
}
}
defer func() {
select {
case <-ctx.Done():
err = Error.New("upload cancelled by user")
// TODO: clean up the partially uploaded segment's pieces
// ec.Delete(context.Background(), nodes, pieceID, pba.SatelliteId),
default:
}
}()
mon.IntVal("put_segment_pieces_total").Observe(int64(pieceCount))
mon.IntVal("put_segment_pieces_optimal").Observe(int64(rs.OptimalThreshold()))
mon.IntVal("put_segment_pieces_successful").Observe(int64(successfulCount))
mon.IntVal("put_segment_pieces_failed").Observe(int64(failureCount))
mon.IntVal("put_segment_pieces_canceled").Observe(int64(cancellationCount))
if int(successfulCount) <= rs.RepairThreshold() && int(successfulCount) < rs.OptimalThreshold() {
return nil, nil, Error.New("successful puts (%d) less than or equal to repair threshold (%d)", successfulCount, rs.RepairThreshold())
}
if int(successfulCount) < rs.OptimalThreshold() {
return nil, nil, Error.New("successful puts (%d) less than success threshold (%d)", successfulCount, rs.OptimalThreshold())
}
return successfulNodes, successfulHashes, nil
}
func (ec *ecClient) putPiece(ctx, parent context.Context, limit *pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, data io.ReadCloser, expiration time.Time) (hash *pb.PieceHash, err error) {
nodeName := "nil"
if limit != nil {
nodeName = limit.GetLimit().StorageNodeId.String()[0:8]
}
defer mon.Task()(&ctx, "node: "+nodeName)(&err)
defer func() { err = errs.Combine(err, data.Close()) }()
if limit == nil {
_, _ = io.Copy(ioutil.Discard, data)
return nil, nil
}
storageNodeID := limit.GetLimit().StorageNodeId
pieceID := limit.GetLimit().PieceId
ps, err := ec.dialPiecestore(ctx, &pb.Node{
Id: storageNodeID,
Address: limit.GetStorageNodeAddress(),
})
if err != nil {
ec.log.Debug("Failed dialing for putting piece to node",
zap.String("PieceID", pieceID.String()),
zap.String("NodeID", storageNodeID.String()),
zap.Error(err),
)
return nil, err
}
defer func() { err = errs.Combine(err, ps.Close()) }()
upload, err := ps.Upload(ctx, limit.GetLimit(), privateKey)
if err != nil {
ec.log.Debug("Failed requesting upload of pieces to node",
zap.String("PieceID", pieceID.String()),
zap.String("NodeID", storageNodeID.String()),
zap.Error(err),
)
return nil, err
}
defer func() {
if ctx.Err() != nil || err != nil {
hash = nil
err = errs.Combine(err, upload.Cancel(ctx))
return
}
h, closeErr := upload.Commit(ctx)
hash = h
err = errs.Combine(err, closeErr)
}()
_, err = sync2.Copy(ctx, upload, data)
// Canceled context means the piece upload was interrupted by user or due
// to slow connection. No error logging for this case.
if ctx.Err() == context.Canceled {
if parent.Err() == context.Canceled {
ec.log.Info("Upload to node canceled by user", zap.String("NodeID", storageNodeID.String()))
} else {
ec.log.Debug("Node cut from upload due to slow connection", zap.String("NodeID", storageNodeID.String()))
}
err = context.Canceled
} else if err != nil {
nodeAddress := "nil"
if limit.GetStorageNodeAddress() != nil {
nodeAddress = limit.GetStorageNodeAddress().GetAddress()
}
ec.log.Debug("Failed uploading piece to node",
zap.String("PieceID", pieceID.String()),
zap.String("NodeID", storageNodeID.String()),
zap.String("Node Address", nodeAddress),
zap.Error(err),
)
}
return hash, err
}
func (ec *ecClient) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, es eestream.ErasureScheme, size int64) (rr ranger.Ranger, err error) {
defer mon.Task()(&ctx)(&err)
if len(limits) != es.TotalCount() {
return nil, Error.New("size of limits slice (%d) does not match total count (%d) of erasure scheme", len(limits), es.TotalCount())
}
if nonNilCount(limits) < es.RequiredCount() {
return nil, Error.New("number of non-nil limits (%d) is less than required count (%d) of erasure scheme", nonNilCount(limits), es.RequiredCount())
}
paddedSize := calcPadded(size, es.StripeSize())
pieceSize := paddedSize / int64(es.RequiredCount())
rrs := map[int]ranger.Ranger{}
for i, addressedLimit := range limits {
if addressedLimit == nil {
continue
}
rrs[i] = &lazyPieceRanger{
dialPiecestore: ec.dialPiecestore,
limit: addressedLimit,
privateKey: privateKey,
size: pieceSize,
}
}
rr, err = eestream.Decode(ec.log, rrs, es, ec.memoryLimit, ec.forceErrorDetection)
if err != nil {
return nil, Error.Wrap(err)
}
ranger, err := eestream.Unpad(rr, int(paddedSize-size))
return ranger, Error.Wrap(err)
}
func (ec *ecClient) Delete(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey) (err error) {
defer mon.Task()(&ctx)(&err)
errch := make(chan error, len(limits))
for _, addressedLimit := range limits {
if addressedLimit == nil {
errch <- nil
continue
}
go func(addressedLimit *pb.AddressedOrderLimit) {
limit := addressedLimit.GetLimit()
ps, err := ec.dialPiecestore(ctx, &pb.Node{
Id: limit.StorageNodeId,
Address: addressedLimit.GetStorageNodeAddress(),
})
if err != nil {
ec.log.Debug("Failed dialing for deleting piece from node",
zap.String("PieceID", limit.PieceId.String()),
zap.String("NodeID", limit.StorageNodeId.String()),
zap.Error(err),
)
errch <- err
return
}
err = ps.Delete(ctx, limit, privateKey)
err = errs.Combine(err, ps.Close())
if err != nil {
ec.log.Debug("Failed deleting piece from node",
zap.String("PieceID", limit.PieceId.String()),
zap.String("NodeID", limit.StorageNodeId.String()),
zap.Error(err),
)
}
errch <- err
}(addressedLimit)
}
allerrs := collectErrors(errch, len(limits))
if len(allerrs) > 0 && len(allerrs) == len(limits) {
return allerrs[0]
}
return nil
}
func collectErrors(errs <-chan error, size int) []error {
var result []error
for i := 0; i < size; i++ {
err := <-errs
if err != nil {
result = append(result, err)
}
}
return result
}
func unique(limits []*pb.AddressedOrderLimit) bool {
if len(limits) < 2 {
return true
}
ids := make(storj.NodeIDList, len(limits))
for i, addressedLimit := range limits {
if addressedLimit != nil {
ids[i] = addressedLimit.GetLimit().StorageNodeId
}
}
// sort the ids and check for identical neighbors
sort.Sort(ids)
// sort.Slice(ids, func(i, k int) bool { return ids[i].Less(ids[k]) })
for i := 1; i < len(ids); i++ {
if ids[i] != (storj.NodeID{}) && ids[i] == ids[i-1] {
return false
}
}
return true
}
func calcPadded(size int64, blockSize int) int64 {
mod := size % int64(blockSize)
if mod == 0 {
return size
}
return size + int64(blockSize) - mod
}
type lazyPieceRanger struct {
dialPiecestore dialPiecestoreFunc
limit *pb.AddressedOrderLimit
privateKey storj.PiecePrivateKey
size int64
}
// Size implements Ranger.Size
func (lr *lazyPieceRanger) Size() int64 {
return lr.size
}
// Range implements Ranger.Range to be lazily connected
func (lr *lazyPieceRanger) Range(ctx context.Context, offset, length int64) (_ io.ReadCloser, err error) {
defer mon.Task()(&ctx)(&err)
return &lazyPieceReader{
ranger: lr,
ctx: ctx,
offset: offset,
length: length,
}, nil
}
type lazyPieceReader struct {
ranger *lazyPieceRanger
ctx context.Context
offset int64
length int64
mu sync.Mutex
isClosed bool
piecestore.Downloader
client *piecestore.Client
}
func (lr *lazyPieceReader) Read(data []byte) (_ int, err error) {
lr.mu.Lock()
defer lr.mu.Unlock()
if lr.isClosed {
return 0, io.EOF
}
if lr.Downloader == nil {
client, downloader, err := lr.ranger.dial(lr.ctx, lr.offset, lr.length)
if err != nil {
return 0, err
}
lr.Downloader = downloader
lr.client = client
}
return lr.Downloader.Read(data)
}
func (lr *lazyPieceRanger) dial(ctx context.Context, offset, length int64) (_ *piecestore.Client, _ piecestore.Downloader, err error) {
defer mon.Task()(&ctx)(&err)
ps, err := lr.dialPiecestore(ctx, &pb.Node{
Id: lr.limit.GetLimit().StorageNodeId,
Address: lr.limit.GetStorageNodeAddress(),
})
if err != nil {
return nil, nil, err
}
download, err := ps.Download(ctx, lr.limit.GetLimit(), lr.privateKey, offset, length)
if err != nil {
return nil, nil, errs.Combine(err, ps.Close())
}
return ps, download, nil
}
func (lr *lazyPieceReader) Close() (err error) {
lr.mu.Lock()
defer lr.mu.Unlock()
if lr.isClosed {
return nil
}
lr.isClosed = true
if lr.Downloader != nil {
err = errs.Combine(err, lr.Downloader.Close())
}
if lr.client != nil {
err = errs.Combine(err, lr.client.Close())
}
return err
}
func nonNilCount(limits []*pb.AddressedOrderLimit) int {
total := 0
for _, limit := range limits {
if limit != nil {
total++
}
}
return total
}