2022-09-07 16:30:33 +01:00
|
|
|
// Copyright (C) 2022 Storj Labs, Inc.
|
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
|
|
|
package main
|
|
|
|
|
|
|
|
import (
|
2022-09-13 14:28:26 +01:00
|
|
|
"context"
|
2023-01-18 17:34:19 +00:00
|
|
|
"errors"
|
2022-09-13 14:28:26 +01:00
|
|
|
"fmt"
|
2023-01-18 17:34:19 +00:00
|
|
|
"io"
|
2022-10-11 12:39:08 +01:00
|
|
|
"os"
|
2022-09-19 14:57:48 +01:00
|
|
|
"strings"
|
2022-09-09 09:16:22 +01:00
|
|
|
"sync/atomic"
|
2022-09-16 16:35:19 +01:00
|
|
|
"time"
|
2022-09-09 09:16:22 +01:00
|
|
|
|
2022-09-14 15:15:58 +01:00
|
|
|
"github.com/spacemonkeygo/monkit/v3"
|
2022-09-13 14:28:26 +01:00
|
|
|
"github.com/zeebo/errs"
|
2022-09-09 09:16:22 +01:00
|
|
|
"go.uber.org/zap"
|
|
|
|
|
2022-09-13 14:28:26 +01:00
|
|
|
"storj.io/common/storj"
|
2022-09-07 16:30:33 +01:00
|
|
|
"storj.io/common/uuid"
|
2022-12-13 22:52:36 +00:00
|
|
|
"storj.io/storj/satellite/audit"
|
2022-09-07 16:30:33 +01:00
|
|
|
"storj.io/storj/satellite/metabase"
|
2023-06-30 11:35:07 +01:00
|
|
|
"storj.io/storj/satellite/nodeselection/uploadselection"
|
2022-09-16 16:35:19 +01:00
|
|
|
"storj.io/storj/satellite/overlay"
|
2022-09-07 16:30:33 +01:00
|
|
|
)
|
|
|
|
|
2022-09-14 15:15:58 +01:00
|
|
|
var mon = monkit.Package()
|
|
|
|
|
2022-09-13 14:28:26 +01:00
|
|
|
// Error is global error class.
|
|
|
|
var Error = errs.Class("segment-verify")
|
|
|
|
|
|
|
|
// Metabase defines implementation dependencies we need from metabase.
|
|
|
|
type Metabase interface {
|
2022-10-05 14:17:19 +01:00
|
|
|
LatestNodesAliasMap(ctx context.Context) (*metabase.NodeAliasMap, error)
|
2022-09-13 14:28:26 +01:00
|
|
|
GetSegmentByPosition(ctx context.Context, opts metabase.GetSegmentByPosition) (segment metabase.Segment, err error)
|
|
|
|
ListVerifySegments(ctx context.Context, opts metabase.ListVerifySegments) (result metabase.ListVerifySegmentsResult, err error)
|
2022-12-03 18:12:05 +00:00
|
|
|
ListBucketsStreamIDs(ctx context.Context, opts metabase.ListBucketsStreamIDs) (result metabase.ListBucketsStreamIDsResult, err error)
|
2022-09-13 14:28:26 +01:00
|
|
|
}
|
|
|
|
|
2022-09-12 20:22:01 +01:00
|
|
|
// Verifier verifies a batch of segments.
|
|
|
|
type Verifier interface {
|
2022-12-21 15:14:53 +00:00
|
|
|
Verify(ctx context.Context, nodeAlias metabase.NodeAlias, target storj.NodeURL, targetVersion string, segments []*Segment, ignoreThrottle bool) (verifiedCount int, err error)
|
2022-09-12 20:22:01 +01:00
|
|
|
}
|
|
|
|
|
2022-09-15 15:23:10 +01:00
|
|
|
// Overlay is used to fetch information about nodes.
|
|
|
|
type Overlay interface {
|
2022-09-16 16:35:19 +01:00
|
|
|
// Get looks up the node by nodeID
|
|
|
|
Get(ctx context.Context, nodeID storj.NodeID) (*overlay.NodeDossier, error)
|
2023-06-30 11:35:07 +01:00
|
|
|
SelectAllStorageNodesDownload(ctx context.Context, onlineWindow time.Duration, asOf overlay.AsOfSystemTimeConfig) ([]*uploadselection.SelectedNode, error)
|
2022-09-15 15:23:10 +01:00
|
|
|
}
|
|
|
|
|
2022-09-13 14:28:26 +01:00
|
|
|
// SegmentWriter allows writing segments to some output.
|
|
|
|
type SegmentWriter interface {
|
|
|
|
Write(ctx context.Context, segments []*Segment) error
|
2022-09-16 16:35:19 +01:00
|
|
|
Close() error
|
2022-09-13 14:28:26 +01:00
|
|
|
}
|
|
|
|
|
2022-09-15 15:23:10 +01:00
|
|
|
// ServiceConfig contains configurable options for Service.
|
|
|
|
type ServiceConfig struct {
|
2022-09-19 14:57:48 +01:00
|
|
|
NotFoundPath string `help:"segments not found on storage nodes" default:"segments-not-found.csv"`
|
|
|
|
RetryPath string `help:"segments unable to check against satellite" default:"segments-retry.csv"`
|
2022-12-13 22:52:36 +00:00
|
|
|
ProblemPiecesPath string `help:"pieces that could not be fetched successfully" default:"problem-pieces.csv"`
|
2022-09-19 14:57:48 +01:00
|
|
|
PriorityNodesPath string `help:"list of priority node ID-s" default:""`
|
2022-10-10 18:14:38 +01:00
|
|
|
IgnoreNodesPath string `help:"list of nodes to ignore" default:""`
|
2022-09-15 15:23:10 +01:00
|
|
|
|
2022-12-13 22:52:36 +00:00
|
|
|
Check int `help:"how many storagenodes to query per segment (if 0, query all)" default:"3"`
|
2022-09-15 15:23:10 +01:00
|
|
|
BatchSize int `help:"number of segments to process per batch" default:"10000"`
|
|
|
|
Concurrency int `help:"number of concurrent verifiers" default:"1000"`
|
2022-12-13 22:52:36 +00:00
|
|
|
MaxOffline int `help:"maximum number of offline in a sequence (if 0, no limit)" default:"2"`
|
2022-09-16 16:35:19 +01:00
|
|
|
|
2023-01-02 19:37:21 +00:00
|
|
|
OfflineStatusCacheTime time.Duration `help:"how long to cache a \"node offline\" status" default:"30m"`
|
|
|
|
|
2022-09-16 16:35:19 +01:00
|
|
|
AsOfSystemInterval time.Duration `help:"as of system interval" releaseDefault:"-5m" devDefault:"-1us" testDefault:"-1us"`
|
2022-09-15 15:23:10 +01:00
|
|
|
}
|
|
|
|
|
2022-12-13 22:52:36 +00:00
|
|
|
type pieceReporterFunc func(
|
|
|
|
ctx context.Context,
|
|
|
|
segment *metabase.VerifySegment,
|
|
|
|
nodeID storj.NodeID,
|
|
|
|
pieceNum int,
|
|
|
|
outcome audit.Outcome) error
|
|
|
|
|
2022-09-07 16:30:33 +01:00
|
|
|
// Service implements segment verification logic.
|
|
|
|
type Service struct {
|
2022-09-15 15:23:10 +01:00
|
|
|
log *zap.Logger
|
|
|
|
config ServiceConfig
|
2022-09-13 14:28:26 +01:00
|
|
|
|
2023-01-05 17:32:18 +00:00
|
|
|
notFound SegmentWriter
|
|
|
|
retry SegmentWriter
|
|
|
|
problemPieces *pieceCSVWriter
|
2022-09-09 09:16:22 +01:00
|
|
|
|
2022-09-12 20:22:01 +01:00
|
|
|
metabase Metabase
|
|
|
|
verifier Verifier
|
2022-09-15 15:23:10 +01:00
|
|
|
overlay Overlay
|
2022-09-12 20:22:01 +01:00
|
|
|
|
2022-12-21 15:14:53 +00:00
|
|
|
aliasMap *metabase.NodeAliasMap
|
|
|
|
aliasToNodeURL map[metabase.NodeAlias]storj.NodeURL
|
|
|
|
priorityNodes NodeAliasSet
|
2023-01-02 19:37:21 +00:00
|
|
|
ignoreNodes NodeAliasSet
|
|
|
|
offlineNodes *nodeAliasExpiringSet
|
2022-12-21 15:14:53 +00:00
|
|
|
offlineCount map[metabase.NodeAlias]int
|
|
|
|
bucketList BucketList
|
|
|
|
nodesVersionMap map[metabase.NodeAlias]string
|
2022-09-07 16:30:33 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// NewService returns a new service for verifying segments.
|
2022-09-16 16:35:19 +01:00
|
|
|
func NewService(log *zap.Logger, metabaseDB Metabase, verifier Verifier, overlay Overlay, config ServiceConfig) (*Service, error) {
|
|
|
|
notFound, err := NewCSVWriter(config.NotFoundPath)
|
|
|
|
if err != nil {
|
|
|
|
return nil, Error.Wrap(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
retry, err := NewCSVWriter(config.RetryPath)
|
|
|
|
if err != nil {
|
|
|
|
return nil, errs.Combine(Error.Wrap(err), notFound.Close())
|
|
|
|
}
|
|
|
|
|
2022-12-13 22:52:36 +00:00
|
|
|
problemPieces, err := newPieceCSVWriter(config.ProblemPiecesPath)
|
|
|
|
if err != nil {
|
|
|
|
return nil, errs.Combine(Error.Wrap(err), retry.Close(), notFound.Close())
|
|
|
|
}
|
|
|
|
|
2022-09-07 16:30:33 +01:00
|
|
|
return &Service{
|
2022-09-15 15:23:10 +01:00
|
|
|
log: log,
|
|
|
|
config: config,
|
2022-09-12 20:22:01 +01:00
|
|
|
|
2023-01-05 17:32:18 +00:00
|
|
|
notFound: notFound,
|
|
|
|
retry: retry,
|
|
|
|
problemPieces: problemPieces,
|
2022-09-16 16:35:19 +01:00
|
|
|
|
|
|
|
metabase: metabaseDB,
|
2022-09-12 20:22:01 +01:00
|
|
|
verifier: verifier,
|
2022-09-15 15:23:10 +01:00
|
|
|
overlay: overlay,
|
|
|
|
|
2022-12-21 15:14:53 +00:00
|
|
|
aliasToNodeURL: map[metabase.NodeAlias]storj.NodeURL{},
|
|
|
|
priorityNodes: NodeAliasSet{},
|
2023-01-02 19:37:21 +00:00
|
|
|
ignoreNodes: NodeAliasSet{},
|
|
|
|
offlineNodes: newNodeAliasExpiringSet(config.OfflineStatusCacheTime),
|
2022-12-21 15:14:53 +00:00
|
|
|
offlineCount: map[metabase.NodeAlias]int{},
|
|
|
|
nodesVersionMap: map[metabase.NodeAlias]string{},
|
2022-09-16 16:35:19 +01:00
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Close closes the outputs from the service.
|
|
|
|
func (service *Service) Close() error {
|
|
|
|
return Error.Wrap(errs.Combine(
|
|
|
|
service.notFound.Close(),
|
|
|
|
service.retry.Close(),
|
2023-01-05 17:32:18 +00:00
|
|
|
service.problemPieces.Close(),
|
2022-09-16 16:35:19 +01:00
|
|
|
))
|
|
|
|
}
|
|
|
|
|
|
|
|
// loadOnlineNodes loads the list of online nodes.
|
|
|
|
func (service *Service) loadOnlineNodes(ctx context.Context) (err error) {
|
|
|
|
interval := overlay.AsOfSystemTimeConfig{
|
|
|
|
Enabled: service.config.AsOfSystemInterval != 0,
|
|
|
|
DefaultInterval: service.config.AsOfSystemInterval,
|
2022-09-07 16:30:33 +01:00
|
|
|
}
|
2022-09-16 16:35:19 +01:00
|
|
|
|
|
|
|
// should this use some other methods?
|
|
|
|
nodes, err := service.overlay.SelectAllStorageNodesDownload(ctx, time.Hour, interval)
|
|
|
|
if err != nil {
|
|
|
|
return Error.Wrap(err)
|
|
|
|
}
|
|
|
|
|
2022-10-05 14:17:19 +01:00
|
|
|
for _, node := range nodes {
|
2022-10-05 15:12:51 +01:00
|
|
|
alias, ok := service.aliasMap.Alias(node.ID)
|
2022-10-05 14:17:19 +01:00
|
|
|
if !ok {
|
|
|
|
// This means the node does not hold any data in metabase.
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2022-10-12 13:14:09 +01:00
|
|
|
addr := node.Address.Address
|
|
|
|
if node.LastIPPort != "" {
|
|
|
|
addr = node.LastIPPort
|
|
|
|
}
|
|
|
|
|
2022-09-16 16:35:19 +01:00
|
|
|
service.aliasToNodeURL[alias] = storj.NodeURL{
|
2022-10-05 14:17:19 +01:00
|
|
|
ID: node.ID,
|
2022-10-12 13:14:09 +01:00
|
|
|
Address: addr,
|
2022-09-16 16:35:19 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
2022-09-07 16:30:33 +01:00
|
|
|
}
|
|
|
|
|
2022-09-19 14:57:48 +01:00
|
|
|
// loadPriorityNodes loads the list of priority nodes.
|
|
|
|
func (service *Service) loadPriorityNodes(ctx context.Context) (err error) {
|
|
|
|
if service.config.PriorityNodesPath == "" {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2022-10-10 18:14:38 +01:00
|
|
|
service.priorityNodes, err = service.parseNodeFile(service.config.PriorityNodesPath)
|
|
|
|
return Error.Wrap(err)
|
|
|
|
}
|
|
|
|
|
2023-01-02 19:37:21 +00:00
|
|
|
// applyIgnoreNodes loads the list of nodes to ignore completely and modifies priority nodes.
|
2022-10-10 18:14:38 +01:00
|
|
|
func (service *Service) applyIgnoreNodes(ctx context.Context) (err error) {
|
|
|
|
if service.config.IgnoreNodesPath == "" {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2023-01-02 19:37:21 +00:00
|
|
|
service.ignoreNodes, err = service.parseNodeFile(service.config.IgnoreNodesPath)
|
2022-09-19 14:57:48 +01:00
|
|
|
if err != nil {
|
2022-10-10 18:14:38 +01:00
|
|
|
return Error.Wrap(err)
|
|
|
|
}
|
|
|
|
|
2023-01-02 19:37:21 +00:00
|
|
|
service.priorityNodes.RemoveAll(service.ignoreNodes)
|
2022-10-10 18:14:38 +01:00
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// parseNodeFile parses a file containing node ID-s.
|
|
|
|
func (service *Service) parseNodeFile(path string) (NodeAliasSet, error) {
|
|
|
|
set := NodeAliasSet{}
|
2022-10-11 12:39:08 +01:00
|
|
|
data, err := os.ReadFile(path)
|
2022-10-10 18:14:38 +01:00
|
|
|
if err != nil {
|
|
|
|
return set, Error.New("unable to read nodes file: %w", err)
|
2022-09-19 14:57:48 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
for _, line := range strings.Split(string(data), "\n") {
|
|
|
|
line = strings.TrimSpace(line)
|
|
|
|
if line == "" || strings.HasPrefix(line, "#") {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
nodeID, err := storj.NodeIDFromString(line)
|
|
|
|
if err != nil {
|
2022-10-10 18:14:38 +01:00
|
|
|
return set, Error.Wrap(err)
|
2022-09-19 14:57:48 +01:00
|
|
|
}
|
|
|
|
|
2022-10-05 15:12:51 +01:00
|
|
|
alias, ok := service.aliasMap.Alias(nodeID)
|
2022-10-05 14:17:19 +01:00
|
|
|
if !ok {
|
2022-10-10 18:14:38 +01:00
|
|
|
service.log.Info("node ID not used", zap.Stringer("node id", nodeID), zap.Error(err))
|
2022-09-19 14:57:48 +01:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2022-10-10 18:14:38 +01:00
|
|
|
set.Add(alias)
|
2022-09-19 14:57:48 +01:00
|
|
|
}
|
|
|
|
|
2022-10-10 18:14:38 +01:00
|
|
|
return set, nil
|
2022-09-19 14:57:48 +01:00
|
|
|
}
|
|
|
|
|
2022-12-03 18:12:05 +00:00
|
|
|
// BucketList contains a list of buckets to check segments from.
|
|
|
|
type BucketList struct {
|
|
|
|
Buckets []metabase.BucketLocation
|
|
|
|
}
|
|
|
|
|
|
|
|
// Add adds a bucket to the bucket list.
|
|
|
|
func (list *BucketList) Add(projectID uuid.UUID, bucketName string) {
|
|
|
|
list.Buckets = append(list.Buckets, metabase.BucketLocation{
|
|
|
|
ProjectID: projectID,
|
|
|
|
BucketName: bucketName,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2022-09-15 15:23:10 +01:00
|
|
|
// ProcessRange processes segments between low and high uuid.UUID with the specified batchSize.
|
|
|
|
func (service *Service) ProcessRange(ctx context.Context, low, high uuid.UUID) (err error) {
|
2022-09-14 15:15:58 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
2022-10-05 15:12:51 +01:00
|
|
|
aliasMap, err := service.metabase.LatestNodesAliasMap(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return Error.Wrap(err)
|
|
|
|
}
|
|
|
|
service.aliasMap = aliasMap
|
|
|
|
|
2022-09-16 16:35:19 +01:00
|
|
|
err = service.loadOnlineNodes(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return Error.Wrap(err)
|
|
|
|
}
|
|
|
|
|
2022-09-19 14:57:48 +01:00
|
|
|
err = service.loadPriorityNodes(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return Error.Wrap(err)
|
|
|
|
}
|
|
|
|
|
2022-10-10 18:14:38 +01:00
|
|
|
err = service.applyIgnoreNodes(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return Error.Wrap(err)
|
|
|
|
}
|
|
|
|
|
2022-09-13 14:28:26 +01:00
|
|
|
cursorStreamID := low
|
2022-09-16 16:35:19 +01:00
|
|
|
var cursorPosition metabase.SegmentPosition
|
2022-09-13 14:28:26 +01:00
|
|
|
if !low.IsZero() {
|
|
|
|
cursorStreamID = uuidBefore(low)
|
2022-09-16 16:35:19 +01:00
|
|
|
cursorPosition = metabase.SegmentPosition{Part: 0xFFFFFFFF, Index: 0xFFFFFFFF}
|
2022-09-13 14:28:26 +01:00
|
|
|
}
|
|
|
|
|
2022-10-10 13:58:03 +01:00
|
|
|
var progress int64
|
2022-09-13 14:28:26 +01:00
|
|
|
for {
|
|
|
|
result, err := service.metabase.ListVerifySegments(ctx, metabase.ListVerifySegments{
|
|
|
|
CursorStreamID: cursorStreamID,
|
|
|
|
CursorPosition: cursorPosition,
|
2022-09-15 15:23:10 +01:00
|
|
|
Limit: service.config.BatchSize,
|
2022-09-13 14:28:26 +01:00
|
|
|
|
2022-09-16 16:35:19 +01:00
|
|
|
AsOfSystemInterval: service.config.AsOfSystemInterval,
|
2022-09-13 14:28:26 +01:00
|
|
|
})
|
|
|
|
if err != nil {
|
2022-09-14 15:15:58 +01:00
|
|
|
return Error.Wrap(err)
|
2022-09-13 14:28:26 +01:00
|
|
|
}
|
|
|
|
verifySegments := result.Segments
|
|
|
|
result.Segments = nil
|
|
|
|
|
|
|
|
// drop any segment that's equal or beyond "high".
|
|
|
|
for len(verifySegments) > 0 && !verifySegments[len(verifySegments)-1].StreamID.Less(high) {
|
|
|
|
verifySegments = verifySegments[:len(verifySegments)-1]
|
|
|
|
}
|
|
|
|
|
|
|
|
// All done?
|
|
|
|
if len(verifySegments) == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2022-09-21 17:30:38 +01:00
|
|
|
last := &verifySegments[len(verifySegments)-1]
|
|
|
|
cursorStreamID, cursorPosition = last.StreamID, last.Position
|
|
|
|
|
2022-09-13 14:28:26 +01:00
|
|
|
// Convert to struct that contains the status.
|
|
|
|
segmentsData := make([]Segment, len(verifySegments))
|
|
|
|
segments := make([]*Segment, len(verifySegments))
|
|
|
|
for i := range segments {
|
|
|
|
segmentsData[i].VerifySegment = verifySegments[i]
|
|
|
|
segments[i] = &segmentsData[i]
|
|
|
|
}
|
|
|
|
|
2022-10-10 13:58:03 +01:00
|
|
|
service.log.Info("processing segments",
|
|
|
|
zap.Int64("progress", progress),
|
|
|
|
zap.Int("count", len(segments)),
|
|
|
|
zap.Stringer("first", segments[0].StreamID),
|
|
|
|
zap.Stringer("last", segments[len(segments)-1].StreamID),
|
|
|
|
)
|
|
|
|
progress += int64(len(segments))
|
|
|
|
|
2022-09-13 14:28:26 +01:00
|
|
|
// Process the data.
|
|
|
|
err = service.ProcessSegments(ctx, segments)
|
|
|
|
if err != nil {
|
2022-09-14 15:15:58 +01:00
|
|
|
return Error.Wrap(err)
|
2022-09-13 14:28:26 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-12-03 18:12:05 +00:00
|
|
|
// ProcessBuckets processes segments in buckets with the specified batchSize.
|
|
|
|
func (service *Service) ProcessBuckets(ctx context.Context, buckets []metabase.BucketLocation) (err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
|
|
|
aliasMap, err := service.metabase.LatestNodesAliasMap(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return Error.Wrap(err)
|
|
|
|
}
|
|
|
|
service.aliasMap = aliasMap
|
|
|
|
|
|
|
|
err = service.loadOnlineNodes(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return Error.Wrap(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
err = service.loadPriorityNodes(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return Error.Wrap(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
err = service.applyIgnoreNodes(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return Error.Wrap(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
var progress int64
|
|
|
|
|
|
|
|
cursorBucket := metabase.BucketLocation{}
|
|
|
|
cursorStreamID := uuid.UUID{}
|
|
|
|
cursorPosition := metabase.SegmentPosition{} // Convert to struct that contains the status.
|
|
|
|
segmentsData := make([]Segment, service.config.BatchSize)
|
|
|
|
segments := make([]*Segment, service.config.BatchSize)
|
|
|
|
for {
|
|
|
|
|
|
|
|
listStreamIDsResult, err := service.metabase.ListBucketsStreamIDs(ctx, metabase.ListBucketsStreamIDs{
|
|
|
|
BucketList: metabase.ListVerifyBucketList{
|
|
|
|
Buckets: service.bucketList.Buckets,
|
|
|
|
},
|
|
|
|
CursorBucket: cursorBucket,
|
|
|
|
CursorStreamID: cursorStreamID,
|
|
|
|
Limit: service.config.BatchSize,
|
|
|
|
|
|
|
|
AsOfSystemInterval: service.config.AsOfSystemInterval,
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return Error.Wrap(err)
|
|
|
|
}
|
|
|
|
for {
|
|
|
|
// TODO loop for this
|
|
|
|
result, err := service.metabase.ListVerifySegments(ctx, metabase.ListVerifySegments{
|
|
|
|
StreamIDs: listStreamIDsResult.StreamIDs,
|
|
|
|
CursorStreamID: cursorStreamID,
|
|
|
|
CursorPosition: cursorPosition,
|
|
|
|
Limit: service.config.BatchSize,
|
|
|
|
|
|
|
|
AsOfSystemInterval: service.config.AsOfSystemInterval,
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return Error.Wrap(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// All done?
|
|
|
|
if len(result.Segments) == 0 {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
|
|
|
segmentsData = segmentsData[:len(result.Segments)]
|
|
|
|
segments = segments[:len(result.Segments)]
|
|
|
|
|
|
|
|
last := &result.Segments[len(result.Segments)-1]
|
|
|
|
cursorStreamID, cursorPosition = last.StreamID, last.Position
|
|
|
|
|
|
|
|
for i := range segments {
|
|
|
|
segmentsData[i].VerifySegment = result.Segments[i]
|
|
|
|
segments[i] = &segmentsData[i]
|
|
|
|
}
|
|
|
|
|
|
|
|
service.log.Info("processing segments",
|
|
|
|
zap.Int64("progress", progress),
|
|
|
|
zap.Int("count", len(segments)),
|
|
|
|
zap.Stringer("first", segments[0].StreamID),
|
|
|
|
zap.Stringer("last", segments[len(segments)-1].StreamID),
|
|
|
|
)
|
|
|
|
progress += int64(len(segments))
|
|
|
|
|
|
|
|
// Process the data.
|
|
|
|
err = service.ProcessSegments(ctx, segments)
|
|
|
|
if err != nil {
|
|
|
|
return Error.Wrap(err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(listStreamIDsResult.StreamIDs) == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
cursorBucket = listStreamIDsResult.LastBucket
|
|
|
|
// TODO remove processed project_ids and bucket_names?
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-01-18 17:34:19 +00:00
|
|
|
// ProcessSegmentsFromCSV processes all segments from the specified CSV source, in
|
|
|
|
// batches of config.BatchSize.
|
|
|
|
func (service *Service) ProcessSegmentsFromCSV(ctx context.Context, segmentSource *SegmentCSVSource) (err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
2023-01-19 23:44:06 +00:00
|
|
|
aliasMap, err := service.metabase.LatestNodesAliasMap(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return Error.Wrap(err)
|
|
|
|
}
|
|
|
|
service.aliasMap = aliasMap
|
|
|
|
service.log.Debug("got aliasMap", zap.Int("length", service.aliasMap.Size()))
|
|
|
|
|
|
|
|
streamIDs := make([]uuid.UUID, 0, service.config.BatchSize)
|
2023-01-18 17:34:19 +00:00
|
|
|
exhausted := false
|
2023-01-19 23:44:06 +00:00
|
|
|
segmentsData := make([]Segment, service.config.BatchSize)
|
|
|
|
segments := make([]*Segment, service.config.BatchSize)
|
2023-01-18 17:34:19 +00:00
|
|
|
for {
|
2023-01-19 23:44:06 +00:00
|
|
|
streamIDs = streamIDs[:0]
|
2023-01-18 17:34:19 +00:00
|
|
|
for n := 0; n < service.config.BatchSize; n++ {
|
2023-01-19 23:44:06 +00:00
|
|
|
streamIDAndPosition, err := segmentSource.Next()
|
2023-01-18 17:34:19 +00:00
|
|
|
if err != nil {
|
|
|
|
if errors.Is(err, io.EOF) {
|
|
|
|
exhausted = true
|
|
|
|
break
|
|
|
|
}
|
|
|
|
return Error.New("could not read csv: %w", err)
|
|
|
|
}
|
2023-01-19 23:44:06 +00:00
|
|
|
streamIDs = append(streamIDs, streamIDAndPosition.StreamID)
|
2023-01-18 17:34:19 +00:00
|
|
|
}
|
2023-01-19 23:44:06 +00:00
|
|
|
var cursorStreamID uuid.UUID
|
|
|
|
var cursorPosition metabase.SegmentPosition
|
|
|
|
for {
|
|
|
|
verifySegments, err := service.metabase.ListVerifySegments(ctx, metabase.ListVerifySegments{
|
|
|
|
CursorStreamID: cursorStreamID,
|
|
|
|
CursorPosition: cursorPosition,
|
|
|
|
StreamIDs: streamIDs,
|
|
|
|
Limit: service.config.BatchSize,
|
|
|
|
AsOfSystemInterval: service.config.AsOfSystemInterval,
|
|
|
|
})
|
|
|
|
segmentsData = segmentsData[:len(verifySegments.Segments)]
|
|
|
|
segments = segments[:len(verifySegments.Segments)]
|
|
|
|
if err != nil {
|
|
|
|
return Error.New("could not query metabase: %w", err)
|
|
|
|
}
|
|
|
|
for n, verifySegment := range verifySegments.Segments {
|
|
|
|
segmentsData[n].VerifySegment = verifySegment
|
|
|
|
segments[n] = &segmentsData[n]
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := service.ProcessSegments(ctx, segments); err != nil {
|
|
|
|
return Error.Wrap(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(verifySegments.Segments) < service.config.BatchSize {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
last := &verifySegments.Segments[len(verifySegments.Segments)-1]
|
|
|
|
cursorStreamID, cursorPosition = last.StreamID, last.Position
|
2023-01-18 17:34:19 +00:00
|
|
|
}
|
2023-01-19 23:44:06 +00:00
|
|
|
|
2023-01-18 17:34:19 +00:00
|
|
|
if exhausted {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-09-13 14:28:26 +01:00
|
|
|
// ProcessSegments processes a collection of segments.
|
2022-09-14 15:15:58 +01:00
|
|
|
func (service *Service) ProcessSegments(ctx context.Context, segments []*Segment) (err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
2022-09-13 14:28:26 +01:00
|
|
|
// Verify all the segments against storage nodes.
|
2022-09-14 15:15:58 +01:00
|
|
|
err = service.Verify(ctx, segments)
|
2022-09-13 14:28:26 +01:00
|
|
|
if err != nil {
|
2022-09-14 15:15:58 +01:00
|
|
|
return Error.Wrap(err)
|
2022-09-13 14:28:26 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
notFound := []*Segment{}
|
|
|
|
retry := []*Segment{}
|
|
|
|
|
|
|
|
// Find out which of the segments we did not find
|
|
|
|
// or there was some other failure.
|
|
|
|
for _, segment := range segments {
|
|
|
|
if segment.Status.NotFound > 0 {
|
|
|
|
notFound = append(notFound, segment)
|
2022-12-23 00:07:11 +00:00
|
|
|
} else if (service.config.Check > 0 && segment.Status.Retry > 0) || segment.Status.Retry > 5 {
|
2022-09-13 14:28:26 +01:00
|
|
|
retry = append(retry, segment)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Some segments might have been deleted during the
|
|
|
|
// processing, so cross-reference and remove any deleted
|
|
|
|
// segments from the list.
|
|
|
|
notFound, err = service.RemoveDeleted(ctx, notFound)
|
|
|
|
if err != nil {
|
2022-09-14 15:15:58 +01:00
|
|
|
return Error.Wrap(err)
|
2022-09-13 14:28:26 +01:00
|
|
|
}
|
|
|
|
retry, err = service.RemoveDeleted(ctx, retry)
|
|
|
|
if err != nil {
|
2022-09-14 15:15:58 +01:00
|
|
|
return Error.Wrap(err)
|
2022-09-13 14:28:26 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// Output the problematic segments:
|
|
|
|
errNotFound := service.notFound.Write(ctx, notFound)
|
|
|
|
errRetry := service.retry.Write(ctx, retry)
|
|
|
|
|
|
|
|
return errs.Combine(errNotFound, errRetry)
|
|
|
|
}
|
|
|
|
|
|
|
|
// RemoveDeleted modifies the slice and returns only the segments that
|
|
|
|
// still exist in the database.
|
|
|
|
func (service *Service) RemoveDeleted(ctx context.Context, segments []*Segment) (_ []*Segment, err error) {
|
2022-09-14 15:15:58 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
2022-09-13 14:28:26 +01:00
|
|
|
valid := segments[:0]
|
|
|
|
for _, seg := range segments {
|
|
|
|
_, err := service.metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
|
|
|
|
StreamID: seg.StreamID,
|
|
|
|
Position: seg.Position,
|
|
|
|
})
|
|
|
|
if metabase.ErrSegmentNotFound.Has(err) {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
service.log.Error("get segment by id failed", zap.Stringer("stream-id", seg.StreamID), zap.String("position", fmt.Sprint(seg.Position)))
|
|
|
|
if ctx.Err() != nil {
|
|
|
|
return valid, ctx.Err()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
valid = append(valid, seg)
|
|
|
|
}
|
|
|
|
return valid, nil
|
|
|
|
}
|
|
|
|
|
2022-09-07 16:30:33 +01:00
|
|
|
// Segment contains minimal information necessary for verifying a single Segment.
|
|
|
|
type Segment struct {
|
2022-09-13 14:28:26 +01:00
|
|
|
metabase.VerifySegment
|
2022-09-09 09:16:22 +01:00
|
|
|
Status Status
|
|
|
|
}
|
|
|
|
|
|
|
|
// Status contains the statistics about the segment.
|
|
|
|
type Status struct {
|
|
|
|
Retry int32
|
|
|
|
Found int32
|
|
|
|
NotFound int32
|
|
|
|
}
|
|
|
|
|
|
|
|
// MarkFound moves a retry token from retry to found.
|
|
|
|
func (status *Status) MarkFound() {
|
|
|
|
atomic.AddInt32(&status.Retry, -1)
|
|
|
|
atomic.AddInt32(&status.Found, 1)
|
|
|
|
}
|
|
|
|
|
|
|
|
// MarkNotFound moves a retry token from retry to not found.
|
|
|
|
func (status *Status) MarkNotFound() {
|
|
|
|
atomic.AddInt32(&status.Retry, -1)
|
|
|
|
atomic.AddInt32(&status.NotFound, 1)
|
2022-09-07 16:30:33 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// Batch is a list of segments to be verified on a single node.
|
|
|
|
type Batch struct {
|
|
|
|
Alias metabase.NodeAlias
|
|
|
|
Items []*Segment
|
|
|
|
}
|
|
|
|
|
|
|
|
// Len returns the length of the batch.
|
|
|
|
func (b *Batch) Len() int { return len(b.Items) }
|
2022-09-13 14:28:26 +01:00
|
|
|
|
|
|
|
// uuidBefore returns an uuid.UUID that's immediately before v.
|
|
|
|
// It might not be a valid uuid after this operation.
|
|
|
|
func uuidBefore(v uuid.UUID) uuid.UUID {
|
|
|
|
for i := len(v) - 1; i >= 0; i-- {
|
|
|
|
v[i]--
|
|
|
|
if v[i] != 0xFF { // we didn't wrap around
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return v
|
|
|
|
}
|