cadb435d25
Since we increased the number of concurrent audit workers to two, there are going to be instances of a single node being audited simultaneously for different segments. If the node times out for both, we will try to write them both to the pending audits table, and the second will return an error since the path is not the same as what already exists. Since with concurrent workers this is expected, we will log the occurrence rather than return an error. Since the release default audit concurrency is 2, update testplanet default to run with concurrent workers as well. Change-Id: I4e657693fa3e825713a219af3835ae287bb062cb
147 lines
4.6 KiB
Go
147 lines
4.6 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package audit_test
|
|
|
|
import (
|
|
"testing"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"storj.io/common/pkcrypto"
|
|
"storj.io/common/testcontext"
|
|
"storj.io/common/testrand"
|
|
"storj.io/storj/private/testplanet"
|
|
"storj.io/storj/satellite/audit"
|
|
"storj.io/storj/satellite/overlay"
|
|
)
|
|
|
|
func TestContainIncrementAndGet(t *testing.T) {
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 2,
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
containment := planet.Satellites[0].DB.Containment()
|
|
cache := planet.Satellites[0].DB.OverlayCache()
|
|
|
|
input := &audit.PendingAudit{
|
|
NodeID: planet.StorageNodes[0].ID(),
|
|
ExpectedShareHash: pkcrypto.SHA256Hash(testrand.Bytes(10)),
|
|
}
|
|
|
|
err := containment.IncrementPending(ctx, input)
|
|
require.NoError(t, err)
|
|
|
|
output, err := containment.Get(ctx, input.NodeID)
|
|
require.NoError(t, err)
|
|
|
|
assert.Equal(t, input, output)
|
|
|
|
// check contained flag set to true
|
|
node, err := cache.Get(ctx, input.NodeID)
|
|
require.NoError(t, err)
|
|
assert.True(t, node.Contained)
|
|
|
|
nodeID1 := planet.StorageNodes[1].ID()
|
|
_, err = containment.Get(ctx, nodeID1)
|
|
require.Error(t, err, audit.ErrContainedNotFound.New("%v", nodeID1))
|
|
assert.True(t, audit.ErrContainedNotFound.Has(err))
|
|
})
|
|
}
|
|
|
|
func TestContainIncrementPendingEntryExists(t *testing.T) {
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 1,
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
containment := planet.Satellites[0].DB.Containment()
|
|
|
|
info1 := &audit.PendingAudit{
|
|
NodeID: planet.StorageNodes[0].ID(),
|
|
ExpectedShareHash: pkcrypto.SHA256Hash(testrand.Bytes(10)),
|
|
}
|
|
|
|
err := containment.IncrementPending(ctx, info1)
|
|
require.NoError(t, err)
|
|
|
|
// expect reverify count for an entry to be 0 after first IncrementPending call
|
|
pending, err := containment.Get(ctx, info1.NodeID)
|
|
require.NoError(t, err)
|
|
assert.EqualValues(t, 0, pending.ReverifyCount)
|
|
|
|
// expect reverify count to be 1 after second IncrementPending call
|
|
err = containment.IncrementPending(ctx, info1)
|
|
require.NoError(t, err)
|
|
pending, err = containment.Get(ctx, info1.NodeID)
|
|
require.NoError(t, err)
|
|
assert.EqualValues(t, 1, pending.ReverifyCount)
|
|
})
|
|
}
|
|
|
|
func TestContainDelete(t *testing.T) {
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 1,
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
containment := planet.Satellites[0].DB.Containment()
|
|
cache := planet.Satellites[0].DB.OverlayCache()
|
|
|
|
info1 := &audit.PendingAudit{
|
|
NodeID: planet.StorageNodes[0].ID(),
|
|
ExpectedShareHash: pkcrypto.SHA256Hash(testrand.Bytes(10)),
|
|
}
|
|
|
|
err := containment.IncrementPending(ctx, info1)
|
|
require.NoError(t, err)
|
|
|
|
// delete the node from containment db
|
|
isDeleted, err := containment.Delete(ctx, info1.NodeID)
|
|
require.NoError(t, err)
|
|
assert.True(t, isDeleted)
|
|
|
|
// check contained flag set to false
|
|
node, err := cache.Get(ctx, info1.NodeID)
|
|
require.NoError(t, err)
|
|
assert.False(t, node.Contained)
|
|
|
|
// get pending audit that doesn't exist
|
|
_, err = containment.Get(ctx, info1.NodeID)
|
|
assert.Error(t, err, audit.ErrContainedNotFound.New("%v", info1.NodeID))
|
|
assert.True(t, audit.ErrContainedNotFound.Has(err))
|
|
|
|
// delete pending audit that doesn't exist
|
|
isDeleted, err = containment.Delete(ctx, info1.NodeID)
|
|
require.NoError(t, err)
|
|
assert.False(t, isDeleted)
|
|
})
|
|
}
|
|
|
|
func TestContainUpdateStats(t *testing.T) {
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 1,
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
containment := planet.Satellites[0].DB.Containment()
|
|
cache := planet.Satellites[0].DB.OverlayCache()
|
|
|
|
info1 := &audit.PendingAudit{
|
|
NodeID: planet.StorageNodes[0].ID(),
|
|
ExpectedShareHash: pkcrypto.SHA256Hash(testrand.Bytes(10)),
|
|
}
|
|
|
|
err := containment.IncrementPending(ctx, info1)
|
|
require.NoError(t, err)
|
|
|
|
// update node stats
|
|
_, err = planet.Satellites[0].Overlay.Service.BatchUpdateStats(ctx, []*overlay.UpdateRequest{{NodeID: info1.NodeID}})
|
|
require.NoError(t, err)
|
|
|
|
// check contained flag set to false
|
|
node, err := cache.Get(ctx, info1.NodeID)
|
|
require.NoError(t, err)
|
|
assert.False(t, node.Contained)
|
|
|
|
// get pending audit that doesn't exist
|
|
_, err = containment.Get(ctx, info1.NodeID)
|
|
assert.Error(t, err, audit.ErrContainedNotFound.New("%v", info1.NodeID))
|
|
assert.True(t, audit.ErrContainedNotFound.Has(err))
|
|
})
|
|
}
|