// Copyright (C) 2022 Storj Labs, Inc. // See LICENSE for copying information. package main_test import ( "context" "fmt" "os" "sort" "sync" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/zeebo/errs" "storj.io/common/pb" "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/common/uuid" segmentverify "storj.io/storj/cmd/tools/segment-verify" "storj.io/storj/private/testplanet" "storj.io/storj/satellite/metabase" "storj.io/storj/satellite/overlay" ) var maxUUID = uuid.UUID{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff} func TestService_EmptyRange(t *testing.T) { ctx := testcontext.New(t) log := testplanet.NewLogger(t) config := segmentverify.ServiceConfig{ NotFoundPath: ctx.File("not-found.csv"), RetryPath: ctx.File("retry.csv"), MaxOffline: 2, } metabase := newMetabaseMock(map[metabase.NodeAlias]storj.NodeID{}) verifier := &verifierMock{allSuccess: true} service, err := segmentverify.NewService(log.Named("segment-verify"), metabase, verifier, metabase, config) require.NoError(t, err) require.NotNil(t, service) defer ctx.Check(service.Close) err = service.ProcessRange(ctx, uuid.UUID{}, maxUUID) require.NoError(t, err) } func TestService_Success(t *testing.T) { ctx := testcontext.New(t) log := testplanet.NewLogger(t) config := segmentverify.ServiceConfig{ NotFoundPath: ctx.File("not-found.csv"), RetryPath: ctx.File("retry.csv"), PriorityNodesPath: ctx.File("priority-nodes.txt"), Check: 3, BatchSize: 100, Concurrency: 3, MaxOffline: 2, } // the node 1 is going to be priority err := os.WriteFile(config.PriorityNodesPath, []byte((storj.NodeID{1}).String()+"\n"), 0755) require.NoError(t, err) bucketListPath := ctx.File("buckets.csv") err = os.WriteFile(bucketListPath, []byte(` 00000000000000000000000000000001,67616c617879 00000000000000000000000000000002,7368696e6f6269`), 0755) require.NoError(t, err) func() { nodes := map[metabase.NodeAlias]storj.NodeID{} for i := 1; i <= 0xFF; i++ { nodes[metabase.NodeAlias(i)] = storj.NodeID{byte(i)} } segments := []metabase.VerifySegment{ { StreamID: uuid.UUID{0x10, 0x10}, AliasPieces: metabase.AliasPieces{{Number: 1, Alias: 8}, {Number: 3, Alias: 9}, {Number: 5, Alias: 10}, {Number: 0, Alias: 1}}, }, { StreamID: uuid.UUID{0x20, 0x20}, AliasPieces: metabase.AliasPieces{{Number: 0, Alias: 2}, {Number: 1, Alias: 3}, {Number: 7, Alias: 4}}, }, { // this won't get processed due to the high limit StreamID: uuid.UUID{0x30, 0x30}, AliasPieces: metabase.AliasPieces{{Number: 0, Alias: 2}, {Number: 1, Alias: 3}, {Number: 7, Alias: 4}}, }, } metabase := newMetabaseMock(nodes, segments...) verifier := &verifierMock{allSuccess: true} metabase.AddStreamIDToBucket(uuid.UUID{1}, "67616c617879", uuid.UUID{0x10, 0x10}) metabase.AddStreamIDToBucket(uuid.UUID{2}, "7368696e6f6269", uuid.UUID{0x20, 0x20}) metabase.AddStreamIDToBucket(uuid.UUID{2}, "7777777", uuid.UUID{0x30, 0x30}) service, err := segmentverify.NewService(log.Named("segment-verify"), metabase, verifier, metabase, config) require.NoError(t, err) require.NotNil(t, service) defer ctx.Check(service.Close) err = service.ProcessRange(ctx, uuid.UUID{0x10, 0x10}, uuid.UUID{0x30, 0x30}) require.NoError(t, err) for node, list := range verifier.processed { assert.True(t, isUnique(list), "each node should process only once: %v %#v", node, list) } // node 1 is a priority node in the segments[0] assert.Len(t, verifier.processed[nodes[1]], 1) // we should get two other checks against the nodes in segments[8-10] assert.Equal(t, 2, len(verifier.processed[nodes[8]])+len(verifier.processed[nodes[9]])+len(verifier.processed[nodes[10]]), ) // these correspond to checks against segment[1] assert.Len(t, verifier.processed[nodes[2]], 1) assert.Len(t, verifier.processed[nodes[3]], 1) assert.Len(t, verifier.processed[nodes[4]], 1) }() retryCSV, err := os.ReadFile(config.RetryPath) require.NoError(t, err) require.Equal(t, "stream id,position,found,not found,retry\n", string(retryCSV)) notFoundCSV, err := os.ReadFile(config.NotFoundPath) require.NoError(t, err) require.Equal(t, "stream id,position,found,not found,retry\n", string(notFoundCSV)) } func TestService_Buckets_Success(t *testing.T) { ctx := testcontext.New(t) log := testplanet.NewLogger(t) config := segmentverify.ServiceConfig{ NotFoundPath: ctx.File("not-found.csv"), RetryPath: ctx.File("retry.csv"), PriorityNodesPath: ctx.File("priority-nodes.txt"), Check: 3, BatchSize: 100, Concurrency: 3, MaxOffline: 2, } // the node 1 is going to be priority err := os.WriteFile(config.PriorityNodesPath, []byte((storj.NodeID{1}).String()+"\n"), 0755) require.NoError(t, err) bucketListPath := ctx.File("buckets.csv") err = os.WriteFile(bucketListPath, []byte(` 00000000000000000000000000000001,67616c617879 00000000000000000000000000000002,7368696e6f6269`), 0755) require.NoError(t, err) func() { nodes := map[metabase.NodeAlias]storj.NodeID{} for i := 1; i <= 0xFF; i++ { nodes[metabase.NodeAlias(i)] = storj.NodeID{byte(i)} } segments := []metabase.VerifySegment{ { StreamID: uuid.UUID{0x10, 0x10}, AliasPieces: metabase.AliasPieces{{Number: 1, Alias: 8}, {Number: 3, Alias: 9}, {Number: 5, Alias: 10}, {Number: 0, Alias: 1}}, }, { StreamID: uuid.UUID{0x20, 0x20}, AliasPieces: metabase.AliasPieces{{Number: 0, Alias: 2}, {Number: 1, Alias: 3}, {Number: 7, Alias: 4}}, }, { // this won't get processed due to the high limit StreamID: uuid.UUID{0x30, 0x30}, AliasPieces: metabase.AliasPieces{{Number: 0, Alias: 2}, {Number: 1, Alias: 3}, {Number: 7, Alias: 4}}, }, } metabase := newMetabaseMock(nodes, segments...) verifier := &verifierMock{allSuccess: true} metabase.AddStreamIDToBucket(uuid.UUID{1}, "67616c617879", uuid.UUID{0x10, 0x10}) metabase.AddStreamIDToBucket(uuid.UUID{2}, "7368696e6f6269", uuid.UUID{0x20, 0x20}) metabase.AddStreamIDToBucket(uuid.UUID{2}, "7777777", uuid.UUID{0x30, 0x30}) service, err := segmentverify.NewService(log.Named("segment-verify"), metabase, verifier, metabase, config) require.NoError(t, err) require.NotNil(t, service) defer ctx.Check(service.Close) bucketList, err := service.ParseBucketFile(bucketListPath) require.NoError(t, err) err = service.ProcessBuckets(ctx, bucketList.Buckets) require.NoError(t, err) for node, list := range verifier.processed { assert.True(t, isUnique(list), "each node should process only once: %v %#v", node, list) } // node 1 is a priority node in the segments[0] assert.Len(t, verifier.processed[nodes[1]], 1) // we should get two other checks against the nodes in segments[8-10] assert.Equal(t, 2, len(verifier.processed[nodes[8]])+len(verifier.processed[nodes[9]])+len(verifier.processed[nodes[10]]), ) }() retryCSV, err := os.ReadFile(config.RetryPath) require.NoError(t, err) require.Equal(t, "stream id,position,found,not found,retry\n", string(retryCSV)) notFoundCSV, err := os.ReadFile(config.NotFoundPath) require.NoError(t, err) require.Equal(t, "stream id,position,found,not found,retry\n", string(notFoundCSV)) } func TestService_Failures(t *testing.T) { ctx := testcontext.New(t) log := testplanet.NewLogger(t) config := segmentverify.ServiceConfig{ NotFoundPath: ctx.File("not-found.csv"), RetryPath: ctx.File("retry.csv"), PriorityNodesPath: ctx.File("priority-nodes.txt"), Check: 2, BatchSize: 100, Concurrency: 3, MaxOffline: 2, } // the node 1 is going to be priority err := os.WriteFile(config.PriorityNodesPath, []byte((storj.NodeID{1}).String()+"\n"), 0755) require.NoError(t, err) func() { nodes := map[metabase.NodeAlias]storj.NodeID{} for i := 1; i <= 0xFF; i++ { nodes[metabase.NodeAlias(i)] = storj.NodeID{byte(i)} } segments := []metabase.VerifySegment{ { StreamID: uuid.UUID{0x10, 0x10}, AliasPieces: metabase.AliasPieces{{Number: 1, Alias: 8}, {Number: 3, Alias: 9}, {Number: 5, Alias: 10}, {Number: 0, Alias: 1}}, }, { StreamID: uuid.UUID{0x20, 0x20}, AliasPieces: metabase.AliasPieces{{Number: 0, Alias: 2}, {Number: 1, Alias: 3}, {Number: 7, Alias: 4}}, }, { StreamID: uuid.UUID{0x30, 0x30}, AliasPieces: metabase.AliasPieces{{Number: 0, Alias: 2}, {Number: 1, Alias: 3}, {Number: 7, Alias: 4}}, }, } metabase := newMetabaseMock(nodes, segments...) verifier := &verifierMock{ offline: []storj.NodeID{{0x02}, {0x08}, {0x09}, {0x0A}}, success: []uuid.UUID{segments[0].StreamID, segments[2].StreamID}, notFound: []uuid.UUID{segments[1].StreamID}, } service, err := segmentverify.NewService(log.Named("segment-verify"), metabase, verifier, metabase, config) require.NoError(t, err) require.NotNil(t, service) defer ctx.Check(service.Close) err = service.ProcessRange(ctx, uuid.UUID{}, maxUUID) require.NoError(t, err) for node, list := range verifier.processed { assert.True(t, isUnique(list), "each node should process only once: %v %#v", node, list) } }() retryCSV, err := os.ReadFile(config.RetryPath) require.NoError(t, err) require.Equal(t, ""+ "stream id,position,found,not found,retry\n"+ "10100000-0000-0000-0000-000000000000,0,1,0,1\n", string(retryCSV)) notFoundCSV, err := os.ReadFile(config.NotFoundPath) require.NoError(t, err) require.Equal(t, ""+ "stream id,position,found,not found,retry\n"+ "20200000-0000-0000-0000-000000000000,0,0,2,0\n", string(notFoundCSV)) } func isUnique(segments []*segmentverify.Segment) bool { type segmentID struct { StreamID uuid.UUID Position metabase.SegmentPosition } seen := map[segmentID]bool{} for _, seg := range segments { id := segmentID{StreamID: seg.StreamID, Position: seg.Position} if seen[id] { return false } seen[id] = true } return true } type metabaseMock struct { nodeIDToAlias map[storj.NodeID]metabase.NodeAlias aliasToNodeID map[metabase.NodeAlias]storj.NodeID streamIDsPerBucket map[metabase.BucketLocation][]uuid.UUID segments []metabase.VerifySegment } func newMetabaseMock(nodes map[metabase.NodeAlias]storj.NodeID, segments ...metabase.VerifySegment) *metabaseMock { mock := &metabaseMock{ nodeIDToAlias: map[storj.NodeID]metabase.NodeAlias{}, aliasToNodeID: nodes, segments: segments, streamIDsPerBucket: make(map[metabase.BucketLocation][]uuid.UUID), } for n, id := range nodes { mock.nodeIDToAlias[id] = n } return mock } func (db *metabaseMock) AddStreamIDToBucket(projectID uuid.UUID, bucketName string, streamIDs ...uuid.UUID) { bucket := metabase.BucketLocation{ProjectID: projectID, BucketName: bucketName} db.streamIDsPerBucket[bucket] = append(db.streamIDsPerBucket[bucket], streamIDs...) } func (db *metabaseMock) Get(ctx context.Context, nodeID storj.NodeID) (*overlay.NodeDossier, error) { return &overlay.NodeDossier{ Node: pb.Node{ Id: nodeID, Address: &pb.NodeAddress{ Transport: pb.NodeTransport_TCP_TLS_GRPC, Address: fmt.Sprintf("nodeid:%v", nodeID), }, }, }, nil } func (db *metabaseMock) SelectAllStorageNodesDownload(ctx context.Context, onlineWindow time.Duration, asOf overlay.AsOfSystemTimeConfig) ([]*overlay.SelectedNode, error) { var xs []*overlay.SelectedNode for nodeID := range db.nodeIDToAlias { xs = append(xs, &overlay.SelectedNode{ ID: nodeID, Address: &pb.NodeAddress{ Transport: pb.NodeTransport_TCP_TLS_GRPC, Address: fmt.Sprintf("nodeid:%v", nodeID), }, LastNet: "nodeid", LastIPPort: fmt.Sprintf("nodeid:%v", nodeID), CountryCode: 0, }) } return xs, nil } func (db *metabaseMock) LatestNodesAliasMap(ctx context.Context) (*metabase.NodeAliasMap, error) { var entries []metabase.NodeAliasEntry for id, alias := range db.nodeIDToAlias { entries = append(entries, metabase.NodeAliasEntry{ ID: id, Alias: alias, }) } return metabase.NewNodeAliasMap(entries), nil } func (db *metabaseMock) DeleteSegmentByPosition(ctx context.Context, opts metabase.GetSegmentByPosition) error { for i, s := range db.segments { if opts.StreamID == s.StreamID && opts.Position == s.Position { db.segments = append(db.segments[:i], db.segments[i+1:]...) return nil } } return metabase.ErrSegmentNotFound.New("%v", opts) } func (db *metabaseMock) GetSegmentByPosition(ctx context.Context, opts metabase.GetSegmentByPosition) (segment metabase.Segment, err error) { for _, s := range db.segments { if opts.StreamID == s.StreamID && opts.Position == s.Position { var pieces metabase.Pieces for _, p := range s.AliasPieces { pieces = append(pieces, metabase.Piece{ Number: p.Number, StorageNode: db.aliasToNodeID[p.Alias], }) } return metabase.Segment{ StreamID: s.StreamID, Position: s.Position, Pieces: pieces, }, nil } } return metabase.Segment{}, metabase.ErrSegmentNotFound.New("%v", opts) } func (db *metabaseMock) ListBucketsStreamIDs(ctx context.Context, opts metabase.ListBucketsStreamIDs) (metabase.ListBucketsStreamIDsResult, error) { result := metabase.ListBucketsStreamIDsResult{} for _, bucket := range opts.BucketList.Buckets { if bucket.ProjectID.Compare(opts.CursorBucket.ProjectID) <= 0 { continue } if bucket.BucketName <= opts.CursorBucket.BucketName { continue } if opts.CursorStreamID.IsZero() { result.StreamIDs = append(result.StreamIDs, db.streamIDsPerBucket[bucket]...) continue } for cursorIndex, streamID := range db.streamIDsPerBucket[bucket] { if opts.CursorStreamID.Less(streamID) { result.StreamIDs = append(result.StreamIDs, db.streamIDsPerBucket[bucket][cursorIndex:]...) break } } if len(result.StreamIDs) > opts.Limit { break } } if len(result.StreamIDs) > opts.Limit { result.StreamIDs = result.StreamIDs[:opts.Limit] } sort.Slice(result.StreamIDs, func(i, j int) bool { return result.StreamIDs[i].Less(result.StreamIDs[j]) }) return result, nil } func (db *metabaseMock) ListVerifySegments(ctx context.Context, opts metabase.ListVerifySegments) (result metabase.ListVerifySegmentsResult, err error) { r := metabase.ListVerifySegmentsResult{} for _, s := range db.segments { if s.StreamID.Less(opts.CursorStreamID) { continue } if s.StreamID == opts.CursorStreamID && !opts.CursorPosition.Less(s.Position) { continue } r.Segments = append(r.Segments, s) if len(r.Segments) >= opts.Limit { break } } return r, nil } type verifierMock struct { allSuccess bool fail error offline []storj.NodeID success []uuid.UUID notFound []uuid.UUID mu sync.Mutex processed map[storj.NodeID][]*segmentverify.Segment } func (v *verifierMock) Verify(ctx context.Context, alias metabase.NodeAlias, target storj.NodeURL, segments []*segmentverify.Segment, _ bool) (int, error) { v.mu.Lock() if v.processed == nil { v.processed = map[storj.NodeID][]*segmentverify.Segment{} } v.processed[target.ID] = append(v.processed[target.ID], segments...) v.mu.Unlock() for _, n := range v.offline { if n == target.ID { return 0, segmentverify.ErrNodeOffline.New("node did not respond %v", target) } } if v.fail != nil { return 0, errs.Wrap(v.fail) } if v.allSuccess { for _, seg := range segments { seg.Status.MarkFound() } return len(segments), nil } for _, seg := range v.success { for _, t := range segments { if t.StreamID == seg { t.Status.MarkFound() } } } for _, seg := range v.notFound { for _, t := range segments { if t.StreamID == seg { t.Status.MarkNotFound() } } } return len(segments), nil }