3146ad7f2e
Previously we were exposing the testing facilities via interface casting the necessary parts, however, when things are not part of the main satellite.DB interface they need to be manually propagated. Rather than relying on using hidden methods lets expose things as long as they don't create a direct dependency to the database driver. Change-Id: I2eb7d8b60f4b64de1320c2d32581f7be267c0f57
160 lines
4.7 KiB
Go
160 lines
4.7 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package main
|
|
|
|
import (
|
|
"strings"
|
|
|
|
"github.com/spf13/cobra"
|
|
"github.com/zeebo/errs"
|
|
"go.uber.org/zap"
|
|
|
|
"storj.io/common/context2"
|
|
"storj.io/private/process"
|
|
"storj.io/private/version"
|
|
"storj.io/storj/private/revocation"
|
|
"storj.io/storj/satellite"
|
|
"storj.io/storj/satellite/accounting"
|
|
"storj.io/storj/satellite/accounting/live"
|
|
"storj.io/storj/satellite/metabase"
|
|
"storj.io/storj/satellite/orders"
|
|
"storj.io/storj/satellite/satellitedb"
|
|
)
|
|
|
|
func cmdAPIRun(cmd *cobra.Command, args []string) (err error) {
|
|
ctx, _ := process.Ctx(cmd)
|
|
log := zap.L()
|
|
|
|
runCfg.Debug.Address = *process.DebugAddrFlag
|
|
|
|
identity, err := runCfg.Identity.Load()
|
|
if err != nil {
|
|
log.Error("Failed to load identity.", zap.Error(err))
|
|
return errs.New("Failed to load identity: %+v", err)
|
|
}
|
|
|
|
db, err := satellitedb.Open(ctx, log.Named("db"), runCfg.Database, satellitedb.Options{
|
|
ApplicationName: "satellite-api",
|
|
APIKeysLRUOptions: runCfg.APIKeysLRUOptions(),
|
|
RevocationLRUOptions: runCfg.RevocationLRUOptions(),
|
|
})
|
|
if err != nil {
|
|
return errs.New("Error starting master database on satellite api: %+v", err)
|
|
}
|
|
defer func() {
|
|
err = errs.Combine(err, db.Close())
|
|
}()
|
|
|
|
for _, migration := range strings.Split(runCfg.DatabaseOptions.MigrationUnsafe, ",") {
|
|
switch migration {
|
|
case fullMigration:
|
|
err = db.MigrateToLatest(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
case snapshotMigration:
|
|
log.Info("MigrationUnsafe using latest snapshot. It's not for production", zap.String("db", "master"))
|
|
err = db.Testing().TestMigrateToLatest(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
case testDataCreation:
|
|
err := createTestData(ctx, db)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
case noMigration:
|
|
// noop
|
|
default:
|
|
return errs.New("unsupported migration type: %s, please try one of the: %s", migration, strings.Join(migrationTypes, ","))
|
|
}
|
|
}
|
|
|
|
metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), runCfg.Config.Metainfo.DatabaseURL,
|
|
runCfg.Config.Metainfo.Metabase("satellite-api"))
|
|
if err != nil {
|
|
return errs.New("Error creating metabase connection on satellite api: %+v", err)
|
|
}
|
|
defer func() {
|
|
err = errs.Combine(err, metabaseDB.Close())
|
|
}()
|
|
|
|
for _, migration := range strings.Split(runCfg.DatabaseOptions.MigrationUnsafe, ",") {
|
|
switch migration {
|
|
case fullMigration:
|
|
err = metabaseDB.MigrateToLatest(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
case snapshotMigration:
|
|
log.Info("MigrationUnsafe using latest snapshot. It's not for production", zap.String("db", "master"))
|
|
err = metabaseDB.TestMigrateToLatest(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
case noMigration, testDataCreation:
|
|
// noop
|
|
default:
|
|
return errs.New("unsupported migration type: %s, please try one of the: %s", migration, strings.Join(migrationTypes, ","))
|
|
}
|
|
}
|
|
|
|
revocationDB, err := revocation.OpenDBFromCfg(ctx, runCfg.Config.Server.Config)
|
|
if err != nil {
|
|
return errs.New("Error creating revocation database on satellite api: %+v", err)
|
|
}
|
|
defer func() {
|
|
err = errs.Combine(err, revocationDB.Close())
|
|
}()
|
|
|
|
accountingCache, err := live.OpenCache(ctx, log.Named("live-accounting"), runCfg.LiveAccounting)
|
|
if err != nil {
|
|
if !accounting.ErrSystemOrNetError.Has(err) || accountingCache == nil {
|
|
return errs.New("Error instantiating live accounting cache: %w", err)
|
|
}
|
|
|
|
log.Warn("Unable to connect to live accounting cache. Verify connection",
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
defer func() {
|
|
err = errs.Combine(err, accountingCache.Close())
|
|
}()
|
|
|
|
rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), runCfg.Orders.FlushBatchSize)
|
|
defer func() {
|
|
err = errs.Combine(err, rollupsWriteCache.CloseAndFlush(context2.WithoutCancellation(ctx)))
|
|
}()
|
|
|
|
peer, err := satellite.NewAPI(log, identity, db, metabaseDB, revocationDB, accountingCache, rollupsWriteCache, &runCfg.Config, version.Build, process.AtomicLevel(cmd))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = peer.Version.Service.CheckVersion(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := process.InitMetricsWithHostname(ctx, log, nil); err != nil {
|
|
log.Warn("Failed to initialize telemetry batcher on satellite api", zap.Error(err))
|
|
}
|
|
|
|
err = metabaseDB.CheckVersion(ctx)
|
|
if err != nil {
|
|
log.Error("Failed metabase database version check.", zap.Error(err))
|
|
return errs.New("failed metabase version check: %+v", err)
|
|
}
|
|
|
|
err = db.CheckVersion(ctx)
|
|
if err != nil {
|
|
log.Error("Failed satellite database version check.", zap.Error(err))
|
|
return errs.New("Error checking version for satellitedb: %+v", err)
|
|
}
|
|
|
|
runError := peer.Run(ctx)
|
|
closeError := peer.Close()
|
|
return errs.Combine(runError, closeError)
|
|
}
|