a93e47514a
This is part of metaloop refactoring. We plan to remove irreparable at some point but there was not time for it. Now instead refatoring it for segmentloop its just easier to drop it. Later we still need to drop table with migration step. Change-Id: I270e77f119273d39a1ecdcf5e1c37a5662a29ab4
105 lines
2.7 KiB
Go
105 lines
2.7 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package main
|
|
|
|
import (
|
|
"github.com/spf13/cobra"
|
|
"github.com/zeebo/errs"
|
|
"go.uber.org/zap"
|
|
|
|
"storj.io/common/context2"
|
|
"storj.io/common/errs2"
|
|
"storj.io/private/process"
|
|
"storj.io/private/version"
|
|
"storj.io/storj/private/revocation"
|
|
"storj.io/storj/satellite"
|
|
"storj.io/storj/satellite/metabase"
|
|
"storj.io/storj/satellite/orders"
|
|
"storj.io/storj/satellite/satellitedb"
|
|
)
|
|
|
|
func cmdRepairerRun(cmd *cobra.Command, args []string) (err error) {
|
|
ctx, _ := process.Ctx(cmd)
|
|
log := zap.L()
|
|
|
|
runCfg.Debug.Address = *process.DebugAddrFlag
|
|
|
|
identity, err := runCfg.Identity.Load()
|
|
if err != nil {
|
|
log.Error("Failed to load identity.", zap.Error(err))
|
|
return errs.New("Failed to load identity: %+v", err)
|
|
}
|
|
|
|
db, err := satellitedb.Open(ctx, log.Named("db"), runCfg.Database, satellitedb.Options{ApplicationName: "satellite-repairer"})
|
|
if err != nil {
|
|
return errs.New("Error starting master database: %+v", err)
|
|
}
|
|
defer func() {
|
|
err = errs.Combine(err, db.Close())
|
|
}()
|
|
|
|
metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL)
|
|
if err != nil {
|
|
return errs.New("Error creating metabase connection: %+v", err)
|
|
}
|
|
defer func() {
|
|
err = errs.Combine(err, metabaseDB.Close())
|
|
}()
|
|
|
|
revocationDB, err := revocation.OpenDBFromCfg(ctx, runCfg.Server.Config)
|
|
if err != nil {
|
|
return errs.New("Error creating revocation database: %+v", err)
|
|
}
|
|
defer func() {
|
|
err = errs.Combine(err, revocationDB.Close())
|
|
}()
|
|
|
|
rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), runCfg.Orders.FlushBatchSize)
|
|
defer func() {
|
|
err = errs.Combine(err, rollupsWriteCache.CloseAndFlush(context2.WithoutCancellation(ctx)))
|
|
}()
|
|
|
|
peer, err := satellite.NewRepairer(
|
|
log,
|
|
identity,
|
|
metabaseDB,
|
|
revocationDB,
|
|
db.RepairQueue(),
|
|
db.Buckets(),
|
|
db.OverlayCache(),
|
|
rollupsWriteCache,
|
|
version.Build,
|
|
&runCfg.Config,
|
|
process.AtomicLevel(cmd),
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = peer.Version.Service.CheckVersion(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := process.InitMetricsWithHostname(ctx, log, nil); err != nil {
|
|
log.Warn("Failed to initialize telemetry batcher on repairer", zap.Error(err))
|
|
}
|
|
|
|
err = metabaseDB.CheckVersion(ctx)
|
|
if err != nil {
|
|
log.Error("Failed metabase database version check.", zap.Error(err))
|
|
return errs.New("failed metabase version check: %+v", err)
|
|
}
|
|
|
|
err = db.CheckVersion(ctx)
|
|
if err != nil {
|
|
log.Error("Failed satellite database version check.", zap.Error(err))
|
|
return errs.New("Error checking version for satellitedb: %+v", err)
|
|
}
|
|
|
|
runError := peer.Run(ctx)
|
|
closeError := peer.Close()
|
|
return errs2.IgnoreCanceled(errs.Combine(runError, closeError))
|
|
}
|