satellite/gc/bloomfilter: uploading bloom filters

We would like to have separate process/command to collect bloom
filters from source different than production DBs. Such process will
use segment loop to build bloom filters for all storage nodes and
will send it to Storj bucket.

This change add main logic to new service. After collecting all bloom
filters with segment loop and piece tracker all filters are marshaled
and packed into zip files. Each zip contains up to "ZipBatchSize" bloom
filters and it's uploaded to specified in configuration bucket.

All uploaded objects have specified expiration time to not delete them
manually.

Updates https://github.com/storj/team-metainfo/issues/120

Change-Id: I2b6bc02a7dd7c3a639e75810fd013ae4afdc80a2
This commit is contained in:
Michal Niewrzal 2022-08-30 17:41:09 +02:00 committed by Storj Robot
parent eea3fac0d3
commit 158eb2381e
4 changed files with 383 additions and 18 deletions

View File

@ -0,0 +1,25 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
/*
Package bloomfilter contains the functions needed to run part of garbage collection
process.
The bloomfilter.PieceTracker implements the segments loop Observer interface
allowing us to subscribe to the loop to get information for every segment
in the metabase db.
The bloomfilter.PieceTracker handling functions are used by the bloomfilter.Service
to periodically account for all existing pieces on storage nodes and create
"retain requests" which contain a bloom filter of all pieces that possibly exist
on a storage node.
The bloomfilter.Service will send that requests to the Storj bucket after a full
segments loop iteration. After that bloom filters will be downloaded and sent
to the storage nodes with separate service from storj/satellite/gc package.
This bloom filter service should be run only against immutable database snapshot.
See storj/docs/design/garbage-collection.md for more info.
*/
package bloomfilter

View File

