Jeff Wendling 59f81a4a0d groupcancel/ec delete: add a timeout based on completion times
we used to do something similar for puts, but that ended up hurting
more than it helped. since deletes are best effort, we can do it
here to kill long tails or unresponsive nodes.

Change-Id: I89fd2d9dcf519d76c78ddad70bc419d1868d2df1
2019-10-30 16:18:39 -06:00

502 lines
14 KiB

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package ecclient
import (
monkit ""
var mon = monkit.Package()
// Client defines an interface for storing erasure coded data to piece store nodes
type Client interface {
Put(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, rs eestream.RedundancyStrategy, data io.Reader, expiration time.Time) (successfulNodes []*pb.Node, successfulHashes []*pb.PieceHash, err error)
Get(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, es eestream.ErasureScheme, size int64) (ranger.Ranger, error)
Delete(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey) error
WithForceErrorDetection(force bool) Client
// PutPiece is not intended to be used by normal uplinks directly, but is exported to support storagenode graceful exit transfers.
PutPiece(ctx, parent context.Context, limit *pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, data io.ReadCloser) (hash *pb.PieceHash, id *identity.PeerIdentity, err error)
type dialPiecestoreFunc func(context.Context, *pb.Node) (*piecestore.Client, error)
type ecClient struct {
log *zap.Logger
dialer rpc.Dialer
memoryLimit int
forceErrorDetection bool
// NewClient from the given identity and max buffer memory
func NewClient(log *zap.Logger, dialer rpc.Dialer, memoryLimit int) Client {
return &ecClient{
log: log,
dialer: dialer,
memoryLimit: memoryLimit,
func (ec *ecClient) WithForceErrorDetection(force bool) Client {
ec.forceErrorDetection = force
return ec
func (ec *ecClient) dialPiecestore(ctx context.Context, n *pb.Node) (*piecestore.Client, error) {
logger := ec.log.Named(n.Id.String())
return piecestore.Dial(ctx, ec.dialer, n, logger, piecestore.DefaultConfig)
func (ec *ecClient) Put(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, rs eestream.RedundancyStrategy, data io.Reader, expiration time.Time) (successfulNodes []*pb.Node, successfulHashes []*pb.PieceHash, err error) {
defer mon.Task()(&ctx)(&err)
pieceCount := len(limits)
if pieceCount != rs.TotalCount() {
return nil, nil, Error.New("size of limits slice (%d) does not match total count (%d) of erasure scheme", pieceCount, rs.TotalCount())
nonNilLimits := nonNilCount(limits)
if nonNilLimits <= rs.RepairThreshold() && nonNilLimits < rs.OptimalThreshold() {
return nil, nil, Error.New("number of non-nil limits (%d) is less than or equal to the repair threshold (%d) of erasure scheme", nonNilLimits, rs.RepairThreshold())
if !unique(limits) {
return nil, nil, Error.New("duplicated nodes are not allowed")
ec.log.Debug("Uploading to storage nodes",
zap.Int("Erasure Share Size", rs.ErasureShareSize()),
zap.Int("Stripe Size", rs.StripeSize()),
zap.Int("Repair Threshold", rs.RepairThreshold()),
zap.Int("Optimal Threshold", rs.OptimalThreshold()),
padded := eestream.PadReader(ioutil.NopCloser(data), rs.StripeSize())
readers, err := eestream.EncodeReader(ctx, ec.log, padded, rs)
if err != nil {
return nil, nil, err
type info struct {
i int
err error
hash *pb.PieceHash
infos := make(chan info, pieceCount)
psCtx, cancel := context.WithCancel(ctx)
defer cancel()
for i, addressedLimit := range limits {
go func(i int, addressedLimit *pb.AddressedOrderLimit) {
hash, _, err := ec.PutPiece(psCtx, ctx, addressedLimit, privateKey, readers[i])
infos <- info{i: i, err: err, hash: hash}
}(i, addressedLimit)
successfulNodes = make([]*pb.Node, pieceCount)
successfulHashes = make([]*pb.PieceHash, pieceCount)
var successfulCount, failureCount, cancellationCount int32
for range limits {
info := <-infos
if limits[info.i] == nil {
if info.err != nil {
if !errs2.IsCanceled(info.err) {
} else {
ec.log.Debug("Upload to storage node failed",
zap.String("NodeID", limits[info.i].GetLimit().StorageNodeId.String()),
successfulNodes[info.i] = &pb.Node{
Id: limits[info.i].GetLimit().StorageNodeId,
Address: limits[info.i].GetStorageNodeAddress(),
successfulHashes[info.i] = info.hash
if int(successfulCount) >= rs.OptimalThreshold() {
ec.log.Debug("Success threshold reached. Cancelling remaining uploads.",
zap.Int("Optimal Threshold", rs.OptimalThreshold()),
defer func() {
select {
case <-ctx.Done():
err = Error.New("upload cancelled by user")
// TODO: clean up the partially uploaded segment's pieces
// ec.Delete(context.Background(), nodes, pieceID, pba.SatelliteId),
if int(successfulCount) <= rs.RepairThreshold() && int(successfulCount) < rs.OptimalThreshold() {
return nil, nil, Error.New("successful puts (%d) less than or equal to repair threshold (%d)", successfulCount, rs.RepairThreshold())
if int(successfulCount) < rs.OptimalThreshold() {
return nil, nil, Error.New("successful puts (%d) less than success threshold (%d)", successfulCount, rs.OptimalThreshold())
return successfulNodes, successfulHashes, nil
func (ec *ecClient) PutPiece(ctx, parent context.Context, limit *pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, data io.ReadCloser) (hash *pb.PieceHash, peerID *identity.PeerIdentity, err error) {
nodeName := "nil"
if limit != nil {
nodeName = limit.GetLimit().StorageNodeId.String()[0:8]
defer mon.Task()(&ctx, "node: "+nodeName)(&err)
defer func() { err = errs.Combine(err, data.Close()) }()
if limit == nil {
_, _ = io.Copy(ioutil.Discard, data)
return nil, nil, nil
storageNodeID := limit.GetLimit().StorageNodeId
pieceID := limit.GetLimit().PieceId
ps, err := ec.dialPiecestore(ctx, &pb.Node{
Id: storageNodeID,
Address: limit.GetStorageNodeAddress(),
if err != nil {
ec.log.Debug("Failed dialing for putting piece to node",
zap.String("PieceID", pieceID.String()),
zap.String("NodeID", storageNodeID.String()),
return nil, nil, err
defer func() { err = errs.Combine(err, ps.Close()) }()
peerID, err = ps.GetPeerIdentity()
if err != nil {
ec.log.Debug("Failed getting peer identity from node connection",
zap.String("NodeID", storageNodeID.String()),
return nil, nil, err
upload, err := ps.Upload(ctx, limit.GetLimit(), privateKey)
if err != nil {
ec.log.Debug("Failed requesting upload of pieces to node",
zap.String("PieceID", pieceID.String()),
zap.String("NodeID", storageNodeID.String()),
return nil, nil, err
defer func() {
if err != nil {
err = errs.Combine(err, upload.Cancel(ctx))
hash, err = upload.Commit(ctx)
_, err = sync2.Copy(ctx, upload, data)
// Canceled context means the piece upload was interrupted by user or due
// to slow connection. No error logging for this case.
if err != nil {
if errs2.IsCanceled(err) {
if parent.Err() == context.Canceled {
ec.log.Info("Upload to node canceled by user", zap.Stringer("NodeID", storageNodeID))
} else {
ec.log.Debug("Node cut from upload due to slow connection", zap.Stringer("NodeID", storageNodeID))
} else {
nodeAddress := ""
if limit.GetStorageNodeAddress() != nil {
nodeAddress = limit.GetStorageNodeAddress().GetAddress()
ec.log.Debug("Failed uploading piece to node",
zap.Stringer("PieceID", pieceID),
zap.Stringer("NodeID", storageNodeID),
zap.String("Node Address", nodeAddress),
return nil, nil, err
return nil, peerID, nil
func (ec *ecClient) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, es eestream.ErasureScheme, size int64) (rr ranger.Ranger, err error) {
defer mon.Task()(&ctx)(&err)
if len(limits) != es.TotalCount() {
return nil, Error.New("size of limits slice (%d) does not match total count (%d) of erasure scheme", len(limits), es.TotalCount())
if nonNilCount(limits) < es.RequiredCount() {
return nil, Error.New("number of non-nil limits (%d) is less than required count (%d) of erasure scheme", nonNilCount(limits), es.RequiredCount())
paddedSize := calcPadded(size, es.StripeSize())
pieceSize := paddedSize / int64(es.RequiredCount())
rrs := map[int]ranger.Ranger{}
for i, addressedLimit := range limits {
if addressedLimit == nil {
rrs[i] = &lazyPieceRanger{
dialPiecestore: ec.dialPiecestore,
limit: addressedLimit,
privateKey: privateKey,
size: pieceSize,
rr, err = eestream.Decode(ec.log, rrs, es, ec.memoryLimit, ec.forceErrorDetection)
if err != nil {
return nil, Error.Wrap(err)
ranger, err := eestream.Unpad(rr, int(paddedSize-size))
return ranger, Error.Wrap(err)
func (ec *ecClient) Delete(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey) (err error) {
defer mon.Task()(&ctx)(&err)
setLimits := 0
for _, addressedLimit := range limits {
if addressedLimit != nil {
gctx, cancel := groupcancel.NewContext(ctx, setLimits, .75, 2)
defer cancel()
errch := make(chan error, setLimits)
for _, addressedLimit := range limits {
if addressedLimit == nil {
go func(addressedLimit *pb.AddressedOrderLimit) {
limit := addressedLimit.GetLimit()
ps, err := ec.dialPiecestore(gctx, &pb.Node{
Id: limit.StorageNodeId,
Address: addressedLimit.GetStorageNodeAddress(),
if err != nil {
ec.log.Debug("Failed dialing for deleting piece from node",
zap.String("PieceID", limit.PieceId.String()),
zap.String("NodeID", limit.StorageNodeId.String()),
errch <- err
err = ps.Delete(gctx, limit, privateKey)
err = errs.Combine(err, ps.Close())
if err != nil {
ec.log.Debug("Failed deleting piece from node",
zap.String("PieceID", limit.PieceId.String()),
zap.String("NodeID", limit.StorageNodeId.String()),
errch <- err
var anySuccess bool
var lastErr error
for i := 0; i < setLimits; i++ {
if err := <-errch; err == nil {
anySuccess = true
} else {
lastErr = err
if anySuccess {
return nil
return lastErr
func unique(limits []*pb.AddressedOrderLimit) bool {
if len(limits) < 2 {
return true
ids := make(storj.NodeIDList, len(limits))
for i, addressedLimit := range limits {
if addressedLimit != nil {
ids[i] = addressedLimit.GetLimit().StorageNodeId
// sort the ids and check for identical neighbors
// sort.Slice(ids, func(i, k int) bool { return ids[i].Less(ids[k]) })
for i := 1; i < len(ids); i++ {
if ids[i] != (storj.NodeID{}) && ids[i] == ids[i-1] {
return false
return true
func calcPadded(size int64, blockSize int) int64 {
mod := size % int64(blockSize)
if mod == 0 {
return size
return size + int64(blockSize) - mod
type lazyPieceRanger struct {
dialPiecestore dialPiecestoreFunc
limit *pb.AddressedOrderLimit
privateKey storj.PiecePrivateKey
size int64
// Size implements Ranger.Size
func (lr *lazyPieceRanger) Size() int64 {
return lr.size
// Range implements Ranger.Range to be lazily connected
func (lr *lazyPieceRanger) Range(ctx context.Context, offset, length int64) (_ io.ReadCloser, err error) {
defer mon.Task()(&ctx)(&err)
return &lazyPieceReader{
ranger: lr,
ctx: ctx,
offset: offset,
length: length,
}, nil
type lazyPieceReader struct {
ranger *lazyPieceRanger
ctx context.Context
offset int64
length int64
mu sync.Mutex
isClosed bool
client *piecestore.Client
func (lr *lazyPieceReader) Read(data []byte) (_ int, err error) {
if lr.isClosed {
return 0, io.EOF
if lr.Downloader == nil {
client, downloader, err := lr.ranger.dial(lr.ctx, lr.offset, lr.length)
if err != nil {
return 0, err
lr.Downloader = downloader
lr.client = client
return lr.Downloader.Read(data)
func (lr *lazyPieceRanger) dial(ctx context.Context, offset, length int64) (_ *piecestore.Client, _ piecestore.Downloader, err error) {
defer mon.Task()(&ctx)(&err)
ps, err := lr.dialPiecestore(ctx, &pb.Node{
Id: lr.limit.GetLimit().StorageNodeId,
Address: lr.limit.GetStorageNodeAddress(),
if err != nil {
return nil, nil, err
download, err := ps.Download(ctx, lr.limit.GetLimit(), lr.privateKey, offset, length)
if err != nil {
return nil, nil, errs.Combine(err, ps.Close())
return ps, download, nil
func (lr *lazyPieceReader) Close() (err error) {
if lr.isClosed {
return nil
lr.isClosed = true
if lr.Downloader != nil {
err = errs.Combine(err, lr.Downloader.Close())
if lr.client != nil {
err = errs.Combine(err, lr.client.Close())
return err
func nonNilCount(limits []*pb.AddressedOrderLimit) int {
total := 0
for _, limit := range limits {
if limit != nil {
return total