satellite/audit: do not contain nodes for unknown errors (#3592)

* skip unknown errors (wip)

* add tests to make sure nodes that time out are added to containment

* add bad blobs store

* call "Skipped" "Unknown"

* add tests to ensure unknown errors do not trigger containment

* add monkit stats to lockfile

* typo

* add periods to end of bad blobs comments
This commit is contained in:
Maximillian von Briesen 2019-11-19 11:30:28 -05:00 committed by littleskunk
parent 8e1e4cc342
commit 8653dda2b1
7 changed files with 596 additions and 38 deletions

View File

@ -5,6 +5,38 @@ storj.io/storj/satellite/accounting/tally."total.objects" IntVal
storj.io/storj/satellite/accounting/tally."total.remote_bytes" IntVal
storj.io/storj/satellite/accounting/tally."total.remote_segments" IntVal
storj.io/storj/satellite/accounting/tally."total.segments" IntVal
storj.io/storj/satellite/audit."audit_contained_nodes" IntVal
storj.io/storj/satellite/audit."audit_contained_nodes_global" Meter
storj.io/storj/satellite/audit."audit_contained_percentage" FloatVal
storj.io/storj/satellite/audit."audit_fail_nodes" IntVal
storj.io/storj/satellite/audit."audit_fail_nodes_global" Meter
storj.io/storj/satellite/audit."audit_failed_percentage" FloatVal
storj.io/storj/satellite/audit."audit_offline_nodes" IntVal
storj.io/storj/satellite/audit."audit_offline_nodes_global" Meter
storj.io/storj/satellite/audit."audit_offline_percentage" FloatVal
storj.io/storj/satellite/audit."audit_success_nodes" IntVal
storj.io/storj/satellite/audit."audit_success_nodes_global" Meter
storj.io/storj/satellite/audit."audit_successful_percentage" FloatVal
storj.io/storj/satellite/audit."audit_total_nodes" IntVal
storj.io/storj/satellite/audit."audit_total_nodes_global" Meter
storj.io/storj/satellite/audit."audit_total_pointer_nodes" IntVal
storj.io/storj/satellite/audit."audit_total_pointer_nodes_global" Meter
storj.io/storj/satellite/audit."audit_unknown_nodes" IntVal
storj.io/storj/satellite/audit."audit_unknown_nodes_global" Meter
storj.io/storj/satellite/audit."audit_unknown_percentage" FloatVal
storj.io/storj/satellite/audit."audited_percentage" FloatVal
storj.io/storj/satellite/audit."reverify_contained" IntVal
storj.io/storj/satellite/audit."reverify_contained_global" Meter
storj.io/storj/satellite/audit."reverify_contained_in_segment" IntVal
storj.io/storj/satellite/audit."reverify_fails" IntVal
storj.io/storj/satellite/audit."reverify_fails_global" Meter
storj.io/storj/satellite/audit."reverify_offlines" IntVal
storj.io/storj/satellite/audit."reverify_offlines_global" Meter
storj.io/storj/satellite/audit."reverify_successes" IntVal
storj.io/storj/satellite/audit."reverify_successes_global" Meter
storj.io/storj/satellite/audit."reverify_total_in_segment" IntVal
storj.io/storj/satellite/audit."reverify_unknown" IntVal
storj.io/storj/satellite/audit."reverify_unknown_global" Meter
storj.io/storj/satellite/gracefulexit."graceful_exit_fail_max_failures_percentage" Meter
storj.io/storj/satellite/gracefulexit."graceful_exit_fail_validation" Meter
storj.io/storj/satellite/gracefulexit."graceful_exit_final_bytes_transferred" IntVal

187
private/testblobs/bad.go Normal file
View File

@ -0,0 +1,187 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package testblobs
import (
"context"
"go.uber.org/zap"
"storj.io/storj/storage"
"storj.io/storj/storagenode"
)
// BadDB implements bad storage node DB.
type BadDB struct {
storagenode.DB
blobs *BadBlobs
log *zap.Logger
}
// NewBadDB creates a new bad storage node DB.
// Use SetError to manually configure the error returned by all piece operations.
func NewBadDB(log *zap.Logger, db storagenode.DB) *BadDB {
return &BadDB{
DB: db,
blobs: newBadBlobs(log, db.Pieces()),
log: log,
}
}
// Pieces returns the blob store.
func (bad *BadDB) Pieces() storage.Blobs {
return bad.blobs
}
// SetError sets an error to be returned for all piece operations.
func (bad *BadDB) SetError(err error) {
bad.blobs.SetError(err)
}
// BadBlobs implements a bad blob store.
type BadBlobs struct {
err error
blobs storage.Blobs
log *zap.Logger
}
// newBadBlobs creates a new bad blob store wrapping the provided blobs.
// Use SetError to manually configure the error returned by all operations.
func newBadBlobs(log *zap.Logger, blobs storage.Blobs) *BadBlobs {
return &BadBlobs{
log: log,
blobs: blobs,
}
}
// Create creates a new blob that can be written optionally takes a size
// argument for performance improvements, -1 is unknown size.
func (bad *BadBlobs) Create(ctx context.Context, ref storage.BlobRef, size int64) (storage.BlobWriter, error) {
if bad.err != nil {
return nil, bad.err
}
return bad.blobs.Create(ctx, ref, size)
}
// Close closes the blob store and any resources associated with it.
func (bad *BadBlobs) Close() error {
if bad.err != nil {
return bad.err
}
return bad.blobs.Close()
}
// Open opens a reader with the specified namespace and key.
func (bad *BadBlobs) Open(ctx context.Context, ref storage.BlobRef) (storage.BlobReader, error) {
if bad.err != nil {
return nil, bad.err
}
return bad.blobs.Open(ctx, ref)
}
// OpenWithStorageFormat opens a reader for the already-located blob, avoiding the potential need
// to check multiple storage formats to find the blob.
func (bad *BadBlobs) OpenWithStorageFormat(ctx context.Context, ref storage.BlobRef, formatVer storage.FormatVersion) (storage.BlobReader, error) {
if bad.err != nil {
return nil, bad.err
}
return bad.blobs.OpenWithStorageFormat(ctx, ref, formatVer)
}
// Trash deletes the blob with the namespace and key.
func (bad *BadBlobs) Trash(ctx context.Context, ref storage.BlobRef) error {
if bad.err != nil {
return bad.err
}
return bad.blobs.Trash(ctx, ref)
}
// RestoreTrash restores all files in the trash.
func (bad *BadBlobs) RestoreTrash(ctx context.Context, namespace []byte) error {
if bad.err != nil {
return bad.err
}
return bad.blobs.RestoreTrash(ctx, namespace)
}
// Delete deletes the blob with the namespace and key.
func (bad *BadBlobs) Delete(ctx context.Context, ref storage.BlobRef) error {
if bad.err != nil {
return bad.err
}
return bad.blobs.Delete(ctx, ref)
}
// DeleteWithStorageFormat deletes the blob with the namespace, key, and format version.
func (bad *BadBlobs) DeleteWithStorageFormat(ctx context.Context, ref storage.BlobRef, formatVer storage.FormatVersion) error {
if bad.err != nil {
return bad.err
}
return bad.blobs.DeleteWithStorageFormat(ctx, ref, formatVer)
}
// Stat looks up disk metadata on the blob file.
func (bad *BadBlobs) Stat(ctx context.Context, ref storage.BlobRef) (storage.BlobInfo, error) {
if bad.err != nil {
return nil, bad.err
}
return bad.blobs.Stat(ctx, ref)
}
// StatWithStorageFormat looks up disk metadata for the blob file with the given storage format
// version. This avoids the potential need to check multiple storage formats for the blob
// when the format is already known.
func (bad *BadBlobs) StatWithStorageFormat(ctx context.Context, ref storage.BlobRef, formatVer storage.FormatVersion) (storage.BlobInfo, error) {
if bad.err != nil {
return nil, bad.err
}
return bad.blobs.StatWithStorageFormat(ctx, ref, formatVer)
}
// WalkNamespace executes walkFunc for each locally stored blob in the given namespace.
// If walkFunc returns a non-nil error, WalkNamespace will stop iterating and return the
// error immediately.
func (bad *BadBlobs) WalkNamespace(ctx context.Context, namespace []byte, walkFunc func(storage.BlobInfo) error) error {
if bad.err != nil {
return bad.err
}
return bad.blobs.WalkNamespace(ctx, namespace, walkFunc)
}
// ListNamespaces returns all namespaces that might be storing data.
func (bad *BadBlobs) ListNamespaces(ctx context.Context) ([][]byte, error) {
if bad.err != nil {
return make([][]byte, 0), bad.err
}
return bad.blobs.ListNamespaces(ctx)
}
// FreeSpace return how much free space left for writing.
func (bad *BadBlobs) FreeSpace() (int64, error) {
if bad.err != nil {
return 0, bad.err
}
return bad.blobs.FreeSpace()
}
// SpaceUsed adds up how much is used in all namespaces.
func (bad *BadBlobs) SpaceUsed(ctx context.Context) (int64, error) {
if bad.err != nil {
return 0, bad.err
}
return bad.blobs.SpaceUsed(ctx)
}
// SpaceUsedInNamespace adds up how much is used in the given namespace.
func (bad *BadBlobs) SpaceUsedInNamespace(ctx context.Context, namespace []byte) (int64, error) {
if bad.err != nil {
return 0, bad.err
}
return bad.blobs.SpaceUsedInNamespace(ctx, namespace)
}
// SetError configures the blob store to return a specific error for all operations.
func (bad *BadBlobs) SetError(err error) {
bad.err = err
}

View File

@ -24,12 +24,13 @@ type Reporter struct {
maxReverifyCount int32
}
// Report contains audit result lists for nodes that succeeded, failed, were offline, or have pending audits
// Report contains audit result lists for nodes that succeeded, failed, were offline, have pending audits, or failed for unknown reasons
type Report struct {
Successes storj.NodeIDList
Fails storj.NodeIDList
Offlines storj.NodeIDList
PendingAudits []*PendingAudit
Unknown storj.NodeIDList
}
// NewReporter instantiates a reporter

View File

@ -10,6 +10,8 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/peertls/tlsopts"
@ -17,11 +19,14 @@ import (
"storj.io/storj/pkg/rpc"
"storj.io/storj/pkg/storj"
"storj.io/storj/private/memory"
"storj.io/storj/private/testblobs"
"storj.io/storj/private/testcontext"
"storj.io/storj/private/testplanet"
"storj.io/storj/private/testrand"
"storj.io/storj/satellite"
"storj.io/storj/satellite/audit"
"storj.io/storj/storage"
"storj.io/storj/storagenode"
"storj.io/storj/uplink"
)
@ -955,3 +960,192 @@ func TestReverifyExpired2(t *testing.T) {
require.Error(t, err)
})
}
// TestReverifySlowDownload checks that a node that times out while sending data to the
// audit service gets put into containment mode
func TestReverifySlowDownload(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
NewStorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
return testblobs.NewSlowDB(log.Named("slowdb"), db), nil
},
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
// These config values are chosen to force the slow node to time out without timing out on the three normal nodes
config.Audit.MinBytesPerSecond = 100 * memory.KiB
config.Audit.MinDownloadTimeout = 500 * time.Millisecond
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := ul.UploadWithConfig(ctx, satellite, &uplink.RSConfig{
MinThreshold: 2,
RepairThreshold: 2,
SuccessThreshold: 4,
MaxThreshold: 4,
}, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
path, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, path)
require.NoError(t, err)
slowPiece := pointer.Remote.RemotePieces[0]
slowNode := slowPiece.NodeId
randomIndex, err := audit.GetRandomStripe(ctx, pointer)
require.NoError(t, err)
orders := satellite.Orders.Service
containment := satellite.DB.Containment()
projects, err := satellite.DB.Console().Projects().GetAll(ctx)
require.NoError(t, err)
bucketID := []byte(storj.JoinPaths(projects[0].ID.String(), "testbucket"))
shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize()
pieces := pointer.GetRemote().GetRemotePieces()
rootPieceID := pointer.GetRemote().RootPieceId
limit, privateKey, err := orders.CreateAuditOrderLimit(ctx, bucketID, slowNode, slowPiece.PieceNum, rootPieceID, shareSize)
require.NoError(t, err)
share, err := audits.Verifier.GetShare(ctx, limit, privateKey, randomIndex, shareSize, int(pieces[0].PieceNum))
require.NoError(t, err)
pending := &audit.PendingAudit{
NodeID: slowNode,
PieceID: pointer.Remote.RootPieceId,
StripeIndex: randomIndex,
ShareSize: shareSize,
ExpectedShareHash: pkcrypto.SHA256Hash(share.Data),
ReverifyCount: 0,
Path: path,
}
err = containment.IncrementPending(ctx, pending)
require.NoError(t, err)
for _, node := range planet.StorageNodes {
if node.ID() == slowNode {
slowNodeDB := node.DB.(*testblobs.SlowDB)
// make downloads on storage node slower than the timeout on the satellite for downloading shares
delay := 1 * time.Second
slowNodeDB.SetLatency(delay)
break
}
}
report, err := audits.Verifier.Reverify(ctx, path)
require.NoError(t, err)
require.Len(t, report.Successes, 0)
require.Len(t, report.Fails, 0)
require.Len(t, report.Offlines, 0)
require.Len(t, report.PendingAudits, 1)
require.Len(t, report.Unknown, 0)
require.Equal(t, report.PendingAudits[0].NodeID, slowNode)
})
}
// TestReverifyUnknownError checks that a node that returns an unknown error during an audit does not get marked as successful, failed, or contained.
func TestReverifyUnknownError(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
NewStorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
return testblobs.NewBadDB(log.Named("baddb"), db), nil
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := ul.UploadWithConfig(ctx, satellite, &uplink.RSConfig{
MinThreshold: 2,
RepairThreshold: 2,
SuccessThreshold: 4,
MaxThreshold: 4,
}, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
path, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, path)
require.NoError(t, err)
badPiece := pointer.Remote.RemotePieces[0]
badNode := badPiece.NodeId
randomIndex, err := audit.GetRandomStripe(ctx, pointer)
require.NoError(t, err)
orders := satellite.Orders.Service
containment := satellite.DB.Containment()
projects, err := satellite.DB.Console().Projects().GetAll(ctx)
require.NoError(t, err)
bucketID := []byte(storj.JoinPaths(projects[0].ID.String(), "testbucket"))
shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize()
pieces := pointer.GetRemote().GetRemotePieces()
rootPieceID := pointer.GetRemote().RootPieceId
limit, privateKey, err := orders.CreateAuditOrderLimit(ctx, bucketID, badNode, badPiece.PieceNum, rootPieceID, shareSize)
require.NoError(t, err)
share, err := audits.Verifier.GetShare(ctx, limit, privateKey, randomIndex, shareSize, int(pieces[0].PieceNum))
require.NoError(t, err)
pending := &audit.PendingAudit{
NodeID: badNode,
PieceID: pointer.Remote.RootPieceId,
StripeIndex: randomIndex,
ShareSize: shareSize,
ExpectedShareHash: pkcrypto.SHA256Hash(share.Data),
ReverifyCount: 0,
Path: path,
}
err = containment.IncrementPending(ctx, pending)
require.NoError(t, err)
for _, node := range planet.StorageNodes {
if node.ID() == badNode {
badNodeDB := node.DB.(*testblobs.BadDB)
// return an error when the satellite requests a share
badNodeDB.SetError(errs.New("unknown error"))
break
}
}
report, err := audits.Verifier.Reverify(ctx, path)
require.NoError(t, err)
require.Len(t, report.Successes, 0)
require.Len(t, report.Fails, 0)
require.Len(t, report.Offlines, 0)
require.Len(t, report.PendingAudits, 0)
require.Len(t, report.Unknown, 1)
require.Equal(t, report.Unknown[0], badNode)
})
}

