satellite/repair: repair is configurable to work only on included/excluded placements

This patch finishes the placement aware repair.

We already introduced the parameters to select only the jobs for specific placements, the remaining part is just to configure the exclude/include rules. + a full e2e unit test.

Change-Id: I223ba84e8ab7481a53e5a444596c7a5ae51573c5
This commit is contained in:
Márton Elek 2023-09-27 15:13:13 +02:00 committed by Storj Robot
parent 63645205c0
commit 58b98bc335
5 changed files with 260 additions and 8 deletions

2
go.mod
View File

@ -47,6 +47,7 @@ require (
github.com/zeebo/errs v1.3.0
github.com/zeebo/errs/v2 v2.0.3
github.com/zeebo/ini v0.0.0-20210514163846-cc8fbd8d9599
github.com/zeebo/structs v1.0.3-0.20230601144555-f2db46069602
github.com/zyedidia/generic v1.2.1
go.etcd.io/bbolt v1.3.5
go.uber.org/zap v1.16.0
@ -127,7 +128,6 @@ require (
github.com/zeebo/float16 v0.1.0 // indirect
github.com/zeebo/incenc v0.0.0-20180505221441-0d92902eec54 // indirect
github.com/zeebo/mwc v0.0.4 // indirect
github.com/zeebo/structs v1.0.3-0.20230601144555-f2db46069602 // indirect
go.opencensus.io v0.24.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect

View File

@ -5,14 +5,19 @@ package repairer
import (
"context"
"fmt"
"strconv"
"strings"
"time"
"github.com/spacemonkeygo/monkit/v3"
"github.com/spf13/pflag"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/semaphore"
"storj.io/common/memory"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/storj/satellite/repair/queue"
)
@ -39,8 +44,49 @@ type Config struct {
RepairExcludedCountryCodes []string `help:"list of country codes to treat node from this country as offline" default:"" hidden:"true"`
DoDeclumping bool `help:"repair pieces on the same network to other nodes" default:"true"`
DoPlacementCheck bool `help:"repair pieces out of segment placement" default:"true"`
IncludedPlacements PlacementList `help:"comma separated placement IDs (numbers), which should checked by the repairer (other placements are ignored)" default:""`
ExcludedPlacements PlacementList `help:"comma separated placement IDs (numbers), placements which should be ignored by the repairer" default:""`
}
// PlacementList is a configurable, comma separated list of PlacementConstraint IDs.
type PlacementList struct {
Placements []storj.PlacementConstraint
}
// String implements pflag.Value.
func (p *PlacementList) String() string {
var s []string
for _, pl := range p.Placements {
s = append(s, fmt.Sprintf("%d", pl))
}
return strings.Join(s, ",")
}
// Set implements pflag.Value.
func (p *PlacementList) Set(s string) error {
parts := strings.Split(s, ",")
for _, pNumStr := range parts {
pNumStr = strings.TrimSpace(pNumStr)
if pNumStr == "" {
continue
}
pNum, err := strconv.Atoi(pNumStr)
if err != nil {
return errs.New("Placement list should contain numbers: %s", s)
}
p.Placements = append(p.Placements, storj.PlacementConstraint(pNum))
}
return nil
}
// Type implements pflag.Value.
func (p PlacementList) Type() string {
return "placement-list"
}
var _ pflag.Value = &PlacementList{}
// Service contains the information needed to run the repair service.
//
// architecture: Worker
@ -138,7 +184,7 @@ func (service *Service) process(ctx context.Context) (err error) {
// return from service.Run when queue fetch fails.
ctx, cancel := context.WithTimeout(ctx, service.config.TotalTimeout)
seg, err := service.queue.Select(ctx, nil, nil)
seg, err := service.queue.Select(ctx, service.config.IncludedPlacements.Placements, service.config.ExcludedPlacements.Placements)
if err != nil {
service.JobLimiter.Release(1)
cancel()
@ -151,7 +197,6 @@ func (service *Service) process(ctx context.Context) (err error) {
go func() {
defer service.JobLimiter.Release(1)
defer cancel()
if err := service.worker(ctx, seg); err != nil {
service.log.Error("repair worker failed:", zap.Error(err))
}

View File

@ -0,0 +1,28 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package repairer
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/zeebo/structs"
"storj.io/common/storj"
)
func TestPlacementList(t *testing.T) {
pl := Config{}
decode := structs.Decode(map[string]string{
"excluded-placements": "1,3,5,6",
}, &pl)
require.NoError(t, decode.Error)
require.Len(t, decode.Broken, 0)
require.Len(t, decode.Missing, 0)
require.Len(t, decode.Used, 1)
require.Equal(t, []storj.PlacementConstraint{1, 3, 5, 6}, pl.ExcludedPlacements.Placements)
}

View File

@ -29,6 +29,7 @@ import (
"storj.io/storj/satellite/nodeselection"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/repair/queue"
"storj.io/storj/satellite/repair/repairer"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/contact"
)
@ -160,11 +161,10 @@ func TestSegmentRepairWithNodeTags(t *testing.T) {
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.Combine(
func(log *zap.Logger, index int, config *satellite.Config) {
placementRules := overlay.ConfigurablePlacementRule{}
tag := fmt.Sprintf("tag(\"%s\",\"selected\",\"true\")", satelliteIdentity.ID())
err := placementRules.Set(fmt.Sprintf("0:exclude(%s);10:%s", tag, tag))
require.NoError(t, err)
config.Placement = placementRules
tag := fmt.Sprintf(`tag("%s","selected","true")`, satelliteIdentity.ID())
config.Placement = overlay.ConfigurablePlacementRule{
PlacementRules: fmt.Sprintf("0:exclude(%s);10:%s", tag, tag),
}
},
func(log *zap.Logger, index int, config *satellite.Config) {
@ -508,3 +508,176 @@ func updateNodeStatus(ctx context.Context, satellite *testplanet.Satellite, node
CountryCode: countryCode,
}, timestamp, satellite.Config.Overlay.Node)
}
// this test creates two keys with two different placement (technically both are PL country restrictions, but different ID)
// when both are placed to wrong nodes (nodes are moved to wrong country), only one of them will be repaired, as repairer
// is configured to include only that placement constraint.
func TestSegmentRepairPlacementRestrictions(t *testing.T) {
placement := overlay.ConfigurablePlacementRule{}
err := placement.Set(`1:country("PL");2:country("PL")`)
require.NoError(t, err)
piecesCount := 4
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 8, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.Combine(
testplanet.ReconfigureRS(1, 1, piecesCount, piecesCount),
func(log *zap.Logger, index int, config *satellite.Config) {
config.Repairer.DoDeclumping = false
config.Placement = placement
config.Repairer.IncludedPlacements = repairer.PlacementList{
Placements: []storj.PlacementConstraint{1},
}
// only on-demand execution
config.RangedLoop.Interval = 10 * time.Hour
},
),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
placement, err := planet.Satellites[0].Config.Placement.Parse()
require.NoError(t, err)
{
require.NoError(t, planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], "testbucket1"))
_, err := planet.Satellites[0].API.Buckets.Service.UpdateBucket(ctx, buckets.Bucket{
ProjectID: planet.Uplinks[0].Projects[0].ID,
Name: "testbucket1",
Placement: storj.PlacementConstraint(1),
})
require.NoError(t, err)
require.NoError(t, planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], "testbucket2"))
_, err = planet.Satellites[0].API.Buckets.Service.UpdateBucket(ctx, buckets.Bucket{
ProjectID: planet.Uplinks[0].Projects[0].ID,
Name: "testbucket2",
Placement: storj.PlacementConstraint(2),
})
require.NoError(t, err)
}
goodLocation := location.Poland
badLocation := location.Germany
{
// both upload will use only the first 4 nodes, as we have the right nodes there
for ix, node := range planet.StorageNodes {
l := goodLocation
if ix > 3 {
l = badLocation
}
require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), l.String()))
}
require.NoError(t, planet.Satellites[0].Repairer.Overlay.UploadSelectionCache.Refresh(ctx))
require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx))
}
expectedData := testrand.Bytes(5 * memory.KiB)
{
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket1", "object", expectedData)
require.NoError(t, err)
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket2", "object", expectedData)
require.NoError(t, err)
}
{
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
require.NoError(t, err)
require.Len(t, segments, 2)
require.Len(t, segments[0].Pieces, piecesCount)
// confirm that pieces are at the good place
for i := 0; i < 2; i++ {
ok, err := allPiecesInPlacement(ctx, planet.Satellites[0].Overlay.Service, segments[i].Pieces, segments[i].Placement, placement.CreateFilters)
require.NoError(t, err)
require.True(t, ok)
}
}
{
// time to move the current nodes out of the right country
for ix, node := range planet.StorageNodes {
l := goodLocation
if ix < 4 {
l = badLocation
}
require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), l.String()))
}
require.NoError(t, planet.Satellites[0].Repairer.Overlay.UploadSelectionCache.Refresh(ctx))
}
{
// confirm that there are out of placement pieces
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
require.NoError(t, err)
require.Len(t, segments, 2)
require.Len(t, segments[0].Pieces, piecesCount)
ok, err := allPiecesInPlacement(ctx, planet.Satellites[0].Overlay.Service, segments[0].Pieces, segments[0].Placement, placement.CreateFilters)
require.NoError(t, err)
require.False(t, ok)
}
{
// hey repair-checker, do you see any problems?
planet.Satellites[0].RangedLoop.RangedLoop.Service.Loop.TriggerWait()
// we should see both segments in repair queue
n, err := planet.Satellites[0].DB.RepairQueue().SelectN(ctx, 10)
require.NoError(t, err)
require.Len(t, n, 2)
}
{
// this should repair only one segment (where placement=1)
planet.Satellites[0].Repairer.Repairer.Loop.TriggerWait()
planet.Satellites[0].Repairer.Repairer.WaitForPendingRepairs()
// one of the segments are repaired
n, err := planet.Satellites[0].DB.RepairQueue().SelectN(ctx, 10)
require.NoError(t, err)
require.Len(t, n, 1)
// segment no2 is still in the repair queue
require.Equal(t, storj.PlacementConstraint(2), n[0].Placement)
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
require.NoError(t, err)
require.Len(t, segments, 2)
require.Len(t, segments[0].Pieces, piecesCount)
require.NotEqual(t, segments[0].Placement, segments[1].Placement)
for _, segment := range segments {
ok, err := allPiecesInPlacement(ctx, planet.Satellites[0].Overlay.Service, segment.Pieces, segment.Placement, placement.CreateFilters)
require.NoError(t, err)
if segment.Placement == 1 {
require.True(t, ok, "Segment is at wrong place %s", segment.StreamID)
} else {
require.False(t, ok)
}
}
}
// download is still working
{
// this is repaired
data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], "testbucket1", "object")
require.NoError(t, err)
require.Equal(t, expectedData, data)
// this is not repaired, wrong nodes are filtered out during download --> error
_, err = planet.Uplinks[0].Download(ctx, planet.Satellites[0], "testbucket2", "object")
require.Error(t, err)
}
})
}

View File

@ -982,9 +982,15 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
# time limit for downloading pieces from a node for repair
# repairer.download-timeout: 5m0s
# comma separated placement IDs (numbers), placements which should be ignored by the repairer
# repairer.excluded-placements: ""
# whether to download pieces for repair in memory (true) or download to disk (false)
# repairer.in-memory-repair: false
# comma separated placement IDs (numbers), which should checked by the repairer (other placements are ignored)
# repairer.included-placements: ""
# how frequently repairer should try and repair more data
# repairer.interval: 5m0s