cmd/tools/segment-verify: add test for whole command line tool

Clarify and expand some tests, and add a large test that tries to cover
the whole segment-verify stack.

Trying to reproduce a problem we saw in production where the Found count
in the output csv increases with each entry, eventually reaching the
thousands, instead of representing the actual number of pieces found for
each entry.

Change-Id: I65e342fad7dc1c350830fd0c8ce75a87a01d495c
This commit is contained in:
paul cannon 2023-10-16 15:57:23 -05:00 committed by Storj Robot
parent 6939c7ec25
commit 39c2bb9e4b
4 changed files with 343 additions and 48 deletions

View File

@ -247,7 +247,6 @@ func verifySegments(cmd *cobra.Command, args []string) error {
if err != nil {
return Error.Wrap(err)
}
verifier.reportPiece = service.problemPieces.Write
defer func() { err = errs.Combine(err, service.Close()) }()
log.Debug("starting", zap.Any("config", service.config), zap.String("command", cmd.Name()))

View File

@ -0,0 +1,278 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information
package main
import (
"context"
"encoding/csv"
"errors"
"fmt"
"io"
"math/rand"
"os"
"strconv"
"strings"
"testing"
"time"
"github.com/jackc/pgx/v5/stdlib"
"github.com/spf13/cobra"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"storj.io/common/memory"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/common/uuid"
"storj.io/private/dbutil/cockroachutil"
"storj.io/private/tagsql"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite/metabase"
"storj.io/storj/storagenode/pieces"
)
func TestCommandLineTool(t *testing.T) {
const (
nodeCount = 10
uplinkCount = 10
)
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: nodeCount, UplinkCount: uplinkCount,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.ReconfigureRS(nodeCount, nodeCount, nodeCount, nodeCount),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
// get the db connstrings that we can set in the global config (these are hilariously hard to get,
// but we really don't need to get them anywhere else in the codebase)
dbConnString := getConnStringFromDBConn(t, ctx, satellite.DB.Testing().RawDB())
metaDBConnString := getConnStringFromDBConn(t, ctx, satellite.Metabase.DB.UnderlyingTagSQL())
notFoundCSV := ctx.File("notfound.csv")
retryCSV := ctx.File("retry.csv")
problemPiecesCSV := ctx.File("problempieces.csv")
// set up global config that the main func will use
satelliteCfg.Config = satellite.Config
satelliteCfg.Database = dbConnString
satelliteCfg.Metainfo.DatabaseURL = metaDBConnString
satelliteCfg.Identity.KeyPath = ctx.File("identity-key")
satelliteCfg.Identity.CertPath = ctx.File("identity-cert")
require.NoError(t, satelliteCfg.Identity.Save(satellite.Identity))
rangeCfg.Verify = VerifierConfig{
PerPieceTimeout: time.Second,
OrderRetryThrottle: 500 * time.Millisecond,
RequestThrottle: 500 * time.Millisecond,
}
rangeCfg.Service = ServiceConfig{
NotFoundPath: notFoundCSV,
RetryPath: retryCSV,
ProblemPiecesPath: problemPiecesCSV,
Check: 0,
BatchSize: 10000,
Concurrency: 1000,
MaxOffline: 2,
OfflineStatusCacheTime: 10 * time.Second,
AsOfSystemInterval: -1 * time.Microsecond,
}
rangeCfg.Low = strings.Repeat("0", 32)
rangeCfg.High = strings.Repeat("f", 32)
// upload some data
data := testrand.Bytes(8 * memory.KiB)
for u, up := range planet.Uplinks {
for i := 0; i < nodeCount; i++ {
err := up.Upload(ctx, satellite, "bucket1", fmt.Sprintf("uplink%d/i%d", u, i), data)
require.NoError(t, err)
}
}
// take one node offline so there will be some pieces in the retry list
offlineNode := planet.StorageNodes[0]
require.NoError(t, planet.StopPeer(offlineNode))
// and delete 10% of pieces at random so there will be some pieces in the not-found list
const deleteFrac = 0.10
allDeletedPieces := make(map[storj.NodeID]map[storj.PieceID]struct{})
numDeletedPieces := 0
for nodeNum, node := range planet.StorageNodes {
if node.ID() == offlineNode.ID() {
continue
}
deletedPieces, err := deletePiecesRandomly(ctx, satellite.ID(), node, deleteFrac)
require.NoError(t, err, nodeNum)
allDeletedPieces[node.ID()] = deletedPieces
numDeletedPieces += len(deletedPieces)
}
// check that the number of segments we expect are present in the metainfo db
result, err := satellite.Metabase.DB.ListVerifySegments(ctx, metabase.ListVerifySegments{
CursorStreamID: uuid.UUID{},
CursorPosition: metabase.SegmentPosition{},
Limit: 10000,
})
require.NoError(t, err)
require.Len(t, result.Segments, uplinkCount*nodeCount)
// perform the verify!
err = verifySegments(&cobra.Command{Use: "range"}, nil)
require.NoError(t, err)
// open the CSVs to check that we get the expected results
retryCSVHandle, err := os.Open(retryCSV)
require.NoError(t, err)
defer ctx.Check(retryCSVHandle.Close)
retryCSVReader := csv.NewReader(retryCSVHandle)
notFoundCSVHandle, err := os.Open(notFoundCSV)
require.NoError(t, err)
defer ctx.Check(notFoundCSVHandle.Close)
notFoundCSVReader := csv.NewReader(notFoundCSVHandle)
problemPiecesCSVHandle, err := os.Open(problemPiecesCSV)
require.NoError(t, err)
defer ctx.Check(problemPiecesCSVHandle.Close)
problemPiecesCSVReader := csv.NewReader(problemPiecesCSVHandle)
// in the retry CSV, we don't expect any rows, because there would need to be more than 5
// nodes offline to produce records here.
// TODO: make that 5 configurable so we can override it here and check results
header, err := retryCSVReader.Read()
require.NoError(t, err)
assert.Equal(t, []string{"stream id", "position", "found", "not found", "retry"}, header)
for {
record, err := retryCSVReader.Read()
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
assert.Fail(t, "unexpected record in retry.csv", "%v", record)
}
// we do expect plenty of rows in not-found.csv. we don't know exactly what pieces these
// pertain to, but we can add up all the reported not-found pieces and expect the total
// to match numDeletedPieces. In addition, for each segment, found+notfound+retry should
// equal nodeCount.
header, err = notFoundCSVReader.Read()
require.NoError(t, err)
assert.Equal(t, []string{"stream id", "position", "found", "not found", "retry"}, header)
identifiedNotFoundPieces := 0
for {
record, err := notFoundCSVReader.Read()
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
found, err := strconv.Atoi(record[2])
require.NoError(t, err)
notFound, err := strconv.Atoi(record[3])
require.NoError(t, err)
retry, err := strconv.Atoi(record[4])
require.NoError(t, err)
lineNum, _ := notFoundCSVReader.FieldPos(0)
assert.Equal(t, nodeCount, found+notFound+retry,
"line %d of not-found.csv contains record: %v where found+notFound+retry != %d", lineNum, record, nodeCount)
identifiedNotFoundPieces += notFound
}
assert.Equal(t, numDeletedPieces, identifiedNotFoundPieces)
// finally, in problem-pieces.csv, we can check results with more precision. we expect
// that all deleted pieces were identified, and that no pieces were identified as not found
// unless we deleted them specifically.
header, err = problemPiecesCSVReader.Read()
require.NoError(t, err)
assert.Equal(t, []string{"stream id", "position", "node id", "piece number", "outcome"}, header)
for {
record, err := problemPiecesCSVReader.Read()
if errors.Is(err, io.EOF) {
break
}
streamID, err := uuid.FromString(record[0])
require.NoError(t, err)
position, err := strconv.ParseUint(record[1], 10, 64)
require.NoError(t, err)
nodeID, err := storj.NodeIDFromString(record[2])
require.NoError(t, err)
pieceNum, err := strconv.ParseInt(record[3], 10, 16)
require.NoError(t, err)
outcome := record[4]
switch outcome {
case "NODE_OFFLINE":
// expect that this was the node we took offline
assert.Equal(t, offlineNode.ID(), nodeID,
"record %v said node %s was offline, but we didn't take it offline", record, nodeID)
case "NOT_FOUND":
segmentPosition := metabase.SegmentPositionFromEncoded(position)
segment, err := satellite.Metabase.DB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: streamID,
Position: segmentPosition,
})
require.NoError(t, err)
pieceID := segment.RootPieceID.Derive(nodeID, int32(pieceNum))
deletedPiecesForNode, ok := allDeletedPieces[nodeID]
require.True(t, ok)
_, ok = deletedPiecesForNode[pieceID]
assert.True(t, ok, "we did not delete piece ID %s, but it was identified as not found", pieceID)
delete(deletedPiecesForNode, pieceID)
default:
assert.Fail(t, "unexpected outcome from problem-pieces.csv", "got %q, but expected \"NODE_OFFLINE\" or \"NOT_FOUND\"", outcome)
}
}
for node, deletedPieces := range allDeletedPieces {
assert.Empty(t, deletedPieces, "pieces were deleted from %v but were not reported in problem-pieces.csv", node)
}
})
}
func deletePiecesRandomly(ctx context.Context, satelliteID storj.NodeID, node *testplanet.StorageNode, rate float64) (deletedPieces map[storj.PieceID]struct{}, err error) {
deletedPieces = make(map[storj.PieceID]struct{})
err = node.Storage2.FileWalker.WalkSatellitePieces(ctx, satelliteID, func(access pieces.StoredPieceAccess) error {
if rand.Float64() < rate {
path, err := access.FullPath(ctx)
if err != nil {
return err
}
err = os.Remove(path)
if err != nil {
return err
}
deletedPieces[access.PieceID()] = struct{}{}
}
return nil
})
return deletedPieces, err
}
func getConnStringFromDBConn(t *testing.T, ctx *testcontext.Context, tagsqlDB tagsql.DB) (dbConnString string) {
type dbConnGetter interface {
StdlibConn() *stdlib.Conn
}
dbConn, err := tagsqlDB.Conn(ctx)
require.NoError(t, err)
defer ctx.Check(dbConn.Close)
err = dbConn.Raw(ctx, func(driverConn interface{}) error {
var stdlibConn *stdlib.Conn
switch conn := driverConn.(type) {
case dbConnGetter:
stdlibConn = conn.StdlibConn()
case *stdlib.Conn:
stdlibConn = conn
}
dbConnString = stdlibConn.Conn().Config().ConnString()
return nil
})
require.NoError(t, err)
if _, ok := tagsqlDB.Driver().(*cockroachutil.Driver); ok {
dbConnString = strings.ReplaceAll(dbConnString, "postgres://", "cockroach://")
}
return dbConnString
}

