satellite/cmd: segment-verify verifies segments in given bucket list

Provides the `segment-verify run buckets` command for verifying segments within a list of buckets.

Bucket list is a csv file with `project_id,bucket_name` to be checked.

https://github.com/storj/storj-private/issues/101

Change-Id: I3d25c27b56fcab4a6a1aebb6f87514d6c97de3ff
This commit is contained in:
Fadila Khadar 2022-12-03 19:12:05 +01:00 committed by Storj Robot
parent 021c98c17a
commit 995f78d579
4 changed files with 352 additions and 10 deletions

View File

@ -25,7 +25,11 @@ There are few parameters for controlling the verification itself:
``` ```
## Running the tool ## Running the tool
- by specifying range boundaries:
``` ```
segment-verify run range --low 00 --high ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff --config-dir ./satellite-config-dir segment-verify run range --low 00 --high ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff --config-dir ./satellite-config-dir
``` ```
- by specifying buckets to be checked:
```
segment-verify run buckets --buckets-csv bucket.csv
```

View File

@ -4,7 +4,12 @@
package main package main
import ( import (
"context"
"encoding/csv"
"encoding/hex" "encoding/hex"
"errors"
"os"
"strings"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/zeebo/errs" "github.com/zeebo/errs"
@ -46,7 +51,13 @@ var (
rangeCmd = &cobra.Command{ rangeCmd = &cobra.Command{
Use: "range", Use: "range",
Short: "runs the command on a range of segments", Short: "runs the command on a range of segments",
RunE: verifySegmentsRange, RunE: verifySegments,
}
bucketsCmd = &cobra.Command{
Use: "buckets",
Short: "runs the command on segments from specified buckets",
RunE: verifySegments,
} }
summarizeCmd = &cobra.Command{ summarizeCmd = &cobra.Command{
@ -58,6 +69,7 @@ var (
satelliteCfg Satellite satelliteCfg Satellite
rangeCfg RangeConfig rangeCfg RangeConfig
bucketsCfg BucketConfig
confDir string confDir string
identityDir string identityDir string
@ -73,10 +85,13 @@ func init() {
rootCmd.AddCommand(runCmd) rootCmd.AddCommand(runCmd)
rootCmd.AddCommand(summarizeCmd) rootCmd.AddCommand(summarizeCmd)
runCmd.AddCommand(rangeCmd) runCmd.AddCommand(rangeCmd)
runCmd.AddCommand(bucketsCmd)
process.Bind(runCmd, &satelliteCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir)) process.Bind(runCmd, &satelliteCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(rangeCmd, &satelliteCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir)) process.Bind(rangeCmd, &satelliteCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(rangeCmd, &rangeCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir)) process.Bind(rangeCmd, &rangeCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(bucketsCmd, &satelliteCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(bucketsCmd, &bucketsCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
} }
// RangeConfig defines configuration for verifying segment existence. // RangeConfig defines configuration for verifying segment existence.
@ -88,7 +103,16 @@ type RangeConfig struct {
High string `help:"hex highest segment id prefix to verify (excluded)"` High string `help:"hex highest segment id prefix to verify (excluded)"`
} }
func verifySegmentsRange(cmd *cobra.Command, args []string) error { // BucketConfig defines configuration for verifying segment existence within a list of buckets.
type BucketConfig struct {
Service ServiceConfig
Verify VerifierConfig
BucketsCSV string `help:"csv file of project_id,bucket_name of buckets to verify" default:""`
}
func verifySegments(cmd *cobra.Command, args []string) error {
ctx, _ := process.Ctx(cmd) ctx, _ := process.Ctx(cmd)
log := zap.L() log := zap.L()
@ -166,7 +190,16 @@ func verifySegmentsRange(cmd *cobra.Command, args []string) error {
return Error.Wrap(err) return Error.Wrap(err)
} }
defer func() { err = errs.Combine(err, service.Close()) }() defer func() { err = errs.Combine(err, service.Close()) }()
if cmd.Name() == "range" {
return verifySegmentsRange(ctx, service, rangeCfg)
}
if cmd.Name() == "buckets" {
return verifySegmentsBuckets(ctx, service, bucketsCfg)
}
return errors.New("unknown commnand: " + cmd.Name())
}
func verifySegmentsRange(ctx context.Context, service *Service, rangeCfg RangeConfig) error {
// parse arguments // parse arguments
var low, high uuid.UUID var low, high uuid.UUID
@ -189,6 +222,58 @@ func verifySegmentsRange(cmd *cobra.Command, args []string) error {
return service.ProcessRange(ctx, low, high) return service.ProcessRange(ctx, low, high)
} }
func verifySegmentsBuckets(ctx context.Context, service *Service, bucketCfg BucketConfig) error {
if bucketsCfg.BucketsCSV == "" {
return Error.New("bucket list file path not provided")
}
bucketList, err := service.ParseBucketFile(bucketsCfg.BucketsCSV)
if err != nil {
return Error.Wrap(err)
}
return service.ProcessBuckets(ctx, bucketList.Buckets)
}
func main() { func main() {
process.Exec(rootCmd) process.Exec(rootCmd)
} }
// ParseBucketFile parses a csv file containing project_id and bucket names.
func (service *Service) ParseBucketFile(path string) (_ BucketList, err error) {
csvFile, err := os.Open(path)
if err != nil {
return BucketList{}, err
}
defer func() {
err = errs.Combine(err, csvFile.Close())
}()
csvReader := csv.NewReader(csvFile)
allEntries, err := csvReader.ReadAll()
if err != nil {
return BucketList{}, err
}
bucketList := BucketList{}
for _, entry := range allEntries {
if len(entry) < 2 {
return BucketList{}, Error.New("unable to parse buckets file: %w", err)
}
projectId, err := projectIdFromCompactString(strings.TrimSpace(entry[0]))
if err != nil {
return BucketList{}, Error.New("unable to parse buckets file: %w", err)
}
bucketList.Add(projectId, strings.TrimSpace(entry[1]))
}
return bucketList, nil
}
func projectIdFromCompactString(s string) (uuid.UUID, error) {
decoded, err := hex.DecodeString(s)
if err != nil {
return uuid.UUID{}, Error.New("invalid string")
}
return uuid.FromBytes(decoded)
}

View File

@ -31,6 +31,7 @@ type Metabase interface {
LatestNodesAliasMap(ctx context.Context) (*metabase.NodeAliasMap, error) LatestNodesAliasMap(ctx context.Context) (*metabase.NodeAliasMap, error)
GetSegmentByPosition(ctx context.Context, opts metabase.GetSegmentByPosition) (segment metabase.Segment, err error) GetSegmentByPosition(ctx context.Context, opts metabase.GetSegmentByPosition) (segment metabase.Segment, err error)
ListVerifySegments(ctx context.Context, opts metabase.ListVerifySegments) (result metabase.ListVerifySegmentsResult, err error) ListVerifySegments(ctx context.Context, opts metabase.ListVerifySegments) (result metabase.ListVerifySegmentsResult, err error)
ListBucketsStreamIDs(ctx context.Context, opts metabase.ListBucketsStreamIDs) (result metabase.ListBucketsStreamIDsResult, err error)
} }
// Verifier verifies a batch of segments. // Verifier verifies a batch of segments.
@ -83,6 +84,7 @@ type Service struct {
priorityNodes NodeAliasSet priorityNodes NodeAliasSet
onlineNodes NodeAliasSet onlineNodes NodeAliasSet
offlineCount map[metabase.NodeAlias]int offlineCount map[metabase.NodeAlias]int
bucketList BucketList
} }
// NewService returns a new service for verifying segments. // NewService returns a new service for verifying segments.
@ -216,6 +218,19 @@ func (service *Service) parseNodeFile(path string) (NodeAliasSet, error) {
return set, nil return set, nil
} }
// BucketList contains a list of buckets to check segments from.
type BucketList struct {
Buckets []metabase.BucketLocation
}
// Add adds a bucket to the bucket list.
func (list *BucketList) Add(projectID uuid.UUID, bucketName string) {
list.Buckets = append(list.Buckets, metabase.BucketLocation{
ProjectID: projectID,
BucketName: bucketName,
})
}
// ProcessRange processes segments between low and high uuid.UUID with the specified batchSize. // ProcessRange processes segments between low and high uuid.UUID with the specified batchSize.
func (service *Service) ProcessRange(ctx context.Context, low, high uuid.UUID) (err error) { func (service *Service) ProcessRange(ctx context.Context, low, high uuid.UUID) (err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
@ -300,6 +315,107 @@ func (service *Service) ProcessRange(ctx context.Context, low, high uuid.UUID) (
} }
} }
// ProcessBuckets processes segments in buckets with the specified batchSize.
func (service *Service) ProcessBuckets(ctx context.Context, buckets []metabase.BucketLocation) (err error) {
defer mon.Task()(&ctx)(&err)
aliasMap, err := service.metabase.LatestNodesAliasMap(ctx)
if err != nil {
return Error.Wrap(err)
}
service.aliasMap = aliasMap
err = service.loadOnlineNodes(ctx)
if err != nil {
return Error.Wrap(err)
}
err = service.loadPriorityNodes(ctx)
if err != nil {
return Error.Wrap(err)
}
err = service.applyIgnoreNodes(ctx)
if err != nil {
return Error.Wrap(err)
}
var progress int64
cursorBucket := metabase.BucketLocation{}
cursorStreamID := uuid.UUID{}
cursorPosition := metabase.SegmentPosition{} // Convert to struct that contains the status.
segmentsData := make([]Segment, service.config.BatchSize)
segments := make([]*Segment, service.config.BatchSize)
for {
listStreamIDsResult, err := service.metabase.ListBucketsStreamIDs(ctx, metabase.ListBucketsStreamIDs{
BucketList: metabase.ListVerifyBucketList{
Buckets: service.bucketList.Buckets,
},
CursorBucket: cursorBucket,
CursorStreamID: cursorStreamID,
Limit: service.config.BatchSize,
AsOfSystemInterval: service.config.AsOfSystemInterval,
})
if err != nil {
return Error.Wrap(err)
}
for {
// TODO loop for this
result, err := service.metabase.ListVerifySegments(ctx, metabase.ListVerifySegments{
StreamIDs: listStreamIDsResult.StreamIDs,
CursorStreamID: cursorStreamID,
CursorPosition: cursorPosition,
Limit: service.config.BatchSize,
AsOfSystemInterval: service.config.AsOfSystemInterval,
})
if err != nil {
return Error.Wrap(err)
}
// All done?
if len(result.Segments) == 0 {
break
}
segmentsData = segmentsData[:len(result.Segments)]
segments = segments[:len(result.Segments)]
last := &result.Segments[len(result.Segments)-1]
cursorStreamID, cursorPosition = last.StreamID, last.Position
for i := range segments {
segmentsData[i].VerifySegment = result.Segments[i]
segments[i] = &segmentsData[i]
}
service.log.Info("processing segments",
zap.Int64("progress", progress),
zap.Int("count", len(segments)),
zap.Stringer("first", segments[0].StreamID),
zap.Stringer("last", segments[len(segments)-1].StreamID),
)
progress += int64(len(segments))
// Process the data.
err = service.ProcessSegments(ctx, segments)
if err != nil {
return Error.Wrap(err)
}
}
if len(listStreamIDsResult.StreamIDs) == 0 {
return nil
}
cursorBucket = listStreamIDsResult.LastBucket
// TODO remove processed project_ids and bucket_names?
}
}
// ProcessSegments processes a collection of segments. // ProcessSegments processes a collection of segments.
func (service *Service) ProcessSegments(ctx context.Context, segments []*Segment) (err error) { func (service *Service) ProcessSegments(ctx context.Context, segments []*Segment) (err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)

View File

@ -7,6 +7,7 @@ import (
"context" "context"
"fmt" "fmt"
"os" "os"
"sort"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -69,6 +70,12 @@ func TestService_Success(t *testing.T) {
err := os.WriteFile(config.PriorityNodesPath, []byte((storj.NodeID{1}).String()+"\n"), 0755) err := os.WriteFile(config.PriorityNodesPath, []byte((storj.NodeID{1}).String()+"\n"), 0755)
require.NoError(t, err) require.NoError(t, err)
bucketListPath := ctx.File("buckets.csv")
err = os.WriteFile(bucketListPath, []byte(`
00000000000000000000000000000001,67616c617879
00000000000000000000000000000002,7368696e6f6269`), 0755)
require.NoError(t, err)
func() { func() {
nodes := map[metabase.NodeAlias]storj.NodeID{} nodes := map[metabase.NodeAlias]storj.NodeID{}
for i := 1; i <= 0xFF; i++ { for i := 1; i <= 0xFF; i++ {
@ -93,6 +100,10 @@ func TestService_Success(t *testing.T) {
metabase := newMetabaseMock(nodes, segments...) metabase := newMetabaseMock(nodes, segments...)
verifier := &verifierMock{allSuccess: true} verifier := &verifierMock{allSuccess: true}
metabase.AddStreamIDToBucket(uuid.UUID{1}, "67616c617879", uuid.UUID{0x10, 0x10})
metabase.AddStreamIDToBucket(uuid.UUID{2}, "7368696e6f6269", uuid.UUID{0x20, 0x20})
metabase.AddStreamIDToBucket(uuid.UUID{2}, "7777777", uuid.UUID{0x30, 0x30})
service, err := segmentverify.NewService(log.Named("segment-verify"), metabase, verifier, metabase, config) service, err := segmentverify.NewService(log.Named("segment-verify"), metabase, verifier, metabase, config)
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, service) require.NotNil(t, service)
@ -127,6 +138,92 @@ func TestService_Success(t *testing.T) {
require.Equal(t, "stream id,position,found,not found,retry\n", string(notFoundCSV)) require.Equal(t, "stream id,position,found,not found,retry\n", string(notFoundCSV))
} }
func TestService_Buckets_Success(t *testing.T) {
ctx := testcontext.New(t)
log := testplanet.NewLogger(t)
config := segmentverify.ServiceConfig{
NotFoundPath: ctx.File("not-found.csv"),
RetryPath: ctx.File("retry.csv"),
PriorityNodesPath: ctx.File("priority-nodes.txt"),
Check: 3,
BatchSize: 100,
Concurrency: 3,
MaxOffline: 2,
}
// the node 1 is going to be priority
err := os.WriteFile(config.PriorityNodesPath, []byte((storj.NodeID{1}).String()+"\n"), 0755)
require.NoError(t, err)
bucketListPath := ctx.File("buckets.csv")
err = os.WriteFile(bucketListPath, []byte(`
00000000000000000000000000000001,67616c617879
00000000000000000000000000000002,7368696e6f6269`), 0755)
require.NoError(t, err)
func() {
nodes := map[metabase.NodeAlias]storj.NodeID{}
for i := 1; i <= 0xFF; i++ {
nodes[metabase.NodeAlias(i)] = storj.NodeID{byte(i)}
}
segments := []metabase.VerifySegment{
{
StreamID: uuid.UUID{0x10, 0x10},
AliasPieces: metabase.AliasPieces{{Number: 1, Alias: 8}, {Number: 3, Alias: 9}, {Number: 5, Alias: 10}, {Number: 0, Alias: 1}},
},
{
StreamID: uuid.UUID{0x20, 0x20},
AliasPieces: metabase.AliasPieces{{Number: 0, Alias: 2}, {Number: 1, Alias: 3}, {Number: 7, Alias: 4}},
},
{ // this won't get processed due to the high limit
StreamID: uuid.UUID{0x30, 0x30},
AliasPieces: metabase.AliasPieces{{Number: 0, Alias: 2}, {Number: 1, Alias: 3}, {Number: 7, Alias: 4}},
},
}
metabase := newMetabaseMock(nodes, segments...)
verifier := &verifierMock{allSuccess: true}
metabase.AddStreamIDToBucket(uuid.UUID{1}, "67616c617879", uuid.UUID{0x10, 0x10})
metabase.AddStreamIDToBucket(uuid.UUID{2}, "7368696e6f6269", uuid.UUID{0x20, 0x20})
metabase.AddStreamIDToBucket(uuid.UUID{2}, "7777777", uuid.UUID{0x30, 0x30})
service, err := segmentverify.NewService(log.Named("segment-verify"), metabase, verifier, metabase, config)
require.NoError(t, err)
require.NotNil(t, service)
defer ctx.Check(service.Close)
bucketList, err := service.ParseBucketFile(bucketListPath)
require.NoError(t, err)
err = service.ProcessBuckets(ctx, bucketList.Buckets)
require.NoError(t, err)
for node, list := range verifier.processed {
assert.True(t, isUnique(list), "each node should process only once: %v %#v", node, list)
}
// node 1 is a priority node in the segments[0]
assert.Len(t, verifier.processed[nodes[1]], 1)
// we should get two other checks against the nodes in segments[8-10]
assert.Equal(t, 2,
len(verifier.processed[nodes[8]])+len(verifier.processed[nodes[9]])+len(verifier.processed[nodes[10]]),
)
}()
retryCSV, err := os.ReadFile(config.RetryPath)
require.NoError(t, err)
require.Equal(t, "stream id,position,found,not found,retry\n", string(retryCSV))
notFoundCSV, err := os.ReadFile(config.NotFoundPath)
require.NoError(t, err)
require.Equal(t, "stream id,position,found,not found,retry\n", string(notFoundCSV))
}
func TestService_Failures(t *testing.T) { func TestService_Failures(t *testing.T) {
ctx := testcontext.New(t) ctx := testcontext.New(t)
log := testplanet.NewLogger(t) log := testplanet.NewLogger(t)
@ -222,6 +319,7 @@ func isUnique(segments []*segmentverify.Segment) bool {
type metabaseMock struct { type metabaseMock struct {
nodeIDToAlias map[storj.NodeID]metabase.NodeAlias nodeIDToAlias map[storj.NodeID]metabase.NodeAlias
aliasToNodeID map[metabase.NodeAlias]storj.NodeID aliasToNodeID map[metabase.NodeAlias]storj.NodeID
streamIDsPerBucket map[metabase.BucketLocation][]uuid.UUID
segments []metabase.VerifySegment segments []metabase.VerifySegment
} }
@ -230,6 +328,7 @@ func newMetabaseMock(nodes map[metabase.NodeAlias]storj.NodeID, segments ...meta
nodeIDToAlias: map[storj.NodeID]metabase.NodeAlias{}, nodeIDToAlias: map[storj.NodeID]metabase.NodeAlias{},
aliasToNodeID: nodes, aliasToNodeID: nodes,
segments: segments, segments: segments,
streamIDsPerBucket: make(map[metabase.BucketLocation][]uuid.UUID),
} }
for n, id := range nodes { for n, id := range nodes {
mock.nodeIDToAlias[id] = n mock.nodeIDToAlias[id] = n
@ -237,6 +336,11 @@ func newMetabaseMock(nodes map[metabase.NodeAlias]storj.NodeID, segments ...meta
return mock return mock
} }
func (db *metabaseMock) AddStreamIDToBucket(projectID uuid.UUID, bucketName string, streamIDs ...uuid.UUID) {
bucket := metabase.BucketLocation{ProjectID: projectID, BucketName: bucketName}
db.streamIDsPerBucket[bucket] = append(db.streamIDsPerBucket[bucket], streamIDs...)
}
func (db *metabaseMock) Get(ctx context.Context, nodeID storj.NodeID) (*overlay.NodeDossier, error) { func (db *metabaseMock) Get(ctx context.Context, nodeID storj.NodeID) (*overlay.NodeDossier, error) {
return &overlay.NodeDossier{ return &overlay.NodeDossier{
Node: pb.Node{ Node: pb.Node{
@ -309,6 +413,39 @@ func (db *metabaseMock) GetSegmentByPosition(ctx context.Context, opts metabase.
return metabase.Segment{}, metabase.ErrSegmentNotFound.New("%v", opts) return metabase.Segment{}, metabase.ErrSegmentNotFound.New("%v", opts)
} }
func (db *metabaseMock) ListBucketsStreamIDs(ctx context.Context, opts metabase.ListBucketsStreamIDs) (metabase.ListBucketsStreamIDsResult, error) {
result := metabase.ListBucketsStreamIDsResult{}
for _, bucket := range opts.BucketList.Buckets {
if bucket.ProjectID.Compare(opts.CursorBucket.ProjectID) <= 0 {
continue
}
if bucket.BucketName <= opts.CursorBucket.BucketName {
continue
}
if opts.CursorStreamID.IsZero() {
result.StreamIDs = append(result.StreamIDs, db.streamIDsPerBucket[bucket]...)
continue
}
for cursorIndex, streamID := range db.streamIDsPerBucket[bucket] {
if opts.CursorStreamID.Less(streamID) {
result.StreamIDs = append(result.StreamIDs, db.streamIDsPerBucket[bucket][cursorIndex:]...)
break
}
}
if len(result.StreamIDs) > opts.Limit {
break
}
}
if len(result.StreamIDs) > opts.Limit {
result.StreamIDs = result.StreamIDs[:opts.Limit]
}
sort.Slice(result.StreamIDs, func(i, j int) bool {
return result.StreamIDs[i].Less(result.StreamIDs[j])
})
return result, nil
}
func (db *metabaseMock) ListVerifySegments(ctx context.Context, opts metabase.ListVerifySegments) (result metabase.ListVerifySegmentsResult, err error) { func (db *metabaseMock) ListVerifySegments(ctx context.Context, opts metabase.ListVerifySegments) (result metabase.ListVerifySegmentsResult, err error) {
r := metabase.ListVerifySegmentsResult{} r := metabase.ListVerifySegmentsResult{}