diff --git a/satellite/gc/bloomfilter/observer.go b/satellite/gc/bloomfilter/observer.go index 136796767..c3301f6eb 100644 --- a/satellite/gc/bloomfilter/observer.go +++ b/satellite/gc/bloomfilter/observer.go @@ -4,22 +4,17 @@ package bloomfilter import ( - "archive/zip" "context" - "strconv" "time" "github.com/zeebo/errs" "go.uber.org/zap" "storj.io/common/bloomfilter" - "storj.io/common/pb" "storj.io/common/storj" - "storj.io/storj/satellite/internalpb" "storj.io/storj/satellite/metabase/rangedloop" "storj.io/storj/satellite/metabase/segmentloop" "storj.io/storj/satellite/overlay" - "storj.io/uplink" ) // Observer implements a rangedloop observer to collect bloom filters for the garbage collection. @@ -28,6 +23,7 @@ import ( type Observer struct { log *zap.Logger config Config + upload *Upload overlay overlay.DB // The following fields are reset for each loop. @@ -44,19 +40,18 @@ var _ (rangedloop.Observer) = (*Observer)(nil) func NewObserver(log *zap.Logger, config Config, overlay overlay.DB) *Observer { return &Observer{ log: log, - config: config, overlay: overlay, + upload: NewUpload(log, config), + config: config, } } // Start is called at the beginning of each segment loop. func (obs *Observer) Start(ctx context.Context, startTime time.Time) (err error) { defer mon.Task()(&ctx)(&err) - switch { - case obs.config.AccessGrant == "": - return errs.New("Access Grant is not set") - case obs.config.Bucket == "": - return errs.New("Bucket is not set") + + if err := obs.upload.CheckConfig(); err != nil { + return err } obs.log.Debug("collecting bloom filters started") @@ -124,173 +119,9 @@ func (obs *Observer) Join(ctx context.Context, partial rangedloop.Partial) (err // Finish uploads the bloom filters. func (obs *Observer) Finish(ctx context.Context) (err error) { defer mon.Task()(&ctx)(&err) - if err := obs.uploadBloomFilters(ctx, obs.latestCreationTime, obs.retainInfos); err != nil { + if err := obs.upload.UploadBloomFilters(ctx, obs.latestCreationTime, obs.retainInfos); err != nil { return err } obs.log.Debug("collecting bloom filters finished") return nil } - -// uploadBloomFilters stores a zipfile with multiple bloom filters in a bucket. -func (obs *Observer) uploadBloomFilters(ctx context.Context, latestCreationDate time.Time, retainInfos map[storj.NodeID]*RetainInfo) (err error) { - defer mon.Task()(&ctx)(&err) - - if len(retainInfos) == 0 { - return nil - } - - prefix := time.Now().Format(time.RFC3339) - - expirationTime := time.Now().Add(obs.config.ExpireIn) - - accessGrant, err := uplink.ParseAccess(obs.config.AccessGrant) - if err != nil { - return err - } - - project, err := uplink.OpenProject(ctx, accessGrant) - if err != nil { - return err - } - defer func() { - // do cleanup in case of any error while uploading bloom filters - if err != nil { - // TODO should we drop whole bucket if cleanup will fail - err = errs.Combine(err, obs.cleanup(ctx, project, prefix)) - } - err = errs.Combine(err, project.Close()) - }() - - _, err = project.EnsureBucket(ctx, obs.config.Bucket) - if err != nil { - return err - } - - // TODO move it before segment loop is started - o := uplink.ListObjectsOptions{ - Prefix: prefix + "/", - } - iterator := project.ListObjects(ctx, obs.config.Bucket, &o) - for iterator.Next() { - if iterator.Item().IsPrefix { - continue - } - - obs.log.Warn("target bucket was not empty, stop operation and wait for next execution", zap.String("bucket", obs.config.Bucket)) - return nil - } - - infos := make([]internalpb.RetainInfo, 0, obs.config.ZipBatchSize) - batchNumber := 0 - for nodeID, info := range retainInfos { - infos = append(infos, internalpb.RetainInfo{ - Filter: info.Filter.Bytes(), - // because bloom filters should be created from immutable database - // snapshot we are using latest segment creation date - CreationDate: latestCreationDate, - PieceCount: int64(info.Count), - StorageNodeId: nodeID, - }) - - if len(infos) == obs.config.ZipBatchSize { - err = obs.uploadPack(ctx, project, prefix, batchNumber, expirationTime, infos) - if err != nil { - return err - } - - infos = infos[:0] - batchNumber++ - } - } - - // upload rest of infos if any - if err := obs.uploadPack(ctx, project, prefix, batchNumber, expirationTime, infos); err != nil { - return err - } - - // update LATEST file - upload, err := project.UploadObject(ctx, obs.config.Bucket, LATEST, nil) - if err != nil { - return err - } - _, err = upload.Write([]byte(prefix)) - if err != nil { - return err - } - - return upload.Commit() -} - -// uploadPack uploads single zip pack with multiple bloom filters. -func (obs *Observer) uploadPack(ctx context.Context, project *uplink.Project, prefix string, batchNumber int, expirationTime time.Time, infos []internalpb.RetainInfo) (err error) { - defer mon.Task()(&ctx)(&err) - - if len(infos) == 0 { - return nil - } - - upload, err := project.UploadObject(ctx, obs.config.Bucket, prefix+"/bloomfilters-"+strconv.Itoa(batchNumber)+".zip", &uplink.UploadOptions{ - Expires: expirationTime, - }) - if err != nil { - return err - } - - zipWriter := zip.NewWriter(upload) - defer func() { - err = errs.Combine(err, zipWriter.Close()) - if err != nil { - err = errs.Combine(err, upload.Abort()) - } else { - err = upload.Commit() - } - }() - - for _, info := range infos { - retainInfoBytes, err := pb.Marshal(&info) - if err != nil { - return err - } - - writer, err := zipWriter.Create(info.StorageNodeId.String()) - if err != nil { - return err - } - - write, err := writer.Write(retainInfoBytes) - if err != nil { - return err - } - if len(retainInfoBytes) != write { - return errs.New("not whole bloom filter was written") - } - } - - return nil -} - -// cleanup moves all objects from root location to unique prefix. Objects will be deleted -// automatically when expires. -func (obs *Observer) cleanup(ctx context.Context, project *uplink.Project, prefix string) (err error) { - defer mon.Task()(&ctx)(&err) - - errPrefix := "upload-error-" + time.Now().Format(time.RFC3339) - o := uplink.ListObjectsOptions{ - Prefix: prefix + "/", - } - iterator := project.ListObjects(ctx, obs.config.Bucket, &o) - - for iterator.Next() { - item := iterator.Item() - if item.IsPrefix { - continue - } - - err := project.MoveObject(ctx, obs.config.Bucket, item.Key, obs.config.Bucket, prefix+"/"+errPrefix+"/"+item.Key, nil) - if err != nil { - return err - } - } - - return iterator.Err() -} diff --git a/satellite/gc/bloomfilter/service.go b/satellite/gc/bloomfilter/service.go index 3b017f955..f14c48a67 100644 --- a/satellite/gc/bloomfilter/service.go +++ b/satellite/gc/bloomfilter/service.go @@ -22,9 +22,6 @@ import ( "storj.io/uplink" ) -// LATEST is the name of the file that contains the most recently completed bloomfilter generation prefix. -const LATEST = "LATEST" - var mon = monkit.Package() // Config contains configurable values for garbage collection. diff --git a/satellite/gc/bloomfilter/upload.go b/satellite/gc/bloomfilter/upload.go new file mode 100644 index 000000000..35bb512b1 --- /dev/null +++ b/satellite/gc/bloomfilter/upload.go @@ -0,0 +1,211 @@ +// Copyright (C) 2022 Storj Labs, Inc. +// See LICENSE for copying information. + +package bloomfilter + +import ( + "archive/zip" + "context" + "strconv" + "time" + + "github.com/zeebo/errs" + "go.uber.org/zap" + + "storj.io/common/pb" + "storj.io/common/storj" + "storj.io/storj/satellite/internalpb" + "storj.io/uplink" +) + +// LATEST is the name of the file that contains the most recently completed bloomfilter generation prefix. +const LATEST = "LATEST" + +// Upload is used to upload bloom filters to specified bucket. +type Upload struct { + log *zap.Logger + config Config +} + +// NewUpload creates new upload for bloom filters. +func NewUpload(log *zap.Logger, config Config) *Upload { + return &Upload{ + log: log, + config: config, + } +} + +// CheckConfig check configuration values. +func (bfu *Upload) CheckConfig() error { + switch { + case bfu.config.AccessGrant == "": + return errs.New("Access Grant is not set") + case bfu.config.Bucket == "": + return errs.New("Bucket is not set") + } + return nil +} + +// UploadBloomFilters stores a zipfile with multiple bloom filters in a bucket. +func (bfu *Upload) UploadBloomFilters(ctx context.Context, latestCreationDate time.Time, retainInfos map[storj.NodeID]*RetainInfo) (err error) { + defer mon.Task()(&ctx)(&err) + + if len(retainInfos) == 0 { + return nil + } + + prefix := time.Now().Format(time.RFC3339) + + expirationTime := time.Now().Add(bfu.config.ExpireIn) + + accessGrant, err := uplink.ParseAccess(bfu.config.AccessGrant) + if err != nil { + return err + } + + project, err := uplink.OpenProject(ctx, accessGrant) + if err != nil { + return err + } + defer func() { + // do cleanup in case of any error while uploading bloom filters + if err != nil { + // TODO should we drop whole bucket if cleanup will fail + err = errs.Combine(err, bfu.cleanup(ctx, project, prefix)) + } + err = errs.Combine(err, project.Close()) + }() + + _, err = project.EnsureBucket(ctx, bfu.config.Bucket) + if err != nil { + return err + } + + // TODO move it before segment loop is started + o := uplink.ListObjectsOptions{ + Prefix: prefix + "/", + } + iterator := project.ListObjects(ctx, bfu.config.Bucket, &o) + for iterator.Next() { + if iterator.Item().IsPrefix { + continue + } + + bfu.log.Warn("target bucket was not empty, stop operation and wait for next execution", zap.String("bucket", bfu.config.Bucket)) + return nil + } + + infos := make([]internalpb.RetainInfo, 0, bfu.config.ZipBatchSize) + batchNumber := 0 + for nodeID, info := range retainInfos { + infos = append(infos, internalpb.RetainInfo{ + Filter: info.Filter.Bytes(), + // because bloom filters should be created from immutable database + // snapshot we are using latest segment creation date + CreationDate: latestCreationDate, + PieceCount: int64(info.Count), + StorageNodeId: nodeID, + }) + + if len(infos) == bfu.config.ZipBatchSize { + err = bfu.uploadPack(ctx, project, prefix, batchNumber, expirationTime, infos) + if err != nil { + return err + } + + infos = infos[:0] + batchNumber++ + } + } + + // upload rest of infos if any + if err := bfu.uploadPack(ctx, project, prefix, batchNumber, expirationTime, infos); err != nil { + return err + } + + // update LATEST file + upload, err := project.UploadObject(ctx, bfu.config.Bucket, LATEST, nil) + if err != nil { + return err + } + _, err = upload.Write([]byte(prefix)) + if err != nil { + return err + } + + return upload.Commit() +} + +// uploadPack uploads single zip pack with multiple bloom filters. +func (bfu *Upload) uploadPack(ctx context.Context, project *uplink.Project, prefix string, batchNumber int, expirationTime time.Time, infos []internalpb.RetainInfo) (err error) { + defer mon.Task()(&ctx)(&err) + + if len(infos) == 0 { + return nil + } + + upload, err := project.UploadObject(ctx, bfu.config.Bucket, prefix+"/bloomfilters-"+strconv.Itoa(batchNumber)+".zip", &uplink.UploadOptions{ + Expires: expirationTime, + }) + if err != nil { + return err + } + + zipWriter := zip.NewWriter(upload) + defer func() { + err = errs.Combine(err, zipWriter.Close()) + if err != nil { + err = errs.Combine(err, upload.Abort()) + } else { + err = upload.Commit() + } + }() + + for _, info := range infos { + retainInfoBytes, err := pb.Marshal(&info) + if err != nil { + return err + } + + writer, err := zipWriter.Create(info.StorageNodeId.String()) + if err != nil { + return err + } + + write, err := writer.Write(retainInfoBytes) + if err != nil { + return err + } + if len(retainInfoBytes) != write { + return errs.New("not whole bloom filter was written") + } + } + + return nil +} + +// cleanup moves all objects from root location to unique prefix. Objects will be deleted +// automatically when expires. +func (bfu *Upload) cleanup(ctx context.Context, project *uplink.Project, prefix string) (err error) { + defer mon.Task()(&ctx)(&err) + + errPrefix := "upload-error-" + time.Now().Format(time.RFC3339) + o := uplink.ListObjectsOptions{ + Prefix: prefix + "/", + } + iterator := project.ListObjects(ctx, bfu.config.Bucket, &o) + + for iterator.Next() { + item := iterator.Item() + if item.IsPrefix { + continue + } + + err := project.MoveObject(ctx, bfu.config.Bucket, item.Key, bfu.config.Bucket, prefix+"/"+errPrefix+"/"+item.Key, nil) + if err != nil { + return err + } + } + + return iterator.Err() +}