storj/pkg/storage/ec/client.go
Kaloyan Raev 5fa7a4a7c6 Ensure ECClient upload timer is stopped when no more status is expected (#1397)
This change ensures that the upload timer of ECClient is always stopped after no more status is expected from uploaded pieces. It also ensures that the "Timer expired" message will be logged only if the context is not already cancelled.

This is to avoid confusing logs where a "Timer expired" message is logged significantly later and mixes with similar messages logged from the upload of the next file segments.
2019-03-04 14:48:13 +01:00

410 lines
11 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package ecclient
import (
"context"
"io"
"io/ioutil"
"sort"
"sync/atomic"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/pkg/eestream"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/piecestore/psclient"
"storj.io/storj/pkg/ranger"
"storj.io/storj/pkg/storj"
"storj.io/storj/pkg/transport"
"storj.io/storj/pkg/utils"
)
var mon = monkit.Package()
// Client defines an interface for storing erasure coded data to piece store nodes
type Client interface {
Put(ctx context.Context, nodes []*pb.Node, rs eestream.RedundancyStrategy, pieceID psclient.PieceID, data io.Reader, expiration time.Time, pba *pb.OrderLimit) (successfulNodes []*pb.Node, successfulHashes []*pb.SignedHash, err error)
Get(ctx context.Context, nodes []*pb.Node, es eestream.ErasureScheme, pieceID psclient.PieceID, size int64, pba *pb.OrderLimit) (ranger.Ranger, error)
Delete(ctx context.Context, nodes []*pb.Node, pieceID psclient.PieceID, satelliteID storj.NodeID) error
}
type psClientFunc func(context.Context, transport.Client, *pb.Node, int) (psclient.Client, error)
type psClientHelper func(context.Context, *pb.Node) (psclient.Client, error)
type ecClient struct {
transport transport.Client
memoryLimit int
newPSClientFunc psClientFunc
}
// NewClient from the given identity and max buffer memory
func NewClient(tc transport.Client, memoryLimit int) Client {
return &ecClient{
transport: tc,
memoryLimit: memoryLimit,
newPSClientFunc: psclient.NewPSClient,
}
}
func (ec *ecClient) newPSClient(ctx context.Context, n *pb.Node) (psclient.Client, error) {
n.Type.DPanicOnInvalid("new ps client")
return ec.newPSClientFunc(ctx, ec.transport, n, 0)
}
func (ec *ecClient) Put(ctx context.Context, nodes []*pb.Node, rs eestream.RedundancyStrategy, pieceID psclient.PieceID, data io.Reader, expiration time.Time, pba *pb.OrderLimit) (successfulNodes []*pb.Node, successfulHashes []*pb.SignedHash, err error) {
defer mon.Task()(&ctx)(&err)
if len(nodes) != rs.TotalCount() {
return nil, nil, Error.New("size of nodes slice (%d) does not match total count (%d) of erasure scheme", len(nodes), rs.TotalCount())
}
if nonNilCount(nodes) < rs.RepairThreshold() {
return nil, nil, Error.New("number of non-nil nodes (%d) is less than repair threshold (%d) of erasure scheme", nonNilCount(nodes), rs.RepairThreshold())
}
if !unique(nodes) {
return nil, nil, Error.New("duplicated nodes are not allowed")
}
padded := eestream.PadReader(ioutil.NopCloser(data), rs.StripeSize())
readers, err := eestream.EncodeReader(ctx, padded, rs)
if err != nil {
return nil, nil, err
}
type info struct {
i int
err error
hash *pb.SignedHash
}
infos := make(chan info, len(nodes))
psCtx, cancel := context.WithCancel(ctx)
defer cancel()
start := time.Now()
for i, node := range nodes {
if node != nil {
node.Type.DPanicOnInvalid("ec client Put")
}
go func(i int, node *pb.Node) {
hash, err := ec.putPiece(psCtx, ctx, node, pieceID, readers[i], expiration, pba)
infos <- info{i: i, err: err, hash: hash}
}(i, node)
}
successfulNodes = make([]*pb.Node, len(nodes))
successfulHashes = make([]*pb.SignedHash, len(nodes))
var successfulCount int32
var timer *time.Timer
for range nodes {
info := <-infos
if info.err == nil {
successfulNodes[info.i] = nodes[info.i]
successfulHashes[info.i] = info.hash
switch int(atomic.AddInt32(&successfulCount, 1)) {
case rs.RepairThreshold():
elapsed := time.Since(start)
more := elapsed * 3 / 2
zap.S().Infof("Repair threshold (%d nodes) reached in %.2f s. Starting a timer for %.2f s for reaching the success threshold (%d nodes)...",
rs.RepairThreshold(), elapsed.Seconds(), more.Seconds(), rs.OptimalThreshold())
timer = time.AfterFunc(more, func() {
if ctx.Err() != context.Canceled {
zap.S().Infof("Timer expired. Successfully uploaded to %d nodes. Canceling the long tail...", atomic.LoadInt32(&successfulCount))
cancel()
}
})
case rs.OptimalThreshold():
zap.S().Infof("Success threshold (%d nodes) reached. Canceling the long tail...", rs.OptimalThreshold())
timer.Stop()
cancel()
}
}
}
// Ensure timer is stopped in the case of repair threshold is reached, but
// not the success threshold due to errors instead of slowness.
if timer != nil {
timer.Stop()
}
/* clean up the partially uploaded segment's pieces */
defer func() {
select {
case <-ctx.Done():
err = utils.CombineErrors(
Error.New("upload cancelled by user"),
ec.Delete(context.Background(), nodes, pieceID, pba.SatelliteId),
)
default:
}
}()
if int(atomic.LoadInt32(&successfulCount)) < rs.RepairThreshold() {
return nil, nil, Error.New("successful puts (%d) less than repair threshold (%d)", successfulCount, rs.RepairThreshold())
}
return successfulNodes, successfulHashes, nil
}
func (ec *ecClient) putPiece(ctx, parent context.Context, node *pb.Node, pieceID psclient.PieceID, data io.ReadCloser, expiration time.Time, pba *pb.OrderLimit) (hash *pb.SignedHash, err error) {
defer func() { err = errs.Combine(err, data.Close()) }()
if node == nil {
_, err = io.Copy(ioutil.Discard, data)
return nil, err
}
derivedPieceID, err := pieceID.Derive(node.Id.Bytes())
if err != nil {
zap.S().Errorf("Failed deriving piece id for %s: %v", pieceID, err)
return nil, err
}
ps, err := ec.newPSClient(ctx, node)
if err != nil {
zap.S().Errorf("Failed dialing for putting piece %s -> %s to node %s: %v",
pieceID, derivedPieceID, node.Id, err)
return nil, err
}
hash, err = ps.Put(ctx, derivedPieceID, data, expiration, pba)
defer func() { err = errs.Combine(err, ps.Close()) }()
// Canceled context means the piece upload was interrupted by user or due
// to slow connection. No error logging for this case.
if ctx.Err() == context.Canceled {
if parent.Err() == context.Canceled {
zap.S().Infof("Upload to node %s canceled by user.", node.Id)
} else {
zap.S().Infof("Node %s cut from upload due to slow connection.", node.Id)
}
err = context.Canceled
} else if err != nil {
nodeAddress := "nil"
if node.Address != nil {
nodeAddress = node.Address.Address
}
zap.S().Errorf("Failed putting piece %s -> %s to node %s (%+v): %v",
pieceID, derivedPieceID, node.Id, nodeAddress, err)
}
return hash, err
}
func (ec *ecClient) Get(ctx context.Context, nodes []*pb.Node, es eestream.ErasureScheme,
pieceID psclient.PieceID, size int64, pba *pb.OrderLimit) (rr ranger.Ranger, err error) {
defer mon.Task()(&ctx)(&err)
if len(nodes) != es.TotalCount() {
return nil, Error.New("size of nodes slice (%d) does not match total count (%d) of erasure scheme", len(nodes), es.TotalCount())
}
if nonNilCount(nodes) < es.RequiredCount() {
return nil, Error.New("number of non-nil nodes (%d) is less than required count (%d) of erasure scheme", nonNilCount(nodes), es.RequiredCount())
}
paddedSize := calcPadded(size, es.StripeSize())
pieceSize := paddedSize / int64(es.RequiredCount())
rrs := map[int]ranger.Ranger{}
type rangerInfo struct {
i int
rr ranger.Ranger
err error
}
ch := make(chan rangerInfo, len(nodes))
for i, n := range nodes {
if n != nil {
n.Type.DPanicOnInvalid("ec client Get")
}
if n == nil {
ch <- rangerInfo{i: i, rr: nil, err: nil}
continue
}
go func(i int, n *pb.Node) {
derivedPieceID, err := pieceID.Derive(n.Id.Bytes())
if err != nil {
zap.S().Errorf("Failed deriving piece id for %s: %v", pieceID, err)
ch <- rangerInfo{i: i, rr: nil, err: err}
return
}
rr := &lazyPieceRanger{
newPSClientHelper: ec.newPSClient,
node: n,
id: derivedPieceID,
size: pieceSize,
pba: pba,
}
ch <- rangerInfo{i: i, rr: rr, err: nil}
}(i, n)
}
for range nodes {
rri := <-ch
if rri.err == nil && rri.rr != nil {
rrs[rri.i] = rri.rr
}
}
rr, err = eestream.Decode(rrs, es, ec.memoryLimit)
if err != nil {
return nil, err
}
return eestream.Unpad(rr, int(paddedSize-size))
}
func (ec *ecClient) Delete(ctx context.Context, nodes []*pb.Node, pieceID psclient.PieceID, satelliteID storj.NodeID) (err error) {
defer mon.Task()(&ctx)(&err)
errch := make(chan error, len(nodes))
for _, v := range nodes {
if v != nil {
v.Type.DPanicOnInvalid("ec client delete")
}
}
for _, n := range nodes {
if n == nil {
errch <- nil
continue
}
go func(n *pb.Node) {
derivedPieceID, err := pieceID.Derive(n.Id.Bytes())
if err != nil {
zap.S().Errorf("Failed deriving piece id for %s: %v", pieceID, err)
errch <- err
return
}
ps, err := ec.newPSClient(ctx, n)
if err != nil {
zap.S().Errorf("Failed dialing for deleting piece %s -> %s from node %s: %v",
pieceID, derivedPieceID, n.Id, err)
errch <- err
return
}
err = ps.Delete(ctx, derivedPieceID, satelliteID)
// normally the bellow call should be deferred, but doing so fails
// randomly the unit tests
err = errs.Combine(err, ps.Close())
if err != nil {
zap.S().Errorf("Failed deleting piece %s -> %s from node %s: %v",
pieceID, derivedPieceID, n.Id, err)
}
errch <- err
}(n)
}
allerrs := collectErrors(errch, len(nodes))
for _, v := range nodes {
if v != nil {
v.Type.DPanicOnInvalid("ec client delete 2")
}
}
if len(allerrs) > 0 && len(allerrs) == len(nodes) {
return allerrs[0]
}
return nil
}
func collectErrors(errs <-chan error, size int) []error {
var result []error
for i := 0; i < size; i++ {
err := <-errs
if err != nil {
result = append(result, err)
}
}
return result
}
func unique(nodes []*pb.Node) bool {
if len(nodes) < 2 {
return true
}
ids := make(storj.NodeIDList, len(nodes))
for i, n := range nodes {
if n != nil {
ids[i] = n.Id
n.Type.DPanicOnInvalid("ec client unique")
}
}
// sort the ids and check for identical neighbors
sort.Sort(ids)
// sort.Slice(ids, func(i, k int) bool { return ids[i].Less(ids[k]) })
for i := 1; i < len(ids); i++ {
if ids[i] != (storj.NodeID{}) && ids[i] == ids[i-1] {
return false
}
}
return true
}
func calcPadded(size int64, blockSize int) int64 {
mod := size % int64(blockSize)
if mod == 0 {
return size
}
return size + int64(blockSize) - mod
}
type lazyPieceRanger struct {
ranger ranger.Ranger
newPSClientHelper psClientHelper
node *pb.Node
id psclient.PieceID
size int64
pba *pb.OrderLimit
}
// Size implements Ranger.Size
func (lr *lazyPieceRanger) Size() int64 {
return lr.size
}
// Range implements Ranger.Range to be lazily connected
func (lr *lazyPieceRanger) Range(ctx context.Context, offset, length int64) (io.ReadCloser, error) {
lr.node.Type.DPanicOnInvalid("Range")
if lr.ranger == nil {
ps, err := lr.newPSClientHelper(ctx, lr.node)
if err != nil {
return nil, err
}
ranger, err := ps.Get(ctx, lr.id, lr.size, lr.pba)
if err != nil {
return nil, err
}
lr.ranger = ranger
}
return lr.ranger.Range(ctx, offset, length)
}
func nonNilCount(nodes []*pb.Node) int {
total := 0
for _, node := range nodes {
if node != nil {
total++
node.Type.DPanicOnInvalid("nonNilCount")
}
}
return total
}