View File

@ -120,6 +120,7 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[
var offlineNodes storj.NodeIDList
var failedNodes storj.NodeIDList
var unknownNodes storj.NodeIDList
containedNodes := make(map[int]storj.NodeID)
sharesToAudit := make(map[int]Share)
@ -180,8 +181,8 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[
continue
}
// unknown transport error
containedNodes[pieceNum] = share.NodeID
verifier.log.Debug("Verify: unknown transport error (contained)",
unknownNodes = append(unknownNodes, share.NodeID)
verifier.log.Debug("Verify: unknown transport error (skipped)",
zap.Bool("Piece Hash Verified", pointer.PieceHashesVerified),
zap.Binary("Segment", []byte(path)),
zap.Stringer("Node ID", share.NodeID),
@ -212,8 +213,8 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[
}
// unknown error
containedNodes[pieceNum] = share.NodeID
verifier.log.Debug("Verify: unknown error (contained)",
unknownNodes = append(unknownNodes, share.NodeID)
verifier.log.Debug("Verify: unknown error (skipped)",
zap.Bool("Piece Hash Verified", pointer.PieceHashesVerified),
zap.Binary("Segment", []byte(path)),
zap.Stringer("Node ID", share.NodeID),
@ -227,6 +228,7 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[
return Report{
Fails: failedNodes,
Offlines: offlineNodes,
Unknown: unknownNodes,
}, ErrNotEnoughShares.New("got %d, required %d", len(sharesToAudit), required)
}
@ -235,6 +237,7 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[
return Report{
Fails: failedNodes,
Offlines: offlineNodes,
Unknown: unknownNodes,
}, err
}
@ -247,44 +250,50 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[
verifier.log.Warn("Verify: failed to delete failed pieces", zap.Binary("Segment", []byte(path)), zap.Error(err))
}
successNodes := getSuccessNodes(ctx, shares, failedNodes, offlineNodes, containedNodes)
successNodes := getSuccessNodes(ctx, shares, failedNodes, offlineNodes, unknownNodes, containedNodes)
totalInPointer := len(pointer.GetRemote().GetRemotePieces())
numOffline := len(offlineNodes)
numSuccessful := len(successNodes)
numFailed := len(failedNodes)
numContained := len(containedNodes)
numUnknown := len(unknownNodes)
totalAudited := numSuccessful + numFailed + numOffline + numContained
auditedPercentage := float64(totalAudited) / float64(totalInPointer)
offlinePercentage := float64(0)
successfulPercentage := float64(0)
failedPercentage := float64(0)
containedPercentage := float64(0)
unknownPercentage := float64(0)
if totalAudited > 0 {
offlinePercentage = float64(numOffline) / float64(totalAudited)
successfulPercentage = float64(numSuccessful) / float64(totalAudited)
failedPercentage = float64(numFailed) / float64(totalAudited)
containedPercentage = float64(numContained) / float64(totalAudited)
unknownPercentage = float64(numUnknown) / float64(totalAudited)
}
mon.Meter("audit_success_nodes_global").Mark(numSuccessful)
mon.Meter("audit_fail_nodes_global").Mark(numFailed)
mon.Meter("audit_offline_nodes_global").Mark(numOffline)
mon.Meter("audit_contained_nodes_global").Mark(numContained)
mon.Meter("audit_total_nodes_global").Mark(totalAudited)
mon.Meter("audit_total_pointer_nodes_global").Mark(totalInPointer)
mon.Meter("audit_success_nodes_global").Mark(numSuccessful) //locked
mon.Meter("audit_fail_nodes_global").Mark(numFailed) //locked
mon.Meter("audit_offline_nodes_global").Mark(numOffline) //locked
mon.Meter("audit_contained_nodes_global").Mark(numContained) //locked
mon.Meter("audit_unknown_nodes_global").Mark(numUnknown) //locked
mon.Meter("audit_total_nodes_global").Mark(totalAudited) //locked
mon.Meter("audit_total_pointer_nodes_global").Mark(totalInPointer) //locked
mon.IntVal("audit_success_nodes").Observe(int64(numSuccessful))
mon.IntVal("audit_fail_nodes").Observe(int64(numFailed))
mon.IntVal("audit_offline_nodes").Observe(int64(numOffline))
mon.IntVal("audit_contained_nodes").Observe(int64(numContained))
mon.IntVal("audit_total_nodes").Observe(int64(totalAudited))
mon.IntVal("audit_total_pointer_nodes").Observe(int64(totalInPointer))
mon.FloatVal("audited_percentage").Observe(auditedPercentage)
mon.FloatVal("audit_offline_percentage").Observe(offlinePercentage)
mon.FloatVal("audit_successful_percentage").Observe(successfulPercentage)
mon.FloatVal("audit_failed_percentage").Observe(failedPercentage)
mon.FloatVal("audit_contained_percentage").Observe(containedPercentage)
mon.IntVal("audit_success_nodes").Observe(int64(numSuccessful)) //locked
mon.IntVal("audit_fail_nodes").Observe(int64(numFailed)) //locked
mon.IntVal("audit_offline_nodes").Observe(int64(numOffline)) //locked
mon.IntVal("audit_contained_nodes").Observe(int64(numContained)) //locked
mon.IntVal("audit_unknown_nodes").Observe(int64(numUnknown)) //locked
mon.IntVal("audit_total_nodes").Observe(int64(totalAudited)) //locked
mon.IntVal("audit_total_pointer_nodes").Observe(int64(totalInPointer)) //locked
mon.FloatVal("audited_percentage").Observe(auditedPercentage) //locked
mon.FloatVal("audit_offline_percentage").Observe(offlinePercentage) //locked
mon.FloatVal("audit_successful_percentage").Observe(successfulPercentage) //locked
mon.FloatVal("audit_failed_percentage").Observe(failedPercentage) //locked
mon.FloatVal("audit_contained_percentage").Observe(containedPercentage) //locked
mon.FloatVal("audit_unknown_percentage").Observe(unknownPercentage) //locked
pendingAudits, err := createPendingAudits(ctx, containedNodes, correctedShares, pointer, randomIndex, path)
if err != nil {
@ -292,6 +301,7 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[
Successes: successNodes,
Fails: failedNodes,
Offlines: offlineNodes,
Unknown: unknownNodes,
}, err
}
@ -300,6 +310,7 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[
Fails: failedNodes,
Offlines: offlineNodes,
PendingAudits: pendingAudits,
Unknown: unknownNodes,
}, nil
}
@ -351,6 +362,7 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report
offline
failed
contained
unknown
erred
)
@ -534,8 +546,8 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report
return
}
// unknown transport error
ch <- result{nodeID: pending.NodeID, status: contained, pendingAudit: pending}
verifier.log.Debug("Reverify: unknown transport error (contained)", zap.Binary("Segment", []byte(pending.Path)), zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
ch <- result{nodeID: pending.NodeID, status: unknown, pendingAudit: pending}
verifier.log.Debug("Reverify: unknown transport error (skipped)", zap.Binary("Segment", []byte(pending.Path)), zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
return
}
if errs2.IsRPC(err, rpcstatus.NotFound) {
@ -563,8 +575,8 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report
return
}
// unknown error
ch <- result{nodeID: pending.NodeID, status: contained, pendingAudit: pending}
verifier.log.Debug("Reverify: unknown error (contained)", zap.Binary("Segment", []byte(pending.Path)), zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
ch <- result{nodeID: pending.NodeID, status: unknown, pendingAudit: pending}
verifier.log.Debug("Reverify: unknown error (skipped)", zap.Binary("Segment", []byte(pending.Path)), zap.Stringer("Node ID", pending.NodeID), zap.Error(err))
return
}
downloadedHash := pkcrypto.SHA256Hash(share.Data)
@ -601,23 +613,27 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report
report.Fails = append(report.Fails, result.nodeID)
case contained:
report.PendingAudits = append(report.PendingAudits, result.pendingAudit)
case unknown:
report.Unknown = append(report.Unknown, result.nodeID)
case erred:
err = errs.Combine(err, result.err)
}
}
mon.Meter("reverify_successes_global").Mark(len(report.Successes))
mon.Meter("reverify_offlines_global").Mark(len(report.Offlines))
mon.Meter("reverify_fails_global").Mark(len(report.Fails))
mon.Meter("reverify_contained_global").Mark(len(report.PendingAudits))
mon.Meter("reverify_successes_global").Mark(len(report.Successes)) //locked
mon.Meter("reverify_offlines_global").Mark(len(report.Offlines)) //locked
mon.Meter("reverify_fails_global").Mark(len(report.Fails)) //locked
mon.Meter("reverify_contained_global").Mark(len(report.PendingAudits)) //locked
mon.Meter("reverify_unknown_global").Mark(len(report.Unknown)) //locked
mon.IntVal("reverify_successes").Observe(int64(len(report.Successes)))
mon.IntVal("reverify_offlines").Observe(int64(len(report.Offlines)))
mon.IntVal("reverify_fails").Observe(int64(len(report.Fails)))
mon.IntVal("reverify_contained").Observe(int64(len(report.PendingAudits)))
mon.IntVal("reverify_successes").Observe(int64(len(report.Successes))) //locked
mon.IntVal("reverify_offlines").Observe(int64(len(report.Offlines))) //locked
mon.IntVal("reverify_fails").Observe(int64(len(report.Fails))) //locked
mon.IntVal("reverify_contained").Observe(int64(len(report.PendingAudits))) //locked
mon.IntVal("reverify_unknown").Observe(int64(len(report.Unknown))) //locked
mon.IntVal("reverify_contained_in_segment").Observe(containedInSegment)
mon.IntVal("reverify_total_in_segment").Observe(int64(len(pieces)))
mon.IntVal("reverify_contained_in_segment").Observe(containedInSegment) //locked
mon.IntVal("reverify_total_in_segment").Observe(int64(len(pieces))) //locked
return report, err
}
@ -784,7 +800,7 @@ func getOfflineNodes(pointer *pb.Pointer, limits []*pb.AddressedOrderLimit, skip
}
// getSuccessNodes uses the failed nodes, offline nodes and contained nodes arrays to determine which nodes passed the audit
func getSuccessNodes(ctx context.Context, shares map[int]Share, failedNodes, offlineNodes storj.NodeIDList, containedNodes map[int]storj.NodeID) (successNodes storj.NodeIDList) {
func getSuccessNodes(ctx context.Context, shares map[int]Share, failedNodes, offlineNodes, unknownNodes storj.NodeIDList, containedNodes map[int]storj.NodeID) (successNodes storj.NodeIDList) {
defer mon.Task()(&ctx)(nil)
fails := make(map[storj.NodeID]bool)
for _, fail := range failedNodes {
@ -793,6 +809,9 @@ func getSuccessNodes(ctx context.Context, shares map[int]Share, failedNodes, off
for _, offline := range offlineNodes {
fails[offline] = true
}
for _, unknown := range unknownNodes {
fails[unknown] = true
}
for _, contained := range containedNodes {
fails[contained] = true
}

View File

@ -25,9 +25,11 @@ import (
"storj.io/storj/private/testcontext"
"storj.io/storj/private/testplanet"
"storj.io/storj/private/testrand"
"storj.io/storj/satellite"
"storj.io/storj/satellite/audit"
"storj.io/storj/storage"
"storj.io/storj/storagenode"
"storj.io/storj/uplink"
)
// TestDownloadSharesHappyPath checks that the Share.Error field of all shares
@ -750,3 +752,123 @@ func TestVerifierModifiedSegmentFailsOnce(t *testing.T) {
}
})
}
// TestVerifierSlowDownload checks that a node that times out while sending data to the
// audit service gets put into containment mode
func TestVerifierSlowDownload(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
NewStorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
return testblobs.NewSlowDB(log.Named("slowdb"), db), nil
},
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
// These config values are chosen to force the slow node to time out without timing out on the three normal nodes
config.Audit.MinBytesPerSecond = 100 * memory.KiB
config.Audit.MinDownloadTimeout = 500 * time.Millisecond
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := ul.UploadWithConfig(ctx, satellite, &uplink.RSConfig{
MinThreshold: 2,
RepairThreshold: 2,
SuccessThreshold: 4,
MaxThreshold: 4,
}, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
path, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, path)
require.NoError(t, err)
slowNode := pointer.Remote.RemotePieces[0].NodeId
for _, node := range planet.StorageNodes {
if node.ID() == slowNode {
slowNodeDB := node.DB.(*testblobs.SlowDB)
// make downloads on storage node slower than the timeout on the satellite for downloading shares
delay := 1 * time.Second
slowNodeDB.SetLatency(delay)
break
}
}
report, err := audits.Verifier.Verify(ctx, path, nil)
require.NoError(t, err)
require.Len(t, report.Successes, 3)
require.Len(t, report.Fails, 0)
require.Len(t, report.Offlines, 0)
require.Len(t, report.PendingAudits, 1)
require.Len(t, report.Unknown, 0)
require.Equal(t, report.PendingAudits[0].NodeID, slowNode)
})
}
// TestVerifierUnknownError checks that a node that returns an unknown error in response to an audit request
// does not get marked as successful, failed, or contained
func TestVerifierUnknownError(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
NewStorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
return testblobs.NewBadDB(log.Named("baddb"), db), nil
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
queue := audits.Queue
audits.Worker.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := ul.UploadWithConfig(ctx, satellite, &uplink.RSConfig{
MinThreshold: 2,
RepairThreshold: 2,
SuccessThreshold: 4,
MaxThreshold: 4,
}, "testbucket", "test/path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
path, err := queue.Next()
require.NoError(t, err)
pointer, err := satellite.Metainfo.Service.Get(ctx, path)
require.NoError(t, err)
badNode := pointer.Remote.RemotePieces[0].NodeId
for _, node := range planet.StorageNodes {
if node.ID() == badNode {
badNodeDB := node.DB.(*testblobs.BadDB)
// return an error when the verifier attempts to download from this node
badNodeDB.SetError(errs.New("unknown error"))
break
}
}
report, err := audits.Verifier.Verify(ctx, path, nil)
require.NoError(t, err)
require.Len(t, report.Successes, 3)
require.Len(t, report.Fails, 0)
require.Len(t, report.Offlines, 0)
require.Len(t, report.PendingAudits, 0)
require.Len(t, report.Unknown, 1)
require.Equal(t, report.Unknown[0], badNode)
})
}

View File

@ -130,6 +130,9 @@ func (worker *Worker) work(ctx context.Context, path storj.Path) error {
for _, pending := range report.PendingAudits {
skip[pending.NodeID] = true
}
for _, nodeID := range report.Unknown {
skip[nodeID] = true
}
// Next, audit the the remaining nodes that are not in containment mode.
report, err = worker.verifier.Verify(ctx, path, skip)