@ -4,16 +4,22 @@
package bloomfilter
import (
"archive/zip"
"context"
"strconv"
"time"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/storj/satellite/internalpb"
"storj.io/storj/satellite/metabase/segmentloop"
"storj.io/storj/satellite/overlay"
"storj.io/uplink"
)
var mon = monkit.Package()
@ -27,6 +33,11 @@ type Config struct {
// value for InitialPieces currently based on average pieces per node
InitialPieces int `help:"the initial number of pieces expected for a storage node to have, used for creating a filter" releaseDefault:"400000" devDefault:"10"`
FalsePositiveRate float64 `help:"the false positive rate used for creating a garbage collection bloom filter" releaseDefault:"0.1" devDefault:"0.1"`
AccessGrant string `help:"Access Grant which will be used to upload bloom filters to the bucket" default:""`
Bucket string `help:"Bucket which will be used to upload bloom filters" default:""` // TODO do we need full location?
ZipBatchSize int `help:"how many bloom filters will be packed in a single zip" default:"500" testDefault:"2"`
ExpireIn time.Duration `help:"how quickly uploaded bloom filters will be automatically deleted" default:"336h"`
}
// Service implements the garbage collection service.
@ -60,6 +71,22 @@ func (service *Service) Run(ctx context.Context) (err error) {
return nil
}
switch {
case service.config.AccessGrant == "":
return errs.New("Access Grant is not set")
case service.config.Bucket == "":
return errs.New("Bucket is not set")
}
return service.Loop.Run(ctx, service.segmentLoop.RunOnce)
}
// RunOnce runs service only once.
func (service *Service) RunOnce(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
service.log.Debug("collecting bloom filters started")
// load last piece counts from overlay db
lastPieceCounts, err := service.overlay.AllPieceCounts(ctx)
if err != nil {
@ -70,24 +97,172 @@ func (service *Service) Run(ctx context.Context) (err error) {
lastPieceCounts = make(map[storj.NodeID]int)
}
return service.Loop.Run(ctx, func(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
pieceTracker := NewPieceTracker(service.log.Named("gc observer"), service.config, lastPieceCounts)
service.log.Debug("collecting bloom filters started")
// collect things to retain
err = service.segmentLoop.Join(ctx, pieceTracker)
if err != nil {
service.log.Error("error joining metainfoloop", zap.Error(err))
return nil
}
pieceTracker := NewPieceTracker(service.log.Named("gc observer"), service.config, lastPieceCounts)
err = service.uploadBloomFilters(ctx, pieceTracker.RetainInfos)
if err != nil {
return err
}
// collect things to retain
err = service.segmentLoop.Join(ctx, pieceTracker)
service.log.Debug("collecting bloom filters finished")
return nil
}
func (service *Service) uploadBloomFilters(ctx context.Context, retainInfos map[storj.NodeID]*RetainInfo) (err error) {
defer mon.Task()(&ctx)(&err)
if len(retainInfos) == 0 {
return nil
}
expirationTime := time.Now().Add(service.config.ExpireIn)
accessGrant, err := uplink.ParseAccess(service.config.AccessGrant)
if err != nil {
return err
}
project, err := uplink.OpenProject(ctx, accessGrant)
if err != nil {
return err
}
defer func() {
// do cleanup in case of any error while uploading bloom filters
if err != nil {
service.log.Error("error joining metainfoloop", zap.Error(err))
return nil
// TODO should we drop whole bucket if cleanup will fail
err = errs.Combine(err, service.cleanup(ctx, project))
}
err = errs.Combine(err, project.Close())
}()
_, err = project.EnsureBucket(ctx, service.config.Bucket)
if err != nil {
return err
}
// TODO move it before segment loop is started
iterator := project.ListObjects(ctx, service.config.Bucket, nil)
for iterator.Next() {
if iterator.Item().IsPrefix {
continue
}
// TODO send bloom filters to the bucket
service.log.Debug("collecting bloom filters finished")
service.log.Warn("target bucket was not empty, stop operation and wait for next execution", zap.String("bucket", service.config.Bucket))
return nil
}
infos := make([]internalpb.RetainInfo, 0, service.config.ZipBatchSize)
batchNumber := 0
for nodeID, info := range retainInfos {
infos = append(infos, internalpb.RetainInfo{
Filter: info.Filter.Bytes(),
CreationDate: info.CreationDate,
PieceCount: int64(info.Count),
StorageNodeId: nodeID,
})
if len(infos) == service.config.ZipBatchSize {
err = service.uploadPack(ctx, project, batchNumber, expirationTime, infos)
if err != nil {
return err
}
infos = infos[:0]
batchNumber++
}
}
// upload rest of infos if any
if err := service.uploadPack(ctx, project, batchNumber, expirationTime, infos); err != nil {
return err
}
// upload empty object as a flag that bloom filters can be consumed.
upload, err := project.UploadObject(ctx, service.config.Bucket, "gc-done", &uplink.UploadOptions{
Expires: expirationTime,
})
if err != nil {
return err
}
return upload.Commit()
}
// uploadPack uploads single zip pack with multiple bloom filters.
func (service *Service) uploadPack(ctx context.Context, project *uplink.Project, batchNumber int, expirationTime time.Time, infos []internalpb.RetainInfo) (err error) {
defer mon.Task()(&ctx)(&err)
if len(infos) == 0 {
return nil
}
upload, err := project.UploadObject(ctx, service.config.Bucket, "bloomfilters-"+strconv.Itoa(batchNumber)+".zip", &uplink.UploadOptions{
Expires: expirationTime,
})
if err != nil {
return err
}
zipWriter := zip.NewWriter(upload)
defer func() {
err = errs.Combine(err, zipWriter.Close())
if err != nil {
err = errs.Combine(err, upload.Abort())
} else {
err = upload.Commit()
}
}()
for _, info := range infos {
retainInfoBytes, err := pb.Marshal(&info)
if err != nil {
return err
}
writer, err := zipWriter.Create(info.StorageNodeId.String())
if err != nil {
return err
}
write, err := writer.Write(retainInfoBytes)
if err != nil {
return err
}
if len(retainInfoBytes) != write {
return errs.New("not whole bloom filter was written")
}
}
return nil
}
// cleanup moves all objects from root location to unique prefix. Objects will be deleted
// automatically when expires.
func (service *Service) cleanup(ctx context.Context, project *uplink.Project) (err error) {
defer mon.Task()(&ctx)(&err)
prefix := strconv.FormatInt(time.Now().Unix(), 10)
iterator := project.ListObjects(ctx, service.config.Bucket, nil)
for iterator.Next() {
item := iterator.Item()
if item.IsPrefix {
continue
}
err := project.MoveObject(ctx, service.config.Bucket, item.Key, service.config.Bucket, prefix+"/"+item.Key, nil)
if err != nil {
return err
}
}
return iterator.Err()
}

View File

@ -4,27 +4,180 @@
package bloomfilter_test
import (
"archive/zip"
"bytes"
"io/ioutil"
"sort"
"strconv"
"testing"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/satellite/gc/bloomfilter"
"storj.io/storj/satellite/internalpb"
)
func TestGarbageCollectionBloomFilters(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
SatelliteCount: 1,
StorageNodeCount: 7,
UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.GarbageCollectionBF.Enabled = true
config.Metainfo.SegmentLoop.AsOfSystemInterval = 1
testplanet.ReconfigureRS(2, 2, 7, 7)(log, index, config)
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
// TODO test will be replaced with something more meaningful when service
// will be fully implemented
planet.Satellites[0].GarbageCollection.BloomFilters.Loop.Pause()
planet.Satellites[0].GarbageCollection.BloomFilters.Loop.TriggerWait()
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "object", testrand.Bytes(10*memory.KiB))
require.NoError(t, err)
access := planet.Uplinks[0].Access[planet.Satellites[0].ID()]
accessString, err := access.Serialize()
require.NoError(t, err)
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
require.NoError(t, err)
defer ctx.Check(project.Close)
type testCase struct {
Bucket string
ZipBatchSize int
ExpectedPacks int
}
testCases := []testCase{
{"bloomfilters-bucket-1", 1, 7},
{"bloomfilters-bucket-2", 2, 4},
{"bloomfilters-bucket-7", 7, 1},
{"bloomfilters-bucket-100", 100, 1},
}
for _, tc := range testCases {
config := planet.Satellites[0].Config.GarbageCollectionBF
config.Enabled = true
config.AccessGrant = accessString
config.Bucket = tc.Bucket
config.ZipBatchSize = tc.ZipBatchSize
service := bloomfilter.NewService(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB, planet.Satellites[0].Metabase.SegmentLoop)
err = service.RunOnce(ctx)
require.NoError(t, err)
iterator := project.ListObjects(ctx, tc.Bucket, nil)
count := 0
nodeIds := []string{}
packNames := []string{}
for iterator.Next() {
if iterator.Item().Key == "gc-done" {
continue
}
packNames = append(packNames, iterator.Item().Key)
data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], tc.Bucket, iterator.Item().Key)
require.NoError(t, err)
zipReader, err := zip.NewReader(bytes.NewReader(data), int64(len(data)))
require.NoError(t, err)
for _, file := range zipReader.File {
bfReader, err := file.Open()
require.NoError(t, err)
bloomfilter, err := ioutil.ReadAll(bfReader)
require.NoError(t, err)
var pbRetainInfo internalpb.RetainInfo
err = pb.Unmarshal(bloomfilter, &pbRetainInfo)
require.NoError(t, err)
require.NotEmpty(t, pbRetainInfo.Filter)
require.NotZero(t, pbRetainInfo.PieceCount)
require.NotZero(t, pbRetainInfo.CreationDate)
require.Equal(t, file.Name, pbRetainInfo.StorageNodeId.String())
nodeIds = append(nodeIds, pbRetainInfo.StorageNodeId.String())
}
count++
}
require.NoError(t, iterator.Err())
require.Equal(t, tc.ExpectedPacks, count)
expectedPackNames := []string{}
for i := 0; i < tc.ExpectedPacks; i++ {
expectedPackNames = append(expectedPackNames, "bloomfilters-"+strconv.Itoa(i)+".zip")
}
sort.Strings(expectedPackNames)
sort.Strings(packNames)
require.Equal(t, expectedPackNames, packNames)
expectedNodeIds := []string{}
for _, node := range planet.StorageNodes {
expectedNodeIds = append(expectedNodeIds, node.ID().String())
}
sort.Strings(expectedNodeIds)
sort.Strings(nodeIds)
require.Equal(t, expectedNodeIds, nodeIds)
_, err = project.StatObject(ctx, tc.Bucket, "gc-done")
require.NoError(t, err)
}
})
}
func TestGarbageCollectionBloomFilters_NotEmptyBucket(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: 4,
UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Metainfo.SegmentLoop.AsOfSystemInterval = 1
testplanet.ReconfigureRS(2, 2, 4, 4)(log, index, config)
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "object", testrand.Bytes(10*memory.KiB))
require.NoError(t, err)
access := planet.Uplinks[0].Access[planet.Satellites[0].ID()]
accessString, err := access.Serialize()
require.NoError(t, err)
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
require.NoError(t, err)
defer ctx.Check(project.Close)
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "bloomfilters", "some object", testrand.Bytes(1*memory.KiB))
require.NoError(t, err)
config := planet.Satellites[0].Config.GarbageCollectionBF
config.AccessGrant = accessString
config.Bucket = "bloomfilters"
service := bloomfilter.NewService(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB, planet.Satellites[0].Metabase.SegmentLoop)
err = service.RunOnce(ctx)
require.NoError(t, err)
iterator := project.ListObjects(ctx, "bloomfilters", nil)
// no new uploads, bucket structure was not changed
require.True(t, iterator.Next())
require.Equal(t, "some object", iterator.Item().Key)
require.False(t, iterator.Next())
require.NoError(t, iterator.Err())
})
}

View File

@ -379,9 +379,18 @@ contact.external-address: ""
# how many expired objects to query in a batch
# expired-deletion.list-limit: 100
# Access Grant which will be used to upload bloom filters to the bucket
# garbage-collection-bf.access-grant: ""
# Bucket which will be used to upload bloom filters
# garbage-collection-bf.bucket: ""
# set if garbage collection bloom filters is enabled or not
# garbage-collection-bf.enabled: true
# how quickly uploaded bloom filters will be automatically deleted
# garbage-collection-bf.expire-in: 336h0m0s
# the false positive rate used for creating a garbage collection bloom filter
# garbage-collection-bf.false-positive-rate: 0.1
@ -391,6 +400,9 @@ contact.external-address: ""
# the time between each garbage collection executions
# garbage-collection-bf.interval: 120h0m0s
# how many bloom filters will be packed in a single zip
# garbage-collection-bf.zip-batch-size: 500
# the number of nodes to concurrently send garbage collection bloom filters to
# garbage-collection.concurrent-sends: 1