View File

@ -123,6 +123,10 @@ func NewService(log *zap.Logger, metabaseDB Metabase, verifier Verifier, overlay
return nil, errs.Combine(Error.Wrap(err), retry.Close(), notFound.Close())
}
if nodeVerifier, ok := verifier.(*NodeVerifier); ok {
nodeVerifier.reportPiece = problemPieces.Write
}
return &Service{
log: log,
config: config,

View File

@ -4,7 +4,7 @@
package main_test
import (
"strconv"
"fmt"
"testing"
"time"
@ -23,15 +23,19 @@ import (
)
func TestVerifier(t *testing.T) {
const (
nodeCount = 10
uplinkCount = 10
)
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
SatelliteCount: 1, StorageNodeCount: nodeCount, UplinkCount: uplinkCount,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.ReconfigureRS(4, 4, 4, 4),
Satellite: testplanet.ReconfigureRS(nodeCount, nodeCount, nodeCount, nodeCount),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
snoCount := int32(len(planet.StorageNodes))
olderNodeVersion := "v1.68.1" // version without Exists endpoint
newerNodeVersion := "v1.69.2" // minimum version with Exists endpoint
@ -46,7 +50,7 @@ func TestVerifier(t *testing.T) {
observedZapCore, observedLogs := observer.New(zap.DebugLevel)
observedLogger := zap.New(observedZapCore).Named("verifier")
service := segmentverify.NewVerifier(
verifier := segmentverify.NewVerifier(
observedLogger,
satellite.Dialer,
satellite.Orders.Service,
@ -54,9 +58,9 @@ func TestVerifier(t *testing.T) {
// upload some data
data := testrand.Bytes(8 * memory.KiB)
for _, up := range planet.Uplinks {
for i := 0; i < 10; i++ {
err := up.Upload(ctx, satellite, "bucket1", strconv.Itoa(i), data)
for u, up := range planet.Uplinks {
for i := 0; i < nodeCount; i++ {
err := up.Upload(ctx, satellite, "bucket1", fmt.Sprintf("uplink%d/i%d", u, i), data)
require.NoError(t, err)
}
}
@ -67,50 +71,57 @@ func TestVerifier(t *testing.T) {
Limit: 10000,
})
require.NoError(t, err)
require.Len(t, result.Segments, uplinkCount*nodeCount)
validSegments := []*segmentverify.Segment{}
for _, raw := range result.Segments {
validSegments = append(validSegments, &segmentverify.Segment{
VerifySegment: raw,
Status: segmentverify.Status{Retry: snoCount},
})
validSegments := make([]*segmentverify.Segment, len(result.Segments))
for i, raw := range result.Segments {
validSegments[i] = &segmentverify.Segment{VerifySegment: raw}
}
resetStatuses := func() {
for _, seg := range validSegments {
seg.Status = segmentverify.Status{Retry: nodeCount}
}
}
resetStatuses()
aliasMap, err := satellite.Metabase.DB.LatestNodesAliasMap(ctx)
require.NoError(t, err)
nodeWithExistsEndpoint := planet.StorageNodes[testrand.Intn(len(planet.StorageNodes)-1)]
t.Run("verify all", func(t *testing.T) {
nodeWithExistsEndpoint := planet.StorageNodes[testrand.Intn(len(planet.StorageNodes)-1)]
var g errgroup.Group
for _, node := range planet.StorageNodes {
node := node
nodeVersion := olderNodeVersion
if node == nodeWithExistsEndpoint {
nodeVersion = newerNodeVersion
var g errgroup.Group
for _, node := range planet.StorageNodes {
node := node
nodeVersion := olderNodeVersion
if node == nodeWithExistsEndpoint {
nodeVersion = newerNodeVersion
}
alias, ok := aliasMap.Alias(node.ID())
require.True(t, ok)
g.Go(func() error {
_, err := verifier.Verify(ctx, alias, node.NodeURL(), nodeVersion, validSegments, true)
return err
})
}
alias, ok := aliasMap.Alias(node.ID())
require.True(t, ok)
g.Go(func() error {
_, err := service.Verify(ctx, alias, node.NodeURL(), nodeVersion, validSegments, true)
return err
})
}
require.NoError(t, g.Wait())
require.NotZero(t, len(observedLogs.All()))
require.NoError(t, g.Wait())
require.NotZero(t, len(observedLogs.All()))
// check that segments were verified with download method
fallbackLogs := observedLogs.FilterMessage("fallback to download method").All()
require.Equal(t, 3, len(fallbackLogs))
require.Equal(t, zap.DebugLevel, fallbackLogs[0].Level)
// check that segments were verified with download method
fallbackLogs := observedLogs.FilterMessage("fallback to download method").All()
require.Equal(t, nodeCount-1, len(fallbackLogs))
require.Equal(t, zap.DebugLevel, fallbackLogs[0].Level)
// check that segments were verified with exists endpoint
existsLogs := observedLogs.FilterMessage("verify segments using Exists method").All()
require.Equal(t, 1, len(existsLogs))
require.Equal(t, zap.DebugLevel, existsLogs[0].Level)
// check that segments were verified with exists endpoint
existsLogs := observedLogs.FilterMessage("verify segments using Exists method").All()
require.Equal(t, 1, len(existsLogs))
require.Equal(t, zap.DebugLevel, existsLogs[0].Level)
for _, seg := range validSegments {
require.Equal(t, segmentverify.Status{Found: snoCount, NotFound: 0, Retry: 0}, seg.Status)
}
for segNum, seg := range validSegments {
require.Equal(t, segmentverify.Status{Found: nodeCount, NotFound: 0, Retry: 0}, seg.Status, segNum)
}
})
// segment not found
alias0, ok := aliasMap.Alias(planet.StorageNodes[0].ID())
@ -138,7 +149,7 @@ func TestVerifier(t *testing.T) {
var count int
t.Run("segment not found using download method", func(t *testing.T) {
// for older node version
count, err = service.Verify(ctx, alias0, planet.StorageNodes[0].NodeURL(), olderNodeVersion,
count, err = verifier.Verify(ctx, alias0, planet.StorageNodes[0].NodeURL(), olderNodeVersion,
[]*segmentverify.Segment{validSegment0, missingSegment, validSegment1}, true)
require.NoError(t, err)
require.Equal(t, 3, count)
@ -153,7 +164,7 @@ func TestVerifier(t *testing.T) {
validSegment1.Status = segmentverify.Status{Retry: 1}
t.Run("segment not found using exists method", func(t *testing.T) {
count, err = service.Verify(ctx, alias0, planet.StorageNodes[0].NodeURL(), newerNodeVersion,
count, err = verifier.Verify(ctx, alias0, planet.StorageNodes[0].NodeURL(), newerNodeVersion,
[]*segmentverify.Segment{validSegment0, missingSegment, validSegment1}, true)
require.NoError(t, err)
require.Equal(t, 3, count)
@ -162,31 +173,34 @@ func TestVerifier(t *testing.T) {
require.Equal(t, segmentverify.Status{Found: 1}, validSegment1.Status)
})
resetStatuses()
t.Run("test throttling", func(t *testing.T) {
// Test throttling
verifyStart := time.Now()
const throttleN = 5
count, err = service.Verify(ctx, alias0, planet.StorageNodes[0].NodeURL(), olderNodeVersion, validSegments[:throttleN], false)
count, err = verifier.Verify(ctx, alias0, planet.StorageNodes[0].NodeURL(), olderNodeVersion, validSegments[:throttleN], false)
require.NoError(t, err)
verifyDuration := time.Since(verifyStart)
require.Equal(t, throttleN, count)
require.Greater(t, verifyDuration, config.RequestThrottle*(throttleN-1))
})
// TODO: test download timeout
resetStatuses()
// TODO: test download timeout
t.Run("Node offline", func(t *testing.T) {
err = planet.StopNodeAndUpdate(ctx, planet.StorageNodes[0])
require.NoError(t, err)
// for older node version
count, err = service.Verify(ctx, alias0, planet.StorageNodes[0].NodeURL(), olderNodeVersion, validSegments, true)
count, err = verifier.Verify(ctx, alias0, planet.StorageNodes[0].NodeURL(), olderNodeVersion, validSegments, true)
require.Error(t, err)
require.Equal(t, 0, count)
require.True(t, segmentverify.ErrNodeOffline.Has(err))
// for node version with Exists endpoint
count, err = service.Verify(ctx, alias0, planet.StorageNodes[0].NodeURL(), newerNodeVersion, validSegments, true)
count, err = verifier.Verify(ctx, alias0, planet.StorageNodes[0].NodeURL(), newerNodeVersion, validSegments, true)
require.Error(t, err)
require.Equal(t, 0, count)
require.True(t, segmentverify.ErrNodeOffline.Has(err))