satellite/gc/bloomfilter: extract BF upload logic

This is refactor/cleanup change before I will start working on adding
separate GC observer with optimized memory consumption.

https://github.com/storj/storj/issues/5803

Change-Id: I854cb3797802a32942c25f2765dbb72be88bacbd
This commit is contained in:
Michal Niewrzal 2023-04-26 14:56:23 +02:00
parent 48f920809f
commit 3cd79d987d
3 changed files with 218 additions and 179 deletions

View File

@ -4,22 +4,17 @@
package bloomfilter
import (
"archive/zip"
"context"
"strconv"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/bloomfilter"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/storj/satellite/internalpb"
"storj.io/storj/satellite/metabase/rangedloop"
"storj.io/storj/satellite/metabase/segmentloop"
"storj.io/storj/satellite/overlay"
"storj.io/uplink"
)
// Observer implements a rangedloop observer to collect bloom filters for the garbage collection.
@ -28,6 +23,7 @@ import (
type Observer struct {
log *zap.Logger
config Config
upload *Upload
overlay overlay.DB
// The following fields are reset for each loop.
@ -44,19 +40,18 @@ var _ (rangedloop.Observer) = (*Observer)(nil)
func NewObserver(log *zap.Logger, config Config, overlay overlay.DB) *Observer {
return &Observer{
log: log,
config: config,
overlay: overlay,
upload: NewUpload(log, config),
config: config,
}
}
// Start is called at the beginning of each segment loop.
func (obs *Observer) Start(ctx context.Context, startTime time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
switch {
case obs.config.AccessGrant == "":
return errs.New("Access Grant is not set")
case obs.config.Bucket == "":
return errs.New("Bucket is not set")
if err := obs.upload.CheckConfig(); err != nil {
return err
}
obs.log.Debug("collecting bloom filters started")
@ -124,173 +119,9 @@ func (obs *Observer) Join(ctx context.Context, partial rangedloop.Partial) (err
// Finish uploads the bloom filters.
func (obs *Observer) Finish(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
if err := obs.uploadBloomFilters(ctx, obs.latestCreationTime, obs.retainInfos); err != nil {
if err := obs.upload.UploadBloomFilters(ctx, obs.latestCreationTime, obs.retainInfos); err != nil {
return err
}
obs.log.Debug("collecting bloom filters finished")
return nil
}
// uploadBloomFilters stores a zipfile with multiple bloom filters in a bucket.
func (obs *Observer) uploadBloomFilters(ctx context.Context, latestCreationDate time.Time, retainInfos map[storj.NodeID]*RetainInfo) (err error) {
defer mon.Task()(&ctx)(&err)
if len(retainInfos) == 0 {
return nil
}
prefix := time.Now().Format(time.RFC3339)
expirationTime := time.Now().Add(obs.config.ExpireIn)
accessGrant, err := uplink.ParseAccess(obs.config.AccessGrant)
if err != nil {
return err
}
project, err := uplink.OpenProject(ctx, accessGrant)
if err != nil {
return err
}
defer func() {
// do cleanup in case of any error while uploading bloom filters
if err != nil {
// TODO should we drop whole bucket if cleanup will fail
err = errs.Combine(err, obs.cleanup(ctx, project, prefix))
}
err = errs.Combine(err, project.Close())
}()
_, err = project.EnsureBucket(ctx, obs.config.Bucket)
if err != nil {
return err
}
// TODO move it before segment loop is started
o := uplink.ListObjectsOptions{
Prefix: prefix + "/",
}
iterator := project.ListObjects(ctx, obs.config.Bucket, &o)
for iterator.Next() {
if iterator.Item().IsPrefix {
continue
}
obs.log.Warn("target bucket was not empty, stop operation and wait for next execution", zap.String("bucket", obs.config.Bucket))
return nil
}
infos := make([]internalpb.RetainInfo, 0, obs.config.ZipBatchSize)
batchNumber := 0
for nodeID, info := range retainInfos {
infos = append(infos, internalpb.RetainInfo{
Filter: info.Filter.Bytes(),
// because bloom filters should be created from immutable database
// snapshot we are using latest segment creation date
CreationDate: latestCreationDate,
PieceCount: int64(info.Count),
StorageNodeId: nodeID,
})
if len(infos) == obs.config.ZipBatchSize {
err = obs.uploadPack(ctx, project, prefix, batchNumber, expirationTime, infos)
if err != nil {
return err
}
infos = infos[:0]
batchNumber++
}
}
// upload rest of infos if any
if err := obs.uploadPack(ctx, project, prefix, batchNumber, expirationTime, infos); err != nil {
return err
}
// update LATEST file
upload, err := project.UploadObject(ctx, obs.config.Bucket, LATEST, nil)
if err != nil {
return err
}
_, err = upload.Write([]byte(prefix))
if err != nil {
return err
}
return upload.Commit()
}
// uploadPack uploads single zip pack with multiple bloom filters.
func (obs *Observer) uploadPack(ctx context.Context, project *uplink.Project, prefix string, batchNumber int, expirationTime time.Time, infos []internalpb.RetainInfo) (err error) {
defer mon.Task()(&ctx)(&err)
if len(infos) == 0 {
return nil
}
upload, err := project.UploadObject(ctx, obs.config.Bucket, prefix+"/bloomfilters-"+strconv.Itoa(batchNumber)+".zip", &uplink.UploadOptions{
Expires: expirationTime,
})
if err != nil {
return err
}
zipWriter := zip.NewWriter(upload)
defer func() {
err = errs.Combine(err, zipWriter.Close())
if err != nil {
err = errs.Combine(err, upload.Abort())
} else {
err = upload.Commit()
}
}()
for _, info := range infos {
retainInfoBytes, err := pb.Marshal(&info)
if err != nil {
return err
}
writer, err := zipWriter.Create(info.StorageNodeId.String())
if err != nil {
return err
}
write, err := writer.Write(retainInfoBytes)
if err != nil {
return err
}
if len(retainInfoBytes) != write {
return errs.New("not whole bloom filter was written")
}
}
return nil
}
// cleanup moves all objects from root location to unique prefix. Objects will be deleted
// automatically when expires.
func (obs *Observer) cleanup(ctx context.Context, project *uplink.Project, prefix string) (err error) {
defer mon.Task()(&ctx)(&err)
errPrefix := "upload-error-" + time.Now().Format(time.RFC3339)
o := uplink.ListObjectsOptions{
Prefix: prefix + "/",
}
iterator := project.ListObjects(ctx, obs.config.Bucket, &o)
for iterator.Next() {
item := iterator.Item()
if item.IsPrefix {
continue
}
err := project.MoveObject(ctx, obs.config.Bucket, item.Key, obs.config.Bucket, prefix+"/"+errPrefix+"/"+item.Key, nil)
if err != nil {
return err
}
}
return iterator.Err()
}

View File

@ -22,9 +22,6 @@ import (
"storj.io/uplink"
)
// LATEST is the name of the file that contains the most recently completed bloomfilter generation prefix.
const LATEST = "LATEST"
var mon = monkit.Package()
// Config contains configurable values for garbage collection.

View File

@ -0,0 +1,211 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package bloomfilter
import (
"archive/zip"
"context"
"strconv"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/storj/satellite/internalpb"
"storj.io/uplink"
)
// LATEST is the name of the file that contains the most recently completed bloomfilter generation prefix.
const LATEST = "LATEST"
// Upload is used to upload bloom filters to specified bucket.
type Upload struct {
log *zap.Logger
config Config
}
// NewUpload creates new upload for bloom filters.
func NewUpload(log *zap.Logger, config Config) *Upload {
return &Upload{
log: log,
config: config,
}
}
// CheckConfig check configuration values.
func (bfu *Upload) CheckConfig() error {
switch {
case bfu.config.AccessGrant == "":
return errs.New("Access Grant is not set")
case bfu.config.Bucket == "":
return errs.New("Bucket is not set")
}
return nil
}
// UploadBloomFilters stores a zipfile with multiple bloom filters in a bucket.
func (bfu *Upload) UploadBloomFilters(ctx context.Context, latestCreationDate time.Time, retainInfos map[storj.NodeID]*RetainInfo) (err error) {
defer mon.Task()(&ctx)(&err)
if len(retainInfos) == 0 {
return nil
}
prefix := time.Now().Format(time.RFC3339)
expirationTime := time.Now().Add(bfu.config.ExpireIn)
accessGrant, err := uplink.ParseAccess(bfu.config.AccessGrant)
if err != nil {
return err
}
project, err := uplink.OpenProject(ctx, accessGrant)
if err != nil {
return err
}
defer func() {
// do cleanup in case of any error while uploading bloom filters
if err != nil {
// TODO should we drop whole bucket if cleanup will fail
err = errs.Combine(err, bfu.cleanup(ctx, project, prefix))
}
err = errs.Combine(err, project.Close())
}()
_, err = project.EnsureBucket(ctx, bfu.config.Bucket)
if err != nil {
return err
}
// TODO move it before segment loop is started
o := uplink.ListObjectsOptions{
Prefix: prefix + "/",
}
iterator := project.ListObjects(ctx, bfu.config.Bucket, &o)
for iterator.Next() {
if iterator.Item().IsPrefix {
continue
}
bfu.log.Warn("target bucket was not empty, stop operation and wait for next execution", zap.String("bucket", bfu.config.Bucket))
return nil
}
infos := make([]internalpb.RetainInfo, 0, bfu.config.ZipBatchSize)
batchNumber := 0
for nodeID, info := range retainInfos {
infos = append(infos, internalpb.RetainInfo{
Filter: info.Filter.Bytes(),
// because bloom filters should be created from immutable database
// snapshot we are using latest segment creation date
CreationDate: latestCreationDate,
PieceCount: int64(info.Count),
StorageNodeId: nodeID,
})
if len(infos) == bfu.config.ZipBatchSize {
err = bfu.uploadPack(ctx, project, prefix, batchNumber, expirationTime, infos)
if err != nil {
return err
}
infos = infos[:0]
batchNumber++
}
}
// upload rest of infos if any
if err := bfu.uploadPack(ctx, project, prefix, batchNumber, expirationTime, infos); err != nil {
return err
}
// update LATEST file
upload, err := project.UploadObject(ctx, bfu.config.Bucket, LATEST, nil)
if err != nil {
return err
}
_, err = upload.Write([]byte(prefix))
if err != nil {
return err
}
return upload.Commit()
}
// uploadPack uploads single zip pack with multiple bloom filters.
func (bfu *Upload) uploadPack(ctx context.Context, project *uplink.Project, prefix string, batchNumber int, expirationTime time.Time, infos []internalpb.RetainInfo) (err error) {
defer mon.Task()(&ctx)(&err)
if len(infos) == 0 {
return nil
}
upload, err := project.UploadObject(ctx, bfu.config.Bucket, prefix+"/bloomfilters-"+strconv.Itoa(batchNumber)+".zip", &uplink.UploadOptions{
Expires: expirationTime,
})
if err != nil {
return err
}
zipWriter := zip.NewWriter(upload)
defer func() {
err = errs.Combine(err, zipWriter.Close())
if err != nil {
err = errs.Combine(err, upload.Abort())
} else {
err = upload.Commit()
}
}()
for _, info := range infos {
retainInfoBytes, err := pb.Marshal(&info)
if err != nil {
return err
}
writer, err := zipWriter.Create(info.StorageNodeId.String())
if err != nil {
return err
}
write, err := writer.Write(retainInfoBytes)
if err != nil {
return err
}
if len(retainInfoBytes) != write {
return errs.New("not whole bloom filter was written")
}
}
return nil
}
// cleanup moves all objects from root location to unique prefix. Objects will be deleted
// automatically when expires.
func (bfu *Upload) cleanup(ctx context.Context, project *uplink.Project, prefix string) (err error) {
defer mon.Task()(&ctx)(&err)
errPrefix := "upload-error-" + time.Now().Format(time.RFC3339)
o := uplink.ListObjectsOptions{
Prefix: prefix + "/",
}
iterator := project.ListObjects(ctx, bfu.config.Bucket, &o)
for iterator.Next() {
item := iterator.Item()
if item.IsPrefix {
continue
}
err := project.MoveObject(ctx, bfu.config.Bucket, item.Key, bfu.config.Bucket, prefix+"/"+errPrefix+"/"+item.Key, nil)
if err != nil {
return err
}
}
return iterator.Err()
}