Add Repair method to ECClient (#1509)

This commit is contained in:
Kaloyan Raev 2019-03-19 15:14:59 +02:00 committed by GitHub
parent 26497df0e2
commit d057efb05e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 119 additions and 10 deletions

View File

@ -19,7 +19,8 @@ import (
// Config contains configurable values for repairer
type Config struct {
MaxRepair int `help:"maximum segments that can be repaired concurrently" default:"100"`
Interval time.Duration `help:"how frequently checker should audit segments" default:"3600s"`
Interval time.Duration `help:"how frequently checker should audit segments" default:"1h0m0s"`
Timeout time.Duration `help:"time limit for uploading repaired pieces to new storage nodes" default:"1m0s"`
MaxBufferMem memory.Size `help:"maximum buffer memory (in bytes) to be allocated for read buffers" default:"4M"`
}
@ -29,5 +30,5 @@ func (c Config) GetSegmentRepairer(ctx context.Context, tc transport.Client, poi
ec := ecclient.NewClient(tc, c.MaxBufferMem.Int())
return segments.NewSegmentRepairer(pointerdb, allocation, cache, ec, identity, selectionPreferences), nil
return segments.NewSegmentRepairer(pointerdb, allocation, cache, ec, identity, selectionPreferences, c.Timeout), nil
}

View File

@ -22,7 +22,6 @@ import (
"storj.io/storj/pkg/ranger"
"storj.io/storj/pkg/storj"
"storj.io/storj/pkg/transport"
"storj.io/storj/pkg/utils"
"storj.io/storj/uplink/piecestore"
)
@ -31,6 +30,7 @@ var mon = monkit.Package()
// Client defines an interface for storing erasure coded data to piece store nodes
type Client interface {
Put(ctx context.Context, limits []*pb.AddressedOrderLimit, rs eestream.RedundancyStrategy, data io.Reader, expiration time.Time) (successfulNodes []*pb.Node, successfulHashes []*pb.PieceHash, err error)
Repair(ctx context.Context, limits []*pb.AddressedOrderLimit, rs eestream.RedundancyStrategy, data io.Reader, expiration time.Time, timeout time.Duration) (successfulNodes []*pb.Node, successfulHashes []*pb.PieceHash, err error)
Get(ctx context.Context, limits []*pb.AddressedOrderLimit, es eestream.ErasureScheme, size int64) (ranger.Ranger, error)
Delete(ctx context.Context, limits []*pb.AddressedOrderLimit) error
}
@ -66,6 +66,7 @@ func (ec *ecClient) newPSClient(ctx context.Context, n *pb.Node) (*piecestore.Cl
func (ec *ecClient) Put(ctx context.Context, limits []*pb.AddressedOrderLimit, rs eestream.RedundancyStrategy, data io.Reader, expiration time.Time) (successfulNodes []*pb.Node, successfulHashes []*pb.PieceHash, err error) {
defer mon.Task()(&ctx)(&err)
if len(limits) != rs.TotalCount() {
return nil, nil, Error.New("size of limits slice (%d) does not match total count (%d) of erasure scheme", len(limits), rs.TotalCount())
}
@ -110,6 +111,11 @@ func (ec *ecClient) Put(ctx context.Context, limits []*pb.AddressedOrderLimit, r
for range limits {
info := <-infos
if limits[info.i] == nil {
continue
}
if info.err != nil {
zap.S().Debugf("Upload to storage node %s failed: %v", limits[info.i].GetLimit().StorageNodeId, info.err)
continue
@ -149,13 +155,13 @@ func (ec *ecClient) Put(ctx context.Context, limits []*pb.AddressedOrderLimit, r
timer.Stop()
}
/* clean up the partially uploaded segment's pieces */
defer func() {
select {
case <-ctx.Done():
err = utils.CombineErrors(
err = errs.Combine(
Error.New("upload cancelled by user"),
// ec.Delete(context.Background(), nodes, pieceID, pba.SatelliteId), //TODO
// TODO: clean up the partially uploaded segment's pieces
// ec.Delete(context.Background(), nodes, pieceID, pba.SatelliteId),
)
default:
}
@ -168,12 +174,111 @@ func (ec *ecClient) Put(ctx context.Context, limits []*pb.AddressedOrderLimit, r
return successfulNodes, successfulHashes, nil
}
func (ec *ecClient) Repair(ctx context.Context, limits []*pb.AddressedOrderLimit, rs eestream.RedundancyStrategy, data io.Reader, expiration time.Time, timeout time.Duration) (successfulNodes []*pb.Node, successfulHashes []*pb.PieceHash, err error) {
defer mon.Task()(&ctx)(&err)
if len(limits) != rs.TotalCount() {
return nil, nil, Error.New("size of limits slice (%d) does not match total count (%d) of erasure scheme", len(limits), rs.TotalCount())
}
if !unique(limits) {
return nil, nil, Error.New("duplicated nodes are not allowed")
}
padded := eestream.PadReader(ioutil.NopCloser(data), rs.StripeSize())
readers, err := eestream.EncodeReader(ctx, padded, rs)
if err != nil {
return nil, nil, err
}
type info struct {
i int
err error
hash *pb.PieceHash
}
infos := make(chan info, len(limits))
psCtx, cancel := context.WithCancel(ctx)
defer cancel()
for i, addressedLimit := range limits {
go func(i int, addressedLimit *pb.AddressedOrderLimit) {
hash, err := ec.putPiece(psCtx, ctx, addressedLimit, readers[i], expiration)
infos <- info{i: i, err: err, hash: hash}
}(i, addressedLimit)
}
successfulNodes = make([]*pb.Node, len(limits))
successfulHashes = make([]*pb.PieceHash, len(limits))
var successfulCount int32
// how many nodes must be repaired to reach the success threshold: o - (n - r)
optimalCount := rs.OptimalThreshold() - (rs.TotalCount() - nonNilCount(limits))
zap.S().Infof("Starting a timer for %s for repairing to %d nodes to reach the success threshold (%d nodes)...",
timeout, optimalCount, rs.OptimalThreshold())
timer := time.AfterFunc(timeout, func() {
if ctx.Err() != context.Canceled {
zap.S().Infof("Timer expired. Successfully repaired to %d nodes. Canceling the long tail...", atomic.LoadInt32(&successfulCount))
cancel()
}
})
for range limits {
info := <-infos
if limits[info.i] == nil {
continue
}
if info.err != nil {
zap.S().Debugf("Repair to storage node %s failed: %v", limits[info.i].GetLimit().StorageNodeId, info.err)
continue
}
successfulNodes[info.i] = &pb.Node{
Id: limits[info.i].GetLimit().StorageNodeId,
Address: limits[info.i].GetStorageNodeAddress(),
Type: pb.NodeType_STORAGE,
}
successfulHashes[info.i] = info.hash
if int(atomic.AddInt32(&successfulCount, 1)) == optimalCount {
zap.S().Infof("Success threshold (%d nodes) reached by repairing to %d nodes. Canceling the long tail...",
rs.OptimalThreshold(), optimalCount)
timer.Stop()
cancel()
}
}
// Ensure timer is stopped in the case the success threshold is not reached
// due to errors instead of slowness.
if timer != nil {
timer.Stop()
}
// TODO: clean up the partially uploaded segment's pieces
defer func() {
select {
case <-ctx.Done():
err = errs.Combine(
Error.New("repair cancelled"),
// ec.Delete(context.Background(), nodes, pieceID, pba.SatelliteId), //TODO
)
default:
}
}()
return successfulNodes, successfulHashes, nil
}
func (ec *ecClient) putPiece(ctx, parent context.Context, limit *pb.AddressedOrderLimit, data io.ReadCloser, expiration time.Time) (hash *pb.PieceHash, err error) {
defer func() { err = errs.Combine(err, data.Close()) }()
if limit == nil {
_, err = io.Copy(ioutil.Discard, data)
return nil, err
_, _ = io.Copy(ioutil.Discard, data)
return nil, nil
}
storageNodeID := limit.GetLimit().StorageNodeId

View File

@ -5,6 +5,7 @@ package segments
import (
"context"
"time"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/zeebo/errs"
@ -28,10 +29,11 @@ type Repairer struct {
selectionPreferences *overlay.NodeSelectionConfig
signer signing.Signer
identity *identity.FullIdentity
timeout time.Duration
}
// NewSegmentRepairer creates a new instance of SegmentRepairer
func NewSegmentRepairer(pointerdb *pointerdb.Service, allocation *pointerdb.AllocationSigner, cache *overlay.Cache, ec ecclient.Client, identity *identity.FullIdentity, selectionPreferences *overlay.NodeSelectionConfig) *Repairer {
func NewSegmentRepairer(pointerdb *pointerdb.Service, allocation *pointerdb.AllocationSigner, cache *overlay.Cache, ec ecclient.Client, identity *identity.FullIdentity, selectionPreferences *overlay.NodeSelectionConfig, timeout time.Duration) *Repairer {
return &Repairer{
pointerdb: pointerdb,
allocation: allocation,
@ -40,6 +42,7 @@ func NewSegmentRepairer(pointerdb *pointerdb.Service, allocation *pointerdb.Allo
identity: identity,
signer: signing.SignerFromFullIdentity(identity),
selectionPreferences: selectionPreferences,
timeout: timeout,
}
}
@ -160,7 +163,7 @@ func (repairer *Repairer) Repair(ctx context.Context, path storj.Path, lostPiece
defer func() { err = errs.Combine(err, r.Close()) }()
// Upload the repaired pieces
successfulNodes, hashes, err := repairer.ec.Put(ctx, putLimits, redundancy, r, convertTime(expiration))
successfulNodes, hashes, err := repairer.ec.Repair(ctx, putLimits, redundancy, r, convertTime(expiration), repairer.timeout)
if err != nil {
return Error.Wrap(err)
}