storj/cmd/tools/segment-verify/main.go

293 lines
8.3 KiB
Go
Raw Normal View History

// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"context"
"encoding/csv"
"encoding/hex"
"errors"
"os"
"strings"
"github.com/spf13/cobra"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/fpath"
"storj.io/common/peertls/tlsopts"
"storj.io/common/rpc"
"storj.io/common/signing"
"storj.io/common/uuid"
"storj.io/private/cfgstruct"
"storj.io/private/process"
"storj.io/storj/private/revocation"
"storj.io/storj/satellite"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/satellitedb"
)
// Satellite defines satellite configuration.
type Satellite struct {
Database string `help:"satellite database connection string" releaseDefault:"postgres://" devDefault:"postgres://"`
satellite.Config
}
var (
rootCmd = &cobra.Command{
Use: "segment-verify",
Short: "segment-verify",
}
runCmd = &cobra.Command{
Use: "run",
Short: "commands to process segments",
}
rangeCmd = &cobra.Command{
Use: "range",
Short: "runs the command on a range of segments",
RunE: verifySegments,
}
bucketsCmd = &cobra.Command{
Use: "buckets",
Short: "runs the command on segments from specified buckets",
RunE: verifySegments,
}
summarizeCmd = &cobra.Command{
Use: "summarize-log",
Short: "summarizes verification log",
Args: cobra.ExactArgs(1),
RunE: summarizeVerificationLog,
}
nodeCheckCmd = &cobra.Command{
Use: "node-check",
Short: "checks segments for too many duplicate or unvetted nodes",
RunE: verifySegmentsNodeCheck,
}
satelliteCfg Satellite
rangeCfg RangeConfig
bucketsCfg BucketConfig
nodeCheckCfg NodeCheckConfig
confDir string
identityDir string
)
func init() {
defaultConfDir := fpath.ApplicationDir("storj", "satellite")
defaultIdentityDir := fpath.ApplicationDir("storj", "identity", "satellite")
cfgstruct.SetupFlag(zap.L(), rootCmd, &confDir, "config-dir", defaultConfDir, "main directory for satellite configuration")
cfgstruct.SetupFlag(zap.L(), rootCmd, &identityDir, "identity-dir", defaultIdentityDir, "main directory for satellite identity credentials")
defaults := cfgstruct.DefaultsFlag(rootCmd)
rootCmd.AddCommand(runCmd)
rootCmd.AddCommand(summarizeCmd)
rootCmd.AddCommand(nodeCheckCmd)
runCmd.AddCommand(rangeCmd)
runCmd.AddCommand(bucketsCmd)
process.Bind(runCmd, &satelliteCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(rangeCmd, &satelliteCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(rangeCmd, &rangeCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(bucketsCmd, &satelliteCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(bucketsCmd, &bucketsCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(nodeCheckCmd, &satelliteCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(nodeCheckCmd, &nodeCheckCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
}
// RangeConfig defines configuration for verifying segment existence.
type RangeConfig struct {
Service ServiceConfig
Verify VerifierConfig
Low string `help:"hex lowest segment id prefix to verify"`
High string `help:"hex highest segment id prefix to verify (excluded)"`
}
// BucketConfig defines configuration for verifying segment existence within a list of buckets.
type BucketConfig struct {
Service ServiceConfig
Verify VerifierConfig
BucketsCSV string `help:"csv file of project_id,bucket_name of buckets to verify" default:""`
}
func verifySegments(cmd *cobra.Command, args []string) error {
ctx, _ := process.Ctx(cmd)
log := zap.L()
// open default satellite database
db, err := satellitedb.Open(ctx, log.Named("db"), satelliteCfg.Database, satellitedb.Options{
ApplicationName: "segment-verify",
SaveRollupBatchSize: satelliteCfg.Tally.SaveRollupBatchSize,
ReadRollupBatchSize: satelliteCfg.Tally.ReadRollupBatchSize,
})
if err != nil {
return errs.New("Error starting master database on satellite: %+v", err)
}
defer func() {
err = errs.Combine(err, db.Close())
}()
// open metabase
metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), satelliteCfg.Metainfo.DatabaseURL,
satelliteCfg.Config.Metainfo.Metabase("satellite-core"))
if err != nil {
return Error.Wrap(err)
}
defer func() { _ = metabaseDB.Close() }()
// check whether satellite and metabase versions match
versionErr := db.CheckVersion(ctx)
if versionErr != nil {
log.Error("versions skewed", zap.Error(versionErr))
return Error.Wrap(versionErr)
}
versionErr = metabaseDB.CheckVersion(ctx)
if versionErr != nil {
log.Error("versions skewed", zap.Error(versionErr))
return Error.Wrap(versionErr)
}
// setup dialer
identity, err := satelliteCfg.Identity.Load()
if err != nil {
log.Error("Failed to load identity.", zap.Error(err))
return errs.New("Failed to load identity: %+v", err)
}
revocationDB, err := revocation.OpenDBFromCfg(ctx, satelliteCfg.Server.Config)
if err != nil {
return errs.New("Error creating revocation database: %+v", err)
}
defer func() {
err = errs.Combine(err, revocationDB.Close())
}()
tlsOptions, err := tlsopts.NewOptions(identity, satelliteCfg.Server.Config, revocationDB)
if err != nil {
return Error.Wrap(err)
}
dialer := rpc.NewDefaultDialer(tlsOptions)
// setup dependencies for verification
overlay, err := overlay.NewService(log.Named("overlay"), db.OverlayCache(), db.NodeEvents(), nil, "", "", satelliteCfg.Overlay)
if err != nil {
return Error.Wrap(err)
}
ordersService, err := orders.NewService(log.Named("orders"), signing.SignerFromFullIdentity(identity), overlay, db.Orders(), satelliteCfg.Orders)
if err != nil {
return Error.Wrap(err)
}
// setup verifier
verifier := NewVerifier(log.Named("verifier"), dialer, ordersService, rangeCfg.Verify)
service, err := NewService(log.Named("service"), metabaseDB, verifier, overlay, rangeCfg.Service)
if err != nil {
return Error.Wrap(err)
}
verifier.reportPiece = service.problemPieces.Write
defer func() { err = errs.Combine(err, service.Close()) }()
if cmd.Name() == "range" {
return verifySegmentsRange(ctx, service, rangeCfg)
}
if cmd.Name() == "buckets" {
return verifySegmentsBuckets(ctx, service, bucketsCfg)
}
return errors.New("unknown commnand: " + cmd.Name())
}
func verifySegmentsRange(ctx context.Context, service *Service, rangeCfg RangeConfig) error {
// parse arguments
var low, high uuid.UUID
lowBytes, err := hex.DecodeString(rangeCfg.Low)
if err != nil {
return Error.Wrap(err)
}
highBytes, err := hex.DecodeString(rangeCfg.High)
if err != nil {
return Error.Wrap(err)
}
copy(low[:], lowBytes)
copy(high[:], highBytes)
if high.IsZero() {
return Error.New("high argument not specified")
}
return service.ProcessRange(ctx, low, high)
}
func verifySegmentsBuckets(ctx context.Context, service *Service, bucketCfg BucketConfig) error {
if bucketsCfg.BucketsCSV == "" {
return Error.New("bucket list file path not provided")
}
bucketList, err := service.ParseBucketFile(bucketsCfg.BucketsCSV)
if err != nil {
return Error.Wrap(err)
}
return service.ProcessBuckets(ctx, bucketList.Buckets)
}
func main() {
process.Exec(rootCmd)
}
// ParseBucketFile parses a csv file containing project_id and bucket names.
func (service *Service) ParseBucketFile(path string) (_ BucketList, err error) {
csvFile, err := os.Open(path)
if err != nil {
return BucketList{}, err
}
defer func() {
err = errs.Combine(err, csvFile.Close())
}()
csvReader := csv.NewReader(csvFile)
allEntries, err := csvReader.ReadAll()
if err != nil {
return BucketList{}, err
}
bucketList := BucketList{}
for _, entry := range allEntries {
if len(entry) < 2 {
return BucketList{}, Error.New("unable to parse buckets file: %w", err)
}
projectId, err := projectIdFromCompactString(strings.TrimSpace(entry[0]))
if err != nil {
return BucketList{}, Error.New("unable to parse buckets file: %w", err)
}
bucketList.Add(projectId, strings.TrimSpace(entry[1]))
}
return bucketList, nil
}
func projectIdFromCompactString(s string) (uuid.UUID, error) {
decoded, err := hex.DecodeString(s)
if err != nil {
return uuid.UUID{}, Error.New("invalid string")
}
return uuid.FromBytes(decoded)
}