381c1e1257
This helps to cleanup the cmd folder a bit. Change-Id: I24025c3dbfd35966325d7d5aaa95cd9a1176a8b7
244 lines
5.8 KiB
Go
244 lines
5.8 KiB
Go
// Copyright (C) 2021 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
|
|
"github.com/jackc/pgx/v4"
|
|
"github.com/spacemonkeygo/monkit/v3"
|
|
"github.com/spf13/cobra"
|
|
flag "github.com/spf13/pflag"
|
|
"github.com/zeebo/errs"
|
|
"go.uber.org/zap"
|
|
|
|
"storj.io/common/uuid"
|
|
"storj.io/private/dbutil/pgutil"
|
|
"storj.io/private/process"
|
|
"storj.io/storj/satellite/metabase"
|
|
)
|
|
|
|
var mon = monkit.Package()
|
|
|
|
var (
|
|
rootCmd = &cobra.Command{
|
|
Use: "metainfo-orphaned-segments",
|
|
Short: "metainfo-orphaned-segments",
|
|
}
|
|
|
|
reportCmd = &cobra.Command{
|
|
Use: "report",
|
|
Short: "report metainfo-orphaned-segments",
|
|
RunE: reportCommand,
|
|
}
|
|
|
|
deleteCmd = &cobra.Command{
|
|
Use: "delete",
|
|
Short: "delete metainfo-orphaned-segments",
|
|
RunE: deleteCommand,
|
|
}
|
|
|
|
config Config
|
|
)
|
|
|
|
func init() {
|
|
rootCmd.AddCommand(reportCmd)
|
|
rootCmd.AddCommand(deleteCmd)
|
|
|
|
config.BindFlags(reportCmd.Flags())
|
|
config.BindFlags(deleteCmd.Flags())
|
|
}
|
|
|
|
// Config defines configuration for cleanup.
|
|
type Config struct {
|
|
MetabaseDB string
|
|
LoopBatchSize int
|
|
DeleteBatchSize int
|
|
Cockroach bool
|
|
}
|
|
|
|
// BindFlags adds bench flags to the the flagset.
|
|
func (config *Config) BindFlags(flag *flag.FlagSet) {
|
|
flag.StringVar(&config.MetabaseDB, "metabasedb", "", "connection URL for MetabaseDB")
|
|
flag.IntVar(&config.LoopBatchSize, "loop-batch-size", 10000, "number of objects to process at once")
|
|
flag.IntVar(&config.DeleteBatchSize, "delete-batch-size", 100, "number of entries to delete with single query")
|
|
flag.BoolVar(&config.Cockroach, "cockroach", true, "metabase is on CRDB")
|
|
}
|
|
|
|
// VerifyFlags verifies whether the values provided are valid.
|
|
func (config *Config) VerifyFlags() error {
|
|
var errlist errs.Group
|
|
if config.MetabaseDB == "" {
|
|
errlist.Add(errors.New("flag '--metabasedb' is not set"))
|
|
}
|
|
return errlist.Err()
|
|
}
|
|
|
|
func reportCommand(cmd *cobra.Command, args []string) error {
|
|
if err := config.VerifyFlags(); err != nil {
|
|
return err
|
|
}
|
|
|
|
ctx, _ := process.Ctx(cmd)
|
|
log := zap.L()
|
|
|
|
return Report(ctx, log, config)
|
|
}
|
|
|
|
func deleteCommand(cmd *cobra.Command, args []string) error {
|
|
if err := config.VerifyFlags(); err != nil {
|
|
return err
|
|
}
|
|
|
|
ctx, _ := process.Ctx(cmd)
|
|
log := zap.L()
|
|
|
|
return Delete(ctx, log, config)
|
|
}
|
|
|
|
func main() {
|
|
process.Exec(rootCmd)
|
|
}
|
|
|
|
// Report finds and reports orphaned segments, nothing is deleted.
|
|
func Report(ctx context.Context, log *zap.Logger, config Config) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
ids, err := findOrphanedSegments(ctx, log, config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
log.Info("orphaned segments stream ids (number of existing segments can be bigger)",
|
|
zap.Int("count", len(ids)))
|
|
for _, id := range ids {
|
|
log.Info("StreamID", zap.String("id", id.String()))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Delete finds and deletes orphaned segments.
|
|
func Delete(ctx context.Context, log *zap.Logger, config Config) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
ids, err := findOrphanedSegments(ctx, log, config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
rawMetabaseDB, err := pgx.Connect(ctx, config.MetabaseDB)
|
|
if err != nil {
|
|
return errs.New("unable to connect %q: %w", config.MetabaseDB, err)
|
|
}
|
|
defer func() { err = errs.Combine(err, rawMetabaseDB.Close(ctx)) }()
|
|
|
|
log.Info("orphaned segments stream ids (number of existing segments can be bigger)", zap.Int("count", len(ids)))
|
|
log.Info("starting deletion")
|
|
|
|
for len(ids) > 0 {
|
|
batch := config.DeleteBatchSize
|
|
if batch > len(ids) {
|
|
batch = len(ids)
|
|
}
|
|
err = sendDelete(ctx, rawMetabaseDB, ids[:batch])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
ids = ids[batch:]
|
|
|
|
log.Info("deleted", zap.Int("count", batch))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func findOrphanedSegments(ctx context.Context, log *zap.Logger, config Config) (_ []uuid.UUID, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), config.MetabaseDB, metabase.Config{ApplicationName: "metabase-orphaned-segments"})
|
|
if err != nil {
|
|
return nil, errs.New("unable to connect %q: %w", config.MetabaseDB, err)
|
|
}
|
|
defer func() { err = errs.Combine(err, metabaseDB.Close()) }()
|
|
|
|
startingTime, err := metabaseDB.Now(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
streamIDs := make(map[uuid.UUID]struct{})
|
|
numberOfElements := 0
|
|
err = metabaseDB.IterateLoopSegments(ctx, metabase.IterateLoopSegments{
|
|
BatchSize: config.LoopBatchSize,
|
|
AsOfSystemTime: startingTime,
|
|
}, func(ctx context.Context, it metabase.LoopSegmentsIterator) error {
|
|
var entry metabase.LoopSegmentEntry
|
|
for it.Next(ctx, &entry) {
|
|
// avoid segments created after starting processing
|
|
if entry.CreatedAt.After(startingTime) {
|
|
continue
|
|
}
|
|
|
|
streamIDs[entry.StreamID] = struct{}{}
|
|
|
|
if numberOfElements%100000 == 0 {
|
|
log.Info("segments iterated", zap.Int("segments", numberOfElements))
|
|
}
|
|
|
|
numberOfElements++
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
numberOfElements = 0
|
|
err = metabaseDB.IterateLoopObjects(ctx, metabase.IterateLoopObjects{
|
|
BatchSize: config.LoopBatchSize,
|
|
AsOfSystemTime: startingTime,
|
|
}, func(ctx context.Context, it metabase.LoopObjectsIterator) error {
|
|
var entry metabase.LoopObjectEntry
|
|
for it.Next(ctx, &entry) {
|
|
// avoid objects created after starting processing
|
|
if entry.CreatedAt.After(startingTime) {
|
|
continue
|
|
}
|
|
|
|
delete(streamIDs, entry.StreamID)
|
|
|
|
if numberOfElements%100000 == 0 {
|
|
log.Info("objects iterated", zap.Int("objects", numberOfElements))
|
|
}
|
|
|
|
numberOfElements++
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ids := make([]uuid.UUID, 0, len(streamIDs))
|
|
for id := range streamIDs {
|
|
ids = append(ids, id)
|
|
}
|
|
|
|
return ids, nil
|
|
}
|
|
|
|
func sendDelete(ctx context.Context, conn *pgx.Conn, ids []uuid.UUID) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
if len(ids) == 0 {
|
|
return nil
|
|
}
|
|
|
|
_, err = conn.Exec(ctx, "DELETE FROM segments WHERE stream_id = ANY($1)", pgutil.UUIDArray(ids))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|