satellite/repair, uplink/ecclient: remove unused expiration arg from ec.Repair and ec.putPiece (#3416)
This commit is contained in:
parent
3ee0b89f8f
commit
5453886231
@ -239,7 +239,7 @@ func verifyOrderLimitSignature(ctx context.Context, satellite signing.Signee, li
|
|||||||
|
|
||||||
// Repair takes a provided segment, encodes it with the provided redundancy strategy,
|
// Repair takes a provided segment, encodes it with the provided redundancy strategy,
|
||||||
// and uploads the pieces in need of repair to new nodes provided by order limits.
|
// and uploads the pieces in need of repair to new nodes provided by order limits.
|
||||||
func (ec *ECRepairer) Repair(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, rs eestream.RedundancyStrategy, data io.Reader, expiration time.Time, timeout time.Duration, path storj.Path) (successfulNodes []*pb.Node, successfulHashes []*pb.PieceHash, err error) {
|
func (ec *ECRepairer) Repair(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, rs eestream.RedundancyStrategy, data io.Reader, timeout time.Duration, path storj.Path) (successfulNodes []*pb.Node, successfulHashes []*pb.PieceHash, err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
pieceCount := len(limits)
|
pieceCount := len(limits)
|
||||||
@ -270,7 +270,7 @@ func (ec *ECRepairer) Repair(ctx context.Context, limits []*pb.AddressedOrderLim
|
|||||||
|
|
||||||
for i, addressedLimit := range limits {
|
for i, addressedLimit := range limits {
|
||||||
go func(i int, addressedLimit *pb.AddressedOrderLimit) {
|
go func(i int, addressedLimit *pb.AddressedOrderLimit) {
|
||||||
hash, err := ec.putPiece(psCtx, ctx, addressedLimit, privateKey, readers[i], expiration, path)
|
hash, err := ec.putPiece(psCtx, ctx, addressedLimit, privateKey, readers[i], path)
|
||||||
infos <- info{i: i, err: err, hash: hash}
|
infos <- info{i: i, err: err, hash: hash}
|
||||||
}(i, addressedLimit)
|
}(i, addressedLimit)
|
||||||
}
|
}
|
||||||
@ -353,7 +353,7 @@ func (ec *ECRepairer) Repair(ctx context.Context, limits []*pb.AddressedOrderLim
|
|||||||
return successfulNodes, successfulHashes, nil
|
return successfulNodes, successfulHashes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ec *ECRepairer) putPiece(ctx, parent context.Context, limit *pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, data io.ReadCloser, expiration time.Time, path storj.Path) (hash *pb.PieceHash, err error) {
|
func (ec *ECRepairer) putPiece(ctx, parent context.Context, limit *pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, data io.ReadCloser, path storj.Path) (hash *pb.PieceHash, err error) {
|
||||||
nodeName := "nil"
|
nodeName := "nil"
|
||||||
if limit != nil {
|
if limit != nil {
|
||||||
nodeName = limit.GetLimit().StorageNodeId.String()[0:8]
|
nodeName = limit.GetLimit().StorageNodeId.String()[0:8]
|
||||||
|
@ -100,7 +100,6 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (s
|
|||||||
}
|
}
|
||||||
|
|
||||||
pieceSize := eestream.CalcPieceSize(pointer.GetSegmentSize(), redundancy)
|
pieceSize := eestream.CalcPieceSize(pointer.GetSegmentSize(), redundancy)
|
||||||
expiration := pointer.GetExpirationDate()
|
|
||||||
|
|
||||||
var excludeNodeIDs storj.NodeIDList
|
var excludeNodeIDs storj.NodeIDList
|
||||||
var healthyPieces, unhealthyPieces []*pb.RemotePiece
|
var healthyPieces, unhealthyPieces []*pb.RemotePiece
|
||||||
@ -208,7 +207,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (s
|
|||||||
defer func() { err = errs.Combine(err, segmentReader.Close()) }()
|
defer func() { err = errs.Combine(err, segmentReader.Close()) }()
|
||||||
|
|
||||||
// Upload the repaired pieces
|
// Upload the repaired pieces
|
||||||
successfulNodes, hashes, err := repairer.ec.Repair(ctx, putLimits, putPrivateKey, redundancy, segmentReader, expiration, repairer.timeout, path)
|
successfulNodes, hashes, err := repairer.ec.Repair(ctx, putLimits, putPrivateKey, redundancy, segmentReader, repairer.timeout, path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, Error.Wrap(err)
|
return false, Error.Wrap(err)
|
||||||
}
|
}
|
||||||
|
@ -160,8 +160,7 @@ func (worker *Worker) transferPiece(ctx context.Context, transferPiece *pb.Trans
|
|||||||
putCtx, cancel := context.WithCancel(ctx)
|
putCtx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
// TODO what's the typical expiration setting?
|
pieceHash, peerID, err := worker.ecclient.PutPiece(putCtx, ctx, addrLimit, pk, reader)
|
||||||
pieceHash, peerID, err := worker.ecclient.PutPiece(putCtx, ctx, addrLimit, pk, reader, time.Now().Add(time.Second*600))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if piecestore.ErrVerifyUntrusted.Has(err) {
|
if piecestore.ErrVerifyUntrusted.Has(err) {
|
||||||
worker.log.Error("failed hash verification.", zap.Stringer("satellite ID", worker.satelliteID), zap.Stringer("piece ID", pieceID), zap.Error(errs.Wrap(err)))
|
worker.log.Error("failed hash verification.", zap.Stringer("satellite ID", worker.satelliteID), zap.Stringer("piece ID", pieceID), zap.Error(errs.Wrap(err)))
|
||||||
|
@ -35,7 +35,7 @@ type Client interface {
|
|||||||
Delete(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey) error
|
Delete(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey) error
|
||||||
WithForceErrorDetection(force bool) Client
|
WithForceErrorDetection(force bool) Client
|
||||||
// PutPiece is not intended to be used by normal uplinks directly, but is exported to support storagenode graceful exit transfers.
|
// PutPiece is not intended to be used by normal uplinks directly, but is exported to support storagenode graceful exit transfers.
|
||||||
PutPiece(ctx, parent context.Context, limit *pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, data io.ReadCloser, expiration time.Time) (hash *pb.PieceHash, id *identity.PeerIdentity, err error)
|
PutPiece(ctx, parent context.Context, limit *pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, data io.ReadCloser) (hash *pb.PieceHash, id *identity.PeerIdentity, err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type dialPiecestoreFunc func(context.Context, *pb.Node) (*piecestore.Client, error)
|
type dialPiecestoreFunc func(context.Context, *pb.Node) (*piecestore.Client, error)
|
||||||
@ -108,7 +108,7 @@ func (ec *ecClient) Put(ctx context.Context, limits []*pb.AddressedOrderLimit, p
|
|||||||
|
|
||||||
for i, addressedLimit := range limits {
|
for i, addressedLimit := range limits {
|
||||||
go func(i int, addressedLimit *pb.AddressedOrderLimit) {
|
go func(i int, addressedLimit *pb.AddressedOrderLimit) {
|
||||||
hash, _, err := ec.PutPiece(psCtx, ctx, addressedLimit, privateKey, readers[i], expiration)
|
hash, _, err := ec.PutPiece(psCtx, ctx, addressedLimit, privateKey, readers[i])
|
||||||
infos <- info{i: i, err: err, hash: hash}
|
infos <- info{i: i, err: err, hash: hash}
|
||||||
}(i, addressedLimit)
|
}(i, addressedLimit)
|
||||||
}
|
}
|
||||||
@ -178,7 +178,7 @@ func (ec *ecClient) Put(ctx context.Context, limits []*pb.AddressedOrderLimit, p
|
|||||||
return successfulNodes, successfulHashes, nil
|
return successfulNodes, successfulHashes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ec *ecClient) PutPiece(ctx, parent context.Context, limit *pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, data io.ReadCloser, expiration time.Time) (hash *pb.PieceHash, peerID *identity.PeerIdentity, err error) {
|
func (ec *ecClient) PutPiece(ctx, parent context.Context, limit *pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, data io.ReadCloser) (hash *pb.PieceHash, peerID *identity.PeerIdentity, err error) {
|
||||||
nodeName := "nil"
|
nodeName := "nil"
|
||||||
if limit != nil {
|
if limit != nil {
|
||||||
nodeName = limit.GetLimit().StorageNodeId.String()[0:8]
|
nodeName = limit.GetLimit().StorageNodeId.String()[0:8]
|
||||||
|
Loading…
Reference in New Issue
Block a user