satellite/gc/bloomfilter: add sync observer

Current observer used with ranged loop is using massive amount of
memory because each range is generating separate set of bloom filters.
Each bloom filter can be up to 2MB of memory. That's a lot.

This change is initial change to reduce used memory by sharing bloom
filters between ranges and just synchronize access to them. This
implementation is rather simple and even naive but maybe it will be
enough without doing something more complex.

https://github.com/storj/storj/issues/5803

Change-Id: Ie62d19276aa9023076b1c97f712b788bce963cbe
This commit is contained in:
Michal Niewrzal 2023-04-26 17:20:13 +02:00
parent 30dee52256
commit 98562d06c8
5 changed files with 334 additions and 72 deletions

View File

@ -98,13 +98,21 @@ func NewGarbageCollectionBF(log *zap.Logger, db DB, metabaseDB *metabase.DB, rev
if config.GarbageCollectionBF.UseRangedLoop {
log.Info("using ranged loop")
provider := rangedloop.NewMetabaseRangeSplitter(metabaseDB, config.RangedLoop.AsOfSystemInterval, config.RangedLoop.BatchSize)
peer.RangedLoop.Service = rangedloop.NewService(log.Named("rangedloop"), config.RangedLoop, provider, []rangedloop.Observer{
bloomfilter.NewObserver(log.Named("gc-bf"),
var observer rangedloop.Observer
if config.GarbageCollectionBF.UseSyncObserver {
observer = bloomfilter.NewSyncObserver(log.Named("gc-bf"),
config.GarbageCollectionBF,
peer.Overlay.DB,
),
})
)
} else {
observer = bloomfilter.NewObserver(log.Named("gc-bf"),
config.GarbageCollectionBF,
peer.Overlay.DB,
)
}
provider := rangedloop.NewMetabaseRangeSplitter(metabaseDB, config.RangedLoop.AsOfSystemInterval, config.RangedLoop.BatchSize)
peer.RangedLoop.Service = rangedloop.NewService(log.Named("rangedloop"), config.RangedLoop, provider, []rangedloop.Observer{observer})
if !config.GarbageCollectionBF.RunOnce {
peer.Services.Add(lifecycle.Item{

View File

@ -0,0 +1,166 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package bloomfilter
import (
"context"
"sync"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/bloomfilter"
"storj.io/common/memory"
"storj.io/common/storj"
"storj.io/storj/satellite/metabase/rangedloop"
"storj.io/storj/satellite/metabase/segmentloop"
"storj.io/storj/satellite/overlay"
)
// SyncObserver implements a rangedloop observer to collect bloom filters for the garbage collection.
type SyncObserver struct {
log *zap.Logger
config Config
overlay overlay.DB
upload *Upload
// The following fields are reset for each loop.
startTime time.Time
lastPieceCounts map[storj.NodeID]int64
seed byte
mu sync.Mutex
retainInfos map[storj.NodeID]*RetainInfo
// LatestCreationTime will be used to set bloom filter CreationDate.
// Because bloom filter service needs to be run against immutable database snapshot
// we can set CreationDate for bloom filters as a latest segment CreatedAt value.
latestCreationTime time.Time
}
var _ (rangedloop.Observer) = (*Observer)(nil)
// NewSyncObserver creates a new instance of the gc rangedloop observer.
func NewSyncObserver(log *zap.Logger, config Config, overlay overlay.DB) *SyncObserver {
return &SyncObserver{
log: log,
overlay: overlay,
upload: NewUpload(log, config),
config: config,
}
}
// Start is called at the beginning of each segment loop.
func (obs *SyncObserver) Start(ctx context.Context, startTime time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
switch {
case obs.config.AccessGrant == "":
return errs.New("Access Grant is not set")
case obs.config.Bucket == "":
return errs.New("Bucket is not set")
}
obs.log.Debug("collecting bloom filters started")
// load last piece counts from overlay db
lastPieceCounts, err := obs.overlay.AllPieceCounts(ctx)
if err != nil {
obs.log.Error("error getting last piece counts", zap.Error(err))
err = nil
}
if lastPieceCounts == nil {
lastPieceCounts = make(map[storj.NodeID]int64)
}
obs.startTime = startTime
obs.lastPieceCounts = lastPieceCounts
obs.retainInfos = make(map[storj.NodeID]*RetainInfo, len(lastPieceCounts))
obs.latestCreationTime = time.Time{}
obs.seed = bloomfilter.GenerateSeed()
return nil
}
// Fork creates a Partial to build bloom filters over a chunk of all the segments.
func (obs *SyncObserver) Fork(ctx context.Context) (_ rangedloop.Partial, err error) {
return obs, nil
}
// Join merges the bloom filters gathered by each Partial.
func (obs *SyncObserver) Join(ctx context.Context, partial rangedloop.Partial) (err error) {
return nil
}
// Finish uploads the bloom filters.
func (obs *SyncObserver) Finish(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
if err := obs.upload.UploadBloomFilters(ctx, obs.latestCreationTime, obs.retainInfos); err != nil {
return err
}
obs.log.Debug("collecting bloom filters finished")
return nil
}
// Process adds pieces to the bloom filter from remote segments.
func (obs *SyncObserver) Process(ctx context.Context, segments []segmentloop.Segment) error {
latestCreationTime := time.Time{}
for _, segment := range segments {
if segment.Inline() {
continue
}
// sanity check to detect if loop is not running against live database
if segment.CreatedAt.After(obs.startTime) {
obs.log.Error("segment created after loop started", zap.Stringer("StreamID", segment.StreamID),
zap.Time("loop started", obs.startTime),
zap.Time("segment created", segment.CreatedAt))
return errs.New("segment created after loop started")
}
if latestCreationTime.Before(segment.CreatedAt) {
latestCreationTime = segment.CreatedAt
}
deriver := segment.RootPieceID.Deriver()
for _, piece := range segment.Pieces {
pieceID := deriver.Derive(piece.StorageNode, int32(piece.Number))
obs.add(piece.StorageNode, pieceID)
}
}
obs.mu.Lock()
defer obs.mu.Unlock()
if obs.latestCreationTime.Before(latestCreationTime) {
obs.latestCreationTime = latestCreationTime
}
return nil
}
// add adds a pieceID to the relevant node's RetainInfo.
func (obs *SyncObserver) add(nodeID storj.NodeID, pieceID storj.PieceID) {
obs.mu.Lock()
defer obs.mu.Unlock()
info, ok := obs.retainInfos[nodeID]
if !ok {
// If we know how many pieces a node should be storing, use that number. Otherwise use default.
numPieces := obs.config.InitialPieces
if pieceCounts := obs.lastPieceCounts[nodeID]; pieceCounts > 0 {
numPieces = pieceCounts
}
hashCount, tableSize := bloomfilter.OptimalParameters(numPieces, obs.config.FalsePositiveRate, 2*memory.MiB)
// limit size of bloom filter to ensure we are under the limit for RPC
filter := bloomfilter.NewExplicit(obs.seed, hashCount, tableSize)
info = &RetainInfo{
Filter: filter,
}
obs.retainInfos[nodeID] = info
}
info.Filter.Add(pieceID)
info.Count++
}

View File

@ -6,6 +6,7 @@ package bloomfilter_test
import (
"archive/zip"
"bytes"
"fmt"
"io"
"sort"
"strconv"
@ -24,6 +25,8 @@ import (
"storj.io/storj/satellite/gc/bloomfilter"
"storj.io/storj/satellite/internalpb"
"storj.io/storj/satellite/metabase/rangedloop"
"storj.io/storj/satellite/metabase/rangedloop/rangedlooptest"
"storj.io/storj/satellite/metabase/segmentloop"
"storj.io/uplink"
)
@ -76,83 +79,91 @@ func TestObserverGarbageCollectionBloomFilters(t *testing.T) {
config.AccessGrant = accessString
config.Bucket = tc.Bucket
config.ZipBatchSize = tc.ZipBatchSize
observer := bloomfilter.NewObserver(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB)
observers := []rangedloop.Observer{
bloomfilter.NewObserver(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB),
bloomfilter.NewSyncObserver(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB),
}
// TODO: see comment above. ideally this should use the rangedloop
// service instantiated for the testplanet.
rangedloopConfig := planet.Satellites[0].Config.RangedLoop
segments := rangedloop.NewMetabaseRangeSplitter(planet.Satellites[0].Metabase.DB, rangedloopConfig.AsOfSystemInterval, rangedloopConfig.BatchSize)
rangedLoop := rangedloop.NewService(zap.NewNop(), planet.Satellites[0].Config.RangedLoop, segments,
[]rangedloop.Observer{observer})
for _, observer := range observers {
name := fmt.Sprintf("%s-%T", tc.Bucket, observer)
t.Run(name, func(t *testing.T) {
// TODO: see comment above. ideally this should use the rangedloop
// service instantiated for the testplanet.
rangedloopConfig := planet.Satellites[0].Config.RangedLoop
segments := rangedloop.NewMetabaseRangeSplitter(planet.Satellites[0].Metabase.DB, rangedloopConfig.AsOfSystemInterval, rangedloopConfig.BatchSize)
rangedLoop := rangedloop.NewService(zap.NewNop(), planet.Satellites[0].Config.RangedLoop, segments,
[]rangedloop.Observer{observer})
_, err = rangedLoop.RunOnce(ctx)
require.NoError(t, err)
download, err := project.DownloadObject(ctx, tc.Bucket, bloomfilter.LATEST, nil)
require.NoError(t, err)
value, err := io.ReadAll(download)
require.NoError(t, err)
err = download.Close()
require.NoError(t, err)
prefix := string(value)
iterator := project.ListObjects(ctx, tc.Bucket, &uplink.ListObjectsOptions{
Prefix: prefix + "/",
})
count := 0
nodeIds := []string{}
packNames := []string{}
for iterator.Next() {
packNames = append(packNames, iterator.Item().Key)
data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], tc.Bucket, iterator.Item().Key)
require.NoError(t, err)
zipReader, err := zip.NewReader(bytes.NewReader(data), int64(len(data)))
require.NoError(t, err)
for _, file := range zipReader.File {
bfReader, err := file.Open()
_, err = rangedLoop.RunOnce(ctx)
require.NoError(t, err)
bloomfilter, err := io.ReadAll(bfReader)
download, err := project.DownloadObject(ctx, tc.Bucket, bloomfilter.LATEST, nil)
require.NoError(t, err)
var pbRetainInfo internalpb.RetainInfo
err = pb.Unmarshal(bloomfilter, &pbRetainInfo)
value, err := io.ReadAll(download)
require.NoError(t, err)
require.NotEmpty(t, pbRetainInfo.Filter)
require.NotZero(t, pbRetainInfo.PieceCount)
require.NotZero(t, pbRetainInfo.CreationDate)
require.Equal(t, file.Name, pbRetainInfo.StorageNodeId.String())
err = download.Close()
require.NoError(t, err)
nodeIds = append(nodeIds, pbRetainInfo.StorageNodeId.String())
}
prefix := string(value)
iterator := project.ListObjects(ctx, tc.Bucket, &uplink.ListObjectsOptions{
Prefix: prefix + "/",
})
count++
count := 0
nodeIds := []string{}
packNames := []string{}
for iterator.Next() {
packNames = append(packNames, iterator.Item().Key)
data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], tc.Bucket, iterator.Item().Key)
require.NoError(t, err)
zipReader, err := zip.NewReader(bytes.NewReader(data), int64(len(data)))
require.NoError(t, err)
for _, file := range zipReader.File {
bfReader, err := file.Open()
require.NoError(t, err)
bloomfilter, err := io.ReadAll(bfReader)
require.NoError(t, err)
var pbRetainInfo internalpb.RetainInfo
err = pb.Unmarshal(bloomfilter, &pbRetainInfo)
require.NoError(t, err)
require.NotEmpty(t, pbRetainInfo.Filter)
require.NotZero(t, pbRetainInfo.PieceCount)
require.NotZero(t, pbRetainInfo.CreationDate)
require.Equal(t, file.Name, pbRetainInfo.StorageNodeId.String())
nodeIds = append(nodeIds, pbRetainInfo.StorageNodeId.String())
}
count++
}
require.NoError(t, iterator.Err())
require.Equal(t, tc.ExpectedPacks, count)
expectedPackNames := []string{}
for i := 0; i < tc.ExpectedPacks; i++ {
expectedPackNames = append(expectedPackNames, prefix+"/bloomfilters-"+strconv.Itoa(i)+".zip")
}
sort.Strings(expectedPackNames)
sort.Strings(packNames)
require.Equal(t, expectedPackNames, packNames)
expectedNodeIds := []string{}
for _, node := range planet.StorageNodes {
expectedNodeIds = append(expectedNodeIds, node.ID().String())
}
sort.Strings(expectedNodeIds)
sort.Strings(nodeIds)
require.Equal(t, expectedNodeIds, nodeIds)
})
}
require.NoError(t, iterator.Err())
require.Equal(t, tc.ExpectedPacks, count)
expectedPackNames := []string{}
for i := 0; i < tc.ExpectedPacks; i++ {
expectedPackNames = append(expectedPackNames, prefix+"/bloomfilters-"+strconv.Itoa(i)+".zip")
}
sort.Strings(expectedPackNames)
sort.Strings(packNames)
require.Equal(t, expectedPackNames, packNames)
expectedNodeIds := []string{}
for _, node := range planet.StorageNodes {
expectedNodeIds = append(expectedNodeIds, node.ID().String())
}
sort.Strings(expectedNodeIds)
sort.Strings(nodeIds)
require.Equal(t, expectedNodeIds, nodeIds)
}
})
}
@ -219,3 +230,76 @@ func TestObserverGarbageCollectionBloomFilters_AllowNotEmptyBucket(t *testing.T)
require.Contains(t, keys, bloomfilter.LATEST)
})
}
func TestObserverGarbageCollection_MultipleRanges(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: 4,
UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
access := planet.Uplinks[0].Access[planet.Satellites[0].ID()]
accessString, err := access.Serialize()
require.NoError(t, err)
for i := 0; i < 21; i++ {
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "bloomfilters", "object"+strconv.Itoa(i), testrand.Bytes(5*memory.KiB))
require.NoError(t, err)
}
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
require.NoError(t, err)
loopSegments := []segmentloop.Segment{}
for _, segment := range segments {
loopSegments = append(loopSegments, segmentloop.Segment{
StreamID: segment.StreamID,
Position: segment.Position,
CreatedAt: segment.CreatedAt,
ExpiresAt: segment.ExpiresAt,
RepairedAt: segment.RepairedAt,
RootPieceID: segment.RootPieceID,
EncryptedSize: segment.EncryptedSize,
PlainOffset: segment.PlainOffset,
PlainSize: segment.PlainSize,
Redundancy: segment.Redundancy,
Pieces: segment.Pieces,
})
}
// TODO: this is a little chicken-and-eggy... the GCBF config is
// provided to the rangedloop service above, but we don't have the
// access grant available until after testplanet has configured
// everything. For now, just test the bloomfilter observer
// directly, as is already done for the service. Maybe we can
// improve this later.
config := planet.Satellites[0].Config.GarbageCollectionBF
config.Enabled = true
config.AccessGrant = accessString
config.Bucket = "bloomfilters"
config.UseRangedLoop = true
observers := []rangedloop.Observer{
bloomfilter.NewObserver(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB),
bloomfilter.NewSyncObserver(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB),
}
provider := &rangedlooptest.RangeSplitter{
Segments: loopSegments,
}
rangedloopConfig := planet.Satellites[0].Config.RangedLoop
rangedloopConfig.Parallelism = 5
rangedloopConfig.BatchSize = 3
for _, observer := range observers {
name := fmt.Sprintf("%T", observer)
t.Run(name, func(t *testing.T) {
rangedLoop := rangedloop.NewService(zap.NewNop(), rangedloopConfig, provider,
[]rangedloop.Observer{observer},
)
_, err = rangedLoop.RunOnce(ctx)
require.NoError(t, err)
})
}
})
}

View File

@ -32,7 +32,8 @@ type Config struct {
RunOnce bool `help:"set if garbage collection bloom filter process should only run once then exit" default:"false"`
UseRangedLoop bool `help:"whether to use ranged loop instead of segment loop" default:"false"`
UseRangedLoop bool `help:"whether to use ranged loop instead of segment loop" default:"false"`
UseSyncObserver bool `help:"whether to use test GC SyncObserver with ranged loop" default:"false"`
// value for InitialPieces currently based on average pieces per node
InitialPieces int64 `help:"the initial number of pieces expected for a storage node to have, used for creating a filter" releaseDefault:"400000" devDefault:"10"`

View File

@ -457,6 +457,9 @@ contact.external-address: ""
# whether to use ranged loop instead of segment loop
# garbage-collection-bf.use-ranged-loop: false
# whether to use test GC SyncObserver with ranged loop
# garbage-collection-bf.use-sync-observer: false
# how many bloom filters will be packed in a single zip
# garbage-collection-bf.zip-batch-size: 500