diff --git a/docs/design/garbage-collection.md b/docs/design/garbage-collection.md index 0d8f54843..3491ea71d 100644 --- a/docs/design/garbage-collection.md +++ b/docs/design/garbage-collection.md @@ -99,7 +99,8 @@ The probability of having a false positive depends on the size of the Bloom filt - **n**: number of elements in the set - **m**: size of the Bloom filter array - **k**: number of hash functions used -- **Probability of false positives**: (1-(1-1/m)^kn)^k which can be approximate by (1-e^(kn/m))^k. +- **Probability of false positives**: (1-(1-1/m)^kn)^k which can be approximate by (1-e^(-kn/m))^k. + | m/n|k|k=1 |k=2 |k=3 |k=4 |k=5 |k=6 |k=7 |k=8 |---|---|---|---|---|---|---|---|---|---| @@ -146,3 +147,74 @@ see: [Bloom filter math](http://pages.cs.wisc.edu/~cao/papers/summary-cache/node - Storage node should be able to receive a Delete request from a Satellite - Storage node should use the filter from the Delete request to decide which pieces to delete, then delete them - Eventually, this service should iterate over a db snapshot instead of being integrated with the data repair checker + +## Bloom filters benchmark + +Three Bloom filter implementations are considered: +- **Zeebo**: Zeebo's bloom filters (github.com/zeebo/sbloom) +- **Willf**: Willf's Bloom filters (github.com/willf/bloom) +- **Steakknife**: Steakknife's Bloom filters (github.com/golang/leveldb/bloom) +- **Custom**: Custom bloom filter + +### Zeebo's bloom filters +- Parameters: + - **k**: The Bloom filter will be built such that the probability of a false positive is less than (1/2)^k + - **h**: hash functions +- Serialization available +- hash functions are to be given as a parameter to the constructor +### Willf's bloom filters +- Parameters: + - **m**: max size in bits + - **k**: number of hash functions +- hash functions not configurable + +### Steakknife's bloom filters +- Parameters: + - **maxElements**: max number of elements in the set + - **p**: probability of false positive +- Serialization available +- murmur3 hash function + +### Custom bloom filter +- Parameters: + - **maxElements**: max number of elements in the set + - **p**: probability of false positive +- The piece id is used as a hash function. + + +### Benchmark +We assume a typical storage nodes has 2 TB capacity, and a typical piece is ~2 MB, so we are testing the behavior with 1 million pieces. + +We create a list of 1 million piece ids and add 95% of them to the Bloom filter. We then check if the 95% are contained in the set (there should be no false negative) and we evaluate the false positive rate by checking the remaining 5% piece ids. + +For each target false positive probability between 1% and 20% and each bloom filter type, we measure the size (in bytes) of the encoded bloom filter and the observed false positive rate. + + +|p| Zeebo size|Zeebo real_p| Willf size|Willf real_p|Steakknife size|Steakknife real_p| Custom size|Custom real_p| +|---| ---| ---| ---| ---| ---| ---| ---| ---| +|0.01| 9437456| 0.01| 1198160| 0.01| 1198264| 0.01| 1250012| 0.01| +|0.02| 8913247| 0.02| 1017824| 0.02| 1017920| 0.02| 1125012| 0.01| +|0.03| 8913247| 0.02| 912336| 0.03| 912432| 0.03| 1000012| 0.02 | +|0.04| 8389038| 0.03| 837488| 0.04| 837576| 0.03| 875012| 0.03| +|0.05| 8389038| 0.03| 779432| 0.04| 779520| 0.04| 875012| 0.03| +|0.06| 8389038| 0.03| 732000| 0.06| 732088| 0.05| 750012| 0.05| +|0.07| 7864829| 0.06| 691888| 0.06| 691968| 0.06| 750012| 0.05| +|0.08| 7864829| 0.06| 657152| 0.07| 657232| 0.07| 750012| 0.05| +|0.09| 7864829| 0.06| 626504| 0.08| 626584| 0.08| 750012| 0.05| +|0.10| 7864829| 0.06| 599096| 0.09| 599176| 0.09| 625012| 0.08| +|0.11| 7864829| 0.06| 574296| 0.10| 574376| 0.10| 625012| 0.08| +|0.12| 7864829| 0.06| 551656| 0.11| 551736| 0.11| 625012| 0.08| +|0.13| 7340620| 0.12| 530832| 0.11| 530904| 0.12| 625012| 0.08| +|0.14| 7340620| 0.12| 511552| 0.12| 511624| 0.13| 625012| 0.08| +|0.15| 7340620| 0.12| 493600| 0.14| 493672| 0.14| 500012| 0.16| +|0.16| 7340620| 0.12| 476816| 0.15| 476888| 0.15| 500012| 0.16| +|0.17| 7340620| 0.12| 461040| 0.15| 461112| 0.16| 500012| 0.16| +|0.18| 7340620| 0.12| 446168| 0.17| 446240| 0.17| 500012| 0.16| +|0.19| 7340620| 0.12| 432104| 0.18| 432176| 0.18| 500012| 0.16| +|0.20| 7340620| 0.12| 418760| 0.19| 418832| 0.19| 500012| 0.16| + +The benchmark code is available as a gist [here](https://gist.github.com/Fadila82/9f54c61b5f91f6b1a6f9207dfbb5dd2d). + +An estimated number of elements must be provided when creating the bloom filter. We decide to use the last known piece count (obtained through the last iteration) as the number of elements for the creation of the new bloom filter. + +If the difference of number of elements between the last iteration and the current iteration is too high (inducing a high false positive rate), we don't send the bloom filter to the storage node. diff --git a/pkg/bloomfilter/filter.go b/pkg/bloomfilter/filter.go new file mode 100644 index 000000000..562d830e0 --- /dev/null +++ b/pkg/bloomfilter/filter.go @@ -0,0 +1,146 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package bloomfilter + +import ( + "encoding/binary" + "math" + "math/rand" + + "github.com/zeebo/errs" + + "storj.io/storj/pkg/storj" +) + +const ( + version1 = 1 +) + +// rangeOffsets contains offsets for selecting subranges +// that minimize overlap in the first hash functions +var rangeOffsets = [...]byte{9, 13, 19, 23} + +// Filter is a bloom filter implementation +type Filter struct { + seed byte + hashCount byte + table []byte +} + +// newExplicit returns a new custom filter. +func newExplicit(seed, hashCount byte, sizeInBytes int) *Filter { + return &Filter{ + seed: seed, + hashCount: hashCount, + table: make([]byte, sizeInBytes), + } +} + +// NewOptimal returns a filter based on expected element count and false positive rate. +func NewOptimal(expectedElements int, falsePositiveRate float64) *Filter { + seed := byte(rand.Intn(255)) + + // calculation based on https://en.wikipedia.org/wiki/Bloom_filter#Optimal_number_of_hash_functions + bitsPerElement := -1.44 * math.Log2(falsePositiveRate) + hashCount := math.Ceil(bitsPerElement * math.Log(2)) + if hashCount > 32 { + // it will never be larger, but just in case to avoid overflow + hashCount = 32 + } + sizeInBytes := int(math.Ceil(float64(expectedElements) * bitsPerElement / 8)) + + return newExplicit(seed, byte(hashCount), sizeInBytes) +} + +// Parameters returns filter parameters. +func (filter *Filter) Parameters() (hashCount, size int) { + return int(filter.hashCount), len(filter.table) +} + +// Add adds an element to the bloom filter +func (filter *Filter) Add(pieceID storj.PieceID) { + offset, rangeOffset := initialConditions(filter.seed) + + for k := byte(0); k < filter.hashCount; k++ { + hash, bit := subrange(offset, pieceID) + + offset += rangeOffset + if offset >= len(storj.PieceID{}) { + offset -= len(storj.PieceID{}) + } + + bucket := hash % uint64(len(filter.table)) + filter.table[bucket] |= 1 << (bit % 8) + } +} + +// Contains return true if pieceID may be in the set +func (filter *Filter) Contains(pieceID storj.PieceID) bool { + offset, rangeOffset := initialConditions(filter.seed) + + for k := byte(0); k < filter.hashCount; k++ { + hash, bit := subrange(offset, pieceID) + + offset += rangeOffset + if offset >= len(storj.PieceID{}) { + offset -= len(storj.PieceID{}) + } + + bucket := hash % uint64(len(filter.table)) + if filter.table[bucket]&(1<<(bit%8)) == 0 { + return false + } + } + + return true +} + +func initialConditions(seed byte) (initialOffset, rangeOffset int) { + initialOffset = int(seed % 32) + rangeOffset = int(rangeOffsets[int(seed/32)%len(rangeOffsets)]) + return initialOffset, rangeOffset +} + +func subrange(seed int, id storj.PieceID) (uint64, byte) { + if seed > len(id)-9 { + var unwrap [9]byte + n := copy(unwrap[:], id[seed:]) + copy(unwrap[n:], id[:]) + return binary.LittleEndian.Uint64(unwrap[:]), unwrap[8] + } + return binary.LittleEndian.Uint64(id[seed : seed+8]), id[seed+8] +} + +// NewFromBytes decodes the filter from a sequence of bytes. +// +// Note: data will be referenced inside the table. +func NewFromBytes(data []byte) (*Filter, error) { + if len(data) < 3 { + return nil, errs.New("not enough data") + } + if data[0] != version1 { + return nil, errs.New("unsupported version %d", data[0]) + } + + filter := &Filter{} + filter.seed = data[1] + filter.hashCount = data[2] + filter.table = data[3:] + + if filter.hashCount == 0 { + return nil, errs.New("invalid hash count %d", filter.hashCount) + } + + return filter, nil +} + +// Bytes encodes the filter into a sequence of bytes that can be transferred on network. +func (filter *Filter) Bytes() []byte { + bytes := make([]byte, 1+1+1+len(filter.table)) + bytes[0] = version1 + bytes[1] = filter.seed + bytes[2] = filter.hashCount + copy(bytes[3:], filter.table) + return bytes +} diff --git a/pkg/bloomfilter/filter_test.go b/pkg/bloomfilter/filter_test.go new file mode 100644 index 000000000..58b86d2e0 --- /dev/null +++ b/pkg/bloomfilter/filter_test.go @@ -0,0 +1,152 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package bloomfilter_test + +import ( + "flag" + "math/rand" + "sort" + "testing" + + "github.com/stretchr/testify/require" + + "storj.io/storj/internal/memory" + "storj.io/storj/pkg/bloomfilter" + "storj.io/storj/pkg/storj" +) + +func TestNoFalsePositive(t *testing.T) { + const numberOfPieces = 10000 + pieceIDs := generateTestIDs(numberOfPieces) + + for _, ratio := range []float32{0.5, 1, 2} { + size := int(numberOfPieces * ratio) + filter := bloomfilter.NewOptimal(size, 0.1) + for _, pieceID := range pieceIDs { + filter.Add(pieceID) + } + for _, pieceID := range pieceIDs { + require.True(t, filter.Contains(pieceID)) + } + } +} + +func TestBytes(t *testing.T) { + for _, count := range []int{0, 100, 1000, 10000} { + filter := bloomfilter.NewOptimal(count, 0.1) + for i := 0; i < count; i++ { + id := newTestPieceID() + filter.Add(id) + } + + bytes := filter.Bytes() + unmarshaled, err := bloomfilter.NewFromBytes(bytes) + require.NoError(t, err) + + require.Equal(t, filter, unmarshaled) + } +} + +func TestBytes_Failing(t *testing.T) { + failing := [][]byte{ + {}, + {0}, + {1}, + {1, 0}, + {255, 10, 10, 10}, + } + for _, bytes := range failing { + _, err := bloomfilter.NewFromBytes(bytes) + require.Error(t, err) + } +} + +// generateTestIDs generates n piece ids +func generateTestIDs(n int) []storj.PieceID { + ids := make([]storj.PieceID, n) + for i := range ids { + ids[i] = newTestPieceID() + } + return ids +} + +func newTestPieceID() storj.PieceID { + var id storj.PieceID + // using math/rand, for less overhead + _, _ = rand.Read(id[:]) + return id +} + +func BenchmarkFilterAdd(b *testing.B) { + ids := generateTestIDs(100000) + filter := bloomfilter.NewOptimal(len(ids), 0.1) + + b.Run("Add", func(b *testing.B) { + for i := 0; i < b.N; i++ { + filter.Add(ids[i%len(ids)]) + } + }) + + b.Run("Contains", func(b *testing.B) { + for i := 0; i < b.N; i++ { + filter.Contains(ids[i%len(ids)]) + } + }) +} + +var approximateFalsePositives = flag.Bool("approximate-false-positive-rate", false, "") + +func TestApproximateFalsePositives(t *testing.T) { + if !*approximateFalsePositives { + t.Skip("Use --approximate-false-positive-rate to enable diagnostic test.") + } + + const measurements = 100 + const validation = 1000 + + for _, p := range []float64{0.01, 0.05, 0.1, 0.2, 0.3} { + for _, n := range []int{1000, 10000, 100000, 1000000} { + fpp := []float64{} + + for k := 0; k < measurements; k++ { + filter := bloomfilter.NewOptimal(n, p) + for i := 0; i < n; i++ { + filter.Add(newTestPieceID()) + } + + positive := 0 + for k := 0; k < validation; k++ { + if filter.Contains(newTestPieceID()) { + positive++ + } + } + fpp = append(fpp, float64(positive)/validation) + } + + hashCount, size := bloomfilter.NewOptimal(n, p).Parameters() + summary := summarize(p, fpp) + t.Logf("n=%8d p=%.2f avg=%.2f min=%.2f mean=%.2f max=%.2f mse=%.3f hc=%d sz=%s", n, p, summary.avg, summary.min, summary.mean, summary.max, summary.mse, hashCount, memory.Size(size)) + } + } +} + +type stats struct { + avg, min, mean, max, mse float64 +} + +// calculates average, minimum, maximum and mean squared error +func summarize(expected float64, values []float64) (r stats) { + sort.Float64s(values) + + for _, v := range values { + r.avg += v + r.mse += (v - expected) * (v - expected) + } + r.avg /= float64(len(values)) + r.mse /= float64(len(values)) + + r.min, r.mean, r.max = values[0], values[len(values)/2], values[len(values)-1] + + return r +}