satellite/{audit,gracefulexit}: remove logic for PieceHashesVerified
We now have the piece hashes verified for all segments on all production satellites. We can remove the code that handles the case where piece hashes are not verified. This would make easier the migration of services from PointerDB to the new metabase. For consistency, PieceHashesVerified is still set to true in PointerDB for new segments. Change-Id: Idf0ccce4c8d01ae812f11e8384a7221d90d4c183
This commit is contained in:
parent
9de1617db0
commit
53b7fd7b00
@ -197,91 +197,6 @@ func TestReverifyFailMissingShare(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
// TestReverifyFailMissingShareHashesNotVerified tests that if piece hashes were not verified for a pointer,
|
||||
// a node that fails an audit for that pointer does not get marked as failing an audit, but is removed from
|
||||
// the pointer.
|
||||
func TestReverifyFailMissingShareNotVerified(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
satellite := planet.Satellites[0]
|
||||
audits := satellite.Audit
|
||||
audits.Worker.Loop.Pause()
|
||||
audits.Chore.Loop.Pause()
|
||||
|
||||
ul := planet.Uplinks[0]
|
||||
testData := testrand.Bytes(8 * memory.KiB)
|
||||
|
||||
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
||||
require.NoError(t, err)
|
||||
|
||||
audits.Chore.Loop.TriggerWait()
|
||||
queue := audits.Queues.Fetch()
|
||||
path, err := queue.Next()
|
||||
require.NoError(t, err)
|
||||
|
||||
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
|
||||
require.NoError(t, err)
|
||||
|
||||
randomIndex, err := audit.GetRandomStripe(ctx, pointer)
|
||||
require.NoError(t, err)
|
||||
|
||||
orders := satellite.Orders.Service
|
||||
containment := satellite.DB.Containment()
|
||||
|
||||
bucket := metabase.BucketLocation{ProjectID: ul.Projects[0].ID, BucketName: "testbucket"}
|
||||
shareSize := pointer.GetRemote().GetRedundancy().GetErasureShareSize()
|
||||
|
||||
pieces := pointer.GetRemote().GetRemotePieces()
|
||||
rootPieceID := pointer.GetRemote().RootPieceId
|
||||
limit, privateKey, cachedIPAndPort, err := orders.CreateAuditOrderLimit(ctx, bucket, pieces[0].NodeId, pieces[0].PieceNum, rootPieceID, shareSize)
|
||||
require.NoError(t, err)
|
||||
|
||||
share, err := audits.Verifier.GetShare(ctx, limit, privateKey, cachedIPAndPort, randomIndex, shareSize, int(pieces[0].PieceNum))
|
||||
require.NoError(t, err)
|
||||
|
||||
pending := &audit.PendingAudit{
|
||||
NodeID: pieces[0].NodeId,
|
||||
PieceID: rootPieceID,
|
||||
StripeIndex: randomIndex,
|
||||
ShareSize: shareSize,
|
||||
ExpectedShareHash: pkcrypto.SHA256Hash(share.Data),
|
||||
ReverifyCount: 0,
|
||||
Path: path,
|
||||
}
|
||||
|
||||
err = containment.IncrementPending(ctx, pending)
|
||||
require.NoError(t, err)
|
||||
|
||||
// update pointer to have PieceHashesVerified false
|
||||
err = satellite.Metainfo.Service.UnsynchronizedDelete(ctx, metabase.SegmentKey(path))
|
||||
require.NoError(t, err)
|
||||
pointer.PieceHashesVerified = false
|
||||
err = satellite.Metainfo.Service.Put(ctx, metabase.SegmentKey(path), pointer)
|
||||
require.NoError(t, err)
|
||||
|
||||
// delete the piece from the first node
|
||||
piece := pieces[0]
|
||||
pieceID := pointer.GetRemote().RootPieceId.Derive(piece.NodeId, piece.PieceNum)
|
||||
node := planet.FindNode(piece.NodeId)
|
||||
err = node.Storage2.Store.Delete(ctx, satellite.ID(), pieceID)
|
||||
require.NoError(t, err)
|
||||
|
||||
report, err := audits.Verifier.Reverify(ctx, path)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, report.Successes, 0)
|
||||
require.Len(t, report.Offlines, 0)
|
||||
require.Len(t, report.PendingAudits, 0)
|
||||
// expect no failed audit
|
||||
require.Len(t, report.Fails, 0)
|
||||
|
||||
// make sure that pending audit is removed
|
||||
_, err = containment.Get(ctx, pending.NodeID)
|
||||
require.True(t, audit.ErrContainedNotFound.Has(err))
|
||||
})
|
||||
}
|
||||
|
||||
func TestReverifyFailBadData(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
|
@ -8,7 +8,6 @@ import (
|
||||
"context"
|
||||
"io"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/spacemonkeygo/monkit/v3"
|
||||
@ -100,14 +99,6 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[
|
||||
return Report{}, nil
|
||||
}
|
||||
|
||||
defer func() {
|
||||
// if piece hashes have not been verified for this segment, do not mark nodes as failing audit
|
||||
if !pointer.PieceHashesVerified {
|
||||
report.PendingAudits = nil
|
||||
report.Fails = nil
|
||||
}
|
||||
}()
|
||||
|
||||
randomIndex, err := GetRandomStripe(ctx, pointer)
|
||||
if err != nil {
|
||||
return Report{}, err
|
||||
@ -135,7 +126,6 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[
|
||||
offlineNodes = getOfflineNodes(pointer, orderLimits, skip)
|
||||
if len(offlineNodes) > 0 {
|
||||
verifier.log.Debug("Verify: order limits not created for some nodes (offline/disqualified)",
|
||||
zap.Bool("Piece Hash Verified", pointer.PieceHashesVerified),
|
||||
zap.Strings("Node IDs", offlineNodes.Strings()))
|
||||
}
|
||||
|
||||
@ -172,7 +162,6 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[
|
||||
// dial timeout
|
||||
offlineNodes = append(offlineNodes, share.NodeID)
|
||||
verifier.log.Debug("Verify: dial timeout (offline)",
|
||||
zap.Bool("Piece Hash Verified", pointer.PieceHashesVerified),
|
||||
zap.Stringer("Node ID", share.NodeID),
|
||||
zap.Error(share.Error))
|
||||
continue
|
||||
@ -181,7 +170,6 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[
|
||||
// dial failed -- offline node
|
||||
offlineNodes = append(offlineNodes, share.NodeID)
|
||||
verifier.log.Debug("Verify: dial failed (offline)",
|
||||
zap.Bool("Piece Hash Verified", pointer.PieceHashesVerified),
|
||||
zap.Stringer("Node ID", share.NodeID),
|
||||
zap.Error(share.Error))
|
||||
continue
|
||||
@ -189,7 +177,6 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[
|
||||
// unknown transport error
|
||||
unknownNodes = append(unknownNodes, share.NodeID)
|
||||
verifier.log.Info("Verify: unknown transport error (skipped)",
|
||||
zap.Bool("Piece Hash Verified", pointer.PieceHashesVerified),
|
||||
zap.Stringer("Node ID", share.NodeID),
|
||||
zap.Error(share.Error))
|
||||
continue
|
||||
@ -199,7 +186,6 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[
|
||||
// missing share
|
||||
failedNodes = append(failedNodes, share.NodeID)
|
||||
verifier.log.Info("Verify: piece not found (audit failed)",
|
||||
zap.Bool("Piece Hash Verified", pointer.PieceHashesVerified),
|
||||
zap.Stringer("Node ID", share.NodeID),
|
||||
zap.Error(share.Error))
|
||||
continue
|
||||
@ -209,7 +195,6 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[
|
||||
// dial successful, but download timed out
|
||||
containedNodes[pieceNum] = share.NodeID
|
||||
verifier.log.Info("Verify: download timeout (contained)",
|
||||
zap.Bool("Piece Hash Verified", pointer.PieceHashesVerified),
|
||||
zap.Stringer("Node ID", share.NodeID),
|
||||
zap.Error(share.Error))
|
||||
continue
|
||||
@ -218,7 +203,6 @@ func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[
|
||||
// unknown error
|
||||
unknownNodes = append(unknownNodes, share.NodeID)
|
||||
verifier.log.Info("Verify: unknown error (skipped)",
|
||||
zap.Bool("Piece Hash Verified", pointer.PieceHashesVerified),
|
||||
zap.Stringer("Node ID", share.NodeID),
|
||||
zap.Error(share.Error))
|
||||
}
|
||||
@ -389,43 +373,6 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report
|
||||
return Report{}, nil
|
||||
}
|
||||
|
||||
pieceHashesVerified := make(map[storj.NodeID]bool)
|
||||
pieceHashesVerifiedMutex := &sync.Mutex{}
|
||||
defer func() {
|
||||
pieceHashesVerifiedMutex.Lock()
|
||||
|
||||
// for each node in Fails and PendingAudits, remove if piece hashes not verified for that segment
|
||||
// if the piece hashes are not verified, remove the "failed" node from containment
|
||||
newFails := storj.NodeIDList{}
|
||||
newPendingAudits := []*PendingAudit{}
|
||||
|
||||
for _, id := range report.Fails {
|
||||
if pieceHashesVerified[id] {
|
||||
newFails = append(newFails, id)
|
||||
} else {
|
||||
_, errDelete := verifier.containment.Delete(ctx, id)
|
||||
if errDelete != nil {
|
||||
verifier.log.Debug("Error deleting node from containment db", zap.Stringer("Node ID", id), zap.Error(errDelete))
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, pending := range report.PendingAudits {
|
||||
if pieceHashesVerified[pending.NodeID] {
|
||||
newPendingAudits = append(newPendingAudits, pending)
|
||||
} else {
|
||||
_, errDelete := verifier.containment.Delete(ctx, pending.NodeID)
|
||||
if errDelete != nil {
|
||||
verifier.log.Debug("Error deleting node from containment db", zap.Stringer("Node ID", pending.NodeID), zap.Error(errDelete))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
report.Fails = newFails
|
||||
report.PendingAudits = newPendingAudits
|
||||
|
||||
pieceHashesVerifiedMutex.Unlock()
|
||||
}()
|
||||
|
||||
pieces := pointer.GetRemote().GetRemotePieces()
|
||||
ch := make(chan result, len(pieces))
|
||||
var containedInSegment int64
|
||||
@ -461,11 +408,6 @@ func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report
|
||||
return
|
||||
}
|
||||
|
||||
// set whether piece hashes have been verified for this segment so we know whether to report a failed or pending audit for this node
|
||||
pieceHashesVerifiedMutex.Lock()
|
||||
pieceHashesVerified[pending.NodeID] = pendingPointer.PieceHashesVerified
|
||||
pieceHashesVerifiedMutex.Unlock()
|
||||
|
||||
if pendingPointer.GetRemote().RootPieceId != pending.PieceID {
|
||||
ch <- result{nodeID: pending.NodeID, status: skipped}
|
||||
return
|
||||
|
@ -507,59 +507,6 @@ func TestVerifierMissingPiece(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
// TestVerifierMissingPieceHashesNotVerified tests that if piece hashes were not verified for a pointer,
|
||||
// a node that fails an audit for that pointer does not get marked as failing an audit, but is removed from
|
||||
// the pointer.
|
||||
func TestVerifierMissingPieceHashesNotVerified(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
satellite := planet.Satellites[0]
|
||||
audits := satellite.Audit
|
||||
|
||||
audits.Worker.Loop.Pause()
|
||||
audits.Chore.Loop.Pause()
|
||||
|
||||
ul := planet.Uplinks[0]
|
||||
testData := testrand.Bytes(8 * memory.KiB)
|
||||
|
||||
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
|
||||
require.NoError(t, err)
|
||||
|
||||
audits.Chore.Loop.TriggerWait()
|
||||
queue := audits.Queues.Fetch()
|
||||
path, err := queue.Next()
|
||||
require.NoError(t, err)
|
||||
|
||||
pointer, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(path))
|
||||
require.NoError(t, err)
|
||||
|
||||
// update pointer to have PieceHashesVerified false
|
||||
err = satellite.Metainfo.Service.UnsynchronizedDelete(ctx, metabase.SegmentKey(path))
|
||||
require.NoError(t, err)
|
||||
pointer.PieceHashesVerified = false
|
||||
err = satellite.Metainfo.Service.Put(ctx, metabase.SegmentKey(path), pointer)
|
||||
require.NoError(t, err)
|
||||
|
||||
// delete the piece from the first node
|
||||
origNumPieces := len(pointer.GetRemote().GetRemotePieces())
|
||||
piece := pointer.GetRemote().GetRemotePieces()[0]
|
||||
pieceID := pointer.GetRemote().RootPieceId.Derive(piece.NodeId, piece.PieceNum)
|
||||
node := planet.FindNode(piece.NodeId)
|
||||
err = node.Storage2.Store.Delete(ctx, satellite.ID(), pieceID)
|
||||
require.NoError(t, err)
|
||||
|
||||
report, err := audits.Verifier.Verify(ctx, path, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Len(t, report.Successes, origNumPieces-1)
|
||||
// expect no failed audit
|
||||
assert.Len(t, report.Fails, 0)
|
||||
assert.Len(t, report.Offlines, 0)
|
||||
assert.Len(t, report.PendingAudits, 0)
|
||||
})
|
||||
}
|
||||
|
||||
func TestVerifierDialTimeout(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
|
@ -583,13 +583,9 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *PendingMap,
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
// If the pointer was piece hash verified, we know this node definitely should have the piece
|
||||
// Otherwise, no penalty.
|
||||
if pointer.PieceHashesVerified {
|
||||
err = endpoint.db.IncrementProgress(ctx, nodeID, 0, 0, 1)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
err = endpoint.db.IncrementProgress(ctx, nodeID, 0, 0, 1)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.Key, transfer.PieceNum)
|
||||
|
@ -31,7 +31,6 @@ import (
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/metainfo/metabase"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
"storj.io/storj/storage"
|
||||
"storj.io/storj/storagenode"
|
||||
"storj.io/storj/storagenode/gracefulexit"
|
||||
)
|
||||
@ -1070,7 +1069,7 @@ func TestPointerChangedOrDeleted(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestFailureNotFoundPieceHashVerified(t *testing.T) {
|
||||
func TestFailureNotFound(t *testing.T) {
|
||||
testTransfers(t, 1, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.Satellite, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
|
||||
response, err := processClient.Recv()
|
||||
require.NoError(t, err)
|
||||
@ -1138,99 +1137,6 @@ func TestFailureNotFoundPieceHashVerified(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
func TestFailureNotFoundPieceHashUnverified(t *testing.T) {
|
||||
testTransfers(t, 1, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.Satellite, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
|
||||
// retrieve remote segment
|
||||
keys, err := satellite.Metainfo.Database.List(ctx, nil, -1)
|
||||
require.NoError(t, err)
|
||||
|
||||
var oldPointer *pb.Pointer
|
||||
var path []byte
|
||||
for _, key := range keys {
|
||||
p, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(key))
|
||||
require.NoError(t, err)
|
||||
|
||||
if p.GetRemote() != nil {
|
||||
oldPointer = p
|
||||
path = key
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// replace pointer with non-piece-hash-verified pointer
|
||||
require.NotNil(t, oldPointer)
|
||||
oldPointerBytes, err := pb.Marshal(oldPointer)
|
||||
require.NoError(t, err)
|
||||
newPointer := &pb.Pointer{}
|
||||
err = pb.Unmarshal(oldPointerBytes, newPointer)
|
||||
require.NoError(t, err)
|
||||
newPointer.PieceHashesVerified = false
|
||||
newPointerBytes, err := pb.Marshal(newPointer)
|
||||
require.NoError(t, err)
|
||||
err = satellite.Metainfo.Database.CompareAndSwap(ctx, storage.Key(path), oldPointerBytes, newPointerBytes)
|
||||
require.NoError(t, err)
|
||||
|
||||
// begin processing graceful exit messages
|
||||
response, err := processClient.Recv()
|
||||
require.NoError(t, err)
|
||||
|
||||
switch m := response.GetMessage().(type) {
|
||||
case *pb.SatelliteMessage_TransferPiece:
|
||||
require.NotNil(t, m)
|
||||
|
||||
message := &pb.StorageNodeMessage{
|
||||
Message: &pb.StorageNodeMessage_Failed{
|
||||
Failed: &pb.TransferFailed{
|
||||
OriginalPieceId: m.TransferPiece.OriginalPieceId,
|
||||
Error: pb.TransferFailed_NOT_FOUND,
|
||||
},
|
||||
},
|
||||
}
|
||||
err = processClient.Send(message)
|
||||
require.NoError(t, err)
|
||||
default:
|
||||
require.FailNow(t, "should not reach this case: %#v", m)
|
||||
}
|
||||
|
||||
response, err = processClient.Recv()
|
||||
require.NoError(t, err)
|
||||
|
||||
switch m := response.GetMessage().(type) {
|
||||
case *pb.SatelliteMessage_ExitCompleted:
|
||||
require.NotNil(t, m)
|
||||
default:
|
||||
require.FailNow(t, "should not reach this case: %#v", m)
|
||||
}
|
||||
|
||||
// check that node is no longer in the pointer
|
||||
keys, err = satellite.Metainfo.Database.List(ctx, nil, -1)
|
||||
require.NoError(t, err)
|
||||
|
||||
var pointer *pb.Pointer
|
||||
for _, key := range keys {
|
||||
p, err := satellite.Metainfo.Service.Get(ctx, metabase.SegmentKey(key))
|
||||
require.NoError(t, err)
|
||||
|
||||
if p.GetRemote() != nil {
|
||||
pointer = p
|
||||
break
|
||||
}
|
||||
}
|
||||
require.NotNil(t, pointer)
|
||||
for _, piece := range pointer.GetRemote().GetRemotePieces() {
|
||||
require.NotEqual(t, piece.NodeId, exitingNode.ID())
|
||||
}
|
||||
|
||||
// check that the exit has completed and we have the correct transferred/failed values
|
||||
progress, err := satellite.DB.GracefulExit().GetProgress(ctx, exitingNode.ID())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, int64(0), progress.PiecesTransferred)
|
||||
require.Equal(t, int64(0), progress.PiecesFailed)
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func TestFailureStorageNodeIgnoresTransferMessages(t *testing.T) {
|
||||
var maxOrderLimitSendCount = 3
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
|
Loading…
Reference in New Issue
Block a user