diff --git a/cmd/tools/segment-verify/README.md b/cmd/tools/segment-verify/README.md index 9f12a8e32..0d8b7c8b4 100644 --- a/cmd/tools/segment-verify/README.md +++ b/cmd/tools/segment-verify/README.md @@ -25,7 +25,11 @@ There are few parameters for controlling the verification itself: ``` ## Running the tool - +- by specifying range boundaries: ``` segment-verify run range --low 00 --high ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff --config-dir ./satellite-config-dir -``` \ No newline at end of file +``` +- by specifying buckets to be checked: +``` +segment-verify run buckets --buckets-csv bucket.csv +``` diff --git a/cmd/tools/segment-verify/main.go b/cmd/tools/segment-verify/main.go index 2de903257..215c1c267 100644 --- a/cmd/tools/segment-verify/main.go +++ b/cmd/tools/segment-verify/main.go @@ -4,7 +4,12 @@ package main import ( + "context" + "encoding/csv" "encoding/hex" + "errors" + "os" + "strings" "github.com/spf13/cobra" "github.com/zeebo/errs" @@ -46,7 +51,13 @@ var ( rangeCmd = &cobra.Command{ Use: "range", Short: "runs the command on a range of segments", - RunE: verifySegmentsRange, + RunE: verifySegments, + } + + bucketsCmd = &cobra.Command{ + Use: "buckets", + Short: "runs the command on segments from specified buckets", + RunE: verifySegments, } summarizeCmd = &cobra.Command{ @@ -58,6 +69,7 @@ var ( satelliteCfg Satellite rangeCfg RangeConfig + bucketsCfg BucketConfig confDir string identityDir string @@ -73,10 +85,13 @@ func init() { rootCmd.AddCommand(runCmd) rootCmd.AddCommand(summarizeCmd) runCmd.AddCommand(rangeCmd) + runCmd.AddCommand(bucketsCmd) process.Bind(runCmd, &satelliteCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir)) process.Bind(rangeCmd, &satelliteCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir)) process.Bind(rangeCmd, &rangeCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir)) + process.Bind(bucketsCmd, &satelliteCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir)) + process.Bind(bucketsCmd, &bucketsCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir)) } // RangeConfig defines configuration for verifying segment existence. @@ -88,7 +103,16 @@ type RangeConfig struct { High string `help:"hex highest segment id prefix to verify (excluded)"` } -func verifySegmentsRange(cmd *cobra.Command, args []string) error { +// BucketConfig defines configuration for verifying segment existence within a list of buckets. +type BucketConfig struct { + Service ServiceConfig + Verify VerifierConfig + + BucketsCSV string `help:"csv file of project_id,bucket_name of buckets to verify" default:""` +} + +func verifySegments(cmd *cobra.Command, args []string) error { + ctx, _ := process.Ctx(cmd) log := zap.L() @@ -166,7 +190,16 @@ func verifySegmentsRange(cmd *cobra.Command, args []string) error { return Error.Wrap(err) } defer func() { err = errs.Combine(err, service.Close()) }() + if cmd.Name() == "range" { + return verifySegmentsRange(ctx, service, rangeCfg) + } + if cmd.Name() == "buckets" { + return verifySegmentsBuckets(ctx, service, bucketsCfg) + } + return errors.New("unknown commnand: " + cmd.Name()) +} +func verifySegmentsRange(ctx context.Context, service *Service, rangeCfg RangeConfig) error { // parse arguments var low, high uuid.UUID @@ -189,6 +222,58 @@ func verifySegmentsRange(cmd *cobra.Command, args []string) error { return service.ProcessRange(ctx, low, high) } +func verifySegmentsBuckets(ctx context.Context, service *Service, bucketCfg BucketConfig) error { + if bucketsCfg.BucketsCSV == "" { + return Error.New("bucket list file path not provided") + } + + bucketList, err := service.ParseBucketFile(bucketsCfg.BucketsCSV) + if err != nil { + return Error.Wrap(err) + } + return service.ProcessBuckets(ctx, bucketList.Buckets) +} + func main() { process.Exec(rootCmd) } + +// ParseBucketFile parses a csv file containing project_id and bucket names. +func (service *Service) ParseBucketFile(path string) (_ BucketList, err error) { + csvFile, err := os.Open(path) + if err != nil { + return BucketList{}, err + } + defer func() { + err = errs.Combine(err, csvFile.Close()) + }() + + csvReader := csv.NewReader(csvFile) + allEntries, err := csvReader.ReadAll() + if err != nil { + return BucketList{}, err + } + + bucketList := BucketList{} + for _, entry := range allEntries { + if len(entry) < 2 { + return BucketList{}, Error.New("unable to parse buckets file: %w", err) + } + + projectId, err := projectIdFromCompactString(strings.TrimSpace(entry[0])) + if err != nil { + return BucketList{}, Error.New("unable to parse buckets file: %w", err) + } + bucketList.Add(projectId, strings.TrimSpace(entry[1])) + } + return bucketList, nil +} + +func projectIdFromCompactString(s string) (uuid.UUID, error) { + decoded, err := hex.DecodeString(s) + if err != nil { + return uuid.UUID{}, Error.New("invalid string") + } + + return uuid.FromBytes(decoded) +} diff --git a/cmd/tools/segment-verify/service.go b/cmd/tools/segment-verify/service.go index 762e30aa5..4cc61bebe 100644 --- a/cmd/tools/segment-verify/service.go +++ b/cmd/tools/segment-verify/service.go @@ -31,6 +31,7 @@ type Metabase interface { LatestNodesAliasMap(ctx context.Context) (*metabase.NodeAliasMap, error) GetSegmentByPosition(ctx context.Context, opts metabase.GetSegmentByPosition) (segment metabase.Segment, err error) ListVerifySegments(ctx context.Context, opts metabase.ListVerifySegments) (result metabase.ListVerifySegmentsResult, err error) + ListBucketsStreamIDs(ctx context.Context, opts metabase.ListBucketsStreamIDs) (result metabase.ListBucketsStreamIDsResult, err error) } // Verifier verifies a batch of segments. @@ -83,6 +84,7 @@ type Service struct { priorityNodes NodeAliasSet onlineNodes NodeAliasSet offlineCount map[metabase.NodeAlias]int + bucketList BucketList } // NewService returns a new service for verifying segments. @@ -216,6 +218,19 @@ func (service *Service) parseNodeFile(path string) (NodeAliasSet, error) { return set, nil } +// BucketList contains a list of buckets to check segments from. +type BucketList struct { + Buckets []metabase.BucketLocation +} + +// Add adds a bucket to the bucket list. +func (list *BucketList) Add(projectID uuid.UUID, bucketName string) { + list.Buckets = append(list.Buckets, metabase.BucketLocation{ + ProjectID: projectID, + BucketName: bucketName, + }) +} + // ProcessRange processes segments between low and high uuid.UUID with the specified batchSize. func (service *Service) ProcessRange(ctx context.Context, low, high uuid.UUID) (err error) { defer mon.Task()(&ctx)(&err) @@ -300,6 +315,107 @@ func (service *Service) ProcessRange(ctx context.Context, low, high uuid.UUID) ( } } +// ProcessBuckets processes segments in buckets with the specified batchSize. +func (service *Service) ProcessBuckets(ctx context.Context, buckets []metabase.BucketLocation) (err error) { + defer mon.Task()(&ctx)(&err) + + aliasMap, err := service.metabase.LatestNodesAliasMap(ctx) + if err != nil { + return Error.Wrap(err) + } + service.aliasMap = aliasMap + + err = service.loadOnlineNodes(ctx) + if err != nil { + return Error.Wrap(err) + } + + err = service.loadPriorityNodes(ctx) + if err != nil { + return Error.Wrap(err) + } + + err = service.applyIgnoreNodes(ctx) + if err != nil { + return Error.Wrap(err) + } + + var progress int64 + + cursorBucket := metabase.BucketLocation{} + cursorStreamID := uuid.UUID{} + cursorPosition := metabase.SegmentPosition{} // Convert to struct that contains the status. + segmentsData := make([]Segment, service.config.BatchSize) + segments := make([]*Segment, service.config.BatchSize) + for { + + listStreamIDsResult, err := service.metabase.ListBucketsStreamIDs(ctx, metabase.ListBucketsStreamIDs{ + BucketList: metabase.ListVerifyBucketList{ + Buckets: service.bucketList.Buckets, + }, + CursorBucket: cursorBucket, + CursorStreamID: cursorStreamID, + Limit: service.config.BatchSize, + + AsOfSystemInterval: service.config.AsOfSystemInterval, + }) + if err != nil { + return Error.Wrap(err) + } + for { + // TODO loop for this + result, err := service.metabase.ListVerifySegments(ctx, metabase.ListVerifySegments{ + StreamIDs: listStreamIDsResult.StreamIDs, + CursorStreamID: cursorStreamID, + CursorPosition: cursorPosition, + Limit: service.config.BatchSize, + + AsOfSystemInterval: service.config.AsOfSystemInterval, + }) + if err != nil { + return Error.Wrap(err) + } + + // All done? + if len(result.Segments) == 0 { + break + } + + segmentsData = segmentsData[:len(result.Segments)] + segments = segments[:len(result.Segments)] + + last := &result.Segments[len(result.Segments)-1] + cursorStreamID, cursorPosition = last.StreamID, last.Position + + for i := range segments { + segmentsData[i].VerifySegment = result.Segments[i] + segments[i] = &segmentsData[i] + } + + service.log.Info("processing segments", + zap.Int64("progress", progress), + zap.Int("count", len(segments)), + zap.Stringer("first", segments[0].StreamID), + zap.Stringer("last", segments[len(segments)-1].StreamID), + ) + progress += int64(len(segments)) + + // Process the data. + err = service.ProcessSegments(ctx, segments) + if err != nil { + return Error.Wrap(err) + } + } + + if len(listStreamIDsResult.StreamIDs) == 0 { + return nil + } + + cursorBucket = listStreamIDsResult.LastBucket + // TODO remove processed project_ids and bucket_names? + } +} + // ProcessSegments processes a collection of segments. func (service *Service) ProcessSegments(ctx context.Context, segments []*Segment) (err error) { defer mon.Task()(&ctx)(&err) diff --git a/cmd/tools/segment-verify/service_test.go b/cmd/tools/segment-verify/service_test.go index 091697ef1..a3885fe3e 100644 --- a/cmd/tools/segment-verify/service_test.go +++ b/cmd/tools/segment-verify/service_test.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "os" + "sort" "sync" "testing" "time" @@ -69,6 +70,12 @@ func TestService_Success(t *testing.T) { err := os.WriteFile(config.PriorityNodesPath, []byte((storj.NodeID{1}).String()+"\n"), 0755) require.NoError(t, err) + bucketListPath := ctx.File("buckets.csv") + err = os.WriteFile(bucketListPath, []byte(` + 00000000000000000000000000000001,67616c617879 + 00000000000000000000000000000002,7368696e6f6269`), 0755) + require.NoError(t, err) + func() { nodes := map[metabase.NodeAlias]storj.NodeID{} for i := 1; i <= 0xFF; i++ { @@ -93,6 +100,10 @@ func TestService_Success(t *testing.T) { metabase := newMetabaseMock(nodes, segments...) verifier := &verifierMock{allSuccess: true} + metabase.AddStreamIDToBucket(uuid.UUID{1}, "67616c617879", uuid.UUID{0x10, 0x10}) + metabase.AddStreamIDToBucket(uuid.UUID{2}, "7368696e6f6269", uuid.UUID{0x20, 0x20}) + metabase.AddStreamIDToBucket(uuid.UUID{2}, "7777777", uuid.UUID{0x30, 0x30}) + service, err := segmentverify.NewService(log.Named("segment-verify"), metabase, verifier, metabase, config) require.NoError(t, err) require.NotNil(t, service) @@ -127,6 +138,92 @@ func TestService_Success(t *testing.T) { require.Equal(t, "stream id,position,found,not found,retry\n", string(notFoundCSV)) } +func TestService_Buckets_Success(t *testing.T) { + ctx := testcontext.New(t) + log := testplanet.NewLogger(t) + + config := segmentverify.ServiceConfig{ + NotFoundPath: ctx.File("not-found.csv"), + RetryPath: ctx.File("retry.csv"), + PriorityNodesPath: ctx.File("priority-nodes.txt"), + + Check: 3, + BatchSize: 100, + Concurrency: 3, + MaxOffline: 2, + } + + // the node 1 is going to be priority + err := os.WriteFile(config.PriorityNodesPath, []byte((storj.NodeID{1}).String()+"\n"), 0755) + require.NoError(t, err) + + bucketListPath := ctx.File("buckets.csv") + err = os.WriteFile(bucketListPath, []byte(` + 00000000000000000000000000000001,67616c617879 + 00000000000000000000000000000002,7368696e6f6269`), 0755) + require.NoError(t, err) + + func() { + nodes := map[metabase.NodeAlias]storj.NodeID{} + for i := 1; i <= 0xFF; i++ { + nodes[metabase.NodeAlias(i)] = storj.NodeID{byte(i)} + } + + segments := []metabase.VerifySegment{ + { + StreamID: uuid.UUID{0x10, 0x10}, + AliasPieces: metabase.AliasPieces{{Number: 1, Alias: 8}, {Number: 3, Alias: 9}, {Number: 5, Alias: 10}, {Number: 0, Alias: 1}}, + }, + { + StreamID: uuid.UUID{0x20, 0x20}, + AliasPieces: metabase.AliasPieces{{Number: 0, Alias: 2}, {Number: 1, Alias: 3}, {Number: 7, Alias: 4}}, + }, + { // this won't get processed due to the high limit + StreamID: uuid.UUID{0x30, 0x30}, + AliasPieces: metabase.AliasPieces{{Number: 0, Alias: 2}, {Number: 1, Alias: 3}, {Number: 7, Alias: 4}}, + }, + } + + metabase := newMetabaseMock(nodes, segments...) + verifier := &verifierMock{allSuccess: true} + + metabase.AddStreamIDToBucket(uuid.UUID{1}, "67616c617879", uuid.UUID{0x10, 0x10}) + metabase.AddStreamIDToBucket(uuid.UUID{2}, "7368696e6f6269", uuid.UUID{0x20, 0x20}) + metabase.AddStreamIDToBucket(uuid.UUID{2}, "7777777", uuid.UUID{0x30, 0x30}) + + service, err := segmentverify.NewService(log.Named("segment-verify"), metabase, verifier, metabase, config) + require.NoError(t, err) + require.NotNil(t, service) + + defer ctx.Check(service.Close) + + bucketList, err := service.ParseBucketFile(bucketListPath) + require.NoError(t, err) + + err = service.ProcessBuckets(ctx, bucketList.Buckets) + require.NoError(t, err) + + for node, list := range verifier.processed { + assert.True(t, isUnique(list), "each node should process only once: %v %#v", node, list) + } + + // node 1 is a priority node in the segments[0] + assert.Len(t, verifier.processed[nodes[1]], 1) + // we should get two other checks against the nodes in segments[8-10] + assert.Equal(t, 2, + len(verifier.processed[nodes[8]])+len(verifier.processed[nodes[9]])+len(verifier.processed[nodes[10]]), + ) + }() + + retryCSV, err := os.ReadFile(config.RetryPath) + require.NoError(t, err) + require.Equal(t, "stream id,position,found,not found,retry\n", string(retryCSV)) + + notFoundCSV, err := os.ReadFile(config.NotFoundPath) + require.NoError(t, err) + require.Equal(t, "stream id,position,found,not found,retry\n", string(notFoundCSV)) +} + func TestService_Failures(t *testing.T) { ctx := testcontext.New(t) log := testplanet.NewLogger(t) @@ -220,16 +317,18 @@ func isUnique(segments []*segmentverify.Segment) bool { } type metabaseMock struct { - nodeIDToAlias map[storj.NodeID]metabase.NodeAlias - aliasToNodeID map[metabase.NodeAlias]storj.NodeID - segments []metabase.VerifySegment + nodeIDToAlias map[storj.NodeID]metabase.NodeAlias + aliasToNodeID map[metabase.NodeAlias]storj.NodeID + streamIDsPerBucket map[metabase.BucketLocation][]uuid.UUID + segments []metabase.VerifySegment } func newMetabaseMock(nodes map[metabase.NodeAlias]storj.NodeID, segments ...metabase.VerifySegment) *metabaseMock { mock := &metabaseMock{ - nodeIDToAlias: map[storj.NodeID]metabase.NodeAlias{}, - aliasToNodeID: nodes, - segments: segments, + nodeIDToAlias: map[storj.NodeID]metabase.NodeAlias{}, + aliasToNodeID: nodes, + segments: segments, + streamIDsPerBucket: make(map[metabase.BucketLocation][]uuid.UUID), } for n, id := range nodes { mock.nodeIDToAlias[id] = n @@ -237,6 +336,11 @@ func newMetabaseMock(nodes map[metabase.NodeAlias]storj.NodeID, segments ...meta return mock } +func (db *metabaseMock) AddStreamIDToBucket(projectID uuid.UUID, bucketName string, streamIDs ...uuid.UUID) { + bucket := metabase.BucketLocation{ProjectID: projectID, BucketName: bucketName} + db.streamIDsPerBucket[bucket] = append(db.streamIDsPerBucket[bucket], streamIDs...) +} + func (db *metabaseMock) Get(ctx context.Context, nodeID storj.NodeID) (*overlay.NodeDossier, error) { return &overlay.NodeDossier{ Node: pb.Node{ @@ -309,6 +413,39 @@ func (db *metabaseMock) GetSegmentByPosition(ctx context.Context, opts metabase. return metabase.Segment{}, metabase.ErrSegmentNotFound.New("%v", opts) } +func (db *metabaseMock) ListBucketsStreamIDs(ctx context.Context, opts metabase.ListBucketsStreamIDs) (metabase.ListBucketsStreamIDsResult, error) { + result := metabase.ListBucketsStreamIDsResult{} + + for _, bucket := range opts.BucketList.Buckets { + if bucket.ProjectID.Compare(opts.CursorBucket.ProjectID) <= 0 { + continue + } + if bucket.BucketName <= opts.CursorBucket.BucketName { + continue + } + if opts.CursorStreamID.IsZero() { + result.StreamIDs = append(result.StreamIDs, db.streamIDsPerBucket[bucket]...) + continue + } + for cursorIndex, streamID := range db.streamIDsPerBucket[bucket] { + if opts.CursorStreamID.Less(streamID) { + result.StreamIDs = append(result.StreamIDs, db.streamIDsPerBucket[bucket][cursorIndex:]...) + break + } + } + if len(result.StreamIDs) > opts.Limit { + break + } + } + if len(result.StreamIDs) > opts.Limit { + result.StreamIDs = result.StreamIDs[:opts.Limit] + } + sort.Slice(result.StreamIDs, func(i, j int) bool { + return result.StreamIDs[i].Less(result.StreamIDs[j]) + }) + return result, nil +} + func (db *metabaseMock) ListVerifySegments(ctx context.Context, opts metabase.ListVerifySegments) (result metabase.ListVerifySegmentsResult, err error) { r := metabase.ListVerifySegmentsResult{}