Fix repair order limit creation (#1650)
* fix repair order limit creation (use piece size instead of share size) * fix counter in CreateGetRepairOrderLimits
This commit is contained in:
parent
9af4f26d43
commit
6028d8c3de
@ -392,9 +392,12 @@ func (service *Service) CreateAuditOrderLimits(ctx context.Context, auditor *ide
|
|||||||
// CreateGetRepairOrderLimits creates the order limits for downloading the healthy pieces of pointer as the source for repair.
|
// CreateGetRepairOrderLimits creates the order limits for downloading the healthy pieces of pointer as the source for repair.
|
||||||
func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, repairer *identity.PeerIdentity, bucketID []byte, pointer *pb.Pointer, healthy []*pb.RemotePiece) (_ []*pb.AddressedOrderLimit, err error) {
|
func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, repairer *identity.PeerIdentity, bucketID []byte, pointer *pb.Pointer, healthy []*pb.RemotePiece) (_ []*pb.AddressedOrderLimit, err error) {
|
||||||
rootPieceID := pointer.GetRemote().RootPieceId
|
rootPieceID := pointer.GetRemote().RootPieceId
|
||||||
redundancy := pointer.GetRemote().GetRedundancy()
|
redundancy, err := eestream.NewRedundancyStrategyFromProto(pointer.GetRemote().GetRedundancy())
|
||||||
shareSize := redundancy.GetErasureShareSize()
|
if err != nil {
|
||||||
totalPieces := redundancy.GetTotal()
|
return nil, Error.Wrap(err)
|
||||||
|
}
|
||||||
|
pieceSize := eestream.CalcPieceSize(pointer.GetSegmentSize(), redundancy)
|
||||||
|
totalPieces := redundancy.TotalCount()
|
||||||
expiration := pointer.ExpirationDate
|
expiration := pointer.ExpirationDate
|
||||||
|
|
||||||
// convert orderExpiration from duration to timestamp
|
// convert orderExpiration from duration to timestamp
|
||||||
@ -410,7 +413,7 @@ func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, repairer
|
|||||||
}
|
}
|
||||||
|
|
||||||
var combinedErrs error
|
var combinedErrs error
|
||||||
var limitsCount int32
|
var limitsCount int
|
||||||
limits := make([]*pb.AddressedOrderLimit, totalPieces)
|
limits := make([]*pb.AddressedOrderLimit, totalPieces)
|
||||||
for _, piece := range healthy {
|
for _, piece := range healthy {
|
||||||
node, err := service.cache.Get(ctx, piece.NodeId)
|
node, err := service.cache.Get(ctx, piece.NodeId)
|
||||||
@ -437,7 +440,7 @@ func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, repairer
|
|||||||
StorageNodeId: piece.NodeId,
|
StorageNodeId: piece.NodeId,
|
||||||
PieceId: rootPieceID.Derive(piece.NodeId),
|
PieceId: rootPieceID.Derive(piece.NodeId),
|
||||||
Action: pb.PieceAction_GET_REPAIR,
|
Action: pb.PieceAction_GET_REPAIR,
|
||||||
Limit: int64(shareSize),
|
Limit: pieceSize,
|
||||||
PieceExpiration: expiration,
|
PieceExpiration: expiration,
|
||||||
OrderExpiration: orderExpiration,
|
OrderExpiration: orderExpiration,
|
||||||
})
|
})
|
||||||
@ -449,10 +452,11 @@ func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, repairer
|
|||||||
Limit: orderLimit,
|
Limit: orderLimit,
|
||||||
StorageNodeAddress: node.Address,
|
StorageNodeAddress: node.Address,
|
||||||
}
|
}
|
||||||
|
limitsCount++
|
||||||
}
|
}
|
||||||
|
|
||||||
if limitsCount < redundancy.GetMinReq() {
|
if limitsCount < redundancy.RequiredCount() {
|
||||||
err = Error.New("not enough nodes available: got %d, required %d", limitsCount, redundancy.GetMinReq())
|
err = Error.New("not enough nodes available: got %d, required %d", limitsCount, redundancy.RequiredCount())
|
||||||
return nil, errs.Combine(err, combinedErrs)
|
return nil, errs.Combine(err, combinedErrs)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -471,8 +475,12 @@ func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, repairer
|
|||||||
// CreatePutRepairOrderLimits creates the order limits for uploading the repaired pieces of pointer to newNodes.
|
// CreatePutRepairOrderLimits creates the order limits for uploading the repaired pieces of pointer to newNodes.
|
||||||
func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, repairer *identity.PeerIdentity, bucketID []byte, pointer *pb.Pointer, getOrderLimits []*pb.AddressedOrderLimit, newNodes []*pb.Node) (_ []*pb.AddressedOrderLimit, err error) {
|
func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, repairer *identity.PeerIdentity, bucketID []byte, pointer *pb.Pointer, getOrderLimits []*pb.AddressedOrderLimit, newNodes []*pb.Node) (_ []*pb.AddressedOrderLimit, err error) {
|
||||||
rootPieceID := pointer.GetRemote().RootPieceId
|
rootPieceID := pointer.GetRemote().RootPieceId
|
||||||
shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize()
|
redundancy, err := eestream.NewRedundancyStrategyFromProto(pointer.GetRemote().GetRedundancy())
|
||||||
totalPieces := pointer.GetRemote().GetRedundancy().GetTotal()
|
if err != nil {
|
||||||
|
return nil, Error.Wrap(err)
|
||||||
|
}
|
||||||
|
pieceSize := eestream.CalcPieceSize(pointer.GetSegmentSize(), redundancy)
|
||||||
|
totalPieces := redundancy.TotalCount()
|
||||||
expiration := pointer.ExpirationDate
|
expiration := pointer.ExpirationDate
|
||||||
|
|
||||||
// convert orderExpiration from duration to timestamp
|
// convert orderExpiration from duration to timestamp
|
||||||
@ -488,7 +496,7 @@ func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, repairer
|
|||||||
}
|
}
|
||||||
|
|
||||||
limits := make([]*pb.AddressedOrderLimit, totalPieces)
|
limits := make([]*pb.AddressedOrderLimit, totalPieces)
|
||||||
var pieceNum int32
|
var pieceNum int
|
||||||
for _, node := range newNodes {
|
for _, node := range newNodes {
|
||||||
if node != nil {
|
if node != nil {
|
||||||
node.Type.DPanicOnInvalid("order service put repair order limits")
|
node.Type.DPanicOnInvalid("order service put repair order limits")
|
||||||
@ -509,7 +517,7 @@ func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, repairer
|
|||||||
StorageNodeId: node.Id,
|
StorageNodeId: node.Id,
|
||||||
PieceId: rootPieceID.Derive(node.Id),
|
PieceId: rootPieceID.Derive(node.Id),
|
||||||
Action: pb.PieceAction_PUT_REPAIR,
|
Action: pb.PieceAction_PUT_REPAIR,
|
||||||
Limit: int64(shareSize),
|
Limit: pieceSize,
|
||||||
PieceExpiration: expiration,
|
PieceExpiration: expiration,
|
||||||
OrderExpiration: orderExpiration,
|
OrderExpiration: orderExpiration,
|
||||||
})
|
})
|
||||||
|
Loading…
Reference in New Issue
Block a user