storj/pkg/storage/ec/client.go

483 lines
14 KiB
Go
Raw Normal View History

2019-01-24 20:15:10 +00:00
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package ecclient
import (
"context"
"io"
"io/ioutil"
"sort"
"sync/atomic"
"time"
2019-01-29 20:42:27 +00:00
"github.com/zeebo/errs"
"go.uber.org/zap"
2019-01-22 15:48:23 +00:00
monkit "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/internal/sync2"
"storj.io/storj/pkg/eestream"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/ranger"
2018-11-30 13:40:13 +00:00
"storj.io/storj/pkg/storj"
"storj.io/storj/pkg/transport"
"storj.io/storj/uplink/piecestore"
)
var mon = monkit.Package()
// Client defines an interface for storing erasure coded data to piece store nodes
type Client interface {
Put(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, rs eestream.RedundancyStrategy, data io.Reader, expiration time.Time) (successfulNodes []*pb.Node, successfulHashes []*pb.PieceHash, err error)
Repair(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, rs eestream.RedundancyStrategy, data io.Reader, expiration time.Time, timeout time.Duration, path storj.Path) (successfulNodes []*pb.Node, successfulHashes []*pb.PieceHash, err error)
Get(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, es eestream.ErasureScheme, size int64) (ranger.Ranger, error)
Delete(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey) error
WithForceErrorDetection(force bool) Client
}
type dialPiecestoreFunc func(context.Context, *pb.Node) (*piecestore.Client, error)
type ecClient struct {
log *zap.Logger
transport transport.Client
memoryLimit int
forceErrorDetection bool
}
// NewClient from the given identity and max buffer memory
func NewClient(log *zap.Logger, tc transport.Client, memoryLimit int) Client {
return &ecClient{
log: log,
transport: tc,
memoryLimit: memoryLimit,
}
}
func (ec *ecClient) WithForceErrorDetection(force bool) Client {
ec.forceErrorDetection = force
return ec
}
func (ec *ecClient) dialPiecestore(ctx context.Context, n *pb.Node) (*piecestore.Client, error) {
logger := ec.log.Named(n.Id.String())
return piecestore.Dial(ctx, ec.transport, n, logger, piecestore.DefaultConfig)
}
func (ec *ecClient) Put(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, rs eestream.RedundancyStrategy, data io.Reader, expiration time.Time) (successfulNodes []*pb.Node, successfulHashes []*pb.PieceHash, err error) {
defer mon.Task()(&ctx)(&err)
2019-03-19 13:14:59 +00:00
if len(limits) != rs.TotalCount() {
return nil, nil, Error.New("size of limits slice (%d) does not match total count (%d) of erasure scheme", len(limits), rs.TotalCount())
}
nonNilLimits := nonNilCount(limits)
if nonNilLimits <= rs.RepairThreshold() && nonNilLimits < rs.OptimalThreshold() {
return nil, nil, Error.New("number of non-nil limits (%d) is less than or equal to the repair threshold (%d) of erasure scheme", nonNilLimits, rs.RepairThreshold())
}
if !unique(limits) {
return nil, nil, Error.New("duplicated nodes are not allowed")
}
ec.log.Sugar().Debugf("Uploading to storage nodes using ErasureShareSize: %d StripeSize: %d RepairThreshold: %d OptimalThreshold: %d",
rs.ErasureShareSize(), rs.StripeSize(), rs.RepairThreshold(), rs.OptimalThreshold())
padded := eestream.PadReader(ioutil.NopCloser(data), rs.StripeSize())
readers, err := eestream.EncodeReader(ctx, padded, rs)
if err != nil {
return nil, nil, err
}
type info struct {
i int
err error
hash *pb.PieceHash
}
infos := make(chan info, len(limits))
psCtx, cancel := context.WithCancel(ctx)
defer cancel()
for i, addressedLimit := range limits {
go func(i int, addressedLimit *pb.AddressedOrderLimit) {
hash, err := ec.putPiece(psCtx, ctx, addressedLimit, privateKey, readers[i], expiration)
infos <- info{i: i, err: err, hash: hash}
}(i, addressedLimit)
}
successfulNodes = make([]*pb.Node, len(limits))
successfulHashes = make([]*pb.PieceHash, len(limits))
var successfulCount int32
for range limits {
info := <-infos
2019-03-19 13:14:59 +00:00
if limits[info.i] == nil {
continue
}
if info.err != nil {
ec.log.Sugar().Debugf("Upload to storage node %s failed: %v", limits[info.i].GetLimit().StorageNodeId, info.err)
continue
}
successfulNodes[info.i] = &pb.Node{
Id: limits[info.i].GetLimit().StorageNodeId,
Address: limits[info.i].GetStorageNodeAddress(),
}
successfulHashes[info.i] = info.hash
atomic.AddInt32(&successfulCount, 1)
if int(successfulCount) >= rs.OptimalThreshold() {
ec.log.Sugar().Infof("Success threshold (%d nodes) reached. Cancelling remaining uploads.", rs.OptimalThreshold())
cancel()
}
}
defer func() {
select {
case <-ctx.Done():
2019-03-19 13:14:59 +00:00
err = errs.Combine(
Error.New("upload cancelled by user"),
2019-03-19 13:14:59 +00:00
// TODO: clean up the partially uploaded segment's pieces
// ec.Delete(context.Background(), nodes, pieceID, pba.SatelliteId),
)
default:
}
}()
successes := int(atomic.LoadInt32(&successfulCount))
if successes <= rs.RepairThreshold() && successes < rs.OptimalThreshold() {
return nil, nil, Error.New("successful puts (%d) less than or equal to repair threshold (%d)", successes, rs.RepairThreshold())
}
if successes < rs.OptimalThreshold() {
return nil, nil, Error.New("successful puts (%d) less than success threshold (%d)", successes, rs.OptimalThreshold())
}
return successfulNodes, successfulHashes, nil
}
func (ec *ecClient) Repair(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, rs eestream.RedundancyStrategy, data io.Reader, expiration time.Time, timeout time.Duration, path storj.Path) (successfulNodes []*pb.Node, successfulHashes []*pb.PieceHash, err error) {
2019-03-19 13:14:59 +00:00
defer mon.Task()(&ctx)(&err)
if len(limits) != rs.TotalCount() {
return nil, nil, Error.New("size of limits slice (%d) does not match total count (%d) of erasure scheme", len(limits), rs.TotalCount())
}
if !unique(limits) {
return nil, nil, Error.New("duplicated nodes are not allowed")
}
padded := eestream.PadReader(ioutil.NopCloser(data), rs.StripeSize())
readers, err := eestream.EncodeReader(ctx, padded, rs)
if err != nil {
return nil, nil, err
}
type info struct {
i int
err error
hash *pb.PieceHash
}
infos := make(chan info, len(limits))
psCtx, cancel := context.WithCancel(ctx)
defer cancel()
for i, addressedLimit := range limits {
go func(i int, addressedLimit *pb.AddressedOrderLimit) {
hash, err := ec.putPiece(psCtx, ctx, addressedLimit, privateKey, readers[i], expiration)
2019-03-19 13:14:59 +00:00
infos <- info{i: i, err: err, hash: hash}
}(i, addressedLimit)
}
[V3-1927] Repairer uploads to max threshold instead of success… (#2423) * pkg/datarepair: Add test to check num upload pieces Add a new test for ensuring the number of pieces that the repair process upload when a segment is injured. * satellite/orders: Don't create "put order limits" over total Repair must not create "put order limits" more than the total count. * pkg/datarepair: Update upload repair pieces test Update the test which checks the number of pieces which are uploaded during a repair for using the same excess over the success threshold value than the implementation. * satellites/orders: Limit repair put order for not being total Limit the number of put orders to be used by repair for only uploading pieces to a % excess over the successful threshold. * pkg/datarepair: Change DataRepair test to pass again Make some changes in the DataRepair test to make pass again after the repair upload repaired pieces only until a % excess over success threshold. Also update the steps description of the DataRepair test after it has been changed, to match on what's now, besides to leave it more generic for avoiding having to update it on minimal future refactorings. * satellite: Make repair excess optimal threshold configurable Add a new configuration parameter to the satellite for being able to configure the percentage excess over the optimal threshold, used for determining how many pieces should be repaired/uploaded, rather than having the value hard coded. * repairer: Add configurable param to segments/repairer Add a new parameters to the segment/repairer to calculate the maximum number of excess nodes, based on the optimal threshold, that repaired pieces can be uploaded. This new parameter has been added for not returning more nodes than the number of upload orders for data repair satellite service calculate for repairing pieces. * pkg/storage/ec: Update log message in clien.Repair * satellite: Update configuration lock file
2019-07-11 23:44:47 +01:00
ec.log.Sugar().Infof("Starting a timer for %s for repairing %s up to %d nodes to try to have a number of pieces around the successful threshold (%d)",
timeout, path, nonNilCount(limits), rs.OptimalThreshold())
2019-03-19 13:14:59 +00:00
var successfulCount int32
2019-03-19 13:14:59 +00:00
timer := time.AfterFunc(timeout, func() {
if ctx.Err() != context.Canceled {
ec.log.Sugar().Infof("Timer expired. Successfully repaired %s to %d nodes. Canceling the long tail...", path, atomic.LoadInt32(&successfulCount))
2019-03-19 13:14:59 +00:00
cancel()
}
})
successfulNodes = make([]*pb.Node, len(limits))
successfulHashes = make([]*pb.PieceHash, len(limits))
2019-03-19 13:14:59 +00:00
for range limits {
info := <-infos
if limits[info.i] == nil {
continue
}
if info.err != nil {
ec.log.Sugar().Debugf("Repair %s to storage node %s failed: %v", path, limits[info.i].GetLimit().StorageNodeId, info.err)
2019-03-19 13:14:59 +00:00
continue
}
successfulNodes[info.i] = &pb.Node{
Id: limits[info.i].GetLimit().StorageNodeId,
Address: limits[info.i].GetStorageNodeAddress(),
}
successfulHashes[info.i] = info.hash
atomic.AddInt32(&successfulCount, 1)
2019-03-19 13:14:59 +00:00
}
// Ensure timer is stopped
_ = timer.Stop()
2019-03-19 13:14:59 +00:00
// TODO: clean up the partially uploaded segment's pieces
defer func() {
select {
case <-ctx.Done():
err = errs.Combine(
Error.New("repair cancelled"),
// ec.Delete(context.Background(), nodes, pieceID, pba.SatelliteId), //TODO
)
default:
}
}()
if atomic.LoadInt32(&successfulCount) == 0 {
return nil, nil, Error.New("repair %v to all nodes failed", path)
}
ec.log.Sugar().Infof("Successfully repaired %s to %d nodes.", path, atomic.LoadInt32(&successfulCount))
2019-05-20 15:18:16 +01:00
2019-03-19 13:14:59 +00:00
return successfulNodes, successfulHashes, nil
}
func (ec *ecClient) putPiece(ctx, parent context.Context, limit *pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, data io.ReadCloser, expiration time.Time) (hash *pb.PieceHash, err error) {
nodeName := "nil"
if limit != nil {
nodeName = limit.GetLimit().StorageNodeId.String()[0:8]
}
defer mon.Task()(&ctx, "node: "+nodeName)(&err)
defer func() { err = errs.Combine(err, data.Close()) }()
if limit == nil {
2019-03-19 13:14:59 +00:00
_, _ = io.Copy(ioutil.Discard, data)
return nil, nil
}
storageNodeID := limit.GetLimit().StorageNodeId
pieceID := limit.GetLimit().PieceId
ps, err := ec.dialPiecestore(ctx, &pb.Node{
Id: storageNodeID,
Address: limit.GetStorageNodeAddress(),
})
if err != nil {
ec.log.Sugar().Debugf("Failed dialing for putting piece %s to node %s: %v", pieceID, storageNodeID, err)
return nil, err
}
defer func() { err = errs.Combine(err, ps.Close()) }()
upload, err := ps.Upload(ctx, limit.GetLimit(), privateKey)
if err != nil {
ec.log.Sugar().Debugf("Failed requesting upload of piece %s to node %s: %v", pieceID, storageNodeID, err)
return nil, err
}
defer func() {
if ctx.Err() != nil || err != nil {
hash = nil
err = errs.Combine(err, upload.Cancel(ctx))
return
}
h, closeErr := upload.Commit(ctx)
hash = h
err = errs.Combine(err, closeErr)
}()
_, err = sync2.Copy(ctx, upload, data)
// Canceled context means the piece upload was interrupted by user or due
// to slow connection. No error logging for this case.
if ctx.Err() == context.Canceled {
if parent.Err() == context.Canceled {
ec.log.Sugar().Infof("Upload to node %s canceled by user.", storageNodeID)
} else {
ec.log.Sugar().Debugf("Node %s cut from upload due to slow connection.", storageNodeID)
}
err = context.Canceled
} else if err != nil {
nodeAddress := "nil"
if limit.GetStorageNodeAddress() != nil {
nodeAddress = limit.GetStorageNodeAddress().GetAddress()
}
ec.log.Sugar().Debugf("Failed uploading piece %s to node %s (%+v): %v", pieceID, storageNodeID, nodeAddress, err)
}
return hash, err
}
func (ec *ecClient) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, es eestream.ErasureScheme, size int64) (rr ranger.Ranger, err error) {
defer mon.Task()(&ctx)(&err)
if len(limits) != es.TotalCount() {
return nil, Error.New("size of limits slice (%d) does not match total count (%d) of erasure scheme", len(limits), es.TotalCount())
}
if nonNilCount(limits) < es.RequiredCount() {
return nil, Error.New("number of non-nil limits (%d) is less than required count (%d) of erasure scheme", nonNilCount(limits), es.RequiredCount())
}
paddedSize := calcPadded(size, es.StripeSize())
pieceSize := paddedSize / int64(es.RequiredCount())
rrs := map[int]ranger.Ranger{}
for i, addressedLimit := range limits {
if addressedLimit == nil {
continue
}
rrs[i] = &lazyPieceRanger{
dialPiecestore: ec.dialPiecestore,
limit: addressedLimit,
privateKey: privateKey,
size: pieceSize,
}
}
rr, err = eestream.Decode(rrs, es, ec.memoryLimit, ec.forceErrorDetection)
if err != nil {
return nil, err
}
return eestream.Unpad(rr, int(paddedSize-size))
}
func (ec *ecClient) Delete(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey) (err error) {
defer mon.Task()(&ctx)(&err)
errch := make(chan error, len(limits))
for _, addressedLimit := range limits {
if addressedLimit == nil {
2019-01-29 20:42:27 +00:00
errch <- nil
continue
}
go func(addressedLimit *pb.AddressedOrderLimit) {
limit := addressedLimit.GetLimit()
ps, err := ec.dialPiecestore(ctx, &pb.Node{
Id: limit.StorageNodeId,
Address: addressedLimit.GetStorageNodeAddress(),
})
if err != nil {
ec.log.Sugar().Errorf("Failed dialing for deleting piece %s from node %s: %v", limit.PieceId, limit.StorageNodeId, err)
2019-01-29 20:42:27 +00:00
errch <- err
return
}
err = ps.Delete(ctx, limit, privateKey)
2019-01-29 20:42:27 +00:00
err = errs.Combine(err, ps.Close())
if err != nil {
ec.log.Sugar().Errorf("Failed deleting piece %s from node %s: %v", limit.PieceId, limit.StorageNodeId, err)
}
2019-01-29 20:42:27 +00:00
errch <- err
}(addressedLimit)
}
allerrs := collectErrors(errch, len(limits))
if len(allerrs) > 0 && len(allerrs) == len(limits) {
return allerrs[0]
}
return nil
}
func collectErrors(errs <-chan error, size int) []error {
var result []error
for i := 0; i < size; i++ {
err := <-errs
if err != nil {
result = append(result, err)
}
}
return result
}
func unique(limits []*pb.AddressedOrderLimit) bool {
if len(limits) < 2 {
return true
}
ids := make(storj.NodeIDList, len(limits))
for i, addressedLimit := range limits {
if addressedLimit != nil {
ids[i] = addressedLimit.GetLimit().StorageNodeId
}
}
// sort the ids and check for identical neighbors
sort.Sort(ids)
// sort.Slice(ids, func(i, k int) bool { return ids[i].Less(ids[k]) })
for i := 1; i < len(ids); i++ {
if ids[i] != (storj.NodeID{}) && ids[i] == ids[i-1] {
return false
}
}
return true
}
func calcPadded(size int64, blockSize int) int64 {
mod := size % int64(blockSize)
if mod == 0 {
return size
}
return size + int64(blockSize) - mod
}
type lazyPieceRanger struct {
dialPiecestore dialPiecestoreFunc
limit *pb.AddressedOrderLimit
privateKey storj.PiecePrivateKey
size int64
}
// Size implements Ranger.Size
func (lr *lazyPieceRanger) Size() int64 {
return lr.size
}
// Range implements Ranger.Range to be lazily connected
func (lr *lazyPieceRanger) Range(ctx context.Context, offset, length int64) (_ io.ReadCloser, err error) {
defer mon.Task()(&ctx)(&err)
ps, err := lr.dialPiecestore(ctx, &pb.Node{
Id: lr.limit.GetLimit().StorageNodeId,
Address: lr.limit.GetStorageNodeAddress(),
})
if err != nil {
return nil, err
}
download, err := ps.Download(ctx, lr.limit.GetLimit(), lr.privateKey, offset, length)
if err != nil {
return nil, errs.Combine(err, ps.Close())
}
return &clientCloser{download, ps}, nil
}
type clientCloser struct {
piecestore.Downloader
client *piecestore.Client
}
func (client *clientCloser) Close() error {
return errs.Combine(
client.Downloader.Close(),
client.client.Close(),
)
}
func nonNilCount(limits []*pb.AddressedOrderLimit) int {
total := 0
for _, limit := range limits {
if limit != nil {
total++
}
}
return total
}