[V3-1927] Repairer uploads to max threshold instead of success… (#2423)
* pkg/datarepair: Add test to check num upload pieces Add a new test for ensuring the number of pieces that the repair process upload when a segment is injured. * satellite/orders: Don't create "put order limits" over total Repair must not create "put order limits" more than the total count. * pkg/datarepair: Update upload repair pieces test Update the test which checks the number of pieces which are uploaded during a repair for using the same excess over the success threshold value than the implementation. * satellites/orders: Limit repair put order for not being total Limit the number of put orders to be used by repair for only uploading pieces to a % excess over the successful threshold. * pkg/datarepair: Change DataRepair test to pass again Make some changes in the DataRepair test to make pass again after the repair upload repaired pieces only until a % excess over success threshold. Also update the steps description of the DataRepair test after it has been changed, to match on what's now, besides to leave it more generic for avoiding having to update it on minimal future refactorings. * satellite: Make repair excess optimal threshold configurable Add a new configuration parameter to the satellite for being able to configure the percentage excess over the optimal threshold, used for determining how many pieces should be repaired/uploaded, rather than having the value hard coded. * repairer: Add configurable param to segments/repairer Add a new parameters to the segment/repairer to calculate the maximum number of excess nodes, based on the optimal threshold, that repaired pieces can be uploaded. This new parameter has been added for not returning more nodes than the number of upload orders for data repair satellite service calculate for repairing pieces. * pkg/storage/ec: Update log message in clien.Repair * satellite: Update configuration lock file
This commit is contained in:
parent
8a9c63809b
commit
f420b29d35
@ -153,10 +153,11 @@ func (planet *Planet) newSatellites(count int) ([]*satellite.Peer, error) {
|
||||
ReliabilityCacheStaleness: 5 * time.Minute,
|
||||
},
|
||||
Repairer: repairer.Config{
|
||||
MaxRepair: 10,
|
||||
Interval: time.Hour,
|
||||
Timeout: 1 * time.Minute, // Repairs can take up to 10 seconds. Leaving room for outliers
|
||||
MaxBufferMem: 4 * memory.MiB,
|
||||
MaxRepair: 10,
|
||||
Interval: time.Hour,
|
||||
Timeout: 1 * time.Minute, // Repairs can take up to 10 seconds. Leaving room for outliers
|
||||
MaxBufferMem: 4 * memory.MiB,
|
||||
MaxExcessRateOptimalThreshold: 0.05,
|
||||
},
|
||||
Audit: audit.Config{
|
||||
MaxRetriesStatDB: 0,
|
||||
|
@ -4,6 +4,8 @@
|
||||
package datarepair_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
@ -21,20 +23,25 @@ import (
|
||||
)
|
||||
|
||||
// TestDataRepair does the following:
|
||||
// - Uploads test data to 7 nodes
|
||||
// - Kills 2 nodes and disqualifies 1
|
||||
// - Triggers data repair, which repairs the data from the remaining 4 nodes to additional 3 new nodes
|
||||
// - Shuts down the 4 nodes from which the data was repaired
|
||||
// - Now we have just the 3 new nodes to which the data was repaired
|
||||
// - Downloads the data from these 3 nodes (succeeds because 3 nodes are enough for download)
|
||||
// - Uploads test data
|
||||
// - Kills some nodes and disqualifies 1
|
||||
// - Triggers data repair, which repairs the data from the remaining nodes to
|
||||
// the numbers of nodes determined by the upload repair max threshold
|
||||
// - Shuts down several nodes, but keeping up a number equal to the minim
|
||||
// threshold
|
||||
// - Downloads the data from those left nodes and check that it's the same than
|
||||
// the uploaded one
|
||||
func TestDataRepair(t *testing.T) {
|
||||
var repairMaxExcessRateOptimalThreshold float64
|
||||
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1,
|
||||
StorageNodeCount: 12,
|
||||
StorageNodeCount: 14,
|
||||
UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
config.Overlay.Node.OnlineWindow = 0
|
||||
repairMaxExcessRateOptimalThreshold = config.Repairer.MaxExcessRateOptimalThreshold
|
||||
},
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
@ -50,47 +57,44 @@ func TestDataRepair(t *testing.T) {
|
||||
satellite.Repair.Checker.Loop.Pause()
|
||||
satellite.Repair.Repairer.Loop.Pause()
|
||||
|
||||
testData := testrand.Bytes(1 * memory.MiB)
|
||||
|
||||
var (
|
||||
testData = testrand.Bytes(8 * memory.KiB)
|
||||
minThreshold = 3
|
||||
successThreshold = 7
|
||||
)
|
||||
err := ul.UploadWithConfig(ctx, satellite, &uplink.RSConfig{
|
||||
MinThreshold: 3,
|
||||
MinThreshold: minThreshold,
|
||||
RepairThreshold: 5,
|
||||
SuccessThreshold: 7,
|
||||
MaxThreshold: 7,
|
||||
SuccessThreshold: successThreshold,
|
||||
MaxThreshold: 10,
|
||||
}, "testbucket", "test/path", testData)
|
||||
require.NoError(t, err)
|
||||
|
||||
// get a remote segment from metainfo
|
||||
metainfo := satellite.Metainfo.Service
|
||||
listResponse, _, err := metainfo.List(ctx, "", "", "", true, 0, 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
var path string
|
||||
var pointer *pb.Pointer
|
||||
for _, v := range listResponse {
|
||||
path = v.GetPath()
|
||||
pointer, err = metainfo.Get(ctx, path)
|
||||
require.NoError(t, err)
|
||||
if pointer.GetType() == pb.Pointer_REMOTE {
|
||||
break
|
||||
}
|
||||
}
|
||||
pointer, path := getRemoteSegment(t, ctx, satellite)
|
||||
|
||||
// calculate how many storagenodes to kill
|
||||
numStorageNodes := len(planet.StorageNodes)
|
||||
redundancy := pointer.GetRemote().GetRedundancy()
|
||||
remotePieces := pointer.GetRemote().GetRemotePieces()
|
||||
minReq := redundancy.GetMinReq()
|
||||
remotePieces := pointer.GetRemote().GetRemotePieces()
|
||||
numPieces := len(remotePieces)
|
||||
// disqualify one storage node
|
||||
toDisqualify := 1
|
||||
toKill := numPieces - (int(minReq+1) + toDisqualify)
|
||||
toKill := numPieces - toDisqualify - int(minReq+1)
|
||||
require.True(t, toKill >= 1)
|
||||
// we should have enough storage nodes to repair on
|
||||
require.True(t, (numStorageNodes-toKill-toDisqualify) >= numPieces)
|
||||
maxNumRepairedPieces := int(
|
||||
math.Ceil(
|
||||
float64(successThreshold) * (1 + repairMaxExcessRateOptimalThreshold),
|
||||
),
|
||||
)
|
||||
numStorageNodes := len(planet.StorageNodes)
|
||||
// Ensure that there are enough storage nodes to upload repaired segments
|
||||
require.Falsef(t,
|
||||
(numStorageNodes-toKill-toDisqualify) < maxNumRepairedPieces,
|
||||
"there is not enough available nodes for repairing: need= %d, have= %d",
|
||||
maxNumRepairedPieces, (numStorageNodes - toKill - toDisqualify),
|
||||
)
|
||||
|
||||
// kill nodes and track lost pieces
|
||||
var lostPieces []int32
|
||||
nodesToKill := make(map[storj.NodeID]bool)
|
||||
nodesToDisqualify := make(map[storj.NodeID]bool)
|
||||
nodesToKeepAlive := make(map[storj.NodeID]bool)
|
||||
@ -106,7 +110,6 @@ func TestDataRepair(t *testing.T) {
|
||||
continue
|
||||
}
|
||||
nodesToKill[piece.NodeId] = true
|
||||
lostPieces = append(lostPieces, piece.GetPieceNum())
|
||||
}
|
||||
|
||||
for _, node := range planet.StorageNodes {
|
||||
@ -130,14 +133,22 @@ func TestDataRepair(t *testing.T) {
|
||||
satellite.Repair.Repairer.Loop.Pause()
|
||||
satellite.Repair.Repairer.Limiter.Wait()
|
||||
|
||||
// kill nodes kept alive to ensure repair worked
|
||||
for _, node := range planet.StorageNodes {
|
||||
if nodesToKeepAlive[node.ID()] {
|
||||
err = planet.StopPeer(node)
|
||||
require.NoError(t, err)
|
||||
// repaired segment should not contain any piece in the killed and DQ nodes
|
||||
metainfo := satellite.Metainfo.Service
|
||||
pointer, err = metainfo.Get(ctx, path)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = satellite.Overlay.Service.UpdateUptime(ctx, node.ID(), false)
|
||||
require.NoError(t, err)
|
||||
nodesToKillForMinThreshold := len(remotePieces) - minThreshold
|
||||
remotePieces = pointer.GetRemote().GetRemotePieces()
|
||||
for _, piece := range remotePieces {
|
||||
require.NotContains(t, nodesToKill, piece.NodeId, "there shouldn't be pieces in killed nodes")
|
||||
require.NotContains(t, nodesToDisqualify, piece.NodeId, "there shouldn't be pieces in DQ nodes")
|
||||
|
||||
// Kill the original nodes which were kept alive to ensure that we can
|
||||
// download from the new nodes that the repaired pieces have been uploaded
|
||||
if _, ok := nodesToKeepAlive[piece.NodeId]; ok && nodesToKillForMinThreshold > 0 {
|
||||
stopNodeByID(t, ctx, planet, piece.NodeId)
|
||||
nodesToKillForMinThreshold--
|
||||
}
|
||||
}
|
||||
|
||||
@ -145,16 +156,6 @@ func TestDataRepair(t *testing.T) {
|
||||
newData, err := ul.Download(ctx, satellite, "testbucket", "test/path")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, newData, testData)
|
||||
|
||||
// updated pointer should not contain any of the killed nodes
|
||||
pointer, err = metainfo.Get(ctx, path)
|
||||
require.NoError(t, err)
|
||||
|
||||
remotePieces = pointer.GetRemote().GetRemotePieces()
|
||||
for _, piece := range remotePieces {
|
||||
require.False(t, nodesToKill[piece.NodeId])
|
||||
require.False(t, nodesToDisqualify[piece.NodeId])
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@ -181,7 +182,7 @@ func TestRepairMultipleDisqualified(t *testing.T) {
|
||||
satellite.Repair.Checker.Loop.Pause()
|
||||
satellite.Repair.Repairer.Loop.Pause()
|
||||
|
||||
testData := testrand.Bytes(1 * memory.MiB)
|
||||
testData := testrand.Bytes(8 * memory.KiB)
|
||||
|
||||
err := ul.UploadWithConfig(ctx, satellite, &uplink.RSConfig{
|
||||
MinThreshold: 3,
|
||||
@ -271,6 +272,134 @@ func TestRepairMultipleDisqualified(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
// TestDataRepairUploadLimits does the following:
|
||||
// - Uploads test data to nodes
|
||||
// - Get one segment of that data to check in which nodes its pieces are stored
|
||||
// - Kills as many nodes as needed which store such segment pieces
|
||||
// - Triggers data repair
|
||||
// - Verify that the number of pieces which repaired has uploaded don't overpass
|
||||
// the established limit (success threshold + % of excess)
|
||||
func TestDataRepairUploadLimit(t *testing.T) {
|
||||
var repairMaxExcessRateOptimalThreshold float64
|
||||
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1,
|
||||
StorageNodeCount: 13,
|
||||
UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
repairMaxExcessRateOptimalThreshold = config.Repairer.MaxExcessRateOptimalThreshold
|
||||
},
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
satellite := planet.Satellites[0]
|
||||
// stop discovery service so that we do not get a race condition when we delete nodes from overlay cache
|
||||
satellite.Discovery.Service.Discovery.Stop()
|
||||
satellite.Discovery.Service.Refresh.Stop()
|
||||
// stop audit to prevent possible interactions i.e. repair timeout problems
|
||||
satellite.Audit.Service.Loop.Stop()
|
||||
satellite.Repair.Checker.Loop.Pause()
|
||||
satellite.Repair.Repairer.Loop.Pause()
|
||||
|
||||
const (
|
||||
repairThreshold = 5
|
||||
successThreshold = 7
|
||||
maxThreshold = 9
|
||||
)
|
||||
var (
|
||||
maxRepairUploadThreshold = int(
|
||||
math.Ceil(
|
||||
float64(successThreshold) * (1 + repairMaxExcessRateOptimalThreshold),
|
||||
),
|
||||
)
|
||||
ul = planet.Uplinks[0]
|
||||
testData = testrand.Bytes(8 * memory.KiB)
|
||||
)
|
||||
|
||||
err := ul.UploadWithConfig(ctx, satellite, &uplink.RSConfig{
|
||||
MinThreshold: 3,
|
||||
RepairThreshold: repairThreshold,
|
||||
SuccessThreshold: successThreshold,
|
||||
MaxThreshold: maxThreshold,
|
||||
}, "testbucket", "test/path", testData)
|
||||
require.NoError(t, err)
|
||||
|
||||
pointer, path := getRemoteSegment(t, ctx, satellite)
|
||||
originalPieces := pointer.GetRemote().GetRemotePieces()
|
||||
require.True(t, len(originalPieces) <= maxThreshold)
|
||||
|
||||
{ // Check that there is enough nodes in the network which don't contain
|
||||
// pieces of the segment for being able to repair the lost pieces
|
||||
availableNumNodes := len(planet.StorageNodes) - len(originalPieces)
|
||||
neededNodesForRepair := maxRepairUploadThreshold - repairThreshold
|
||||
require.Truef(t,
|
||||
availableNumNodes >= neededNodesForRepair,
|
||||
"Not enough remaining nodes in the network for repairing the pieces: have= %d, need= %d",
|
||||
availableNumNodes, neededNodesForRepair,
|
||||
)
|
||||
}
|
||||
|
||||
originalStorageNodes := make(map[storj.NodeID]struct{})
|
||||
for _, p := range originalPieces {
|
||||
originalStorageNodes[p.NodeId] = struct{}{}
|
||||
}
|
||||
|
||||
killedNodes := make(map[storj.NodeID]struct{})
|
||||
{ // Register nodes of the network which don't have pieces for the segment
|
||||
// to be injured and ill nodes which have pieces of the segment in order
|
||||
// to injure it
|
||||
numNodesToKill := len(originalPieces) - repairThreshold
|
||||
for _, node := range planet.StorageNodes {
|
||||
if _, ok := originalStorageNodes[node.ID()]; !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if len(killedNodes) < numNodesToKill {
|
||||
err = planet.StopPeer(node)
|
||||
require.NoError(t, err)
|
||||
_, err = satellite.Overlay.Service.UpdateUptime(ctx, node.ID(), false)
|
||||
require.NoError(t, err)
|
||||
|
||||
killedNodes[node.ID()] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
satellite.Repair.Checker.Loop.Restart()
|
||||
satellite.Repair.Checker.Loop.TriggerWait()
|
||||
satellite.Repair.Checker.Loop.Pause()
|
||||
satellite.Repair.Repairer.Loop.Restart()
|
||||
satellite.Repair.Repairer.Loop.TriggerWait()
|
||||
satellite.Repair.Repairer.Loop.Pause()
|
||||
satellite.Repair.Repairer.Limiter.Wait()
|
||||
|
||||
// Get the pointer after repair to check the nodes where the pieces are
|
||||
// stored
|
||||
pointer, err = satellite.Metainfo.Service.Get(ctx, path)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Check that repair has uploaded missed pieces to an expected number of
|
||||
// nodes
|
||||
afterRepairPieces := pointer.GetRemote().GetRemotePieces()
|
||||
require.Falsef(t,
|
||||
len(afterRepairPieces) > maxRepairUploadThreshold,
|
||||
"Repaired pieces cannot be over max repair upload threshold. maxRepairUploadThreshold= %d, have= %d",
|
||||
maxRepairUploadThreshold, len(afterRepairPieces),
|
||||
)
|
||||
require.Falsef(t,
|
||||
len(afterRepairPieces) < successThreshold,
|
||||
"Repaired pieces shouldn't be under success threshold. successThreshold= %d, have= %d",
|
||||
successThreshold, len(afterRepairPieces),
|
||||
)
|
||||
|
||||
// Check that after repair, the segment doesn't have more pieces on the
|
||||
// killed nodes
|
||||
for _, p := range afterRepairPieces {
|
||||
require.NotContains(t, killedNodes, p.NodeId, "there shouldn't be pieces in killed nodes")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func isDisqualified(t *testing.T, ctx *testcontext.Context, satellite *satellite.Peer, nodeID storj.NodeID) bool {
|
||||
node, err := satellite.Overlay.Service.Get(ctx, nodeID)
|
||||
require.NoError(t, err)
|
||||
@ -293,3 +422,47 @@ func disqualifyNode(t *testing.T, ctx *testcontext.Context, satellite *satellite
|
||||
require.NoError(t, err)
|
||||
require.True(t, isDisqualified(t, ctx, satellite, nodeID))
|
||||
}
|
||||
|
||||
// getRemoteSegment returns a remote pointer its path from satellite.
|
||||
// nolint:golint
|
||||
func getRemoteSegment(
|
||||
t *testing.T, ctx context.Context, satellite *satellite.Peer,
|
||||
) (_ *pb.Pointer, path string) {
|
||||
t.Helper()
|
||||
|
||||
// get a remote segment from metainfo
|
||||
metainfo := satellite.Metainfo.Service
|
||||
listResponse, _, err := metainfo.List(ctx, "", "", "", true, 0, 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, v := range listResponse {
|
||||
path := v.GetPath()
|
||||
pointer, err := metainfo.Get(ctx, path)
|
||||
require.NoError(t, err)
|
||||
if pointer.GetType() == pb.Pointer_REMOTE {
|
||||
return pointer, path
|
||||
}
|
||||
}
|
||||
|
||||
t.Fatal("satellite doesn't have any remote segment")
|
||||
return nil, ""
|
||||
}
|
||||
|
||||
// nolint:golint
|
||||
func stopNodeByID(t *testing.T, ctx context.Context, planet *testplanet.Planet, nodeID storj.NodeID) {
|
||||
t.Helper()
|
||||
|
||||
for _, node := range planet.StorageNodes {
|
||||
if node.ID() == nodeID {
|
||||
err := planet.StopPeer(node)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, sat := range planet.Satellites {
|
||||
_, err = sat.Overlay.Service.UpdateUptime(ctx, node.ID(), false)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -34,10 +34,11 @@ var (
|
||||
|
||||
// Config contains configurable values for repairer
|
||||
type Config struct {
|
||||
MaxRepair int `help:"maximum segments that can be repaired concurrently" releaseDefault:"5" devDefault:"1"`
|
||||
Interval time.Duration `help:"how frequently repairer should try and repair more data" releaseDefault:"1h" devDefault:"0h5m0s"`
|
||||
Timeout time.Duration `help:"time limit for uploading repaired pieces to new storage nodes" devDefault:"10m0s" releaseDefault:"2h"`
|
||||
MaxBufferMem memory.Size `help:"maximum buffer memory (in bytes) to be allocated for read buffers" default:"4M"`
|
||||
MaxRepair int `help:"maximum segments that can be repaired concurrently" releaseDefault:"5" devDefault:"1"`
|
||||
Interval time.Duration `help:"how frequently repairer should try and repair more data" releaseDefault:"1h" devDefault:"0h5m0s"`
|
||||
Timeout time.Duration `help:"time limit for uploading repaired pieces to new storage nodes" devDefault:"10m0s" releaseDefault:"2h"`
|
||||
MaxBufferMem memory.Size `help:"maximum buffer memory (in bytes) to be allocated for read buffers" default:"4M"`
|
||||
MaxExcessRateOptimalThreshold float64 `help:"ratio applied to the optimal threshold to calculate the excess of the maximum number of repaired pieces to upload" default:"0.05"`
|
||||
}
|
||||
|
||||
// GetSegmentRepairer creates a new segment repairer from storeConfig values
|
||||
@ -46,7 +47,9 @@ func (c Config) GetSegmentRepairer(ctx context.Context, log *zap.Logger, tc tran
|
||||
|
||||
ec := ecclient.NewClient(log.Named("ecclient"), tc, c.MaxBufferMem.Int())
|
||||
|
||||
return segments.NewSegmentRepairer(log.Named("repairer"), metainfo, orders, cache, ec, identity, c.Timeout), nil
|
||||
return segments.NewSegmentRepairer(
|
||||
log.Named("repairer"), metainfo, orders, cache, ec, identity, c.Timeout, c.MaxExcessRateOptimalThreshold,
|
||||
), nil
|
||||
}
|
||||
|
||||
// SegmentRepairer is a repairer for segments
|
||||
|
@ -193,7 +193,7 @@ func (ec *ecClient) Repair(ctx context.Context, limits []*pb.AddressedOrderLimit
|
||||
}(i, addressedLimit)
|
||||
}
|
||||
|
||||
ec.log.Sugar().Infof("Starting a timer for %s for repairing %s to %d nodes to reach at least the success threshold (%d nodes)...",
|
||||
ec.log.Sugar().Infof("Starting a timer for %s for repairing %s up to %d nodes to try to have a number of pieces around the successful threshold (%d)",
|
||||
timeout, path, nonNilCount(limits), rs.OptimalThreshold())
|
||||
|
||||
var successfulCount int32
|
||||
|
@ -5,6 +5,7 @@ package segments
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
@ -29,18 +30,37 @@ type Repairer struct {
|
||||
ec ecclient.Client
|
||||
identity *identity.FullIdentity
|
||||
timeout time.Duration
|
||||
|
||||
// multiplierOptimalThreshold is the value that multiplied by the optimal
|
||||
// threshold results in the maximum limit of number of nodes to upload
|
||||
// repaired pieces
|
||||
multiplierOptimalThreshold float64
|
||||
}
|
||||
|
||||
// NewSegmentRepairer creates a new instance of SegmentRepairer
|
||||
func NewSegmentRepairer(log *zap.Logger, metainfo *metainfo.Service, orders *orders.Service, cache *overlay.Cache, ec ecclient.Client, identity *identity.FullIdentity, timeout time.Duration) *Repairer {
|
||||
// NewSegmentRepairer creates a new instance of SegmentRepairer.
|
||||
//
|
||||
// excessPercentageOptimalThreshold is the percentage to apply over the optimal
|
||||
// threshould to determine the maximum limit of nodes to upload repaired pieces,
|
||||
// when negative, 0 is applied.
|
||||
func NewSegmentRepairer(
|
||||
log *zap.Logger, metainfo *metainfo.Service, orders *orders.Service,
|
||||
cache *overlay.Cache, ec ecclient.Client, identity *identity.FullIdentity, timeout time.Duration,
|
||||
excessOptimalThreshold float64,
|
||||
) *Repairer {
|
||||
|
||||
if excessOptimalThreshold < 0 {
|
||||
excessOptimalThreshold = 0
|
||||
}
|
||||
|
||||
return &Repairer{
|
||||
log: log,
|
||||
metainfo: metainfo,
|
||||
orders: orders,
|
||||
cache: cache,
|
||||
ec: ec.WithForceErrorDetection(true),
|
||||
identity: identity,
|
||||
timeout: timeout,
|
||||
log: log,
|
||||
metainfo: metainfo,
|
||||
orders: orders,
|
||||
cache: cache,
|
||||
ec: ec.WithForceErrorDetection(true),
|
||||
identity: identity,
|
||||
timeout: timeout,
|
||||
multiplierOptimalThreshold: 1 + excessOptimalThreshold,
|
||||
}
|
||||
}
|
||||
|
||||
@ -122,9 +142,17 @@ func (repairer *Repairer) Repair(ctx context.Context, path storj.Path) (err erro
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
var requestCount int
|
||||
{
|
||||
totalNeeded := math.Ceil(float64(redundancy.OptimalThreshold()) *
|
||||
repairer.multiplierOptimalThreshold,
|
||||
)
|
||||
requestCount = int(totalNeeded) - len(healthyPieces)
|
||||
}
|
||||
|
||||
// Request Overlay for n-h new storage nodes
|
||||
request := overlay.FindStorageNodesRequest{
|
||||
RequestedCount: redundancy.TotalCount() - len(healthyPieces),
|
||||
RequestedCount: requestCount,
|
||||
FreeBandwidth: pieceSize,
|
||||
FreeDisk: pieceSize,
|
||||
ExcludedNodes: excludeNodeIDs,
|
||||
|
@ -6,6 +6,7 @@ package orders
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/skyrings/skyring-common/tools/uuid"
|
||||
@ -26,24 +27,29 @@ type Config struct {
|
||||
|
||||
// Service for creating order limits.
|
||||
type Service struct {
|
||||
log *zap.Logger
|
||||
satellite signing.Signer
|
||||
cache *overlay.Cache
|
||||
orders DB
|
||||
satelliteAddress *pb.NodeAddress
|
||||
|
||||
orderExpiration time.Duration
|
||||
log *zap.Logger
|
||||
satellite signing.Signer
|
||||
cache *overlay.Cache
|
||||
orders DB
|
||||
satelliteAddress *pb.NodeAddress
|
||||
orderExpiration time.Duration
|
||||
repairMaxExcessRateOptimalThreshold float64
|
||||
}
|
||||
|
||||
// NewService creates new service for creating order limits.
|
||||
func NewService(log *zap.Logger, satellite signing.Signer, cache *overlay.Cache, orders DB, orderExpiration time.Duration, satelliteAddress *pb.NodeAddress) *Service {
|
||||
func NewService(
|
||||
log *zap.Logger, satellite signing.Signer, cache *overlay.Cache,
|
||||
orders DB, orderExpiration time.Duration, satelliteAddress *pb.NodeAddress,
|
||||
repairMaxExcessRateOptimalThreshold float64,
|
||||
) *Service {
|
||||
return &Service{
|
||||
log: log,
|
||||
satellite: satellite,
|
||||
cache: cache,
|
||||
orders: orders,
|
||||
satelliteAddress: satelliteAddress,
|
||||
orderExpiration: orderExpiration,
|
||||
log: log,
|
||||
satellite: satellite,
|
||||
cache: cache,
|
||||
orders: orders,
|
||||
satelliteAddress: satelliteAddress,
|
||||
orderExpiration: orderExpiration,
|
||||
repairMaxExcessRateOptimalThreshold: repairMaxExcessRateOptimalThreshold,
|
||||
}
|
||||
}
|
||||
|
||||
@ -498,6 +504,11 @@ func (service *Service) CreateAuditOrderLimit(ctx context.Context, bucketID []by
|
||||
return limit, piecePrivateKey, nil
|
||||
}
|
||||
|
||||
// CreateGetRepairOrderLimits creates the order limits for downloading the
|
||||
// healthy pieces of pointer as the source for repair.
|
||||
//
|
||||
// The length of the returned orders slice is the total number of pieces of the
|
||||
// segment, setting to null the ones which don't correspond to a healthy piece.
|
||||
// CreateGetRepairOrderLimits creates the order limits for downloading the healthy pieces of pointer as the source for repair.
|
||||
func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer, healthy []*pb.RemotePiece) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
@ -594,16 +605,6 @@ func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, bucketID
|
||||
// CreatePutRepairOrderLimits creates the order limits for uploading the repaired pieces of pointer to newNodes.
|
||||
func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer, getOrderLimits []*pb.AddressedOrderLimit, newNodes []*pb.Node) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
rootPieceID := pointer.GetRemote().RootPieceId
|
||||
redundancy, err := eestream.NewRedundancyStrategyFromProto(pointer.GetRemote().GetRedundancy())
|
||||
if err != nil {
|
||||
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
||||
}
|
||||
|
||||
pieceSize := eestream.CalcPieceSize(pointer.GetSegmentSize(), redundancy)
|
||||
totalPieces := redundancy.TotalCount()
|
||||
pieceExpiration := pointer.ExpirationDate
|
||||
orderExpiration := time.Now().Add(service.orderExpiration)
|
||||
|
||||
piecePublicKey, piecePrivateKey, err := storj.NewPieceKey()
|
||||
@ -616,39 +617,75 @@ func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, bucketID
|
||||
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
||||
}
|
||||
|
||||
limits := make([]*pb.AddressedOrderLimit, totalPieces)
|
||||
var pieceNum int32
|
||||
for _, node := range newNodes {
|
||||
for int(pieceNum) < totalPieces && getOrderLimits[pieceNum] != nil {
|
||||
pieceNum++
|
||||
}
|
||||
|
||||
if int(pieceNum) >= totalPieces { // should not happen
|
||||
return nil, storj.PiecePrivateKey{}, Error.New("piece num greater than total pieces: %d >= %d", pieceNum, totalPieces)
|
||||
}
|
||||
|
||||
orderLimit, err := signing.SignOrderLimit(ctx, service.satellite, &pb.OrderLimit{
|
||||
SerialNumber: serialNumber,
|
||||
SatelliteId: service.satellite.ID(),
|
||||
SatelliteAddress: service.satelliteAddress,
|
||||
UplinkPublicKey: piecePublicKey,
|
||||
StorageNodeId: node.Id,
|
||||
PieceId: rootPieceID.Derive(node.Id, pieceNum),
|
||||
Action: pb.PieceAction_PUT_REPAIR,
|
||||
Limit: pieceSize,
|
||||
PieceExpiration: pieceExpiration,
|
||||
OrderCreation: time.Now(),
|
||||
OrderExpiration: orderExpiration,
|
||||
})
|
||||
var limits []*pb.AddressedOrderLimit
|
||||
{ // Create the order limits for being used to upload the repaired pieces
|
||||
redundancy, err := eestream.NewRedundancyStrategyFromProto(pointer.GetRemote().GetRedundancy())
|
||||
if err != nil {
|
||||
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
||||
}
|
||||
|
||||
limits[pieceNum] = &pb.AddressedOrderLimit{
|
||||
Limit: orderLimit,
|
||||
StorageNodeAddress: node.Address,
|
||||
totalPieces := redundancy.TotalCount()
|
||||
limits = make([]*pb.AddressedOrderLimit, totalPieces)
|
||||
|
||||
totalPiecesAfterRepair := int(
|
||||
math.Ceil(
|
||||
float64(redundancy.OptimalThreshold()) * (1 + service.repairMaxExcessRateOptimalThreshold),
|
||||
),
|
||||
)
|
||||
if totalPiecesAfterRepair > totalPieces {
|
||||
totalPiecesAfterRepair = totalPieces
|
||||
}
|
||||
|
||||
var numCurrentPieces int
|
||||
for _, o := range getOrderLimits {
|
||||
if o != nil {
|
||||
numCurrentPieces++
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
totalPiecesToRepair = totalPiecesAfterRepair - numCurrentPieces
|
||||
rootPieceID = pointer.GetRemote().RootPieceId
|
||||
pieceSize = eestream.CalcPieceSize(pointer.GetSegmentSize(), redundancy)
|
||||
pieceNum int32
|
||||
)
|
||||
for _, node := range newNodes {
|
||||
for int(pieceNum) < totalPieces && getOrderLimits[pieceNum] != nil {
|
||||
pieceNum++
|
||||
}
|
||||
|
||||
if int(pieceNum) >= totalPieces { // should not happen
|
||||
return nil, storj.PiecePrivateKey{}, Error.New("piece num greater than total pieces: %d >= %d", pieceNum, totalPieces)
|
||||
}
|
||||
|
||||
orderLimit, err := signing.SignOrderLimit(ctx, service.satellite, &pb.OrderLimit{
|
||||
SerialNumber: serialNumber,
|
||||
SatelliteId: service.satellite.ID(),
|
||||
SatelliteAddress: service.satelliteAddress,
|
||||
UplinkPublicKey: piecePublicKey,
|
||||
StorageNodeId: node.Id,
|
||||
PieceId: rootPieceID.Derive(node.Id, pieceNum),
|
||||
Action: pb.PieceAction_PUT_REPAIR,
|
||||
Limit: pieceSize,
|
||||
PieceExpiration: pointer.ExpirationDate,
|
||||
OrderCreation: time.Now(),
|
||||
OrderExpiration: orderExpiration,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
||||
}
|
||||
|
||||
limits[pieceNum] = &pb.AddressedOrderLimit{
|
||||
Limit: orderLimit,
|
||||
StorageNodeAddress: node.Address,
|
||||
}
|
||||
pieceNum++
|
||||
totalPiecesToRepair--
|
||||
|
||||
if totalPiecesToRepair == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
pieceNum++
|
||||
}
|
||||
|
||||
err = service.saveSerial(ctx, serialNumber, bucketID, orderExpiration)
|
||||
|
@ -390,6 +390,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config, ve
|
||||
Transport: pb.NodeTransport_TCP_TLS_GRPC,
|
||||
Address: config.Kademlia.ExternalAddress,
|
||||
},
|
||||
config.Repairer.MaxExcessRateOptimalThreshold,
|
||||
)
|
||||
pb.RegisterOrdersServer(peer.Server.GRPC(), peer.Orders.Endpoint)
|
||||
}
|
||||
|
3
scripts/testdata/satellite-config.yaml.lock
vendored
3
scripts/testdata/satellite-config.yaml.lock
vendored
@ -289,6 +289,9 @@ kademlia.operator.wallet: ""
|
||||
# maximum buffer memory (in bytes) to be allocated for read buffers
|
||||
# repairer.max-buffer-mem: 4.0 MB
|
||||
|
||||
# ratio applied to the optimal threshold to calculate the excess of the maximum number of repaired pieces to upload
|
||||
# repairer.max-excess-rate-optimal-threshold: 0.05
|
||||
|
||||
# maximum segments that can be repaired concurrently
|
||||
# repairer.max-repair: 5
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user