cmd/tools/segment-verify: fix read-csv subcommand
We were reading in a segment's stream ID and position, and assuming that was enough for the downloader. But of course, the downloader needs AliasPieces filled in. So now we request each segment record from the metabase and fill in the VerifySegment records entirely. Change-Id: If85236388eb99a65e2cb739aa976bd49ee2b2c89
This commit is contained in:
parent
8850fde9f5
commit
740cb0d9c7
@ -212,7 +212,7 @@ func (s *SegmentCSVSource) Close() error {
|
||||
|
||||
// Next returns the next segment from the CSV file. If there are no more, it
|
||||
// returns (nil, io.EOF).
|
||||
func (s *SegmentCSVSource) Next() (*Segment, error) {
|
||||
func (s *SegmentCSVSource) Next() (*metabase.GetSegmentByPosition, error) {
|
||||
entry, err := s.csvReader.Read()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -225,10 +225,8 @@ func (s *SegmentCSVSource) Next() (*Segment, error) {
|
||||
if err != nil {
|
||||
return nil, Error.New("position encoding: %w", err)
|
||||
}
|
||||
return &Segment{
|
||||
VerifySegment: metabase.VerifySegment{
|
||||
StreamID: segmentUUID,
|
||||
Position: metabase.SegmentPositionFromEncoded(positionEncoded),
|
||||
},
|
||||
return &metabase.GetSegmentByPosition{
|
||||
StreamID: segmentUUID,
|
||||
Position: metabase.SegmentPositionFromEncoded(positionEncoded),
|
||||
}, nil
|
||||
}
|
||||
|
@ -213,23 +213,45 @@ func verifySegments(cmd *cobra.Command, args []string) error {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
var (
|
||||
verifyConfig VerifierConfig
|
||||
serviceConfig ServiceConfig
|
||||
commandFunc func(ctx context.Context, service *Service) error
|
||||
)
|
||||
switch cmd.Name() {
|
||||
case "range":
|
||||
verifyConfig = rangeCfg.Verify
|
||||
serviceConfig = rangeCfg.Service
|
||||
commandFunc = func(ctx context.Context, service *Service) error {
|
||||
return verifySegmentsRange(ctx, service, rangeCfg)
|
||||
}
|
||||
case "buckets":
|
||||
verifyConfig = bucketsCfg.Verify
|
||||
serviceConfig = bucketsCfg.Service
|
||||
commandFunc = func(ctx context.Context, service *Service) error {
|
||||
return verifySegmentsBuckets(ctx, service, bucketsCfg)
|
||||
}
|
||||
case "read-csv":
|
||||
verifyConfig = readCSVCfg.Verify
|
||||
serviceConfig = readCSVCfg.Service
|
||||
commandFunc = func(ctx context.Context, service *Service) error {
|
||||
return verifySegmentsCSV(ctx, service, readCSVCfg)
|
||||
}
|
||||
default:
|
||||
return errors.New("unknown command: " + cmd.Name())
|
||||
}
|
||||
|
||||
// setup verifier
|
||||
verifier := NewVerifier(log.Named("verifier"), dialer, ordersService, rangeCfg.Verify)
|
||||
service, err := NewService(log.Named("service"), metabaseDB, verifier, overlay, rangeCfg.Service)
|
||||
verifier := NewVerifier(log.Named("verifier"), dialer, ordersService, verifyConfig)
|
||||
service, err := NewService(log.Named("service"), metabaseDB, verifier, overlay, serviceConfig)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
verifier.reportPiece = service.problemPieces.Write
|
||||
defer func() { err = errs.Combine(err, service.Close()) }()
|
||||
switch cmd.Name() {
|
||||
case "range":
|
||||
return verifySegmentsRange(ctx, service, rangeCfg)
|
||||
case "buckets":
|
||||
return verifySegmentsBuckets(ctx, service, bucketsCfg)
|
||||
case "read-csv":
|
||||
return verifySegmentsCSV(ctx, service, readCSVCfg)
|
||||
}
|
||||
return errors.New("unknown commnand: " + cmd.Name())
|
||||
|
||||
log.Debug("starting", zap.Any("config", service.config), zap.String("command", cmd.Name()))
|
||||
return commandFunc(ctx, service)
|
||||
}
|
||||
|
||||
func verifySegmentsRange(ctx context.Context, service *Service, rangeCfg RangeConfig) error {
|
||||
|
@ -444,12 +444,21 @@ func (service *Service) ProcessBuckets(ctx context.Context, buckets []metabase.B
|
||||
func (service *Service) ProcessSegmentsFromCSV(ctx context.Context, segmentSource *SegmentCSVSource) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
segments := make([]*Segment, 0, service.config.BatchSize)
|
||||
aliasMap, err := service.metabase.LatestNodesAliasMap(ctx)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
service.aliasMap = aliasMap
|
||||
service.log.Debug("got aliasMap", zap.Int("length", service.aliasMap.Size()))
|
||||
|
||||
streamIDs := make([]uuid.UUID, 0, service.config.BatchSize)
|
||||
exhausted := false
|
||||
segmentsData := make([]Segment, service.config.BatchSize)
|
||||
segments := make([]*Segment, service.config.BatchSize)
|
||||
for {
|
||||
segments = segments[:0]
|
||||
streamIDs = streamIDs[:0]
|
||||
for n := 0; n < service.config.BatchSize; n++ {
|
||||
seg, err := segmentSource.Next()
|
||||
streamIDAndPosition, err := segmentSource.Next()
|
||||
if err != nil {
|
||||
if errors.Is(err, io.EOF) {
|
||||
exhausted = true
|
||||
@ -457,11 +466,39 @@ func (service *Service) ProcessSegmentsFromCSV(ctx context.Context, segmentSourc
|
||||
}
|
||||
return Error.New("could not read csv: %w", err)
|
||||
}
|
||||
segments = append(segments, seg)
|
||||
streamIDs = append(streamIDs, streamIDAndPosition.StreamID)
|
||||
}
|
||||
if err := service.ProcessSegments(ctx, segments); err != nil {
|
||||
return Error.Wrap(err)
|
||||
var cursorStreamID uuid.UUID
|
||||
var cursorPosition metabase.SegmentPosition
|
||||
for {
|
||||
verifySegments, err := service.metabase.ListVerifySegments(ctx, metabase.ListVerifySegments{
|
||||
CursorStreamID: cursorStreamID,
|
||||
CursorPosition: cursorPosition,
|
||||
StreamIDs: streamIDs,
|
||||
Limit: service.config.BatchSize,
|
||||
AsOfSystemInterval: service.config.AsOfSystemInterval,
|
||||
})
|
||||
segmentsData = segmentsData[:len(verifySegments.Segments)]
|
||||
segments = segments[:len(verifySegments.Segments)]
|
||||
if err != nil {
|
||||
return Error.New("could not query metabase: %w", err)
|
||||
}
|
||||
for n, verifySegment := range verifySegments.Segments {
|
||||
segmentsData[n].VerifySegment = verifySegment
|
||||
segments[n] = &segmentsData[n]
|
||||
}
|
||||
|
||||
if err := service.ProcessSegments(ctx, segments); err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
if len(verifySegments.Segments) < service.config.BatchSize {
|
||||
break
|
||||
}
|
||||
last := &verifySegments.Segments[len(verifySegments.Segments)-1]
|
||||
cursorStreamID, cursorPosition = last.StreamID, last.Position
|
||||
}
|
||||
|
||||
if exhausted {
|
||||
return nil
|
||||
}
|
||||
|
@ -333,6 +333,10 @@ func (service *NodeVerifier) verifySegmentsWithExists(ctx context.Context, clien
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
service.log.Debug("Exists response",
|
||||
zap.Int("request-count", len(pieceIds)),
|
||||
zap.Int("missing-count", len(response.Missing)))
|
||||
|
||||
if len(response.Missing) == 0 {
|
||||
for i := range segments {
|
||||
segments[i].Status.MarkFound()
|
||||
|
Loading…
Reference in New Issue
Block a user