Use RepairThreshold naming consistently (#385)
This commit is contained in:
parent
550858c5a7
commit
2019803c5e
@ -39,61 +39,61 @@ type ErasureScheme interface {
|
||||
RequiredCount() int
|
||||
}
|
||||
|
||||
// RedundancyStrategy is an ErasureScheme with a minimum and optimum thresholds
|
||||
// RedundancyStrategy is an ErasureScheme with a repair and optimal thresholds
|
||||
type RedundancyStrategy struct {
|
||||
ErasureScheme
|
||||
Min int
|
||||
Opt int
|
||||
repairThreshold int
|
||||
optimalThreshold int
|
||||
}
|
||||
|
||||
// NewRedundancyStrategy from the given ErasureScheme, minimum and optimum
|
||||
// thresholds
|
||||
// NewRedundancyStrategy from the given ErasureScheme, repair and optimal thresholds.
|
||||
//
|
||||
// Min is the minimum threshold. If set to 0, it will be reset to the
|
||||
// TotalCount of the ErasureScheme.
|
||||
// Opt is the optimum threshold. If set to 0, it will be reset to the
|
||||
// TotalCount of the ErasureScheme.
|
||||
func NewRedundancyStrategy(es ErasureScheme, Min, Opt int) (RedundancyStrategy, error) {
|
||||
if Min == 0 {
|
||||
Min = es.TotalCount()
|
||||
// repairThreshold is the minimum repair threshold.
|
||||
// If set to 0, it will be reset to the TotalCount of the ErasureScheme.
|
||||
// optimalThreshold is the optimal threshold.
|
||||
// If set to 0, it will be reset to the TotalCount of the ErasureScheme.
|
||||
func NewRedundancyStrategy(es ErasureScheme, repairThreshold, optimalThreshold int) (RedundancyStrategy, error) {
|
||||
if repairThreshold == 0 {
|
||||
repairThreshold = es.TotalCount()
|
||||
}
|
||||
if Opt == 0 {
|
||||
Opt = es.TotalCount()
|
||||
|
||||
if optimalThreshold == 0 {
|
||||
optimalThreshold = es.TotalCount()
|
||||
}
|
||||
if Min < 0 {
|
||||
return RedundancyStrategy{}, Error.New("negative minimum threshold")
|
||||
if repairThreshold < 0 {
|
||||
return RedundancyStrategy{}, Error.New("negative repair threshold")
|
||||
}
|
||||
if Min > 0 && Min < es.RequiredCount() {
|
||||
return RedundancyStrategy{}, Error.New("minimum threshold less than required count")
|
||||
if repairThreshold > 0 && repairThreshold < es.RequiredCount() {
|
||||
return RedundancyStrategy{}, Error.New("repair threshold less than required count")
|
||||
}
|
||||
if Min > es.TotalCount() {
|
||||
return RedundancyStrategy{}, Error.New("minimum threshold greater than total count")
|
||||
if repairThreshold > es.TotalCount() {
|
||||
return RedundancyStrategy{}, Error.New("repair threshold greater than total count")
|
||||
}
|
||||
if Opt < 0 {
|
||||
return RedundancyStrategy{}, Error.New("negative optimum threshold")
|
||||
if optimalThreshold < 0 {
|
||||
return RedundancyStrategy{}, Error.New("negative optimal threshold")
|
||||
}
|
||||
if Opt > 0 && Opt < es.RequiredCount() {
|
||||
return RedundancyStrategy{}, Error.New("optimum threshold less than required count")
|
||||
if optimalThreshold > 0 && optimalThreshold < es.RequiredCount() {
|
||||
return RedundancyStrategy{}, Error.New("optimal threshold less than required count")
|
||||
}
|
||||
if Opt > es.TotalCount() {
|
||||
return RedundancyStrategy{}, Error.New("optimum threshold greater than total count")
|
||||
if optimalThreshold > es.TotalCount() {
|
||||
return RedundancyStrategy{}, Error.New("optimal threshold greater than total count")
|
||||
}
|
||||
if Min > Opt {
|
||||
return RedundancyStrategy{}, Error.New("minimum threshold greater than optimum threshold")
|
||||
if repairThreshold > optimalThreshold {
|
||||
return RedundancyStrategy{}, Error.New("repair threshold greater than optimal threshold")
|
||||
}
|
||||
return RedundancyStrategy{ErasureScheme: es, Min: Min, Opt: Opt}, nil
|
||||
return RedundancyStrategy{ErasureScheme: es, repairThreshold: repairThreshold, optimalThreshold: optimalThreshold}, nil
|
||||
}
|
||||
|
||||
// MinimumThreshold is the number of available erasure pieces below which
|
||||
// RepairThreshold is the number of available erasure pieces below which
|
||||
// the data must be repaired to avoid loss
|
||||
func (rs *RedundancyStrategy) MinimumThreshold() int {
|
||||
return rs.Min
|
||||
func (rs *RedundancyStrategy) RepairThreshold() int {
|
||||
return rs.repairThreshold
|
||||
}
|
||||
|
||||
// OptimumThreshold is the number of available erasure pieces above which
|
||||
// OptimalThreshold is the number of available erasure pieces above which
|
||||
// there is no need for the data to be repaired
|
||||
func (rs *RedundancyStrategy) OptimumThreshold() int {
|
||||
return rs.Opt
|
||||
func (rs *RedundancyStrategy) OptimalThreshold() int {
|
||||
return rs.optimalThreshold
|
||||
}
|
||||
|
||||
type encodedReader struct {
|
||||
@ -121,11 +121,10 @@ type block struct {
|
||||
// mbm is the maximum memory (in bytes) to be allocated for read buffers. If
|
||||
// set to 0, the minimum possible memory will be used.
|
||||
//
|
||||
// When the minimum threshold is reached a timer will be started with another
|
||||
// When the repair threshold is reached a timer will be started with another
|
||||
// 1.5x the amount of time that took so far. The Readers will be aborted as
|
||||
// soon as the timer expires or the optimum threshold is reached.
|
||||
func EncodeReader(ctx context.Context, r io.Reader, rs RedundancyStrategy,
|
||||
mbm int) ([]io.Reader, error) {
|
||||
// soon as the timer expires or the optimal threshold is reached.
|
||||
func EncodeReader(ctx context.Context, r io.Reader, rs RedundancyStrategy, mbm int) ([]io.Reader, error) {
|
||||
if err := checkMBM(mbm); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -255,7 +254,7 @@ func (er *encodedReader) checkSlowChannel(num int) (closed bool) {
|
||||
// check if more than the required buffer channels are empty, i.e. the
|
||||
// current channel is slow and should be closed and its context should be
|
||||
// canceled
|
||||
closed = ec >= er.rs.MinimumThreshold()
|
||||
closed = ec >= er.rs.RepairThreshold()
|
||||
if closed {
|
||||
er.eps[num].closed = true
|
||||
close(er.eps[num].ch)
|
||||
@ -269,13 +268,13 @@ func (er *encodedReader) readerDone() {
|
||||
er.mux.Lock()
|
||||
defer er.mux.Unlock()
|
||||
er.done++
|
||||
if er.done == er.rs.MinimumThreshold() {
|
||||
// minimum threshold reached, wait for 1.5x the duration and cancel
|
||||
// the context regardless if optimum threshold is reached
|
||||
if er.done == er.rs.RepairThreshold() {
|
||||
// repair threshold reached, wait for 1.5x the duration and cancel
|
||||
// the context regardless if optimal threshold is reached
|
||||
time.AfterFunc(time.Since(er.start)*3/2, er.cancel)
|
||||
}
|
||||
if er.done == er.rs.OptimumThreshold() {
|
||||
// optimum threshold reached - cancel the context
|
||||
if er.done == er.rs.OptimalThreshold() {
|
||||
// optimal threshold reached - cancel the context
|
||||
er.cancel()
|
||||
}
|
||||
}
|
||||
@ -313,8 +312,8 @@ func (ep *encodedPiece) Read(p []byte) (n int, err error) {
|
||||
case <-ep.ctx.Done():
|
||||
// context was canceled due to:
|
||||
// - slowness
|
||||
// - optimum threshold reached
|
||||
// - timeout after reaching minimum threshold expired
|
||||
// - optimal threshold reached
|
||||
// - timeout after reaching repair threshold expired
|
||||
return 0, io.ErrUnexpectedEOF
|
||||
}
|
||||
}
|
||||
@ -338,7 +337,7 @@ type EncodedRanger struct {
|
||||
}
|
||||
|
||||
// NewEncodedRanger from the given Ranger and RedundancyStrategy. See the
|
||||
// comments for EncodeReader about the minimum and optimum thresholds, and the
|
||||
// comments for EncodeReader about the repair and optimal thresholds, and the
|
||||
// max buffer memory.
|
||||
func NewEncodedRanger(rr ranger.Ranger, rs RedundancyStrategy, mbm int) (*EncodedRanger, error) {
|
||||
if rr.Size()%int64(rs.DecodedBlockSize()) != 0 {
|
||||
|
@ -144,22 +144,22 @@ func TestRSRanger(t *testing.T) {
|
||||
|
||||
func TestNewRedundancyStrategy(t *testing.T) {
|
||||
for i, tt := range []struct {
|
||||
min int
|
||||
rep int
|
||||
opt int
|
||||
expMin int
|
||||
expRep int
|
||||
expOpt int
|
||||
errString string
|
||||
}{
|
||||
{0, 0, 4, 4, ""},
|
||||
{-1, 0, 0, 0, "eestream error: negative minimum threshold"},
|
||||
{1, 0, 0, 0, "eestream error: minimum threshold less than required count"},
|
||||
{5, 0, 0, 0, "eestream error: minimum threshold greater than total count"},
|
||||
{0, -1, 0, 0, "eestream error: negative optimum threshold"},
|
||||
{0, 1, 0, 0, "eestream error: optimum threshold less than required count"},
|
||||
{0, 5, 0, 0, "eestream error: optimum threshold greater than total count"},
|
||||
{-1, 0, 0, 0, "eestream error: negative repair threshold"},
|
||||
{1, 0, 0, 0, "eestream error: repair threshold less than required count"},
|
||||
{5, 0, 0, 0, "eestream error: repair threshold greater than total count"},
|
||||
{0, -1, 0, 0, "eestream error: negative optimal threshold"},
|
||||
{0, 1, 0, 0, "eestream error: optimal threshold less than required count"},
|
||||
{0, 5, 0, 0, "eestream error: optimal threshold greater than total count"},
|
||||
{3, 4, 3, 4, ""},
|
||||
{0, 3, 0, 0, "eestream error: minimum threshold greater than optimum threshold"},
|
||||
{4, 3, 0, 0, "eestream error: minimum threshold greater than optimum threshold"},
|
||||
{0, 3, 0, 0, "eestream error: repair threshold greater than optimal threshold"},
|
||||
{4, 3, 0, 0, "eestream error: repair threshold greater than optimal threshold"},
|
||||
{4, 4, 4, 4, ""},
|
||||
} {
|
||||
errTag := fmt.Sprintf("Test case #%d", i)
|
||||
@ -168,14 +168,14 @@ func TestNewRedundancyStrategy(t *testing.T) {
|
||||
continue
|
||||
}
|
||||
es := NewRSScheme(fc, 8*1024)
|
||||
rs, err := NewRedundancyStrategy(es, tt.min, tt.opt)
|
||||
rs, err := NewRedundancyStrategy(es, tt.rep, tt.opt)
|
||||
if tt.errString != "" {
|
||||
assert.EqualError(t, err, tt.errString, errTag)
|
||||
continue
|
||||
}
|
||||
assert.NoError(t, err, errTag)
|
||||
assert.Equal(t, tt.expMin, rs.MinimumThreshold(), errTag)
|
||||
assert.Equal(t, tt.expOpt, rs.OptimumThreshold(), errTag)
|
||||
assert.Equal(t, tt.expRep, rs.RepairThreshold(), errTag)
|
||||
assert.Equal(t, tt.expOpt, rs.OptimalThreshold(), errTag)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -143,9 +143,7 @@ func (c Config) GetBucketStore(ctx context.Context, identity *provider.FullIdent
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rs, err := eestream.NewRedundancyStrategy(
|
||||
eestream.NewRSScheme(fc, c.ErasureShareSize),
|
||||
c.RepairThreshold, c.SuccessThreshold)
|
||||
rs, err := eestream.NewRedundancyStrategy(eestream.NewRSScheme(fc, c.ErasureShareSize), c.RepairThreshold, c.SuccessThreshold)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -107,10 +107,10 @@ func (ec *ecClient) Put(ctx context.Context, nodes []*pb.Node, rs eestream.Redun
|
||||
}
|
||||
allerrs := collectErrors(errs, len(readers))
|
||||
sc := len(readers) - len(allerrs)
|
||||
if sc < rs.MinimumThreshold() {
|
||||
if sc < rs.RepairThreshold() {
|
||||
return Error.New(
|
||||
"successful puts (%d) less than minimum threshold (%d)",
|
||||
sc, rs.MinimumThreshold())
|
||||
"successful puts (%d) less than repair threshold (%d)",
|
||||
sc, rs.RepairThreshold())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -144,15 +144,15 @@ TestLoop:
|
||||
[]error{nil, nil, nil, nil}, ""},
|
||||
{[]*pb.Node{node0, node1, node2, node3}, 0, 0, false,
|
||||
[]error{nil, ErrDialFailed, nil, nil},
|
||||
"ecclient error: successful puts (3) less than minimum threshold (4)"},
|
||||
"ecclient error: successful puts (3) less than repair threshold (4)"},
|
||||
{[]*pb.Node{node0, node1, node2, node3}, 0, 0, false,
|
||||
[]error{nil, ErrOpFailed, nil, nil},
|
||||
"ecclient error: successful puts (3) less than minimum threshold (4)"},
|
||||
"ecclient error: successful puts (3) less than repair threshold (4)"},
|
||||
{[]*pb.Node{node0, node1, node2, node3}, 2, 0, false,
|
||||
[]error{nil, ErrDialFailed, nil, nil}, ""},
|
||||
{[]*pb.Node{node0, node1, node2, node3}, 2, 0, false,
|
||||
[]error{ErrOpFailed, ErrDialFailed, nil, ErrDialFailed},
|
||||
"ecclient error: successful puts (1) less than minimum threshold (2)"},
|
||||
"ecclient error: successful puts (1) less than repair threshold (2)"},
|
||||
} {
|
||||
errTag := fmt.Sprintf("Test case #%d", i)
|
||||
|
||||
|
@ -162,8 +162,8 @@ func (s *segmentStore) makeRemotePointer(nodes []*pb.Node, pieceID client.PieceI
|
||||
Type: pb.RedundancyScheme_RS,
|
||||
MinReq: int32(s.rs.RequiredCount()),
|
||||
Total: int32(s.rs.TotalCount()),
|
||||
RepairThreshold: int32(s.rs.Min),
|
||||
SuccessThreshold: int32(s.rs.Opt),
|
||||
RepairThreshold: int32(s.rs.RepairThreshold()),
|
||||
SuccessThreshold: int32(s.rs.OptimalThreshold()),
|
||||
ErasureShareSize: int32(s.rs.EncodedBlockSize()),
|
||||
},
|
||||
PieceId: string(pieceID),
|
||||
|
Loading…
Reference in New Issue
Block a user