satellite/repair: Update repair override config to support multiple RS schemes.

Rather than having a single repair override value, we will now support
repair override values based on a particular segment's RS scheme.

The new format for RS override values is
"k/o/n-override,k/o/n-override..."

Change-Id: Ieb422638446ef3a9357d59b2d279ee941367604d
This commit is contained in:
Moby von Briesen 2020-10-27 14:26:46 -04:00 committed by Maximillian von Briesen
parent 55d5e1fd7d
commit 575f50df84
7 changed files with 368 additions and 34 deletions

View File

@ -30,18 +30,6 @@ var (
mon = monkit.Package() mon = monkit.Package()
) )
// Config contains configurable values for checker.
type Config struct {
Interval time.Duration `help:"how frequently checker should check for bad segments" releaseDefault:"30s" devDefault:"0h0m10s"`
IrreparableInterval time.Duration `help:"how frequently irrepairable checker should check for lost pieces" releaseDefault:"30m" devDefault:"0h0m5s"`
ReliabilityCacheStaleness time.Duration `help:"how stale reliable node cache can be" releaseDefault:"5m" devDefault:"5m"`
RepairOverride int `help:"override value for repair threshold" releaseDefault:"52" devDefault:"0"`
// Node failure rate is an estimation based on a 6 hour checker run interval (4 checker iterations per day), a network of about 9200 nodes, and about 2 nodes churning per day.
// This results in `2/9200/4 = 0.00005435` being the probability of any single node going down in the interval of one checker iteration.
NodeFailureRate float64 `help:"the probability of a single node going down within the next checker iteration" default:"0.00005435"`
}
// durabilityStats remote segment information. // durabilityStats remote segment information.
type durabilityStats struct { type durabilityStats struct {
objectsChecked int64 objectsChecked int64
@ -65,7 +53,7 @@ type Checker struct {
metainfo *metainfo.Service metainfo *metainfo.Service
metaLoop *metainfo.Loop metaLoop *metainfo.Loop
nodestate *ReliabilityCache nodestate *ReliabilityCache
repairOverride int32 repairOverrides RepairOverridesMap
nodeFailureRate float64 nodeFailureRate float64
Loop *sync2.Cycle Loop *sync2.Cycle
IrreparableLoop *sync2.Cycle IrreparableLoop *sync2.Cycle
@ -81,7 +69,7 @@ func NewChecker(logger *zap.Logger, repairQueue queue.RepairQueue, irrdb irrepar
metainfo: metainfo, metainfo: metainfo,
metaLoop: metaLoop, metaLoop: metaLoop,
nodestate: NewReliabilityCache(overlay, config.ReliabilityCacheStaleness), nodestate: NewReliabilityCache(overlay, config.ReliabilityCacheStaleness),
repairOverride: int32(config.RepairOverride), repairOverrides: config.RepairOverrides.GetMap(),
nodeFailureRate: config.NodeFailureRate, nodeFailureRate: config.NodeFailureRate,
Loop: sync2.NewCycle(config.Interval), Loop: sync2.NewCycle(config.Interval),
@ -128,7 +116,7 @@ func (checker *Checker) IdentifyInjuredSegments(ctx context.Context) (err error)
irrdb: checker.irrdb, irrdb: checker.irrdb,
nodestate: checker.nodestate, nodestate: checker.nodestate,
monStats: durabilityStats{}, monStats: durabilityStats{},
overrideRepair: checker.repairOverride, repairOverrides: checker.repairOverrides,
nodeFailureRate: checker.nodeFailureRate, nodeFailureRate: checker.nodeFailureRate,
log: checker.logger, log: checker.logger,
} }
@ -201,8 +189,9 @@ func (checker *Checker) updateIrreparableSegmentStatus(ctx context.Context, poin
redundancy := pointer.Remote.Redundancy redundancy := pointer.Remote.Redundancy
repairThreshold := redundancy.RepairThreshold repairThreshold := redundancy.RepairThreshold
if checker.repairOverride != 0 { overrideValue := checker.repairOverrides.GetOverrideValuePB(redundancy)
repairThreshold = checker.repairOverride if overrideValue != 0 {
repairThreshold = overrideValue
} }
// we repair when the number of healthy pieces is less than or equal to the repair threshold and is greater or equal to // we repair when the number of healthy pieces is less than or equal to the repair threshold and is greater or equal to
@ -262,7 +251,7 @@ type checkerObserver struct {
irrdb irreparable.DB irrdb irreparable.DB
nodestate *ReliabilityCache nodestate *ReliabilityCache
monStats durabilityStats monStats durabilityStats
overrideRepair int32 repairOverrides RepairOverridesMap
nodeFailureRate float64 nodeFailureRate float64
log *zap.Logger log *zap.Logger
} }
@ -308,12 +297,14 @@ func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metainfo
segmentAge := time.Since(segment.CreationDate) segmentAge := time.Since(segment.CreationDate)
mon.IntVal("checker_segment_age").Observe(int64(segmentAge.Seconds())) //mon:locked mon.IntVal("checker_segment_age").Observe(int64(segmentAge.Seconds())) //mon:locked
required := int(segment.Redundancy.RequiredShares) redundancy := segment.Redundancy
repairThreshold := int(segment.Redundancy.RepairShares) required := int(redundancy.RequiredShares)
if obs.overrideRepair != 0 { repairThreshold := int(redundancy.RepairShares)
repairThreshold = int(obs.overrideRepair) overrideValue := obs.repairOverrides.GetOverrideValue(redundancy)
if overrideValue != 0 {
repairThreshold = int(overrideValue)
} }
successThreshold := int(segment.Redundancy.OptimalShares) successThreshold := int(redundancy.OptimalShares)
segmentHealth := repair.SegmentHealth(numHealthy, required, obs.nodeFailureRate) segmentHealth := repair.SegmentHealth(numHealthy, required, obs.nodeFailureRate)
mon.FloatVal("checker_segment_health").Observe(segmentHealth) //mon:locked mon.FloatVal("checker_segment_health").Observe(segmentHealth) //mon:locked

View File

@ -0,0 +1,173 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package checker
import (
"fmt"
"strconv"
"strings"
"time"
"storj.io/common/pb"
"storj.io/common/storj"
)
// Config contains configurable values for checker.
type Config struct {
Interval time.Duration `help:"how frequently checker should check for bad segments" releaseDefault:"30s" devDefault:"0h0m10s"`
IrreparableInterval time.Duration `help:"how frequently irrepairable checker should check for lost pieces" releaseDefault:"30m" devDefault:"0h0m5s"`
ReliabilityCacheStaleness time.Duration `help:"how stale reliable node cache can be" releaseDefault:"5m" devDefault:"5m"`
RepairOverrides RepairOverrides `help:"comma-separated override values for repair threshold in the format k/o/n-override (min/optimal/total-override)" releaseDefault:"29/80/110-52,29/80/95-52" devDefault:""`
// Node failure rate is an estimation based on a 6 hour checker run interval (4 checker iterations per day), a network of about 9200 nodes, and about 2 nodes churning per day.
// This results in `2/9200/4 = 0.00005435` being the probability of any single node going down in the interval of one checker iteration.
NodeFailureRate float64 `help:"the probability of a single node going down within the next checker iteration" default:"0.00005435"`
}
// RepairOverride is a configuration struct that contains an override repair
// value for a given RS k/o/n (min/success/total).
//
// Can be used as a flag.
type RepairOverride struct {
Min int
Success int
Total int
Override int32
}
// Type implements pflag.Value.
func (RepairOverride) Type() string { return "checker.RepairOverride" }
// String is required for pflag.Value.
func (ro *RepairOverride) String() string {
return fmt.Sprintf("%d/%d/%d-%d",
ro.Min,
ro.Success,
ro.Total,
ro.Override)
}
// Set sets the value from a string in the format k/o/n-override (min/optimal/total-repairOverride).
func (ro *RepairOverride) Set(s string) error {
// Split on dash. Expect two items. First item is RS numbers. Second item is Override.
info := strings.Split(s, "-")
if len(info) != 2 {
return Error.New("Invalid default repair override config (expect format k/o/n-override, got %s)", s)
}
rsNumbersString := info[0]
overrideString := info[1]
// Split on forward slash. Expect exactly three positive non-decreasing integers.
rsNumbers := strings.Split(rsNumbersString, "/")
if len(rsNumbers) != 3 {
return Error.New("Invalid default RS numbers (wrong size, expect 3): %s", rsNumbersString)
}
minValue := 1
values := []int{}
for _, nextValueString := range rsNumbers {
nextValue, err := strconv.Atoi(nextValueString)
if err != nil {
return Error.New("Invalid default RS numbers (should all be valid integers): %s, %w", rsNumbersString, err)
}
if nextValue < minValue {
return Error.New("Invalid default RS numbers (should be non-decreasing): %s", rsNumbersString)
}
values = append(values, nextValue)
minValue = nextValue
}
ro.Min = values[0]
ro.Success = values[1]
ro.Total = values[2]
// Attempt to parse "-override" part of config.
override, err := strconv.Atoi(overrideString)
if err != nil {
return Error.New("Invalid override value (should be valid integer): %s, %w", overrideString, err)
}
if override < ro.Min || override >= ro.Success {
return Error.New("Invalid override value (should meet criteria min <= override < success). Min: %d, Override: %d, Success: %d.", ro.Min, override, ro.Success)
}
ro.Override = int32(override)
return nil
}
// RepairOverrides is a configuration struct that contains a list of override repair
// values for various given RS combinations of k/o/n (min/success/total).
//
// Can be used as a flag.
type RepairOverrides struct {
List []RepairOverride
}
// Type implements pflag.Value.
func (RepairOverrides) Type() string { return "checker.RepairOverrides" }
// String is required for pflag.Value. It is a comma separated list of RepairOverride configs.
func (ros *RepairOverrides) String() string {
var s strings.Builder
for i, ro := range ros.List {
if i > 0 {
s.WriteString(",")
}
s.WriteString(ro.String())
}
return s.String()
}
// Set sets the value from a string in the format "k/o/n-override,k/o/n-override,...".
func (ros *RepairOverrides) Set(s string) error {
ros.List = nil
roStrings := strings.Split(s, ",")
for _, roString := range roStrings {
roString = strings.TrimSpace(roString)
if roString == "" {
continue
}
newRo := RepairOverride{}
err := newRo.Set(roString)
if err != nil {
return err
}
ros.List = append(ros.List, newRo)
}
return nil
}
// GetMap creates a RepairOverridesMap from the config.
func (ros *RepairOverrides) GetMap() RepairOverridesMap {
newMap := RepairOverridesMap{
overrideMap: make(map[string]int32),
}
for _, ro := range ros.List {
key := getRSKey(ro.Min, ro.Success, ro.Total)
newMap.overrideMap[key] = ro.Override
}
return newMap
}
// RepairOverridesMap is derived from the RepairOverrides config, and is used for quickly retrieving
// repair override values.
type RepairOverridesMap struct {
// map of "k/o/n" -> override value
overrideMap map[string]int32
}
// GetOverrideValuePB returns the override value for a pb RS scheme if it exists, or 0 otherwise.
func (rom *RepairOverridesMap) GetOverrideValuePB(rs *pb.RedundancyScheme) int32 {
key := getRSKey(int(rs.MinReq), int(rs.SuccessThreshold), int(rs.Total))
return rom.overrideMap[key]
}
// GetOverrideValue returns the override value for an RS scheme if it exists, or 0 otherwise.
func (rom *RepairOverridesMap) GetOverrideValue(rs storj.RedundancyScheme) int32 {
key := getRSKey(int(rs.RequiredShares), int(rs.OptimalShares), int(rs.TotalShares))
return rom.overrideMap[key]
}
func getRSKey(min, success, total int) string {
return fmt.Sprintf("%d/%d/%d", min, success, total)
}

View File

@ -0,0 +1,159 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package checker_test
import (
"testing"
"github.com/stretchr/testify/require"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/storj/satellite/repair/checker"
)
func TestRepairOverrideConfigValidation(t *testing.T) {
tests := []struct {
description string
overrideConfig string
expectError bool
size int
}{
{
description: "valid multi repair override config",
overrideConfig: "2/5/20-3,1/4/10-2",
expectError: false,
size: 2,
},
{
description: "valid single repair override config",
overrideConfig: "2/5/20-3",
expectError: false,
size: 1,
},
{
description: "invalid repair override config - numbers decrease",
overrideConfig: "1/5/4-3",
expectError: true,
},
{
description: "invalid repair override config - starts at 0",
overrideConfig: "0/5/6-3",
expectError: true,
},
{
description: "invalid repair override config - strings",
overrideConfig: "1/2/4-a",
expectError: true,
},
{
description: "invalid repair override config - strings",
overrideConfig: "1/b/4-3",
expectError: true,
},
{
description: "invalid repair override config - floating point numbers",
overrideConfig: "2/3.2/4-3",
expectError: true,
},
{
description: "invalid repair override config - floating point numbers",
overrideConfig: "1/5/6-3.2",
expectError: true,
},
{
description: "invalid repair override config - no override value",
overrideConfig: "1/2/4",
expectError: true,
},
{
description: "invalid repair override config - not enough rs numbers",
overrideConfig: "1/6-3",
expectError: true,
},
{
description: "invalid repair override config - override < min",
overrideConfig: "2/5/20-1",
expectError: true,
},
{
description: "invalid repair override config - override >= optimal",
overrideConfig: "2/5/20-5",
expectError: true,
},
{
description: "valid repair override config - empty items in multi value",
overrideConfig: ",2/5/20-4,,3/6/7-4",
expectError: false,
size: 2,
},
{
description: "valid repair override config - empty",
overrideConfig: "",
expectError: false,
size: 0,
},
}
for _, tt := range tests {
t.Log(tt.description)
newOverrides := checker.RepairOverrides{}
err := newOverrides.Set(tt.overrideConfig)
if tt.expectError {
require.Error(t, err)
} else {
require.NoError(t, err)
require.Len(t, newOverrides.List, tt.size)
}
}
}
func TestRepairOverride(t *testing.T) {
overrideConfig := "29/80/95-52,10/30/40-25"
newOverrides := checker.RepairOverrides{}
err := newOverrides.Set(overrideConfig)
require.NoError(t, err)
schemes := [][]int16{
{10, 20, 30, 40},
{29, 35, 80, 95},
{29, 60, 80, 95},
{2, 5, 10, 30},
}
storjSchemes := []storj.RedundancyScheme{}
pbSchemes := []*pb.RedundancyScheme{}
for _, scheme := range schemes {
newStorj := storj.RedundancyScheme{
RequiredShares: scheme[0],
RepairShares: scheme[1],
OptimalShares: scheme[2],
TotalShares: scheme[3],
}
storjSchemes = append(storjSchemes, newStorj)
newPB := &pb.RedundancyScheme{
MinReq: int32(scheme[0]),
RepairThreshold: int32(scheme[1]),
SuccessThreshold: int32(scheme[2]),
Total: int32(scheme[3]),
}
pbSchemes = append(pbSchemes, newPB)
}
ro := newOverrides.GetMap()
require.EqualValues(t, 25, ro.GetOverrideValue(storjSchemes[0]))
require.EqualValues(t, 25, ro.GetOverrideValuePB(pbSchemes[0]))
// second and third schemes should have the same override value (52) despite having a different repair threshold.
require.EqualValues(t, 52, ro.GetOverrideValue(storjSchemes[1]))
require.EqualValues(t, 52, ro.GetOverrideValuePB(pbSchemes[1]))
require.EqualValues(t, 52, ro.GetOverrideValue(storjSchemes[2]))
require.EqualValues(t, 52, ro.GetOverrideValuePB(pbSchemes[2]))
// fourth scheme has no matching override config.
require.EqualValues(t, 0, ro.GetOverrideValue(storjSchemes[3]))
require.EqualValues(t, 0, ro.GetOverrideValuePB(pbSchemes[3]))
}

View File

@ -22,6 +22,7 @@ import (
"storj.io/storj/satellite" "storj.io/storj/satellite"
"storj.io/storj/satellite/metainfo/metabase" "storj.io/storj/satellite/metainfo/metabase"
"storj.io/storj/satellite/overlay" "storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/repair/checker"
"storj.io/storj/storage" "storj.io/storj/storage"
) )
@ -915,7 +916,11 @@ func testDataRepairOverrideHigherLimit(t *testing.T, inMemoryRepair bool) {
Satellite: testplanet.Combine( Satellite: testplanet.Combine(
func(log *zap.Logger, index int, config *satellite.Config) { func(log *zap.Logger, index int, config *satellite.Config) {
config.Repairer.InMemoryRepair = inMemoryRepair config.Repairer.InMemoryRepair = inMemoryRepair
config.Checker.RepairOverride = repairOverride config.Checker.RepairOverrides = checker.RepairOverrides{
List: []checker.RepairOverride{
{Min: 3, Success: 9, Total: 9, Override: repairOverride},
},
}
}, },
testplanet.ReconfigureRS(3, 4, 9, 9), testplanet.ReconfigureRS(3, 4, 9, 9),
), ),
@ -1007,7 +1012,11 @@ func testDataRepairOverrideLowerLimit(t *testing.T, inMemoryRepair bool) {
Satellite: testplanet.Combine( Satellite: testplanet.Combine(
func(log *zap.Logger, index int, config *satellite.Config) { func(log *zap.Logger, index int, config *satellite.Config) {
config.Repairer.InMemoryRepair = inMemoryRepair config.Repairer.InMemoryRepair = inMemoryRepair
config.Checker.RepairOverride = repairOverride config.Checker.RepairOverrides = checker.RepairOverrides{
List: []checker.RepairOverride{
{Min: 3, Success: 9, Total: 9, Override: repairOverride},
},
}
}, },
testplanet.ReconfigureRS(3, 6, 9, 9), testplanet.ReconfigureRS(3, 6, 9, 9),
), ),

View File

@ -20,6 +20,7 @@ import (
"storj.io/storj/satellite/metainfo/metabase" "storj.io/storj/satellite/metainfo/metabase"
"storj.io/storj/satellite/orders" "storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay" "storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/repair/checker"
"storj.io/uplink/private/eestream" "storj.io/uplink/private/eestream"
) )
@ -61,8 +62,8 @@ type SegmentRepairer struct {
// repaired pieces // repaired pieces
multiplierOptimalThreshold float64 multiplierOptimalThreshold float64
// repairOverride is the value handed over from the checker to override the Repair Threshold // repairOverrides is the set of values configured by the checker to override the repair threshold for various RS schemes.
repairOverride int repairOverrides checker.RepairOverridesMap
} }
// NewSegmentRepairer creates a new instance of SegmentRepairer. // NewSegmentRepairer creates a new instance of SegmentRepairer.
@ -73,7 +74,7 @@ type SegmentRepairer struct {
func NewSegmentRepairer( func NewSegmentRepairer(
log *zap.Logger, metainfo *metainfo.Service, orders *orders.Service, log *zap.Logger, metainfo *metainfo.Service, orders *orders.Service,
overlay *overlay.Service, dialer rpc.Dialer, timeout time.Duration, overlay *overlay.Service, dialer rpc.Dialer, timeout time.Duration,
excessOptimalThreshold float64, repairOverride int, excessOptimalThreshold float64, repairOverrides checker.RepairOverrides,
downloadTimeout time.Duration, inMemoryRepair bool, downloadTimeout time.Duration, inMemoryRepair bool,
satelliteSignee signing.Signee, satelliteSignee signing.Signee,
) *SegmentRepairer { ) *SegmentRepairer {
@ -90,7 +91,7 @@ func NewSegmentRepairer(
ec: NewECRepairer(log.Named("ec repairer"), dialer, satelliteSignee, downloadTimeout, inMemoryRepair), ec: NewECRepairer(log.Named("ec repairer"), dialer, satelliteSignee, downloadTimeout, inMemoryRepair),
timeout: timeout, timeout: timeout,
multiplierOptimalThreshold: 1 + excessOptimalThreshold, multiplierOptimalThreshold: 1 + excessOptimalThreshold,
repairOverride: repairOverride, repairOverrides: repairOverrides.GetMap(),
} }
} }
@ -155,8 +156,9 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, path storj.Path) (s
mon.Counter("repairer_segments_below_min_req").Inc(0) //mon:locked mon.Counter("repairer_segments_below_min_req").Inc(0) //mon:locked
repairThreshold := pointer.Remote.Redundancy.RepairThreshold repairThreshold := pointer.Remote.Redundancy.RepairThreshold
if repairer.repairOverride != 0 { overrideValue := repairer.repairOverrides.GetOverrideValuePB(pointer.Remote.Redundancy)
repairThreshold = int32(repairer.repairOverride) if overrideValue != 0 {
repairThreshold = overrideValue
} }
// repair not needed // repair not needed

View File

@ -177,7 +177,7 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity,
peer.Dialer, peer.Dialer,
config.Repairer.Timeout, config.Repairer.Timeout,
config.Repairer.MaxExcessRateOptimalThreshold, config.Repairer.MaxExcessRateOptimalThreshold,
config.Checker.RepairOverride, config.Checker.RepairOverrides,
config.Repairer.DownloadTimeout, config.Repairer.DownloadTimeout,
config.Repairer.InMemoryRepair, config.Repairer.InMemoryRepair,
signing.SigneeFromPeerIdentity(peer.Identity.PeerIdentity()), signing.SigneeFromPeerIdentity(peer.Identity.PeerIdentity()),

View File

@ -37,8 +37,8 @@
# how stale reliable node cache can be # how stale reliable node cache can be
# checker.reliability-cache-staleness: 5m0s # checker.reliability-cache-staleness: 5m0s
# override value for repair threshold # comma-separated override values for repair threshold in the format k/o/n-override (min/optimal/total-override)
# checker.repair-override: 52 # checker.repair-overrides: 29/80/110-52,29/80/95-52
# percent of held amount disposed to node after leaving withheld # percent of held amount disposed to node after leaving withheld
compensation.dispose-percent: 50 compensation.dispose-percent: 50