overlay.UpdateStats removes node from containment mode (#2419)
This commit is contained in:
parent
f56cb1c612
commit
d32c907440
@ -6,20 +6,25 @@ package audit_test
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"storj.io/storj/internal/testcontext"
|
||||
"storj.io/storj/internal/testplanet"
|
||||
"storj.io/storj/internal/testrand"
|
||||
"storj.io/storj/pkg/audit"
|
||||
"storj.io/storj/pkg/overlay"
|
||||
"storj.io/storj/pkg/pkcrypto"
|
||||
"storj.io/storj/pkg/storj"
|
||||
)
|
||||
|
||||
func TestContainIncrementAndGet(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4,
|
||||
SatelliteCount: 1, StorageNodeCount: 2,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
containment := planet.Satellites[0].DB.Containment()
|
||||
cache := planet.Satellites[0].DB.OverlayCache()
|
||||
|
||||
input := &audit.PendingAudit{
|
||||
NodeID: planet.StorageNodes[0].ID(),
|
||||
PieceID: storj.PieceID{},
|
||||
@ -29,30 +34,32 @@ func TestContainIncrementAndGet(t *testing.T) {
|
||||
ReverifyCount: 0,
|
||||
}
|
||||
|
||||
err := planet.Satellites[0].DB.Containment().IncrementPending(ctx, input)
|
||||
err := containment.IncrementPending(ctx, input)
|
||||
require.NoError(t, err)
|
||||
|
||||
output, err := planet.Satellites[0].DB.Containment().Get(ctx, input.NodeID)
|
||||
output, err := containment.Get(ctx, input.NodeID)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, input, output)
|
||||
assert.Equal(t, input, output)
|
||||
|
||||
// check contained flag set to true
|
||||
node, err := planet.Satellites[0].DB.OverlayCache().Get(ctx, input.NodeID)
|
||||
node, err := cache.Get(ctx, input.NodeID)
|
||||
require.NoError(t, err)
|
||||
require.True(t, node.Contained)
|
||||
assert.True(t, node.Contained)
|
||||
|
||||
nodeID1 := planet.StorageNodes[1].ID()
|
||||
_, err = planet.Satellites[0].DB.Containment().Get(ctx, nodeID1)
|
||||
_, err = containment.Get(ctx, nodeID1)
|
||||
require.Error(t, err, audit.ErrContainedNotFound.New(nodeID1.String()))
|
||||
require.True(t, audit.ErrContainedNotFound.Has(err))
|
||||
assert.True(t, audit.ErrContainedNotFound.Has(err))
|
||||
})
|
||||
}
|
||||
|
||||
func TestContainIncrementPendingEntryExists(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4,
|
||||
SatelliteCount: 1, StorageNodeCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
containment := planet.Satellites[0].DB.Containment()
|
||||
|
||||
info1 := &audit.PendingAudit{
|
||||
NodeID: planet.StorageNodes[0].ID(),
|
||||
PieceID: storj.PieceID{},
|
||||
@ -62,7 +69,7 @@ func TestContainIncrementPendingEntryExists(t *testing.T) {
|
||||
ReverifyCount: 0,
|
||||
}
|
||||
|
||||
err := planet.Satellites[0].DB.Containment().IncrementPending(ctx, info1)
|
||||
err := containment.IncrementPending(ctx, info1)
|
||||
require.NoError(t, err)
|
||||
|
||||
info2 := &audit.PendingAudit{
|
||||
@ -75,28 +82,30 @@ func TestContainIncrementPendingEntryExists(t *testing.T) {
|
||||
}
|
||||
|
||||
// expect failure when an entry with the same nodeID but different expected share data already exists
|
||||
err = planet.Satellites[0].DB.Containment().IncrementPending(ctx, info2)
|
||||
require.Error(t, err)
|
||||
require.True(t, audit.ErrAlreadyExists.Has(err))
|
||||
err = containment.IncrementPending(ctx, info2)
|
||||
assert.True(t, audit.ErrAlreadyExists.Has(err))
|
||||
|
||||
// expect reverify count for an entry to be 0 after first IncrementPending call
|
||||
pending, err := planet.Satellites[0].DB.Containment().Get(ctx, info1.NodeID)
|
||||
pending, err := containment.Get(ctx, info1.NodeID)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 0, pending.ReverifyCount)
|
||||
assert.EqualValues(t, 0, pending.ReverifyCount)
|
||||
|
||||
// expect reverify count to be 1 after second IncrementPending call
|
||||
err = planet.Satellites[0].DB.Containment().IncrementPending(ctx, info1)
|
||||
err = containment.IncrementPending(ctx, info1)
|
||||
require.NoError(t, err)
|
||||
pending, err = planet.Satellites[0].DB.Containment().Get(ctx, info1.NodeID)
|
||||
pending, err = containment.Get(ctx, info1.NodeID)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 1, pending.ReverifyCount)
|
||||
assert.EqualValues(t, 1, pending.ReverifyCount)
|
||||
})
|
||||
}
|
||||
|
||||
func TestContainDelete(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4,
|
||||
SatelliteCount: 1, StorageNodeCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
containment := planet.Satellites[0].DB.Containment()
|
||||
cache := planet.Satellites[0].DB.OverlayCache()
|
||||
|
||||
info1 := &audit.PendingAudit{
|
||||
NodeID: planet.StorageNodes[0].ID(),
|
||||
PieceID: storj.PieceID{},
|
||||
@ -106,31 +115,62 @@ func TestContainDelete(t *testing.T) {
|
||||
ReverifyCount: 0,
|
||||
}
|
||||
|
||||
err := planet.Satellites[0].DB.Containment().IncrementPending(ctx, info1)
|
||||
err := containment.IncrementPending(ctx, info1)
|
||||
require.NoError(t, err)
|
||||
|
||||
// check contained flag set to true
|
||||
node, err := planet.Satellites[0].DB.OverlayCache().Get(ctx, info1.NodeID)
|
||||
// delete the node from containment db
|
||||
isDeleted, err := containment.Delete(ctx, info1.NodeID)
|
||||
require.NoError(t, err)
|
||||
require.True(t, node.Contained)
|
||||
|
||||
isDeleted, err := planet.Satellites[0].DB.Containment().Delete(ctx, info1.NodeID)
|
||||
require.NoError(t, err)
|
||||
require.True(t, isDeleted)
|
||||
assert.True(t, isDeleted)
|
||||
|
||||
// check contained flag set to false
|
||||
node, err = planet.Satellites[0].DB.OverlayCache().Get(ctx, info1.NodeID)
|
||||
node, err := cache.Get(ctx, info1.NodeID)
|
||||
require.NoError(t, err)
|
||||
require.False(t, node.Contained)
|
||||
assert.False(t, node.Contained)
|
||||
|
||||
// get pending audit that doesn't exist
|
||||
_, err = planet.Satellites[0].DB.Containment().Get(ctx, info1.NodeID)
|
||||
require.Error(t, err, audit.ErrContainedNotFound.New(info1.NodeID.String()))
|
||||
require.True(t, audit.ErrContainedNotFound.Has(err))
|
||||
_, err = containment.Get(ctx, info1.NodeID)
|
||||
assert.Error(t, err, audit.ErrContainedNotFound.New(info1.NodeID.String()))
|
||||
assert.True(t, audit.ErrContainedNotFound.Has(err))
|
||||
|
||||
// delete pending audit that doesn't exist
|
||||
isDeleted, err = planet.Satellites[0].DB.Containment().Delete(ctx, info1.NodeID)
|
||||
isDeleted, err = containment.Delete(ctx, info1.NodeID)
|
||||
require.NoError(t, err)
|
||||
require.False(t, isDeleted)
|
||||
assert.False(t, isDeleted)
|
||||
})
|
||||
}
|
||||
|
||||
func TestContainUpdateStats(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
containment := planet.Satellites[0].DB.Containment()
|
||||
cache := planet.Satellites[0].DB.OverlayCache()
|
||||
|
||||
info1 := &audit.PendingAudit{
|
||||
NodeID: planet.StorageNodes[0].ID(),
|
||||
PieceID: storj.PieceID{},
|
||||
StripeIndex: 0,
|
||||
ShareSize: 0,
|
||||
ExpectedShareHash: pkcrypto.SHA256Hash(testrand.Bytes(10)),
|
||||
ReverifyCount: 0,
|
||||
}
|
||||
|
||||
err := containment.IncrementPending(ctx, info1)
|
||||
require.NoError(t, err)
|
||||
|
||||
// update node stats
|
||||
_, err = cache.UpdateStats(ctx, &overlay.UpdateRequest{NodeID: info1.NodeID})
|
||||
require.NoError(t, err)
|
||||
|
||||
// check contained flag set to false
|
||||
node, err := cache.Get(ctx, info1.NodeID)
|
||||
require.NoError(t, err)
|
||||
assert.False(t, node.Contained)
|
||||
|
||||
// get pending audit that doesn't exist
|
||||
_, err = containment.Get(ctx, info1.NodeID)
|
||||
assert.Error(t, err, audit.ErrContainedNotFound.New(info1.NodeID.String()))
|
||||
assert.True(t, audit.ErrContainedNotFound.Has(err))
|
||||
})
|
||||
}
|
||||
|
@ -129,13 +129,6 @@ func (reporter *Reporter) recordAuditFailStatus(ctx context.Context, failedAudit
|
||||
failed = append(failed, nodeID)
|
||||
errlist.Add(err)
|
||||
}
|
||||
|
||||
// TODO(kaloyan): Perhaps, this should be executed in the same Tx as overlay.UpdateStats above
|
||||
_, err = reporter.containment.Delete(ctx, nodeID)
|
||||
if err != nil {
|
||||
failed = append(failed, nodeID)
|
||||
errlist.Add(err)
|
||||
}
|
||||
}
|
||||
if len(failed) > 0 {
|
||||
return failed, errs.Combine(Error.New("failed to record some audit fail statuses in overlay"), errlist.Err())
|
||||
@ -174,13 +167,6 @@ func (reporter *Reporter) recordAuditSuccessStatus(ctx context.Context, successN
|
||||
failed = append(failed, nodeID)
|
||||
errlist.Add(err)
|
||||
}
|
||||
|
||||
// TODO(kaloyan): Perhaps, this should be executed in the same Tx as overlay.UpdateStats above
|
||||
_, err = reporter.containment.Delete(ctx, nodeID)
|
||||
if err != nil {
|
||||
failed = append(failed, nodeID)
|
||||
errlist.Add(err)
|
||||
}
|
||||
}
|
||||
if len(failed) > 0 {
|
||||
return failed, errs.Combine(Error.New("failed to record some audit success statuses in overlay"), errlist.Err())
|
||||
@ -210,13 +196,6 @@ func (reporter *Reporter) recordPendingAudits(ctx context.Context, pendingAudits
|
||||
failed = append(failed, pendingAudit)
|
||||
errlist.Add(err)
|
||||
}
|
||||
|
||||
// TODO(kaloyan): Perhaps, this should be executed in the same Tx as overlay.UpdateStats above
|
||||
_, err = reporter.containment.Delete(ctx, pendingAudit.NodeID)
|
||||
if err != nil {
|
||||
failed = append(failed, pendingAudit)
|
||||
errlist.Add(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(failed) > 0 {
|
||||
|
@ -688,11 +688,20 @@ func (cache *overlaycache) UpdateStats(ctx context.Context, updateReq *overlay.U
|
||||
updateFields.AuditSuccessCount = dbx.Node_AuditSuccessCount(dbNode.AuditSuccessCount + 1)
|
||||
}
|
||||
|
||||
// Updating node stats always exits it from containment mode
|
||||
updateFields.Contained = dbx.Node_Contained(false)
|
||||
|
||||
dbNode, err = tx.Update_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()), updateFields)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(errs.Combine(err, tx.Rollback()))
|
||||
}
|
||||
|
||||
// Cleanup containment table too
|
||||
_, err = tx.Delete_PendingAudits_By_NodeId(ctx, dbx.PendingAudits_NodeId(nodeID.Bytes()))
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(errs.Combine(err, tx.Rollback()))
|
||||
}
|
||||
|
||||
// TODO: Allegedly tx.Get_Node_By_Id and tx.Update_Node_By_Id should never return a nil value for dbNode,
|
||||
// however we've seen from some crashes that it does. We need to track down the cause of these crashes
|
||||
// but for now we're adding a nil check to prevent a panic.
|
||||
|
Loading…
Reference in New Issue
Block a user