diff --git a/cmd/tools/segment-verify/README.md b/cmd/tools/segment-verify/README.md index 0d8b7c8b4..0c68d3758 100644 --- a/cmd/tools/segment-verify/README.md +++ b/cmd/tools/segment-verify/README.md @@ -22,6 +22,8 @@ There are few parameters for controlling the verification itself: --verify.per-piece-timeout duration duration to wait per piece download (default 800ms) # Just the regular dialing timeout. --verify.dial-timeout duration how long to wait for a successful dial (default 2s) +# This allows to specify the minimum node version that has the Exists endpoint. +--verify.version-with-exists string minimum storage node version with implemented Exists method (default "v1.69.2") ``` ## Running the tool diff --git a/cmd/tools/segment-verify/process.go b/cmd/tools/segment-verify/process.go index 49ed3d961..cd7edf8eb 100644 --- a/cmd/tools/segment-verify/process.go +++ b/cmd/tools/segment-verify/process.go @@ -83,7 +83,7 @@ func (service *Service) VerifyBatches(ctx context.Context, batches []*Batch) err for _, batch := range batches { batch := batch - nodeURL, err := service.convertAliasToNodeURL(ctx, batch.Alias) + info, err := service.GetNodeInfo(ctx, batch.Alias) if err != nil { return Error.Wrap(err) } @@ -91,7 +91,7 @@ func (service *Service) VerifyBatches(ctx context.Context, batches []*Batch) err ignoreThrottle := service.priorityNodes.Contains(batch.Alias) limiter.Go(ctx, func() { - verifiedCount, err := service.verifier.Verify(ctx, batch.Alias, nodeURL, batch.Items, ignoreThrottle) + verifiedCount, err := service.verifier.Verify(ctx, batch.Alias, info.NodeURL, info.Version, batch.Items, ignoreThrottle) if err != nil { if ErrNodeOffline.Has(err) { mu.Lock() @@ -143,6 +143,9 @@ func (service *Service) convertAliasToNodeURL(ctx context.Context, alias metabas return storj.NodeURL{}, Error.Wrap(err) } + // TODO: single responsibility? + service.nodesVersionMap[alias] = info.Version.Version + nodeURL = storj.NodeURL{ ID: info.Id, Address: info.Address.Address, @@ -152,3 +155,34 @@ func (service *Service) convertAliasToNodeURL(ctx context.Context, alias metabas } return nodeURL, nil } + +// NodeInfo contains node information. +type NodeInfo struct { + Version string + NodeURL storj.NodeURL +} + +// GetNodeInfo retrieves node information, using a cache if needed. +func (service *Service) GetNodeInfo(ctx context.Context, alias metabase.NodeAlias) (NodeInfo, error) { + nodeURL, err := service.convertAliasToNodeURL(ctx, alias) + if err != nil { + return NodeInfo{}, Error.Wrap(err) + } + + version, ok := service.nodesVersionMap[alias] + + if !ok { + info, err := service.overlay.Get(ctx, nodeURL.ID) + if err != nil { + return NodeInfo{}, Error.Wrap(err) + } + + service.nodesVersionMap[alias] = info.Version.Version + version = info.Version.Version + } + + return NodeInfo{ + NodeURL: nodeURL, + Version: version, + }, nil +} diff --git a/cmd/tools/segment-verify/service.go b/cmd/tools/segment-verify/service.go index a5bdcd953..c224cca38 100644 --- a/cmd/tools/segment-verify/service.go +++ b/cmd/tools/segment-verify/service.go @@ -37,7 +37,7 @@ type Metabase interface { // Verifier verifies a batch of segments. type Verifier interface { - Verify(ctx context.Context, nodeAlias metabase.NodeAlias, target storj.NodeURL, segments []*Segment, ignoreThrottle bool) (verifiedCount int, err error) + Verify(ctx context.Context, nodeAlias metabase.NodeAlias, target storj.NodeURL, targetVersion string, segments []*Segment, ignoreThrottle bool) (verifiedCount int, err error) } // Overlay is used to fetch information about nodes. @@ -88,12 +88,13 @@ type Service struct { verifier Verifier overlay Overlay - aliasMap *metabase.NodeAliasMap - aliasToNodeURL map[metabase.NodeAlias]storj.NodeURL - priorityNodes NodeAliasSet - onlineNodes NodeAliasSet - offlineCount map[metabase.NodeAlias]int - bucketList BucketList + aliasMap *metabase.NodeAliasMap + aliasToNodeURL map[metabase.NodeAlias]storj.NodeURL + priorityNodes NodeAliasSet + onlineNodes NodeAliasSet + offlineCount map[metabase.NodeAlias]int + bucketList BucketList + nodesVersionMap map[metabase.NodeAlias]string // this is a callback so that problematic pieces can be reported as they are found, // rather than being kept in a list which might grow unreasonably large. @@ -129,10 +130,11 @@ func NewService(log *zap.Logger, metabaseDB Metabase, verifier Verifier, overlay verifier: verifier, overlay: overlay, - aliasToNodeURL: map[metabase.NodeAlias]storj.NodeURL{}, - priorityNodes: NodeAliasSet{}, - onlineNodes: NodeAliasSet{}, - offlineCount: map[metabase.NodeAlias]int{}, + aliasToNodeURL: map[metabase.NodeAlias]storj.NodeURL{}, + priorityNodes: NodeAliasSet{}, + onlineNodes: NodeAliasSet{}, + offlineCount: map[metabase.NodeAlias]int{}, + nodesVersionMap: map[metabase.NodeAlias]string{}, reportPiece: problemPieces.Write, }, nil diff --git a/cmd/tools/segment-verify/service_test.go b/cmd/tools/segment-verify/service_test.go index c6200cf82..d4b64e2a5 100644 --- a/cmd/tools/segment-verify/service_test.go +++ b/cmd/tools/segment-verify/service_test.go @@ -471,7 +471,7 @@ type verifierMock struct { processed map[storj.NodeID][]*segmentverify.Segment } -func (v *verifierMock) Verify(ctx context.Context, alias metabase.NodeAlias, target storj.NodeURL, segments []*segmentverify.Segment, _ bool) (int, error) { +func (v *verifierMock) Verify(ctx context.Context, alias metabase.NodeAlias, target storj.NodeURL, targetVersion string, segments []*segmentverify.Segment, _ bool) (int, error) { v.mu.Lock() if v.processed == nil { v.processed = map[storj.NodeID][]*segmentverify.Segment{} diff --git a/cmd/tools/segment-verify/verify.go b/cmd/tools/segment-verify/verify.go index 538b991ab..57b361613 100644 --- a/cmd/tools/segment-verify/verify.go +++ b/cmd/tools/segment-verify/verify.go @@ -8,10 +8,12 @@ import ( "io" "time" + "github.com/blang/semver" "github.com/zeebo/errs" "go.uber.org/zap" "storj.io/common/errs2" + "storj.io/common/pb" "storj.io/common/rpc" "storj.io/common/rpc/rpcpool" "storj.io/common/rpc/rpcstatus" @@ -26,13 +28,16 @@ import ( // ErrNodeOffline is returned when it was not possible to contact a node or the node was not responding. var ErrNodeOffline = errs.Class("node offline") +var errWrongNodeVersion = errs.Class("wrong node version") + // VerifierConfig contains configurations for operation. type VerifierConfig struct { DialTimeout time.Duration `help:"how long to wait for a successful dial" default:"2s"` PerPieceTimeout time.Duration `help:"duration to wait per piece download" default:"800ms"` OrderRetryThrottle time.Duration `help:"how much to wait before retrying order creation" default:"50ms"` - RequestThrottle time.Duration `help:"minimum interval for sending out each request" default:"150ms"` + RequestThrottle time.Duration `help:"minimum interval for sending out each request" default:"150ms"` + VersionWithExists string `help:"minimum storage node version with implemented Exists method" default:"v1.69.2"` } // NodeVerifier implements segment verification by dialing nodes. @@ -45,6 +50,8 @@ type NodeVerifier struct { orders *orders.Service reportPiece pieceReporterFunc + + versionWithExists semver.Version } var _ Verifier = (*NodeVerifier)(nil) @@ -62,16 +69,34 @@ func NewVerifier(log *zap.Logger, dialer rpc.Dialer, orders *orders.Service, con IdleExpiration: 10 * time.Minute, }) + version, err := semver.ParseTolerant(config.VersionWithExists) + if err != nil { + log.Warn("invalid VersionWithExists", zap.String("VersionWithExists", config.VersionWithExists), zap.Error(err)) + } + return &NodeVerifier{ - log: log, - config: config, - dialer: configuredDialer, - orders: orders, + log: log, + config: config, + dialer: configuredDialer, + orders: orders, + versionWithExists: version, } } // Verify a collection of segments by attempting to download a byte from each segment from the target node. -func (service *NodeVerifier) Verify(ctx context.Context, alias metabase.NodeAlias, target storj.NodeURL, segments []*Segment, ignoreThrottle bool) (verifiedCount int, err error) { +func (service *NodeVerifier) Verify(ctx context.Context, alias metabase.NodeAlias, target storj.NodeURL, targetVersion string, segments []*Segment, ignoreThrottle bool) (verifiedCount int, err error) { + verifiedCount, err = service.VerifyWithExists(ctx, alias, target, targetVersion, segments) + // if Exists method is unimplemented or it is wrong node version fallback to download verification + if !errs2.IsRPC(err, rpcstatus.Unimplemented) && !errWrongNodeVersion.Has(err) { + return verifiedCount, err + } + if err != nil { + service.log.Debug("fallback to download method", zap.Error(err)) + err = nil + } + + service.log.Debug("verify segments by downloading pieces") + var client *piecestore.Client defer func() { if client != nil { @@ -212,6 +237,99 @@ func findPieceNum(segment *Segment, alias metabase.NodeAlias) uint16 { panic("piece number not found") } +// VerifyWithExists verifies that the segments exist on the specified node by calling the piecestore Exists +// endpoint if the node version supports it. +func (service *NodeVerifier) VerifyWithExists(ctx context.Context, alias metabase.NodeAlias, target storj.NodeURL, targetVersion string, segments []*Segment) (verifiedCount int, err error) { + if service.versionWithExists.String() == "" || targetVersion == "" { + return 0, errWrongNodeVersion.New("missing node version or no base version defined") + } + + nodeVersion, err := semver.ParseTolerant(targetVersion) + if err != nil { + return 0, errWrongNodeVersion.Wrap(err) + } + + if !nodeVersion.GE(service.versionWithExists) { + return 0, errWrongNodeVersion.New("too old version") + } + + service.log.Debug("verify segments using Exists method", zap.Stringer("node-id", target.ID)) + + var conn *rpc.Conn + var client pb.DRPCPiecestoreClient + defer func() { + if conn != nil { + _ = conn.Close() + } + }() + + const maxDials = 2 + dialCount := 0 + + for client == nil { + dialCount++ + if dialCount > maxDials { + return 0, ErrNodeOffline.New("too many redials") + } + + conn, err := service.dialer.DialNodeURL(rpcpool.WithForceDial(ctx), target) + if err != nil { + service.log.Info("failed to dial node", + zap.Stringer("node-id", target.ID), + zap.Error(err)) + } else { + client = pb.NewDRPCPiecestoreClient(conn) + } + } + + err = service.verifySegmentsWithExists(ctx, client, alias, target, segments) + if err != nil { + // we could not do the verification, for a reason that implies we won't be able + // to do any more + return 0, Error.Wrap(err) + } + + return len(segments), nil +} + +// verifySegmentsWithExists calls the Exists endpoint on the specified target node for each segment. +func (service *NodeVerifier) verifySegmentsWithExists(ctx context.Context, client pb.DRPCPiecestoreClient, alias metabase.NodeAlias, target storj.NodeURL, segments []*Segment) (err error) { + pieceIds := make([]storj.PieceID, 0, len(segments)) + + for _, segment := range segments { + pieceNum := findPieceNum(segment, alias) + + pieceId := segment.RootPieceID.Derive(target.ID, int32(pieceNum)) + pieceIds = append(pieceIds, pieceId) + } + + response, err := client.Exists(ctx, &pb.ExistsRequest{ + PieceIds: pieceIds, + }) + if err != nil { + return Error.Wrap(err) + } + + for index := range pieceIds { + if missing(index, response.Missing) { + segments[index].Status.MarkNotFound() + } else { + segments[index].Status.MarkFound() + } + } + + return nil +} + +func missing(index int, missing []uint32) bool { + for _, m := range missing { + if uint32(index) == m { + return true + } + } + return false +} + // rateLimiter limits the rate of some type of event. It acts like a token // bucket, allowing for bursting, as long as the _average_ interval between // events over the lifetime of the rateLimiter is less than or equal to the diff --git a/cmd/tools/segment-verify/verify_test.go b/cmd/tools/segment-verify/verify_test.go index 73f8688d6..08dd40cf4 100644 --- a/cmd/tools/segment-verify/verify_test.go +++ b/cmd/tools/segment-verify/verify_test.go @@ -9,6 +9,8 @@ import ( "time" "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" "golang.org/x/sync/errgroup" "storj.io/common/memory" @@ -30,14 +32,22 @@ func TestVerifier(t *testing.T) { satellite := planet.Satellites[0] snoCount := int32(len(planet.StorageNodes)) + olderNodeVersion := "v1.68.1" // version without Exists endpoint + newerNodeVersion := "v1.69.2" // minimum version with Exists endpoint config := segmentverify.VerifierConfig{ PerPieceTimeout: time.Second, OrderRetryThrottle: 500 * time.Millisecond, RequestThrottle: 500 * time.Millisecond, + VersionWithExists: "v1.69.2", } + + // create new observed logger + observedZapCore, observedLogs := observer.New(zap.DebugLevel) + observedLogger := zap.New(observedZapCore).Named("verifier") + service := segmentverify.NewVerifier( - planet.Log().Named("verifier"), + observedLogger, satellite.Dialer, satellite.Orders.Service, config) @@ -69,17 +79,35 @@ func TestVerifier(t *testing.T) { aliasMap, err := satellite.Metabase.DB.LatestNodesAliasMap(ctx) require.NoError(t, err) + nodeWithExistsEndpoint := planet.StorageNodes[testrand.Intn(len(planet.StorageNodes)-1)] + var g errgroup.Group for _, node := range planet.StorageNodes { node := node + nodeVersion := olderNodeVersion + if node == nodeWithExistsEndpoint { + nodeVersion = newerNodeVersion + } alias, ok := aliasMap.Alias(node.ID()) require.True(t, ok) g.Go(func() error { - _, err := service.Verify(ctx, alias, node.NodeURL(), validSegments, true) + _, err := service.Verify(ctx, alias, node.NodeURL(), nodeVersion, validSegments, true) return err }) } require.NoError(t, g.Wait()) + require.NotZero(t, len(observedLogs.All())) + + // check that segments were verified with download method + fallbackLogs := observedLogs.FilterMessage("fallback to download method").All() + require.Equal(t, 3, len(fallbackLogs)) + require.Equal(t, zap.DebugLevel, fallbackLogs[0].Level) + + // check that segments were verified with exists endpoint + existsLogs := observedLogs.FilterMessage("verify segments using Exists method").All() + require.Equal(t, 1, len(existsLogs)) + require.Equal(t, zap.DebugLevel, existsLogs[0].Level) + for _, seg := range validSegments { require.Equal(t, segmentverify.Status{Found: snoCount, NotFound: 0, Retry: 0}, seg.Status) } @@ -108,31 +136,60 @@ func TestVerifier(t *testing.T) { } var count int - count, err = service.Verify(ctx, alias0, planet.StorageNodes[0].NodeURL(), - []*segmentverify.Segment{validSegment0, missingSegment, validSegment1}, true) - require.NoError(t, err) - require.Equal(t, 3, count) - require.Equal(t, segmentverify.Status{Found: 1}, validSegment0.Status) - require.Equal(t, segmentverify.Status{NotFound: 1}, missingSegment.Status) - require.Equal(t, segmentverify.Status{Found: 1}, validSegment1.Status) + t.Run("segment not found using download method", func(t *testing.T) { + // for older node version + count, err = service.Verify(ctx, alias0, planet.StorageNodes[0].NodeURL(), olderNodeVersion, + []*segmentverify.Segment{validSegment0, missingSegment, validSegment1}, true) + require.NoError(t, err) + require.Equal(t, 3, count) + require.Equal(t, segmentverify.Status{Found: 1}, validSegment0.Status) + require.Equal(t, segmentverify.Status{NotFound: 1}, missingSegment.Status) + require.Equal(t, segmentverify.Status{Found: 1}, validSegment1.Status) + }) - // Test throttling - verifyStart := time.Now() - const throttleN = 5 - count, err = service.Verify(ctx, alias0, planet.StorageNodes[0].NodeURL(), validSegments[:throttleN], false) - require.NoError(t, err) - verifyDuration := time.Since(verifyStart) - require.Equal(t, throttleN, count) - require.Greater(t, verifyDuration, config.RequestThrottle*(throttleN-1)) + // reset status + validSegment0.Status = segmentverify.Status{Retry: 1} + missingSegment.Status = segmentverify.Status{Retry: 1} + validSegment1.Status = segmentverify.Status{Retry: 1} + + t.Run("segment not found using exists method", func(t *testing.T) { + count, err = service.Verify(ctx, alias0, planet.StorageNodes[0].NodeURL(), newerNodeVersion, + []*segmentverify.Segment{validSegment0, missingSegment, validSegment1}, true) + require.NoError(t, err) + require.Equal(t, 3, count) + require.Equal(t, segmentverify.Status{Found: 1}, validSegment0.Status) + require.Equal(t, segmentverify.Status{NotFound: 1}, missingSegment.Status) + require.Equal(t, segmentverify.Status{Found: 1}, validSegment1.Status) + }) + + t.Run("test throttling", func(t *testing.T) { + // Test throttling + verifyStart := time.Now() + const throttleN = 5 + count, err = service.Verify(ctx, alias0, planet.StorageNodes[0].NodeURL(), olderNodeVersion, validSegments[:throttleN], false) + require.NoError(t, err) + verifyDuration := time.Since(verifyStart) + require.Equal(t, throttleN, count) + require.Greater(t, verifyDuration, config.RequestThrottle*(throttleN-1)) + }) // TODO: test download timeout - // node offline - err = planet.StopNodeAndUpdate(ctx, planet.StorageNodes[0]) - require.NoError(t, err) - count, err = service.Verify(ctx, alias0, planet.StorageNodes[0].NodeURL(), validSegments, true) - require.Error(t, err) - require.Equal(t, 0, count) - require.True(t, segmentverify.ErrNodeOffline.Has(err)) + t.Run("Node offline", func(t *testing.T) { + err = planet.StopNodeAndUpdate(ctx, planet.StorageNodes[0]) + require.NoError(t, err) + + // for older node version + count, err = service.Verify(ctx, alias0, planet.StorageNodes[0].NodeURL(), olderNodeVersion, validSegments, true) + require.Error(t, err) + require.Equal(t, 0, count) + require.True(t, segmentverify.ErrNodeOffline.Has(err)) + + // for node version with Exists endpoint + count, err = service.Verify(ctx, alias0, planet.StorageNodes[0].NodeURL(), newerNodeVersion, validSegments, true) + require.Error(t, err) + require.Equal(t, 0, count) + require.True(t, segmentverify.ErrNodeOffline.Has(err)) + }) }) }