satellite/audit: fix reservoir sampling bias
While researching logs from a large set of audits, I noticed that nearly all of them had streamIDs starting with 0 or 1. This seemed very odd, because streamIDs are supposed to be pretty much entirely random, and every hex digit from 0-f should have been represented with roughly equal frequency. It turned out that our A-Chao implementation of reservoir sampling is flawed. As far as we can tell, so is the Wikipedia implementation. No one has yet reviewed the original 1982 paper by Dr. Chao in enough detail to know where the error originated, but we do know that we have been auditing segments near the beginning of the segment loop (low streamIDs) far more often than segments near the end of the segment loop (high streamIDs). This change uses an algorithm Wikipedia calls "A-Res" instead, and adds a test to check for that sort of bias creeping back in somehow. A-Res will be slightly slower than A-Chao, because of a few extra steps that need to be done, but it does appear to be selecting items uniformly. Change-Id: I45eba4c522bafc729cebe2aab6f3fe65cd6336be
This commit is contained in:
parent
b562cbf98f
commit
231c783698
@ -4,6 +4,7 @@
|
|||||||
package audit
|
package audit
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -17,9 +18,9 @@ const maxReservoirSize = 3
|
|||||||
// Reservoir holds a certain number of segments to reflect a random sample.
|
// Reservoir holds a certain number of segments to reflect a random sample.
|
||||||
type Reservoir struct {
|
type Reservoir struct {
|
||||||
Segments [maxReservoirSize]segmentloop.Segment
|
Segments [maxReservoirSize]segmentloop.Segment
|
||||||
|
Keys [maxReservoirSize]float64
|
||||||
size int8
|
size int8
|
||||||
index int64
|
index int64
|
||||||
wSum int64
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewReservoir instantiates a Reservoir.
|
// NewReservoir instantiates a Reservoir.
|
||||||
@ -35,23 +36,28 @@ func NewReservoir(size int) *Reservoir {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sample makes sure that for every segment in metainfo from index i=size..n-1,
|
// Sample tries to ensure that each segment passed in has a chance (proportional
|
||||||
// compute the relative weight based on segment size, and pick a random floating
|
// to its size) to be in the reservoir when sampling is complete.
|
||||||
// point number r = rand(0..1), and if r < the relative weight of the segment,
|
//
|
||||||
// select uniformly a random segment reservoir.Segments[rand(0..i)] to replace with
|
// The tricky part is that we do not know ahead of time how many segments will
|
||||||
// segment. See https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_A-Chao
|
// be passed in. The way this is accomplished is known as _Reservoir Sampling_.
|
||||||
// for the algorithm used.
|
// The specific algorithm we are using here is called A-Res on the Wikipedia
|
||||||
|
// article: https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_A-Res
|
||||||
func (reservoir *Reservoir) Sample(r *rand.Rand, segment *segmentloop.Segment) {
|
func (reservoir *Reservoir) Sample(r *rand.Rand, segment *segmentloop.Segment) {
|
||||||
|
k := -math.Log(r.Float64()) / float64(segment.EncryptedSize)
|
||||||
if reservoir.index < int64(reservoir.size) {
|
if reservoir.index < int64(reservoir.size) {
|
||||||
reservoir.Segments[reservoir.index] = *segment
|
reservoir.Segments[reservoir.index] = *segment
|
||||||
reservoir.wSum += int64(segment.EncryptedSize)
|
reservoir.Keys[reservoir.index] = k
|
||||||
} else {
|
} else {
|
||||||
reservoir.wSum += int64(segment.EncryptedSize)
|
max := 0
|
||||||
p := float64(segment.EncryptedSize) / float64(reservoir.wSum)
|
for i := 1; i < int(reservoir.size); i++ {
|
||||||
random := r.Float64()
|
if reservoir.Keys[i] > reservoir.Keys[max] {
|
||||||
if random < p {
|
max = i
|
||||||
index := r.Int31n(int32(reservoir.size))
|
}
|
||||||
reservoir.Segments[index] = *segment
|
}
|
||||||
|
if k < reservoir.Keys[max] {
|
||||||
|
reservoir.Segments[max] = *segment
|
||||||
|
reservoir.Keys[max] = k
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
reservoir.index++
|
reservoir.index++
|
||||||
|
@ -4,10 +4,13 @@
|
|||||||
package audit
|
package audit
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/binary"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"storj.io/common/testrand"
|
"storj.io/common/testrand"
|
||||||
@ -30,7 +33,7 @@ func TestReservoir(t *testing.T) {
|
|||||||
require.Equal(t, r.Segments[:], []segmentloop.Segment{*seg(1), *seg(2), *seg(3)})
|
require.Equal(t, r.Segments[:], []segmentloop.Segment{*seg(1), *seg(2), *seg(3)})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestReservoirBias(t *testing.T) {
|
func TestReservoirWeights(t *testing.T) {
|
||||||
var weight10StreamID = testrand.UUID()
|
var weight10StreamID = testrand.UUID()
|
||||||
var weight5StreamID = testrand.UUID()
|
var weight5StreamID = testrand.UUID()
|
||||||
var weight2StreamID = testrand.UUID()
|
var weight2StreamID = testrand.UUID()
|
||||||
@ -92,3 +95,57 @@ func TestReservoirBias(t *testing.T) {
|
|||||||
require.Greater(t, streamIDCountsMap[weight5StreamID], streamIDCountsMap[weight2StreamID])
|
require.Greater(t, streamIDCountsMap[weight5StreamID], streamIDCountsMap[weight2StreamID])
|
||||||
require.Greater(t, streamIDCountsMap[weight2StreamID], streamIDCountsMap[weight1StreamID])
|
require.Greater(t, streamIDCountsMap[weight2StreamID], streamIDCountsMap[weight1StreamID])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Sample many segments, with equal weight, uniformly distributed, and in order,
|
||||||
|
// through the reservoir. Expect that elements show up in the result set with
|
||||||
|
// equal chance, whether they were inserted near the beginning of the list or
|
||||||
|
// near the end.
|
||||||
|
func TestReservoirBias(t *testing.T) {
|
||||||
|
const (
|
||||||
|
reservoirSize = 3
|
||||||
|
useBits = 14
|
||||||
|
numSegments = 1 << useBits
|
||||||
|
weight = 100000 // any number; same for all segments
|
||||||
|
numRounds = 1000
|
||||||
|
)
|
||||||
|
|
||||||
|
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
|
numsSelected := make([]uint64, numRounds*reservoirSize)
|
||||||
|
|
||||||
|
for r := 0; r < numRounds; r++ {
|
||||||
|
res := NewReservoir(reservoirSize)
|
||||||
|
for n := 0; n < numSegments; n++ {
|
||||||
|
seg := segmentloop.Segment{
|
||||||
|
EncryptedSize: weight,
|
||||||
|
}
|
||||||
|
binary.BigEndian.PutUint64(seg.StreamID[0:8], uint64(n)<<(64-useBits))
|
||||||
|
res.Sample(rng, &seg)
|
||||||
|
}
|
||||||
|
for i, seg := range res.Segments {
|
||||||
|
num := binary.BigEndian.Uint64(seg.StreamID[0:8]) >> (64 - useBits)
|
||||||
|
numsSelected[r*reservoirSize+i] = num
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Sort(uint64Slice(numsSelected))
|
||||||
|
|
||||||
|
// this delta is probably way too generous. but, the A-Chao
|
||||||
|
// implementation failed the test with this value, so maybe it's fine.
|
||||||
|
delta := float64(numSegments / 8)
|
||||||
|
quartile0 := numsSelected[len(numsSelected)*0/4]
|
||||||
|
assert.InDelta(t, numSegments*0/4, quartile0, delta)
|
||||||
|
quartile1 := numsSelected[len(numsSelected)*1/4]
|
||||||
|
assert.InDelta(t, numSegments*1/4, quartile1, delta)
|
||||||
|
quartile2 := numsSelected[len(numsSelected)*2/4]
|
||||||
|
assert.InDelta(t, numSegments*2/4, quartile2, delta)
|
||||||
|
quartile3 := numsSelected[len(numsSelected)*3/4]
|
||||||
|
assert.InDelta(t, numSegments*3/4, quartile3, delta)
|
||||||
|
quartile4 := numsSelected[len(numsSelected)-1]
|
||||||
|
assert.InDelta(t, numSegments*4/4, quartile4, delta)
|
||||||
|
}
|
||||||
|
|
||||||
|
type uint64Slice []uint64
|
||||||
|
|
||||||
|
func (us uint64Slice) Len() int { return len(us) }
|
||||||
|
func (us uint64Slice) Swap(i, j int) { us[i], us[j] = us[j], us[i] }
|
||||||
|
func (us uint64Slice) Less(i, j int) bool { return us[i] < us[j] }
|
||||||
|
Loading…
Reference in New Issue
Block a user