cmd/satellite: command for verifying piece hashes
Jira: https://storjlabs.atlassian.net/browse/PG-69 There are a number of segments with piece_hashes_verified = false in their metadata) on US-Central-1, Europe-West-1, and Asia-East-1 satellites. Most probably, this happened due to a bug we had in the past. We want to verify them before executing the main migration to metabase. This would simplify the main migration to metabase with one less issue to think about. Change-Id: I8831af1a254c560d45bb87d7104e49abd8242236
This commit is contained in:
parent
d508c4c985
commit
b409b53f7f
@ -229,6 +229,12 @@ var (
|
||||
Long: "Fixes the old-style objects by adding the number of segments to the metadata.",
|
||||
RunE: cmdFixOldStyleObjects,
|
||||
}
|
||||
verifyPieceHashesCmd = &cobra.Command{
|
||||
Use: "verify-piece-hashes",
|
||||
Short: "Verifies piece hashes for unverified segments",
|
||||
Long: "Verifies piece hashes for all segments with PieceHashesVerifeid = false in their pointer.",
|
||||
RunE: cmdVerifyPieceHashes,
|
||||
}
|
||||
|
||||
runCfg Satellite
|
||||
setupCfg Satellite
|
||||
@ -264,7 +270,8 @@ var (
|
||||
}
|
||||
verifyGracefulExitReceiptCfg struct {
|
||||
}
|
||||
fixOldStyleObjectsCfg struct {
|
||||
dryRunCfg struct {
|
||||
Satellite
|
||||
DryRun bool `help:"only prints logs for the changes to be made without apply them" default:"true"`
|
||||
}
|
||||
confDir string
|
||||
@ -303,6 +310,7 @@ func init() {
|
||||
billingCmd.AddCommand(finalizeCustomerInvoicesCmd)
|
||||
billingCmd.AddCommand(stripeCustomerCmd)
|
||||
metainfoCmd.AddCommand(fixOldStyleObjectsCmd)
|
||||
metainfoCmd.AddCommand(verifyPieceHashesCmd)
|
||||
process.Bind(runCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
|
||||
process.Bind(runMigrationCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
|
||||
process.Bind(runAPICmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
|
||||
@ -324,7 +332,8 @@ func init() {
|
||||
process.Bind(createCustomerInvoicesCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
|
||||
process.Bind(finalizeCustomerInvoicesCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
|
||||
process.Bind(stripeCustomerCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
|
||||
process.Bind(fixOldStyleObjectsCmd, &fixOldStyleObjectsCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
|
||||
process.Bind(fixOldStyleObjectsCmd, &dryRunCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
|
||||
process.Bind(verifyPieceHashesCmd, &dryRunCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
|
||||
}
|
||||
|
||||
func cmdRun(cmd *cobra.Command, args []string) (err error) {
|
||||
@ -713,7 +722,13 @@ func cmdStripeCustomer(cmd *cobra.Command, args []string) (err error) {
|
||||
func cmdFixOldStyleObjects(cmd *cobra.Command, args []string) (err error) {
|
||||
ctx, _ := process.Ctx(cmd)
|
||||
|
||||
return fixOldStyleObjects(ctx, fixOldStyleObjectsCfg.DryRun)
|
||||
return fixOldStyleObjects(ctx)
|
||||
}
|
||||
|
||||
func cmdVerifyPieceHashes(cmd *cobra.Command, args []string) (err error) {
|
||||
ctx, _ := process.Ctx(cmd)
|
||||
|
||||
return verifyPieceHashes(ctx)
|
||||
}
|
||||
|
||||
func main() {
|
||||
|
@ -7,19 +7,28 @@ import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"log"
|
||||
"os"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/pb"
|
||||
"storj.io/common/peertls/tlsopts"
|
||||
"storj.io/common/rpc"
|
||||
"storj.io/common/signing"
|
||||
"storj.io/storj/pkg/revocation"
|
||||
"storj.io/storj/satellite/audit"
|
||||
"storj.io/storj/satellite/metainfo"
|
||||
"storj.io/storj/satellite/orders"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
"storj.io/storj/satellite/satellitedb"
|
||||
)
|
||||
|
||||
func runMetainfoCmd(cmdFunc func(*metainfo.Service) error) error {
|
||||
func runMetainfoCmd(ctx context.Context, cmdFunc func(*metainfo.Service) error) error {
|
||||
logger := zap.L()
|
||||
|
||||
db, err := satellitedb.New(logger.Named("db"), runCfg.Database, satellitedb.Options{})
|
||||
db, err := satellitedb.New(logger.Named("db"), dryRunCfg.Database, satellitedb.Options{})
|
||||
if err != nil {
|
||||
return errs.New("error connecting to master database on satellite: %+v", err)
|
||||
}
|
||||
@ -27,7 +36,12 @@ func runMetainfoCmd(cmdFunc func(*metainfo.Service) error) error {
|
||||
err = errs.Combine(err, db.Close())
|
||||
}()
|
||||
|
||||
pointerDB, err := metainfo.NewStore(logger.Named("pointerdb"), runCfg.Metainfo.DatabaseURL)
|
||||
err = db.CheckVersion(ctx)
|
||||
if err != nil {
|
||||
return errs.New("Error checking version for satellitedb: %+v", err)
|
||||
}
|
||||
|
||||
pointerDB, err := metainfo.NewStore(logger.Named("pointerdb"), dryRunCfg.Metainfo.DatabaseURL)
|
||||
if err != nil {
|
||||
return errs.New("Error creating metainfo database connection: %+v", err)
|
||||
}
|
||||
@ -44,8 +58,95 @@ func runMetainfoCmd(cmdFunc func(*metainfo.Service) error) error {
|
||||
return cmdFunc(service)
|
||||
}
|
||||
|
||||
func fixOldStyleObjects(ctx context.Context, dryRun bool) (err error) {
|
||||
return runMetainfoCmd(func(metainfo *metainfo.Service) error {
|
||||
func runVerifierCmd(ctx context.Context, cmdFunc func(*audit.Verifier) error) error {
|
||||
logger := zap.L()
|
||||
|
||||
identity, err := dryRunCfg.Identity.Load()
|
||||
if err != nil {
|
||||
log.Fatal("Failed to load identity.", zap.Error(err))
|
||||
}
|
||||
|
||||
db, err := satellitedb.New(logger.Named("db"), dryRunCfg.Database, satellitedb.Options{})
|
||||
if err != nil {
|
||||
return errs.New("error connecting to master database on satellite: %+v", err)
|
||||
}
|
||||
defer func() {
|
||||
err = errs.Combine(err, db.Close())
|
||||
}()
|
||||
|
||||
err = db.CheckVersion(ctx)
|
||||
if err != nil {
|
||||
return errs.New("Error checking version for satellitedb: %+v", err)
|
||||
}
|
||||
|
||||
pointerDB, err := metainfo.NewStore(logger.Named("pointerdb"), dryRunCfg.Metainfo.DatabaseURL)
|
||||
if err != nil {
|
||||
return errs.New("Error creating metainfo database connection: %+v", err)
|
||||
}
|
||||
defer func() {
|
||||
err = errs.Combine(err, pointerDB.Close())
|
||||
}()
|
||||
|
||||
revocationDB, err := revocation.NewDBFromCfg(dryRunCfg.Server.Config)
|
||||
if err != nil {
|
||||
return errs.New("Error creating revocation database: %+v", err)
|
||||
}
|
||||
defer func() {
|
||||
err = errs.Combine(err, revocationDB.Close())
|
||||
}()
|
||||
|
||||
tlsOptions, err := tlsopts.NewOptions(identity, dryRunCfg.Server.Config, revocationDB)
|
||||
if err != nil {
|
||||
return errs.New("Error creating TLS options: %+v", err)
|
||||
}
|
||||
|
||||
dialer := rpc.NewDefaultDialer(tlsOptions)
|
||||
|
||||
metainfoService := metainfo.NewService(
|
||||
logger.Named("metainfo:service"),
|
||||
pointerDB,
|
||||
db.Buckets(),
|
||||
)
|
||||
|
||||
overlayService := overlay.NewService(
|
||||
logger.Named("overlay"),
|
||||
db.OverlayCache(),
|
||||
runCfg.Overlay,
|
||||
)
|
||||
|
||||
ordersService, err := orders.NewService(
|
||||
logger.Named("orders:service"),
|
||||
signing.SignerFromFullIdentity(identity),
|
||||
overlayService,
|
||||
db.Orders(),
|
||||
db.Buckets(),
|
||||
runCfg.Orders,
|
||||
&pb.NodeAddress{
|
||||
Transport: pb.NodeTransport_TCP_TLS_GRPC,
|
||||
Address: runCfg.Contact.ExternalAddress,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return errs.New("Error creating orders service: %+v", err)
|
||||
}
|
||||
|
||||
verifier := audit.NewVerifier(
|
||||
logger.Named("audit:verifier"),
|
||||
metainfoService,
|
||||
dialer,
|
||||
overlayService,
|
||||
db.Containment(),
|
||||
ordersService,
|
||||
identity,
|
||||
runCfg.Audit.MinBytesPerSecond,
|
||||
runCfg.Audit.MinDownloadTimeout,
|
||||
)
|
||||
|
||||
return cmdFunc(verifier)
|
||||
}
|
||||
|
||||
func fixOldStyleObjects(ctx context.Context) (err error) {
|
||||
return runMetainfoCmd(ctx, func(metainfo *metainfo.Service) error {
|
||||
var total, fixed int
|
||||
|
||||
scanner := bufio.NewScanner(os.Stdin)
|
||||
@ -55,7 +156,7 @@ func fixOldStyleObjects(ctx context.Context, dryRun bool) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
changed, err := metainfo.FixOldStyleObject(ctx, key, dryRun)
|
||||
changed, err := metainfo.FixOldStyleObject(ctx, key, dryRunCfg.DryRun)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -66,12 +167,36 @@ func fixOldStyleObjects(ctx context.Context, dryRun bool) (err error) {
|
||||
}
|
||||
}
|
||||
|
||||
if err := scanner.Err(); err != nil {
|
||||
return err
|
||||
zap.L().Info("Completed.", zap.Int("Fixed", fixed), zap.Int("From Total", total))
|
||||
|
||||
return scanner.Err()
|
||||
})
|
||||
}
|
||||
|
||||
func verifyPieceHashes(ctx context.Context) (err error) {
|
||||
return runVerifierCmd(ctx, func(verifier *audit.Verifier) error {
|
||||
var total, fixed int
|
||||
|
||||
scanner := bufio.NewScanner(os.Stdin)
|
||||
for scanner.Scan() {
|
||||
key, err := hex.DecodeString(scanner.Text())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
changed, err := verifier.VerifyPieceHashes(ctx, string(key), dryRunCfg.DryRun)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
total++
|
||||
if changed {
|
||||
fixed++
|
||||
}
|
||||
}
|
||||
|
||||
zap.L().Info("Completed.", zap.Int("Fixed", fixed), zap.Int("From Total", total))
|
||||
|
||||
return nil
|
||||
return scanner.Err()
|
||||
})
|
||||
}
|
||||
|
2
go.mod
2
go.mod
@ -42,7 +42,7 @@ require (
|
||||
golang.org/x/sys v0.0.0-20200808120158-1030fc2bf1d9
|
||||
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
|
||||
golang.org/x/tools v0.0.0-20200428211428-0c9eba77bc32 // indirect
|
||||
storj.io/common v0.0.0-20200904063801-15a4e772a2f2
|
||||
storj.io/common v0.0.0-20200925121432-61f74bdf4b5c
|
||||
storj.io/drpc v0.0.14
|
||||
storj.io/monkit-jaeger v0.0.0-20200518165323-80778fc3f91b
|
||||
storj.io/private v0.0.0-20200925142346-4c879709882f
|
||||
|
4
go.sum
4
go.sum
@ -728,8 +728,8 @@ storj.io/common v0.0.0-20200729140050-4c1ddac6fa63 h1:BkRvlginTJGi0yAkpN+4ZKm2Yp
|
||||
storj.io/common v0.0.0-20200729140050-4c1ddac6fa63/go.mod h1:ILr54ISCqCQ6MmIwT7eaR/fEGrBfgfxiPt8nmpWqnUM=
|
||||
storj.io/common v0.0.0-20200818131620-f9cddf66b4be h1:kyX4v2M3ZNjlj0cFGON9as91Qm08Jg3XtVz7MQwjMy8=
|
||||
storj.io/common v0.0.0-20200818131620-f9cddf66b4be/go.mod h1:ILr54ISCqCQ6MmIwT7eaR/fEGrBfgfxiPt8nmpWqnUM=
|
||||
storj.io/common v0.0.0-20200904063801-15a4e772a2f2 h1:ZCODzZM42kgs+yODctj8TOKfVJ1fPETXiOrS+C2muQU=
|
||||
storj.io/common v0.0.0-20200904063801-15a4e772a2f2/go.mod h1:ILr54ISCqCQ6MmIwT7eaR/fEGrBfgfxiPt8nmpWqnUM=
|
||||
storj.io/common v0.0.0-20200925121432-61f74bdf4b5c h1:4B1CHdMDbHZ3sgs6yliXsH2MzQR5EuEY8utSEVxsSRU=
|
||||
storj.io/common v0.0.0-20200925121432-61f74bdf4b5c/go.mod h1:ILr54ISCqCQ6MmIwT7eaR/fEGrBfgfxiPt8nmpWqnUM=
|
||||
storj.io/drpc v0.0.11/go.mod h1:TiFc2obNjL9/3isMW1Rpxjy8V9uE0B2HMeMFGiiI7Iw=
|
||||
storj.io/drpc v0.0.11/go.mod h1:TiFc2obNjL9/3isMW1Rpxjy8V9uE0B2HMeMFGiiI7Iw=
|
||||
storj.io/drpc v0.0.14 h1:GCBdymTt1BRw4oHmmUZZlxYXLVRxxYj6x3Ivide2J+I=
|
||||
|
@ -28,6 +28,7 @@ import (
|
||||
"storj.io/storj/satellite/metainfo/metabase"
|
||||
"storj.io/storj/satellite/orders"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
"storj.io/storj/storage"
|
||||
"storj.io/uplink/private/eestream"
|
||||
"storj.io/uplink/private/piecestore"
|
||||
)
|
||||
@ -66,6 +67,7 @@ type Verifier struct {
|
||||
minDownloadTimeout time.Duration
|
||||
|
||||
OnTestingCheckSegmentAlteredHook func()
|
||||
OnTestingVerifyMockFunc func() (Report, error)
|
||||
}
|
||||
|
||||
// NewVerifier creates a Verifier.
|
||||
@ -890,3 +892,110 @@ func GetRandomStripe(ctx context.Context, pointer *pb.Pointer) (index int64, err
|
||||
|
||||
return randomStripeIndex, nil
|
||||
}
|
||||
|
||||
// VerifyPieceHashes verifies the piece hashes for segments with piece_hashes_verified = false.
|
||||
func (verifier *Verifier) VerifyPieceHashes(ctx context.Context, path storj.Path, dryRun bool) (changed bool, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
verifier.log.Info("Verifying piece hashes.", zap.String("Path", path))
|
||||
|
||||
maxAttempts := 3
|
||||
for attempts := 0; attempts < maxAttempts; attempts++ {
|
||||
attempts++
|
||||
|
||||
_, pointer, err := verifier.metainfo.GetWithBytes(ctx, metabase.SegmentKey(path))
|
||||
if err != nil {
|
||||
if storj.ErrObjectNotFound.Has(err) {
|
||||
verifier.log.Info("Segment not found.")
|
||||
return false, nil
|
||||
}
|
||||
return false, Error.Wrap(err)
|
||||
}
|
||||
|
||||
if pointer.PieceHashesVerified {
|
||||
verifier.log.Info("Piece hashes already verified.")
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if pointer.Type != pb.Pointer_REMOTE {
|
||||
verifier.log.Info("Not a remote segment.")
|
||||
return false, nil
|
||||
}
|
||||
|
||||
var report Report
|
||||
if verifier.OnTestingVerifyMockFunc != nil {
|
||||
report, err = verifier.OnTestingVerifyMockFunc()
|
||||
} else {
|
||||
report, err = verifier.Verify(ctx, path, nil)
|
||||
}
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
verifier.log.Info("Audit report received.",
|
||||
zap.Int("Success", report.Successes.Len()),
|
||||
zap.Int("Fails", report.Fails.Len()),
|
||||
zap.Int("Offlines", report.Offlines.Len()),
|
||||
zap.Int("Pending Audits", len(report.PendingAudits)),
|
||||
zap.Int("Unknown", report.Unknown.Len()),
|
||||
)
|
||||
|
||||
if report.Successes.Len() == 0 {
|
||||
// skip it - this could happen if there was deleted or expired
|
||||
verifier.log.Info("Empty success list. Skipping the segment.")
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if report.Successes.Len() < int(pointer.Remote.Redundancy.MinReq) {
|
||||
verifier.log.Warn("Segment would be irreparable. Not fixing it.",
|
||||
zap.Int("Successful Nodes", report.Successes.Len()),
|
||||
zap.Int32("Minimum Required", pointer.Remote.Redundancy.MinReq))
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if report.Successes.Len() < int(pointer.Remote.Redundancy.RepairThreshold) {
|
||||
verifier.log.Warn("Segment would require repair. Not fixing it.",
|
||||
zap.Int("Successful Nodes", report.Successes.Len()),
|
||||
zap.Int32("Repair Threshold", pointer.Remote.Redundancy.RepairThreshold))
|
||||
return false, nil
|
||||
}
|
||||
|
||||
toRemoveCount := report.Fails.Len() + report.Offlines.Len() + len(report.PendingAudits) + report.Unknown.Len()
|
||||
|
||||
toRemove := make([]*pb.RemotePiece, 0, toRemoveCount)
|
||||
for _, piece := range pointer.Remote.RemotePieces {
|
||||
if !report.Successes.Contains(piece.NodeId) {
|
||||
toRemove = append(toRemove, piece)
|
||||
}
|
||||
}
|
||||
|
||||
// sanity check
|
||||
if len(toRemove) != toRemoveCount {
|
||||
return false, Error.New("Pieces to remove (%d) do not match unsuccessful nodes (%d)", len(toRemove), toRemoveCount)
|
||||
}
|
||||
|
||||
verifier.log.Info("Removing unsuccessful pieces from pointer.", zap.Int("Pieces To Remove", toRemoveCount))
|
||||
|
||||
if dryRun {
|
||||
verifier.log.Info("Dry run, skipping the actual fix.", zap.Int("Successful Nodes", report.Successes.Len()))
|
||||
return true, nil
|
||||
}
|
||||
|
||||
_, err = verifier.metainfo.UpdatePiecesCheckDuplicatesVerifyHashes(ctx, metabase.SegmentKey(path), pointer, nil, toRemove, false, true)
|
||||
if err != nil {
|
||||
if storage.ErrValueChanged.Has(err) {
|
||||
verifier.log.Info("Race detected while modifying segment pointer. Retrying...")
|
||||
continue
|
||||
}
|
||||
if storage.ErrKeyNotFound.Has(err) {
|
||||
verifier.log.Info("Object not found.")
|
||||
return false, nil
|
||||
}
|
||||
return false, Error.Wrap(err)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return false, Error.New("Failed to modify segment pointer in %d attempts.", maxAttempts)
|
||||
}
|
||||
|
@ -5,6 +5,8 @@ package audit_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -873,3 +875,178 @@ func TestVerifierUnknownError(t *testing.T) {
|
||||
require.Equal(t, report.Unknown[0], badNode.ID())
|
||||
})
|
||||
}
|
||||
|
||||
func TestVerifyPieceHashes(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: testplanet.ReconfigureRS(2, 2, 6, 6),
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
satellite := planet.Satellites[0]
|
||||
audits := satellite.Audit
|
||||
|
||||
nodes := storj.NodeIDList{
|
||||
planet.StorageNodes[0].ID(),
|
||||
planet.StorageNodes[1].ID(),
|
||||
planet.StorageNodes[2].ID(),
|
||||
planet.StorageNodes[3].ID(),
|
||||
planet.StorageNodes[4].ID(),
|
||||
planet.StorageNodes[5].ID(),
|
||||
}
|
||||
|
||||
// happy path test cases
|
||||
for i, tt := range []struct {
|
||||
report audit.Report
|
||||
err error
|
||||
changed bool
|
||||
}{
|
||||
{ // empty report is sometimes returned if the segment was expired or deleted.
|
||||
report: audit.Report{},
|
||||
changed: false,
|
||||
},
|
||||
{ // all nodes from the pointer responded successfully to the audit
|
||||
report: audit.Report{Successes: nodes},
|
||||
changed: true,
|
||||
},
|
||||
{ // one node failed the audit
|
||||
report: audit.Report{Successes: nodes[1:], Fails: nodes[:1]},
|
||||
changed: true,
|
||||
},
|
||||
{ // 4 nodes failed the audit
|
||||
report: audit.Report{Successes: nodes[4:], Fails: nodes[:4]},
|
||||
changed: true,
|
||||
},
|
||||
{ // one node was offline
|
||||
report: audit.Report{Successes: nodes[1:], Offlines: nodes[:1]},
|
||||
changed: true,
|
||||
},
|
||||
{ // 4 nodes were offline
|
||||
report: audit.Report{Successes: nodes[4:], Offlines: nodes[:4]},
|
||||
changed: true,
|
||||
},
|
||||
{ // one node was contained and scheduled for reverification
|
||||
report: audit.Report{Successes: nodes[1:], PendingAudits: []*audit.PendingAudit{{NodeID: nodes[0]}}},
|
||||
changed: true,
|
||||
},
|
||||
{ // 4 nodes were contained and scheduled for reverification
|
||||
report: audit.Report{Successes: nodes[4:], PendingAudits: []*audit.PendingAudit{{NodeID: nodes[0]}, {NodeID: nodes[1]}, {NodeID: nodes[2]}, {NodeID: nodes[3]}}},
|
||||
changed: true,
|
||||
},
|
||||
{ // one node returned unknown error
|
||||
report: audit.Report{Successes: nodes[1:], Unknown: nodes[:1]},
|
||||
changed: true,
|
||||
},
|
||||
{ // 4 nodes returned unknown error
|
||||
report: audit.Report{Successes: nodes[4:], Unknown: nodes[:4]},
|
||||
changed: true,
|
||||
},
|
||||
{ // one node failed the audit and 2 nodes were offline
|
||||
report: audit.Report{Successes: nodes[3:], Fails: nodes[:1], Offlines: nodes[1:3]},
|
||||
changed: true,
|
||||
},
|
||||
{ // one node failed the audit, one was offline, one was contained, and one returned unknown error
|
||||
report: audit.Report{Successes: nodes[4:], Fails: nodes[:1], Offlines: nodes[1:2], PendingAudits: []*audit.PendingAudit{{NodeID: nodes[2]}}, Unknown: nodes[3:4]},
|
||||
changed: true,
|
||||
},
|
||||
{ // remaining nodes are below repair threshold
|
||||
report: audit.Report{Successes: nodes[5:], Offlines: nodes[:5]},
|
||||
changed: false,
|
||||
},
|
||||
{ // Verify returns an error
|
||||
report: audit.Report{},
|
||||
err: errors.New("test error"),
|
||||
changed: false,
|
||||
},
|
||||
} {
|
||||
errTag := fmt.Sprintf("%d. %+v", i, tt)
|
||||
|
||||
testReport := tt.report
|
||||
testErr := tt.err
|
||||
|
||||
audits.Verifier.OnTestingVerifyMockFunc = func() (audit.Report, error) {
|
||||
return testReport, testErr
|
||||
}
|
||||
|
||||
ul := planet.Uplinks[0]
|
||||
testData := testrand.Bytes(8 * memory.KiB)
|
||||
|
||||
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
||||
require.NoError(t, err)
|
||||
|
||||
keys, err := satellite.Metainfo.Database.List(ctx, nil, 1)
|
||||
require.NoError(t, err, errTag)
|
||||
require.Equal(t, 1, len(keys))
|
||||
|
||||
key := metabase.SegmentKey(keys[0])
|
||||
|
||||
// verifying segments with piece_hashes_verified = true should return no error and changed = false
|
||||
changed, err := audits.Verifier.VerifyPieceHashes(ctx, string(key), false)
|
||||
require.NoError(t, err, errTag)
|
||||
assert.False(t, changed, errTag)
|
||||
|
||||
// assert that piece_hashes_verified = true before setting it to false
|
||||
pointer, err := satellite.Metainfo.Service.Get(ctx, key)
|
||||
require.NoError(t, err, errTag)
|
||||
require.True(t, pointer.PieceHashesVerified, errTag)
|
||||
|
||||
// set the piece_hashes_verified to false and store it in the pointer
|
||||
pointer.PieceHashesVerified = false
|
||||
err = satellite.Metainfo.Service.UnsynchronizedPut(ctx, key, pointer)
|
||||
require.NoError(t, err, errTag)
|
||||
|
||||
// verifying (dry run) segments with piece_hashes_verified = false should return no error and changed = true
|
||||
changed, err = audits.Verifier.VerifyPieceHashes(ctx, string(key), true)
|
||||
assert.Equal(t, tt.err, err, errTag)
|
||||
assert.Equal(t, tt.changed, changed, errTag)
|
||||
|
||||
// assert that piece_hashes_verified is still false after the dry run
|
||||
dryRunPointer, err := satellite.Metainfo.Service.Get(ctx, key)
|
||||
require.NoError(t, err, errTag)
|
||||
assert.False(t, dryRunPointer.PieceHashesVerified, errTag)
|
||||
|
||||
// assert the no piece was removed from the pointer by the dry run
|
||||
for i, piece := range dryRunPointer.Remote.RemotePieces {
|
||||
require.GreaterOrEqual(t, len(pointer.Remote.RemotePieces), i, errTag)
|
||||
assert.Equal(t, pointer.Remote.RemotePieces[i].NodeId, piece.NodeId, errTag)
|
||||
}
|
||||
|
||||
// verifying (no dry run) segments with piece_hashes_verified = false should return no error and changed = true
|
||||
changed, err = audits.Verifier.VerifyPieceHashes(ctx, string(key), false)
|
||||
assert.Equal(t, tt.err, err, errTag)
|
||||
assert.Equal(t, tt.changed, changed, errTag)
|
||||
|
||||
// assert that piece_hashes_verified = true if the segment was verified
|
||||
verifiedPointer, err := satellite.Metainfo.Service.Get(ctx, key)
|
||||
require.NoError(t, err, errTag)
|
||||
assert.Equal(t, tt.changed, verifiedPointer.PieceHashesVerified, errTag)
|
||||
|
||||
if changed {
|
||||
// assert the remaining pieces in the pointer are the expected ones
|
||||
for _, piece := range verifiedPointer.Remote.RemotePieces {
|
||||
assert.Contains(t, tt.report.Successes, piece.NodeId, errTag)
|
||||
assert.NotContains(t, tt.report.Fails, piece.NodeId, errTag)
|
||||
assert.NotContains(t, tt.report.Offlines, piece.NodeId, errTag)
|
||||
assert.NotContains(t, tt.report.Unknown, piece.NodeId, errTag)
|
||||
for _, pending := range tt.report.PendingAudits {
|
||||
assert.NotEqual(t, pending.NodeID, piece.NodeId, errTag)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// assert the no piece was removed from the pointer if it wasn't verified
|
||||
for i, piece := range verifiedPointer.Remote.RemotePieces {
|
||||
require.GreaterOrEqual(t, len(pointer.Remote.RemotePieces), i, errTag)
|
||||
assert.Equal(t, pointer.Remote.RemotePieces[i].NodeId, piece.NodeId, errTag)
|
||||
}
|
||||
}
|
||||
|
||||
// fixing non-existing object should return no error and changed = false
|
||||
err = satellite.Metainfo.Service.UnsynchronizedDelete(ctx, key)
|
||||
require.NoError(t, err, errTag)
|
||||
|
||||
changed, err = audits.Verifier.VerifyPieceHashes(ctx, string(key), false)
|
||||
require.NoError(t, err, errTag)
|
||||
assert.False(t, changed, errTag)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -95,6 +95,14 @@ func (s *Service) UpdatePieces(ctx context.Context, key metabase.SegmentKey, ref
|
||||
// Replacing the node ID and the hash of a piece can be done by adding the
|
||||
// piece to both toAdd and toRemove.
|
||||
func (s *Service) UpdatePiecesCheckDuplicates(ctx context.Context, key metabase.SegmentKey, ref *pb.Pointer, toAdd, toRemove []*pb.RemotePiece, checkDuplicates bool) (pointer *pb.Pointer, err error) {
|
||||
return s.UpdatePiecesCheckDuplicatesVerifyHashes(ctx, key, ref, toAdd, toRemove, checkDuplicates, false)
|
||||
}
|
||||
|
||||
// UpdatePiecesCheckDuplicatesVerifyHashes atomically adds toAdd pieces,
|
||||
// removes toRemove pieces, and sets PieceHashesVerified to verifyHashes in
|
||||
// the pointer under path. ref is the pointer that caller received via Get
|
||||
// prior to calling this method.
|
||||
func (s *Service) UpdatePiecesCheckDuplicatesVerifyHashes(ctx context.Context, key metabase.SegmentKey, ref *pb.Pointer, toAdd, toRemove []*pb.RemotePiece, checkDuplicates, verifyHashes bool) (pointer *pb.Pointer, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if err := sanityCheckPointer(key, ref); err != nil {
|
||||
@ -185,6 +193,10 @@ func (s *Service) UpdatePiecesCheckDuplicates(ctx context.Context, key metabase.
|
||||
pointer.LastRepaired = ref.LastRepaired
|
||||
pointer.RepairCount = ref.RepairCount
|
||||
|
||||
if verifyHashes {
|
||||
pointer.PieceHashesVerified = true
|
||||
}
|
||||
|
||||
// marshal the pointer
|
||||
newPointerBytes, err := pb.Marshal(pointer)
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user