From 98562d06c80f36cd6df3c04de2950260024699bf Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Wed, 26 Apr 2023 17:20:13 +0200 Subject: [PATCH] satellite/gc/bloomfilter: add sync observer Current observer used with ranged loop is using massive amount of memory because each range is generating separate set of bloom filters. Each bloom filter can be up to 2MB of memory. That's a lot. This change is initial change to reduce used memory by sharing bloom filters between ranges and just synchronize access to them. This implementation is rather simple and even naive but maybe it will be enough without doing something more complex. https://github.com/storj/storj/issues/5803 Change-Id: Ie62d19276aa9023076b1c97f712b788bce963cbe --- satellite/gc-bf.go | 18 +- satellite/gc/bloomfilter/observer_sync.go | 166 +++++++++++++++ satellite/gc/bloomfilter/observer_test.go | 216 ++++++++++++++------ satellite/gc/bloomfilter/service.go | 3 +- scripts/testdata/satellite-config.yaml.lock | 3 + 5 files changed, 334 insertions(+), 72 deletions(-) create mode 100644 satellite/gc/bloomfilter/observer_sync.go diff --git a/satellite/gc-bf.go b/satellite/gc-bf.go index f73f3354a..9f6e3632f 100644 --- a/satellite/gc-bf.go +++ b/satellite/gc-bf.go @@ -98,13 +98,21 @@ func NewGarbageCollectionBF(log *zap.Logger, db DB, metabaseDB *metabase.DB, rev if config.GarbageCollectionBF.UseRangedLoop { log.Info("using ranged loop") - provider := rangedloop.NewMetabaseRangeSplitter(metabaseDB, config.RangedLoop.AsOfSystemInterval, config.RangedLoop.BatchSize) - peer.RangedLoop.Service = rangedloop.NewService(log.Named("rangedloop"), config.RangedLoop, provider, []rangedloop.Observer{ - bloomfilter.NewObserver(log.Named("gc-bf"), + var observer rangedloop.Observer + if config.GarbageCollectionBF.UseSyncObserver { + observer = bloomfilter.NewSyncObserver(log.Named("gc-bf"), config.GarbageCollectionBF, peer.Overlay.DB, - ), - }) + ) + } else { + observer = bloomfilter.NewObserver(log.Named("gc-bf"), + config.GarbageCollectionBF, + peer.Overlay.DB, + ) + } + + provider := rangedloop.NewMetabaseRangeSplitter(metabaseDB, config.RangedLoop.AsOfSystemInterval, config.RangedLoop.BatchSize) + peer.RangedLoop.Service = rangedloop.NewService(log.Named("rangedloop"), config.RangedLoop, provider, []rangedloop.Observer{observer}) if !config.GarbageCollectionBF.RunOnce { peer.Services.Add(lifecycle.Item{ diff --git a/satellite/gc/bloomfilter/observer_sync.go b/satellite/gc/bloomfilter/observer_sync.go new file mode 100644 index 000000000..8072fb012 --- /dev/null +++ b/satellite/gc/bloomfilter/observer_sync.go @@ -0,0 +1,166 @@ +// Copyright (C) 2022 Storj Labs, Inc. +// See LICENSE for copying information. + +package bloomfilter + +import ( + "context" + "sync" + "time" + + "github.com/zeebo/errs" + "go.uber.org/zap" + + "storj.io/common/bloomfilter" + "storj.io/common/memory" + "storj.io/common/storj" + "storj.io/storj/satellite/metabase/rangedloop" + "storj.io/storj/satellite/metabase/segmentloop" + "storj.io/storj/satellite/overlay" +) + +// SyncObserver implements a rangedloop observer to collect bloom filters for the garbage collection. +type SyncObserver struct { + log *zap.Logger + config Config + overlay overlay.DB + upload *Upload + + // The following fields are reset for each loop. + startTime time.Time + lastPieceCounts map[storj.NodeID]int64 + seed byte + + mu sync.Mutex + retainInfos map[storj.NodeID]*RetainInfo + // LatestCreationTime will be used to set bloom filter CreationDate. + // Because bloom filter service needs to be run against immutable database snapshot + // we can set CreationDate for bloom filters as a latest segment CreatedAt value. + latestCreationTime time.Time +} + +var _ (rangedloop.Observer) = (*Observer)(nil) + +// NewSyncObserver creates a new instance of the gc rangedloop observer. +func NewSyncObserver(log *zap.Logger, config Config, overlay overlay.DB) *SyncObserver { + return &SyncObserver{ + log: log, + overlay: overlay, + upload: NewUpload(log, config), + config: config, + } +} + +// Start is called at the beginning of each segment loop. +func (obs *SyncObserver) Start(ctx context.Context, startTime time.Time) (err error) { + defer mon.Task()(&ctx)(&err) + switch { + case obs.config.AccessGrant == "": + return errs.New("Access Grant is not set") + case obs.config.Bucket == "": + return errs.New("Bucket is not set") + } + + obs.log.Debug("collecting bloom filters started") + + // load last piece counts from overlay db + lastPieceCounts, err := obs.overlay.AllPieceCounts(ctx) + if err != nil { + obs.log.Error("error getting last piece counts", zap.Error(err)) + err = nil + } + if lastPieceCounts == nil { + lastPieceCounts = make(map[storj.NodeID]int64) + } + + obs.startTime = startTime + obs.lastPieceCounts = lastPieceCounts + obs.retainInfos = make(map[storj.NodeID]*RetainInfo, len(lastPieceCounts)) + obs.latestCreationTime = time.Time{} + obs.seed = bloomfilter.GenerateSeed() + return nil +} + +// Fork creates a Partial to build bloom filters over a chunk of all the segments. +func (obs *SyncObserver) Fork(ctx context.Context) (_ rangedloop.Partial, err error) { + return obs, nil +} + +// Join merges the bloom filters gathered by each Partial. +func (obs *SyncObserver) Join(ctx context.Context, partial rangedloop.Partial) (err error) { + return nil +} + +// Finish uploads the bloom filters. +func (obs *SyncObserver) Finish(ctx context.Context) (err error) { + defer mon.Task()(&ctx)(&err) + + if err := obs.upload.UploadBloomFilters(ctx, obs.latestCreationTime, obs.retainInfos); err != nil { + return err + } + obs.log.Debug("collecting bloom filters finished") + return nil +} + +// Process adds pieces to the bloom filter from remote segments. +func (obs *SyncObserver) Process(ctx context.Context, segments []segmentloop.Segment) error { + latestCreationTime := time.Time{} + for _, segment := range segments { + if segment.Inline() { + continue + } + + // sanity check to detect if loop is not running against live database + if segment.CreatedAt.After(obs.startTime) { + obs.log.Error("segment created after loop started", zap.Stringer("StreamID", segment.StreamID), + zap.Time("loop started", obs.startTime), + zap.Time("segment created", segment.CreatedAt)) + return errs.New("segment created after loop started") + } + + if latestCreationTime.Before(segment.CreatedAt) { + latestCreationTime = segment.CreatedAt + } + + deriver := segment.RootPieceID.Deriver() + for _, piece := range segment.Pieces { + pieceID := deriver.Derive(piece.StorageNode, int32(piece.Number)) + obs.add(piece.StorageNode, pieceID) + } + } + + obs.mu.Lock() + defer obs.mu.Unlock() + + if obs.latestCreationTime.Before(latestCreationTime) { + obs.latestCreationTime = latestCreationTime + } + + return nil +} + +// add adds a pieceID to the relevant node's RetainInfo. +func (obs *SyncObserver) add(nodeID storj.NodeID, pieceID storj.PieceID) { + obs.mu.Lock() + defer obs.mu.Unlock() + + info, ok := obs.retainInfos[nodeID] + if !ok { + // If we know how many pieces a node should be storing, use that number. Otherwise use default. + numPieces := obs.config.InitialPieces + if pieceCounts := obs.lastPieceCounts[nodeID]; pieceCounts > 0 { + numPieces = pieceCounts + } + + hashCount, tableSize := bloomfilter.OptimalParameters(numPieces, obs.config.FalsePositiveRate, 2*memory.MiB) + // limit size of bloom filter to ensure we are under the limit for RPC + filter := bloomfilter.NewExplicit(obs.seed, hashCount, tableSize) + info = &RetainInfo{ + Filter: filter, + } + obs.retainInfos[nodeID] = info + } + + info.Filter.Add(pieceID) + info.Count++ +} diff --git a/satellite/gc/bloomfilter/observer_test.go b/satellite/gc/bloomfilter/observer_test.go index a40616ae3..bcb40d259 100644 --- a/satellite/gc/bloomfilter/observer_test.go +++ b/satellite/gc/bloomfilter/observer_test.go @@ -6,6 +6,7 @@ package bloomfilter_test import ( "archive/zip" "bytes" + "fmt" "io" "sort" "strconv" @@ -24,6 +25,8 @@ import ( "storj.io/storj/satellite/gc/bloomfilter" "storj.io/storj/satellite/internalpb" "storj.io/storj/satellite/metabase/rangedloop" + "storj.io/storj/satellite/metabase/rangedloop/rangedlooptest" + "storj.io/storj/satellite/metabase/segmentloop" "storj.io/uplink" ) @@ -76,83 +79,91 @@ func TestObserverGarbageCollectionBloomFilters(t *testing.T) { config.AccessGrant = accessString config.Bucket = tc.Bucket config.ZipBatchSize = tc.ZipBatchSize - observer := bloomfilter.NewObserver(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB) + observers := []rangedloop.Observer{ + bloomfilter.NewObserver(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB), + bloomfilter.NewSyncObserver(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB), + } - // TODO: see comment above. ideally this should use the rangedloop - // service instantiated for the testplanet. - rangedloopConfig := planet.Satellites[0].Config.RangedLoop - segments := rangedloop.NewMetabaseRangeSplitter(planet.Satellites[0].Metabase.DB, rangedloopConfig.AsOfSystemInterval, rangedloopConfig.BatchSize) - rangedLoop := rangedloop.NewService(zap.NewNop(), planet.Satellites[0].Config.RangedLoop, segments, - []rangedloop.Observer{observer}) + for _, observer := range observers { + name := fmt.Sprintf("%s-%T", tc.Bucket, observer) + t.Run(name, func(t *testing.T) { + // TODO: see comment above. ideally this should use the rangedloop + // service instantiated for the testplanet. + rangedloopConfig := planet.Satellites[0].Config.RangedLoop + segments := rangedloop.NewMetabaseRangeSplitter(planet.Satellites[0].Metabase.DB, rangedloopConfig.AsOfSystemInterval, rangedloopConfig.BatchSize) + rangedLoop := rangedloop.NewService(zap.NewNop(), planet.Satellites[0].Config.RangedLoop, segments, + []rangedloop.Observer{observer}) - _, err = rangedLoop.RunOnce(ctx) - require.NoError(t, err) - - download, err := project.DownloadObject(ctx, tc.Bucket, bloomfilter.LATEST, nil) - require.NoError(t, err) - - value, err := io.ReadAll(download) - require.NoError(t, err) - - err = download.Close() - require.NoError(t, err) - - prefix := string(value) - iterator := project.ListObjects(ctx, tc.Bucket, &uplink.ListObjectsOptions{ - Prefix: prefix + "/", - }) - - count := 0 - nodeIds := []string{} - packNames := []string{} - for iterator.Next() { - packNames = append(packNames, iterator.Item().Key) - - data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], tc.Bucket, iterator.Item().Key) - require.NoError(t, err) - - zipReader, err := zip.NewReader(bytes.NewReader(data), int64(len(data))) - require.NoError(t, err) - - for _, file := range zipReader.File { - bfReader, err := file.Open() + _, err = rangedLoop.RunOnce(ctx) require.NoError(t, err) - bloomfilter, err := io.ReadAll(bfReader) + download, err := project.DownloadObject(ctx, tc.Bucket, bloomfilter.LATEST, nil) require.NoError(t, err) - var pbRetainInfo internalpb.RetainInfo - err = pb.Unmarshal(bloomfilter, &pbRetainInfo) + value, err := io.ReadAll(download) require.NoError(t, err) - require.NotEmpty(t, pbRetainInfo.Filter) - require.NotZero(t, pbRetainInfo.PieceCount) - require.NotZero(t, pbRetainInfo.CreationDate) - require.Equal(t, file.Name, pbRetainInfo.StorageNodeId.String()) + err = download.Close() + require.NoError(t, err) - nodeIds = append(nodeIds, pbRetainInfo.StorageNodeId.String()) - } + prefix := string(value) + iterator := project.ListObjects(ctx, tc.Bucket, &uplink.ListObjectsOptions{ + Prefix: prefix + "/", + }) - count++ + count := 0 + nodeIds := []string{} + packNames := []string{} + for iterator.Next() { + packNames = append(packNames, iterator.Item().Key) + + data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], tc.Bucket, iterator.Item().Key) + require.NoError(t, err) + + zipReader, err := zip.NewReader(bytes.NewReader(data), int64(len(data))) + require.NoError(t, err) + + for _, file := range zipReader.File { + bfReader, err := file.Open() + require.NoError(t, err) + + bloomfilter, err := io.ReadAll(bfReader) + require.NoError(t, err) + + var pbRetainInfo internalpb.RetainInfo + err = pb.Unmarshal(bloomfilter, &pbRetainInfo) + require.NoError(t, err) + + require.NotEmpty(t, pbRetainInfo.Filter) + require.NotZero(t, pbRetainInfo.PieceCount) + require.NotZero(t, pbRetainInfo.CreationDate) + require.Equal(t, file.Name, pbRetainInfo.StorageNodeId.String()) + + nodeIds = append(nodeIds, pbRetainInfo.StorageNodeId.String()) + } + + count++ + } + require.NoError(t, iterator.Err()) + require.Equal(t, tc.ExpectedPacks, count) + + expectedPackNames := []string{} + for i := 0; i < tc.ExpectedPacks; i++ { + expectedPackNames = append(expectedPackNames, prefix+"/bloomfilters-"+strconv.Itoa(i)+".zip") + } + sort.Strings(expectedPackNames) + sort.Strings(packNames) + require.Equal(t, expectedPackNames, packNames) + + expectedNodeIds := []string{} + for _, node := range planet.StorageNodes { + expectedNodeIds = append(expectedNodeIds, node.ID().String()) + } + sort.Strings(expectedNodeIds) + sort.Strings(nodeIds) + require.Equal(t, expectedNodeIds, nodeIds) + }) } - require.NoError(t, iterator.Err()) - require.Equal(t, tc.ExpectedPacks, count) - - expectedPackNames := []string{} - for i := 0; i < tc.ExpectedPacks; i++ { - expectedPackNames = append(expectedPackNames, prefix+"/bloomfilters-"+strconv.Itoa(i)+".zip") - } - sort.Strings(expectedPackNames) - sort.Strings(packNames) - require.Equal(t, expectedPackNames, packNames) - - expectedNodeIds := []string{} - for _, node := range planet.StorageNodes { - expectedNodeIds = append(expectedNodeIds, node.ID().String()) - } - sort.Strings(expectedNodeIds) - sort.Strings(nodeIds) - require.Equal(t, expectedNodeIds, nodeIds) } }) } @@ -219,3 +230,76 @@ func TestObserverGarbageCollectionBloomFilters_AllowNotEmptyBucket(t *testing.T) require.Contains(t, keys, bloomfilter.LATEST) }) } + +func TestObserverGarbageCollection_MultipleRanges(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, + StorageNodeCount: 4, + UplinkCount: 1, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + access := planet.Uplinks[0].Access[planet.Satellites[0].ID()] + accessString, err := access.Serialize() + require.NoError(t, err) + + for i := 0; i < 21; i++ { + err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "bloomfilters", "object"+strconv.Itoa(i), testrand.Bytes(5*memory.KiB)) + require.NoError(t, err) + } + + segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) + require.NoError(t, err) + + loopSegments := []segmentloop.Segment{} + for _, segment := range segments { + loopSegments = append(loopSegments, segmentloop.Segment{ + StreamID: segment.StreamID, + Position: segment.Position, + CreatedAt: segment.CreatedAt, + ExpiresAt: segment.ExpiresAt, + RepairedAt: segment.RepairedAt, + RootPieceID: segment.RootPieceID, + EncryptedSize: segment.EncryptedSize, + PlainOffset: segment.PlainOffset, + PlainSize: segment.PlainSize, + Redundancy: segment.Redundancy, + Pieces: segment.Pieces, + }) + } + + // TODO: this is a little chicken-and-eggy... the GCBF config is + // provided to the rangedloop service above, but we don't have the + // access grant available until after testplanet has configured + // everything. For now, just test the bloomfilter observer + // directly, as is already done for the service. Maybe we can + // improve this later. + config := planet.Satellites[0].Config.GarbageCollectionBF + config.Enabled = true + config.AccessGrant = accessString + config.Bucket = "bloomfilters" + config.UseRangedLoop = true + observers := []rangedloop.Observer{ + bloomfilter.NewObserver(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB), + bloomfilter.NewSyncObserver(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB), + } + + provider := &rangedlooptest.RangeSplitter{ + Segments: loopSegments, + } + + rangedloopConfig := planet.Satellites[0].Config.RangedLoop + rangedloopConfig.Parallelism = 5 + rangedloopConfig.BatchSize = 3 + + for _, observer := range observers { + name := fmt.Sprintf("%T", observer) + t.Run(name, func(t *testing.T) { + rangedLoop := rangedloop.NewService(zap.NewNop(), rangedloopConfig, provider, + []rangedloop.Observer{observer}, + ) + + _, err = rangedLoop.RunOnce(ctx) + require.NoError(t, err) + }) + } + }) +} diff --git a/satellite/gc/bloomfilter/service.go b/satellite/gc/bloomfilter/service.go index f14c48a67..def5eb3ee 100644 --- a/satellite/gc/bloomfilter/service.go +++ b/satellite/gc/bloomfilter/service.go @@ -32,7 +32,8 @@ type Config struct { RunOnce bool `help:"set if garbage collection bloom filter process should only run once then exit" default:"false"` - UseRangedLoop bool `help:"whether to use ranged loop instead of segment loop" default:"false"` + UseRangedLoop bool `help:"whether to use ranged loop instead of segment loop" default:"false"` + UseSyncObserver bool `help:"whether to use test GC SyncObserver with ranged loop" default:"false"` // value for InitialPieces currently based on average pieces per node InitialPieces int64 `help:"the initial number of pieces expected for a storage node to have, used for creating a filter" releaseDefault:"400000" devDefault:"10"` diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index c8f006894..50d94aa31 100755 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -457,6 +457,9 @@ contact.external-address: "" # whether to use ranged loop instead of segment loop # garbage-collection-bf.use-ranged-loop: false +# whether to use test GC SyncObserver with ranged loop +# garbage-collection-bf.use-sync-observer: false + # how many bloom filters will be packed in a single zip # garbage-collection-bf.zip-batch-size: 500