all: add pprof labels for debugger

By using pprof.Labels debugger is able to show service/peer names in
goroutine names.

Change-Id: I5f55253470f7cc7e556f8e8b87f746394e41675f
This commit is contained in:
Egon Elbre 2020-10-29 13:58:36 +02:00
parent 624255e8ba
commit e0dca4042d
14 changed files with 644 additions and 575 deletions

View File

@ -7,6 +7,7 @@ package lifecycle
import (
"context"
"errors"
"runtime/pprof"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
@ -54,7 +55,10 @@ func (group *Group) Run(ctx context.Context, g *errgroup.Group) {
continue
}
g.Go(func() error {
err := item.Run(ctx)
var err error
pprof.Do(ctx, pprof.Labels("name", item.Name), func(ctx context.Context) {
err = item.Run(ctx)
})
if errors.Is(ctx.Err(), context.Canceled) {
err = errs2.IgnoreCanceled(err)
}

View File

@ -12,6 +12,7 @@ import (
"database/sql"
"database/sql/driver"
"errors"
"runtime/pprof"
"time"
"github.com/zeebo/errs"
@ -22,15 +23,21 @@ import (
// Open opens *sql.DB and wraps the implementation with tagging.
func Open(ctx context.Context, driverName, dataSourceName string) (DB, error) {
db, err := sql.Open(driverName, dataSourceName)
var sdb *sql.DB
var err error
pprof.Do(ctx, pprof.Labels("db", driverName), func(ctx context.Context) {
sdb, err = sql.Open(driverName, dataSourceName)
})
if err != nil {
return nil, err
}
err = db.Ping()
err = sdb.PingContext(ctx)
if err != nil {
return nil, err
}
return Wrap(db), nil
return Wrap(sdb), nil
}
// Wrap turns a *sql.DB into a DB-matching interface.

View File

@ -12,6 +12,7 @@ import (
"net"
"os"
"path/filepath"
"runtime/pprof"
"sync"
"time"
@ -34,6 +35,8 @@ const defaultInterval = 15 * time.Second
// Peer represents one of StorageNode or Satellite.
type Peer interface {
Label() string
ID() storj.NodeID
Addr() string
URL() string
@ -121,7 +124,7 @@ func (peer *closablePeer) Close() error {
}
// NewCustom creates a new full system with the specified configuration.
func NewCustom(log *zap.Logger, config Config, satelliteDatabases satellitedbtest.SatelliteDatabases) (*Planet, error) {
func NewCustom(ctx context.Context, log *zap.Logger, config Config, satelliteDatabases satellitedbtest.SatelliteDatabases) (*Planet, error) {
if config.IdentityVersion == nil {
version := storj.LatestIDVersion()
config.IdentityVersion = &version
@ -161,7 +164,7 @@ func NewCustom(log *zap.Logger, config Config, satelliteDatabases satellitedbtes
return nil, errs.Combine(err, planet.Shutdown())
}
planet.Satellites, err = planet.newSatellites(config.SatelliteCount, satelliteDatabases)
planet.Satellites, err = planet.newSatellites(ctx, config.SatelliteCount, satelliteDatabases)
if err != nil {
return nil, errs.Combine(err, planet.Shutdown())
}
@ -171,12 +174,12 @@ func NewCustom(log *zap.Logger, config Config, satelliteDatabases satellitedbtes
whitelistedSatellites = append(whitelistedSatellites, satellite.NodeURL())
}
planet.StorageNodes, err = planet.newStorageNodes(config.StorageNodeCount, whitelistedSatellites)
planet.StorageNodes, err = planet.newStorageNodes(ctx, config.StorageNodeCount, whitelistedSatellites)
if err != nil {
return nil, errs.Combine(err, planet.Shutdown())
}
planet.Uplinks, err = planet.newUplinks("uplink", config.UplinkCount)
planet.Uplinks, err = planet.newUplinks(ctx, "uplink", config.UplinkCount)
if err != nil {
return nil, errs.Combine(err, planet.Shutdown())
}
@ -189,34 +192,42 @@ func (planet *Planet) Start(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
planet.cancel = cancel
planet.run.Go(func() error {
return planet.VersionControl.Run(ctx)
pprof.Do(ctx, pprof.Labels("peer", "version-control"), func(ctx context.Context) {
planet.run.Go(func() error {
return planet.VersionControl.Run(ctx)
})
})
if planet.ReferralManager != nil {
planet.run.Go(func() error {
return planet.ReferralManager.Run(ctx)
pprof.Do(ctx, pprof.Labels("peer", "referral-manager"), func(ctx context.Context) {
planet.run.Go(func() error {
return planet.ReferralManager.Run(ctx)
})
})
}
for i := range planet.peers {
peer := &planet.peers[i]
peer.ctx, peer.cancel = context.WithCancel(ctx)
planet.run.Go(func() error {
defer close(peer.runFinished)
pprof.Do(peer.ctx, pprof.Labels("peer", peer.peer.Label()), func(ctx context.Context) {
planet.run.Go(func() error {
defer close(peer.runFinished)
err := peer.peer.Run(peer.ctx)
return err
err := peer.peer.Run(ctx)
return err
})
})
}
var group errgroup.Group
for _, peer := range planet.StorageNodes {
peer := peer
group.Go(func() error {
peer.Storage2.Monitor.Loop.TriggerWait()
peer.Contact.Chore.TriggerWait(ctx)
return nil
pprof.Do(ctx, pprof.Labels("peer", peer.Label(), "startup", "contact"), func(ctx context.Context) {
group.Go(func() error {
peer.Storage2.Monitor.Loop.TriggerWait()
peer.Contact.Chore.TriggerWait(ctx)
return nil
})
})
}
_ = group.Wait()

View File

@ -5,6 +5,7 @@ package testplanet
import (
"context"
"runtime/pprof"
"strings"
"testing"
@ -52,17 +53,19 @@ func Run(t *testing.T, config Config, test func(t *testing.T, ctx *testcontext.C
planetConfig.Name = t.Name()
}
planet, err := NewCustom(zaptest.NewLogger(t), planetConfig, satelliteDB)
if err != nil {
t.Fatalf("%+v", err)
}
defer ctx.Check(planet.Shutdown)
pprof.Do(ctx, pprof.Labels("planet", planetConfig.Name), func(namedctx context.Context) {
planet, err := NewCustom(namedctx, zaptest.NewLogger(t), planetConfig, satelliteDB)
if err != nil {
t.Fatalf("%+v", err)
}
defer ctx.Check(planet.Shutdown)
planet.Start(ctx)
planet.Start(namedctx)
provisionUplinks(ctx, t, planet)
provisionUplinks(namedctx, t, planet)
test(t, ctx, planet)
test(t, ctx, planet)
})
})
}
}

View File

@ -8,6 +8,7 @@ import (
"net"
"os"
"path/filepath"
"runtime/pprof"
"strconv"
"time"
@ -68,6 +69,7 @@ import (
// Satellite contains all the processes needed to run a full Satellite setup.
type Satellite struct {
Name string
Config satellite.Config
Core *satellite.Core
@ -191,6 +193,9 @@ type Satellite struct {
}
}
// Label returns name for debugger.
func (system *Satellite) Label() string { return system.Name }
// ID returns the ID of the Satellite system.
func (system *Satellite) ID() storj.NodeID { return system.API.Identity.ID }
@ -314,374 +319,383 @@ func (system *Satellite) Run(ctx context.Context) (err error) {
func (system *Satellite) PrivateAddr() string { return system.API.Server.PrivateAddr().String() }
// newSatellites initializes satellites.
func (planet *Planet) newSatellites(count int, satelliteDatabases satellitedbtest.SatelliteDatabases) ([]*Satellite, error) {
var xs []*Satellite
defer func() {
for _, x := range xs {
planet.peers = append(planet.peers, newClosablePeer(x))
}
}()
func (planet *Planet) newSatellites(ctx context.Context, count int, databases satellitedbtest.SatelliteDatabases) ([]*Satellite, error) {
var satellites []*Satellite
for i := 0; i < count; i++ {
ctx := context.TODO()
prefix := "satellite" + strconv.Itoa(i)
index := i
prefix := "satellite" + strconv.Itoa(index)
log := planet.log.Named(prefix)
storageDir := filepath.Join(planet.directory, prefix)
if err := os.MkdirAll(storageDir, 0700); err != nil {
return nil, err
}
var system *Satellite
var err error
identity, err := planet.NewIdentity()
pprof.Do(ctx, pprof.Labels("peer", prefix), func(ctx context.Context) {
system, err = planet.newSatellite(ctx, prefix, index, log, databases)
})
if err != nil {
return nil, err
}
db, err := satellitedbtest.CreateMasterDB(ctx, log.Named("db"), planet.config.Name, "S", i, satelliteDatabases.MasterDB)
if err != nil {
return nil, err
}
if planet.config.Reconfigure.SatelliteDB != nil {
var newdb satellite.DB
newdb, err = planet.config.Reconfigure.SatelliteDB(log.Named("db"), i, db)
if err != nil {
return nil, errs.Combine(err, db.Close())
}
db = newdb
}
planet.databases = append(planet.databases, db)
pointerDB, err := satellitedbtest.CreatePointerDB(ctx, log.Named("pointerdb"), planet.config.Name, "P", i, satelliteDatabases.PointerDB)
if err != nil {
return nil, err
}
if planet.config.Reconfigure.SatellitePointerDB != nil {
var newPointerDB metainfo.PointerDB
newPointerDB, err = planet.config.Reconfigure.SatellitePointerDB(log.Named("pointerdb"), i, pointerDB)
if err != nil {
return nil, errs.Combine(err, pointerDB.Close())
}
pointerDB = newPointerDB
}
planet.databases = append(planet.databases, pointerDB)
redis, err := redisserver.Mini()
if err != nil {
return nil, err
}
planet.databases = append(planet.databases, redis)
config := satellite.Config{
Server: server.Config{
Address: "127.0.0.1:0",
PrivateAddress: "127.0.0.1:0",
Config: tlsopts.Config{
RevocationDBURL: "bolt://" + filepath.Join(storageDir, "revocation.db"),
UsePeerCAWhitelist: true,
PeerCAWhitelistPath: planet.whitelistPath,
PeerIDVersions: "latest",
Extensions: extensions.Config{
Revocation: false,
WhitelistSignedLeaf: false,
},
},
},
Debug: debug.Config{
Address: "",
},
Admin: admin.Config{
Address: "127.0.0.1:0",
},
Contact: contact.Config{
Timeout: 1 * time.Minute,
},
Overlay: overlay.Config{
Node: overlay.NodeSelectionConfig{
UptimeCount: 0,
AuditCount: 0,
NewNodeFraction: 1,
OnlineWindow: time.Minute,
DistinctIP: false,
MinimumDiskSpace: 100 * memory.MB,
AuditReputationRepairWeight: 1,
AuditReputationUplinkWeight: 1,
AuditReputationLambda: 0.95,
AuditReputationWeight: 1,
AuditReputationDQ: 0.6,
SuspensionGracePeriod: time.Hour,
SuspensionDQEnabled: true,
},
NodeSelectionCache: overlay.CacheConfig{
Staleness: 3 * time.Minute,
},
UpdateStatsBatchSize: 100,
AuditHistory: overlay.AuditHistoryConfig{
WindowSize: 10 * time.Minute,
TrackingPeriod: time.Hour,
GracePeriod: time.Hour,
OfflineThreshold: 0.6,
},
},
Metainfo: metainfo.Config{
DatabaseURL: "", // not used
MinRemoteSegmentSize: 0, // TODO: fix tests to work with 1024
MaxInlineSegmentSize: 4 * memory.KiB,
MaxSegmentSize: 64 * memory.MiB,
MaxMetadataSize: 2 * memory.KiB,
MaxCommitInterval: 1 * time.Hour,
Overlay: true,
RS: metainfo.RSConfig{
MaxBufferMem: memory.Size(256),
ErasureShareSize: memory.Size(256),
MinThreshold: atLeastOne(planet.config.StorageNodeCount * 1 / 5),
RepairThreshold: atLeastOne(planet.config.StorageNodeCount * 2 / 5),
SuccessThreshold: atLeastOne(planet.config.StorageNodeCount * 3 / 5),
TotalThreshold: atLeastOne(planet.config.StorageNodeCount * 4 / 5),
MinTotalThreshold: (planet.config.StorageNodeCount * 4 / 5),
MaxTotalThreshold: (planet.config.StorageNodeCount * 4 / 5),
Validate: false,
},
Loop: metainfo.LoopConfig{
CoalesceDuration: 1 * time.Second,
ListLimit: 10000,
},
RateLimiter: metainfo.RateLimiterConfig{
Enabled: true,
Rate: 1000,
CacheCapacity: 100,
CacheExpiration: 10 * time.Second,
},
ProjectLimits: metainfo.ProjectLimitConfig{
MaxBuckets: 10,
DefaultMaxUsage: 25 * memory.GB,
DefaultMaxBandwidth: 25 * memory.GB,
},
PieceDeletion: piecedeletion.Config{
MaxConcurrency: 100,
MaxConcurrentPieces: 1000,
MaxPiecesPerBatch: 4000,
MaxPiecesPerRequest: 2000,
DialTimeout: 2 * time.Second,
RequestTimeout: 2 * time.Second,
FailThreshold: 2 * time.Second,
},
ObjectDeletion: objectdeletion.Config{
MaxObjectsPerRequest: 100,
ZombieSegmentsPerRequest: 3,
MaxConcurrentRequests: 100,
},
},
Orders: orders.Config{
Expiration: 7 * 24 * time.Hour,
SettlementBatchSize: 10,
FlushBatchSize: 10,
FlushInterval: defaultInterval,
NodeStatusLogging: true,
WindowEndpointRolloutPhase: orders.WindowEndpointRolloutPhase3,
},
Checker: checker.Config{
Interval: defaultInterval,
IrreparableInterval: defaultInterval,
ReliabilityCacheStaleness: 1 * time.Minute,
},
Payments: paymentsconfig.Config{
StorageTBPrice: "10",
EgressTBPrice: "45",
ObjectPrice: "0.0000022",
StripeCoinPayments: stripecoinpayments.Config{
TransactionUpdateInterval: defaultInterval,
AccountBalanceUpdateInterval: defaultInterval,
ConversionRatesCycleInterval: defaultInterval,
ListingLimit: 100,
},
CouponDuration: 2,
CouponValue: 275,
PaywallProportion: 1,
},
Repairer: repairer.Config{
MaxRepair: 10,
Interval: defaultInterval,
Timeout: 1 * time.Minute, // Repairs can take up to 10 seconds. Leaving room for outliers
DownloadTimeout: 1 * time.Minute,
TotalTimeout: 10 * time.Minute,
MaxBufferMem: 4 * memory.MiB,
MaxExcessRateOptimalThreshold: 0.05,
InMemoryRepair: false,
},
Audit: audit.Config{
MaxRetriesStatDB: 0,
MinBytesPerSecond: 1 * memory.KB,
MinDownloadTimeout: 5 * time.Second,
MaxReverifyCount: 3,
ChoreInterval: defaultInterval,
QueueInterval: defaultInterval,
Slots: 3,
WorkerConcurrency: 2,
},
GarbageCollection: gc.Config{
Interval: defaultInterval,
Enabled: true,
InitialPieces: 10,
FalsePositiveRate: 0.1,
ConcurrentSends: 1,
RunInCore: false,
},
ExpiredDeletion: expireddeletion.Config{
Interval: defaultInterval,
Enabled: true,
},
DBCleanup: dbcleanup.Config{
SerialsInterval: defaultInterval,
},
Tally: tally.Config{
Interval: defaultInterval,
},
Rollup: rollup.Config{
Interval: defaultInterval,
DeleteTallies: false,
},
ReportedRollup: reportedrollup.Config{
Interval: defaultInterval,
},
ProjectBWCleanup: projectbwcleanup.Config{
Interval: defaultInterval,
RetainMonths: 1,
},
LiveAccounting: live.Config{
StorageBackend: "redis://" + redis.Addr() + "?db=0",
BandwidthCacheTTL: 5 * time.Minute,
},
Mail: mailservice.Config{
SMTPServerAddress: "smtp.mail.test:587",
From: "Labs <storj@mail.test>",
AuthType: "simulate",
TemplatePath: filepath.Join(developmentRoot, "web/satellite/static/emails"),
},
Console: consoleweb.Config{
Address: "127.0.0.1:0",
StaticDir: filepath.Join(developmentRoot, "web/satellite"),
AuthToken: "very-secret-token",
AuthTokenSecret: "my-suppa-secret-key",
Config: console.Config{
PasswordCost: console.TestPasswordCost,
DefaultProjectLimit: 5,
},
RateLimit: web.IPRateLimiterConfig{
Duration: 5 * time.Minute,
Burst: 3,
NumLimits: 10,
},
},
Marketing: marketingweb.Config{
Address: "127.0.0.1:0",
StaticDir: filepath.Join(developmentRoot, "web/marketing"),
},
Version: planet.NewVersionConfig(),
GracefulExit: gracefulexit.Config{
Enabled: true,
ChoreBatchSize: 10,
ChoreInterval: defaultInterval,
EndpointBatchSize: 100,
MaxFailuresPerPiece: 5,
MaxInactiveTimeFrame: time.Second * 10,
OverallMaxFailuresPercentage: 10,
RecvTimeout: time.Minute * 1,
MaxOrderLimitSendCount: 3,
NodeMinAgeInMonths: 0,
},
Metrics: metrics.Config{
ChoreInterval: defaultInterval,
},
Downtime: downtime.Config{
DetectionInterval: defaultInterval,
EstimationInterval: defaultInterval,
EstimationBatchSize: 5,
EstimationConcurrencyLimit: 5,
},
}
if planet.ReferralManager != nil {
config.Referrals.ReferralManagerURL = storj.NodeURL{
ID: planet.ReferralManager.Identity().ID,
Address: planet.ReferralManager.Addr().String(),
}
}
if planet.config.Reconfigure.Satellite != nil {
planet.config.Reconfigure.Satellite(log, i, &config)
}
versionInfo := planet.NewVersionInfo()
revocationDB, err := revocation.OpenDBFromCfg(ctx, config.Server.Config)
if err != nil {
return xs, errs.Wrap(err)
}
planet.databases = append(planet.databases, revocationDB)
liveAccounting, err := live.NewCache(log.Named("live-accounting"), config.LiveAccounting)
if err != nil {
return xs, errs.Wrap(err)
}
planet.databases = append(planet.databases, liveAccounting)
rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), config.Orders.FlushBatchSize)
planet.databases = append(planet.databases, rollupsWriteCacheCloser{rollupsWriteCache})
peer, err := satellite.New(log, identity, db, pointerDB, revocationDB, liveAccounting, rollupsWriteCache, versionInfo, &config, nil)
if err != nil {
return xs, err
}
err = db.TestingMigrateToLatest(ctx)
if err != nil {
return nil, err
}
api, err := planet.newAPI(ctx, i, identity, db, pointerDB, config, versionInfo)
if err != nil {
return xs, err
}
adminPeer, err := planet.newAdmin(ctx, i, identity, db, config, versionInfo)
if err != nil {
return xs, err
}
repairerPeer, err := planet.newRepairer(ctx, i, identity, db, pointerDB, config, versionInfo)
if err != nil {
return xs, err
}
gcPeer, err := planet.newGarbageCollection(ctx, i, identity, db, pointerDB, config, versionInfo)
if err != nil {
return xs, err
}
log.Debug("id=" + peer.ID().String() + " addr=" + api.Addr())
system := createNewSystem(log, config, peer, api, repairerPeer, adminPeer, gcPeer)
xs = append(xs, system)
log.Debug("id=" + system.ID().String() + " addr=" + system.Addr())
satellites = append(satellites, system)
planet.peers = append(planet.peers, newClosablePeer(system))
}
return xs, nil
return satellites, nil
}
func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int, log *zap.Logger, databases satellitedbtest.SatelliteDatabases) (*Satellite, error) {
storageDir := filepath.Join(planet.directory, prefix)
if err := os.MkdirAll(storageDir, 0700); err != nil {
return nil, err
}
identity, err := planet.NewIdentity()
if err != nil {
return nil, err
}
db, err := satellitedbtest.CreateMasterDB(ctx, log.Named("db"), planet.config.Name, "S", index, databases.MasterDB)
if err != nil {
return nil, err
}
if planet.config.Reconfigure.SatelliteDB != nil {
var newdb satellite.DB
newdb, err = planet.config.Reconfigure.SatelliteDB(log.Named("db"), index, db)
if err != nil {
return nil, errs.Combine(err, db.Close())
}
db = newdb
}
planet.databases = append(planet.databases, db)
pointerDB, err := satellitedbtest.CreatePointerDB(ctx, log.Named("pointerdb"), planet.config.Name, "P", index, databases.PointerDB)
if err != nil {
return nil, err
}
if planet.config.Reconfigure.SatellitePointerDB != nil {
var newPointerDB metainfo.PointerDB
newPointerDB, err = planet.config.Reconfigure.SatellitePointerDB(log.Named("pointerdb"), index, pointerDB)
if err != nil {
return nil, errs.Combine(err, pointerDB.Close())
}
pointerDB = newPointerDB
}
planet.databases = append(planet.databases, pointerDB)
redis, err := redisserver.Mini(ctx)
if err != nil {
return nil, err
}
planet.databases = append(planet.databases, redis)
config := satellite.Config{
Server: server.Config{
Address: "127.0.0.1:0",
PrivateAddress: "127.0.0.1:0",
Config: tlsopts.Config{
RevocationDBURL: "bolt://" + filepath.Join(storageDir, "revocation.db"),
UsePeerCAWhitelist: true,
PeerCAWhitelistPath: planet.whitelistPath,
PeerIDVersions: "latest",
Extensions: extensions.Config{
Revocation: false,
WhitelistSignedLeaf: false,
},
},
},
Debug: debug.Config{
Address: "",
},
Admin: admin.Config{
Address: "127.0.0.1:0",
},
Contact: contact.Config{
Timeout: 1 * time.Minute,
},
Overlay: overlay.Config{
Node: overlay.NodeSelectionConfig{
UptimeCount: 0,
AuditCount: 0,
NewNodeFraction: 1,
OnlineWindow: time.Minute,
DistinctIP: false,
MinimumDiskSpace: 100 * memory.MB,
AuditReputationRepairWeight: 1,
AuditReputationUplinkWeight: 1,
AuditReputationLambda: 0.95,
AuditReputationWeight: 1,
AuditReputationDQ: 0.6,
SuspensionGracePeriod: time.Hour,
SuspensionDQEnabled: true,
},
NodeSelectionCache: overlay.CacheConfig{
Staleness: 3 * time.Minute,
},
UpdateStatsBatchSize: 100,
AuditHistory: overlay.AuditHistoryConfig{
WindowSize: 10 * time.Minute,
TrackingPeriod: time.Hour,
GracePeriod: time.Hour,
OfflineThreshold: 0.6,
},
},
Metainfo: metainfo.Config{
DatabaseURL: "", // not used
MinRemoteSegmentSize: 0, // TODO: fix tests to work with 1024
MaxInlineSegmentSize: 4 * memory.KiB,
MaxSegmentSize: 64 * memory.MiB,
MaxMetadataSize: 2 * memory.KiB,
MaxCommitInterval: 1 * time.Hour,
Overlay: true,
RS: metainfo.RSConfig{
MaxBufferMem: memory.Size(256),
ErasureShareSize: memory.Size(256),
MinThreshold: atLeastOne(planet.config.StorageNodeCount * 1 / 5),
RepairThreshold: atLeastOne(planet.config.StorageNodeCount * 2 / 5),
SuccessThreshold: atLeastOne(planet.config.StorageNodeCount * 3 / 5),
TotalThreshold: atLeastOne(planet.config.StorageNodeCount * 4 / 5),
MinTotalThreshold: (planet.config.StorageNodeCount * 4 / 5),
MaxTotalThreshold: (planet.config.StorageNodeCount * 4 / 5),
Validate: false,
},
Loop: metainfo.LoopConfig{
CoalesceDuration: 1 * time.Second,
ListLimit: 10000,
},
RateLimiter: metainfo.RateLimiterConfig{
Enabled: true,
Rate: 1000,
CacheCapacity: 100,
CacheExpiration: 10 * time.Second,
},
ProjectLimits: metainfo.ProjectLimitConfig{
MaxBuckets: 10,
DefaultMaxUsage: 25 * memory.GB,
DefaultMaxBandwidth: 25 * memory.GB,
},
PieceDeletion: piecedeletion.Config{
MaxConcurrency: 100,
MaxConcurrentPieces: 1000,
MaxPiecesPerBatch: 4000,
MaxPiecesPerRequest: 2000,
DialTimeout: 2 * time.Second,
RequestTimeout: 2 * time.Second,
FailThreshold: 2 * time.Second,
},
ObjectDeletion: objectdeletion.Config{
MaxObjectsPerRequest: 100,
ZombieSegmentsPerRequest: 3,
MaxConcurrentRequests: 100,
},
},
Orders: orders.Config{
Expiration: 7 * 24 * time.Hour,
SettlementBatchSize: 10,
FlushBatchSize: 10,
FlushInterval: defaultInterval,
NodeStatusLogging: true,
WindowEndpointRolloutPhase: orders.WindowEndpointRolloutPhase3,
},
Checker: checker.Config{
Interval: defaultInterval,
IrreparableInterval: defaultInterval,
ReliabilityCacheStaleness: 1 * time.Minute,
},
Payments: paymentsconfig.Config{
StorageTBPrice: "10",
EgressTBPrice: "45",
ObjectPrice: "0.0000022",
StripeCoinPayments: stripecoinpayments.Config{
TransactionUpdateInterval: defaultInterval,
AccountBalanceUpdateInterval: defaultInterval,
ConversionRatesCycleInterval: defaultInterval,
ListingLimit: 100,
},
CouponDuration: 2,
CouponValue: 275,
PaywallProportion: 1,
},
Repairer: repairer.Config{
MaxRepair: 10,
Interval: defaultInterval,
Timeout: 1 * time.Minute, // Repairs can take up to 10 seconds. Leaving room for outliers
DownloadTimeout: 1 * time.Minute,
TotalTimeout: 10 * time.Minute,
MaxBufferMem: 4 * memory.MiB,
MaxExcessRateOptimalThreshold: 0.05,
InMemoryRepair: false,
},
Audit: audit.Config{
MaxRetriesStatDB: 0,
MinBytesPerSecond: 1 * memory.KB,
MinDownloadTimeout: 5 * time.Second,
MaxReverifyCount: 3,
ChoreInterval: defaultInterval,
QueueInterval: defaultInterval,
Slots: 3,
WorkerConcurrency: 2,
},
GarbageCollection: gc.Config{
Interval: defaultInterval,
Enabled: true,
InitialPieces: 10,
FalsePositiveRate: 0.1,
ConcurrentSends: 1,
RunInCore: false,
},
ExpiredDeletion: expireddeletion.Config{
Interval: defaultInterval,
Enabled: true,
},
DBCleanup: dbcleanup.Config{
SerialsInterval: defaultInterval,
},
Tally: tally.Config{
Interval: defaultInterval,
},
Rollup: rollup.Config{
Interval: defaultInterval,
DeleteTallies: false,
},
ReportedRollup: reportedrollup.Config{
Interval: defaultInterval,
},
ProjectBWCleanup: projectbwcleanup.Config{
Interval: defaultInterval,
RetainMonths: 1,
},
LiveAccounting: live.Config{
StorageBackend: "redis://" + redis.Addr() + "?db=0",
BandwidthCacheTTL: 5 * time.Minute,
},
Mail: mailservice.Config{
SMTPServerAddress: "smtp.mail.test:587",
From: "Labs <storj@mail.test>",
AuthType: "simulate",
TemplatePath: filepath.Join(developmentRoot, "web/satellite/static/emails"),
},
Console: consoleweb.Config{
Address: "127.0.0.1:0",
StaticDir: filepath.Join(developmentRoot, "web/satellite"),
AuthToken: "very-secret-token",
AuthTokenSecret: "my-suppa-secret-key",
Config: console.Config{
PasswordCost: console.TestPasswordCost,
DefaultProjectLimit: 5,
},
RateLimit: web.IPRateLimiterConfig{
Duration: 5 * time.Minute,
Burst: 3,
NumLimits: 10,
},
},
Marketing: marketingweb.Config{
Address: "127.0.0.1:0",
StaticDir: filepath.Join(developmentRoot, "web/marketing"),
},
Version: planet.NewVersionConfig(),
GracefulExit: gracefulexit.Config{
Enabled: true,
ChoreBatchSize: 10,
ChoreInterval: defaultInterval,
EndpointBatchSize: 100,
MaxFailuresPerPiece: 5,
MaxInactiveTimeFrame: time.Second * 10,
OverallMaxFailuresPercentage: 10,
RecvTimeout: time.Minute * 1,
MaxOrderLimitSendCount: 3,
NodeMinAgeInMonths: 0,
},
Metrics: metrics.Config{
ChoreInterval: defaultInterval,
},
Downtime: downtime.Config{
DetectionInterval: defaultInterval,
EstimationInterval: defaultInterval,
EstimationBatchSize: 5,
EstimationConcurrencyLimit: 5,
},
}
if planet.ReferralManager != nil {
config.Referrals.ReferralManagerURL = storj.NodeURL{
ID: planet.ReferralManager.Identity().ID,
Address: planet.ReferralManager.Addr().String(),
}
}
if planet.config.Reconfigure.Satellite != nil {
planet.config.Reconfigure.Satellite(log, index, &config)
}
versionInfo := planet.NewVersionInfo()
revocationDB, err := revocation.OpenDBFromCfg(ctx, config.Server.Config)
if err != nil {
return nil, errs.Wrap(err)
}
planet.databases = append(planet.databases, revocationDB)
liveAccounting, err := live.NewCache(log.Named("live-accounting"), config.LiveAccounting)
if err != nil {
return nil, errs.Wrap(err)
}
planet.databases = append(planet.databases, liveAccounting)
rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), config.Orders.FlushBatchSize)
planet.databases = append(planet.databases, rollupsWriteCacheCloser{rollupsWriteCache})
peer, err := satellite.New(log, identity, db, pointerDB, revocationDB, liveAccounting, rollupsWriteCache, versionInfo, &config, nil)
if err != nil {
return nil, err
}
err = db.TestingMigrateToLatest(ctx)
if err != nil {
return nil, err
}
api, err := planet.newAPI(ctx, index, identity, db, pointerDB, config, versionInfo)
if err != nil {
return nil, err
}
adminPeer, err := planet.newAdmin(ctx, index, identity, db, config, versionInfo)
if err != nil {
return nil, err
}
repairerPeer, err := planet.newRepairer(ctx, index, identity, db, pointerDB, config, versionInfo)
if err != nil {
return nil, err
}
gcPeer, err := planet.newGarbageCollection(ctx, index, identity, db, pointerDB, config, versionInfo)
if err != nil {
return nil, err
}
return createNewSystem(prefix, log, config, peer, api, repairerPeer, adminPeer, gcPeer), nil
}
// createNewSystem makes a new Satellite System and exposes the same interface from
// before we split out the API. In the short term this will help keep all the tests passing
// without much modification needed. However long term, we probably want to rework this
// so it represents how the satellite will run when it is made up of many prrocesses.
func createNewSystem(log *zap.Logger, config satellite.Config, peer *satellite.Core, api *satellite.API, repairerPeer *satellite.Repairer, adminPeer *satellite.Admin, gcPeer *satellite.GarbageCollection) *Satellite {
func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer *satellite.Core, api *satellite.API, repairerPeer *satellite.Repairer, adminPeer *satellite.Admin, gcPeer *satellite.GarbageCollection) *Satellite {
system := &Satellite{
Name: name,
Config: config,
Core: peer,
API: api,
@ -755,8 +769,8 @@ func createNewSystem(log *zap.Logger, config satellite.Config, peer *satellite.C
return system
}
func (planet *Planet) newAPI(ctx context.Context, count int, identity *identity.FullIdentity, db satellite.DB, pointerDB metainfo.PointerDB, config satellite.Config, versionInfo version.Info) (*satellite.API, error) {
prefix := "satellite-api" + strconv.Itoa(count)
func (planet *Planet) newAPI(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, pointerDB metainfo.PointerDB, config satellite.Config, versionInfo version.Info) (*satellite.API, error) {
prefix := "satellite-api" + strconv.Itoa(index)
log := planet.log.Named(prefix)
var err error
@ -778,15 +792,15 @@ func (planet *Planet) newAPI(ctx context.Context, count int, identity *identity.
return satellite.NewAPI(log, identity, db, pointerDB, revocationDB, liveAccounting, rollupsWriteCache, &config, versionInfo, nil)
}
func (planet *Planet) newAdmin(ctx context.Context, count int, identity *identity.FullIdentity, db satellite.DB, config satellite.Config, versionInfo version.Info) (*satellite.Admin, error) {
prefix := "satellite-admin" + strconv.Itoa(count)
func (planet *Planet) newAdmin(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, config satellite.Config, versionInfo version.Info) (*satellite.Admin, error) {
prefix := "satellite-admin" + strconv.Itoa(index)
log := planet.log.Named(prefix)
return satellite.NewAdmin(log, identity, db, versionInfo, &config, nil)
}
func (planet *Planet) newRepairer(ctx context.Context, count int, identity *identity.FullIdentity, db satellite.DB, pointerDB metainfo.PointerDB, config satellite.Config, versionInfo version.Info) (*satellite.Repairer, error) {
prefix := "satellite-repairer" + strconv.Itoa(count)
func (planet *Planet) newRepairer(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, pointerDB metainfo.PointerDB, config satellite.Config, versionInfo version.Info) (*satellite.Repairer, error) {
prefix := "satellite-repairer" + strconv.Itoa(index)
log := planet.log.Named(prefix)
revocationDB, err := revocation.OpenDBFromCfg(ctx, config.Server.Config)
@ -809,8 +823,8 @@ func (cache rollupsWriteCacheCloser) Close() error {
return cache.RollupsWriteCache.CloseAndFlush(context.TODO())
}
func (planet *Planet) newGarbageCollection(ctx context.Context, count int, identity *identity.FullIdentity, db satellite.DB, pointerDB metainfo.PointerDB, config satellite.Config, versionInfo version.Info) (*satellite.GarbageCollection, error) {
prefix := "satellite-gc" + strconv.Itoa(count)
func (planet *Planet) newGarbageCollection(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, pointerDB metainfo.PointerDB, config satellite.Config, versionInfo version.Info) (*satellite.GarbageCollection, error) {
prefix := "satellite-gc" + strconv.Itoa(index)
log := planet.log.Named(prefix)
revocationDB, err := revocation.OpenDBFromCfg(ctx, config.Server.Config)

View File

@ -8,11 +8,13 @@ import (
"fmt"
"os"
"path/filepath"
"runtime/pprof"
"strconv"
"strings"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/memory"
"storj.io/common/peertls/extensions"
@ -41,10 +43,14 @@ import (
// StorageNode contains all the processes needed to run a full StorageNode setup.
type StorageNode struct {
Name string
Config storagenode.Config
*storagenode.Peer
}
// Label returns name for debugger.
func (system *StorageNode) Label() string { return system.Name }
// URL returns the node url as a string.
func (system *StorageNode) URL() string { return system.NodeURL().String() }
@ -54,14 +60,7 @@ func (system *StorageNode) NodeURL() storj.NodeURL {
}
// newStorageNodes initializes storage nodes.
func (planet *Planet) newStorageNodes(count int, whitelistedSatellites storj.NodeURLs) ([]*StorageNode, error) {
var xs []*StorageNode
defer func() {
for _, x := range xs {
planet.peers = append(planet.peers, newClosablePeer(x))
}
}()
func (planet *Planet) newStorageNodes(ctx context.Context, count int, whitelistedSatellites storj.NodeURLs) ([]*StorageNode, error) {
var sources []trust.Source
for _, u := range whitelistedSatellites {
source, err := trust.NewStaticURLSource(u.String())
@ -71,169 +70,183 @@ func (planet *Planet) newStorageNodes(count int, whitelistedSatellites storj.Nod
sources = append(sources, source)
}
var xs []*StorageNode
for i := 0; i < count; i++ {
ctx := context.TODO()
prefix := "storage" + strconv.Itoa(i)
index := i
prefix := "storage" + strconv.Itoa(index)
log := planet.log.Named(prefix)
storageDir := filepath.Join(planet.directory, prefix)
if err := os.MkdirAll(storageDir, 0700); err != nil {
return nil, err
}
identity, err := planet.NewIdentity()
if err != nil {
return nil, err
}
config := storagenode.Config{
Server: server.Config{
Address: "127.0.0.1:0",
PrivateAddress: "127.0.0.1:0",
Config: tlsopts.Config{
RevocationDBURL: "bolt://" + filepath.Join(storageDir, "revocation.db"),
UsePeerCAWhitelist: true,
PeerCAWhitelistPath: planet.whitelistPath,
PeerIDVersions: "*",
Extensions: extensions.Config{
Revocation: false,
WhitelistSignedLeaf: false,
},
},
},
Debug: debug.Config{
Address: "",
},
Preflight: preflight.Config{
LocalTimeCheck: false,
},
Operator: storagenode.OperatorConfig{
Email: prefix + "@mail.test",
Wallet: "0x" + strings.Repeat("00", 20),
},
Storage: piecestore.OldConfig{
Path: filepath.Join(storageDir, "pieces/"),
AllocatedDiskSpace: 1 * memory.GB,
KBucketRefreshInterval: defaultInterval,
},
Collector: collector.Config{
Interval: defaultInterval,
},
Nodestats: nodestats.Config{
MaxSleep: 0,
ReputationSync: defaultInterval,
StorageSync: defaultInterval,
},
Console: consoleserver.Config{
Address: "127.0.0.1:0",
StaticDir: filepath.Join(developmentRoot, "web/storagenode/"),
},
Storage2: piecestore.Config{
CacheSyncInterval: defaultInterval,
ExpirationGracePeriod: 0,
MaxConcurrentRequests: 100,
OrderLimitGracePeriod: time.Hour,
StreamOperationTimeout: time.Hour,
ReportCapacityThreshold: 100 * memory.MB,
DeleteQueueSize: 10000,
DeleteWorkers: 1,
Orders: orders.Config{
SenderInterval: defaultInterval,
SenderTimeout: 10 * time.Minute,
CleanupInterval: defaultInterval,
ArchiveTTL: time.Hour,
MaxSleep: 0,
Path: filepath.Join(storageDir, "orders"),
},
Monitor: monitor.Config{
MinimumDiskSpace: 100 * memory.MB,
NotifyLowDiskCooldown: defaultInterval,
VerifyDirReadableInterval: defaultInterval,
VerifyDirWritableInterval: defaultInterval,
},
Trust: trust.Config{
Sources: sources,
CachePath: filepath.Join(storageDir, "trust-cache.json"),
RefreshInterval: defaultInterval,
},
MaxUsedSerialsSize: memory.MiB,
},
Pieces: pieces.DefaultConfig,
Filestore: filestore.DefaultConfig,
Retain: retain.Config{
MaxTimeSkew: 10 * time.Second,
Status: retain.Enabled,
Concurrency: 5,
},
Version: planet.NewVersionConfig(),
Bandwidth: bandwidth.Config{
Interval: defaultInterval,
},
Contact: contact.Config{
Interval: defaultInterval,
},
GracefulExit: gracefulexit.Config{
ChoreInterval: defaultInterval,
NumWorkers: 3,
NumConcurrentTransfers: 1,
MinBytesPerSecond: 128 * memory.B,
MinDownloadTimeout: 2 * time.Minute,
},
}
if planet.config.Reconfigure.StorageNode != nil {
planet.config.Reconfigure.StorageNode(i, &config)
}
newIPCount := planet.config.Reconfigure.UniqueIPCount
if newIPCount > 0 {
if i >= count-newIPCount {
config.Server.Address = fmt.Sprintf("127.0.%d.1:0", i+1)
config.Server.PrivateAddress = fmt.Sprintf("127.0.%d.1:0", i+1)
}
}
verisonInfo := planet.NewVersionInfo()
var db storagenode.DB
db, err = storagenodedb.OpenNew(ctx, log.Named("db"), config.DatabaseConfig())
if err != nil {
return nil, err
}
if planet.config.Reconfigure.StorageNodeDB != nil {
db, err = planet.config.Reconfigure.StorageNodeDB(i, db, planet.log)
if err != nil {
return nil, err
}
}
revocationDB, err := revocation.OpenDBFromCfg(ctx, config.Server.Config)
if err != nil {
return xs, errs.Wrap(err)
}
planet.databases = append(planet.databases, revocationDB)
peer, err := storagenode.New(log, identity, db, revocationDB, config, verisonInfo, nil)
if err != nil {
return xs, err
}
// Mark the peer's PieceDeleter as in testing mode, so it is easy to wait on the deleter
peer.Storage2.PieceDeleter.SetupTest()
err = db.MigrateToLatest(ctx)
if err != nil {
return nil, err
}
planet.databases = append(planet.databases, db)
log.Debug("id=" + peer.ID().String() + " addr=" + peer.Addr())
xs = append(xs, &StorageNode{
Config: config,
Peer: peer,
var system *StorageNode
var err error
pprof.Do(ctx, pprof.Labels("peer", prefix), func(ctx context.Context) {
system, err = planet.newStorageNode(ctx, prefix, index, count, log, sources)
})
if err != nil {
return nil, err
}
log.Debug("id=" + system.ID().String() + " addr=" + system.Addr())
xs = append(xs, system)
planet.peers = append(planet.peers, newClosablePeer(system))
}
return xs, nil
}
func (planet *Planet) newStorageNode(ctx context.Context, prefix string, index, count int, log *zap.Logger, sources []trust.Source) (*StorageNode, error) {
storageDir := filepath.Join(planet.directory, prefix)
if err := os.MkdirAll(storageDir, 0700); err != nil {
return nil, err
}
identity, err := planet.NewIdentity()
if err != nil {
return nil, err
}
config := storagenode.Config{
Server: server.Config{
Address: "127.0.0.1:0",
PrivateAddress: "127.0.0.1:0",
Config: tlsopts.Config{
RevocationDBURL: "bolt://" + filepath.Join(storageDir, "revocation.db"),
UsePeerCAWhitelist: true,
PeerCAWhitelistPath: planet.whitelistPath,
PeerIDVersions: "*",
Extensions: extensions.Config{
Revocation: false,
WhitelistSignedLeaf: false,
},
},
},
Debug: debug.Config{
Address: "",
},
Preflight: preflight.Config{
LocalTimeCheck: false,
},
Operator: storagenode.OperatorConfig{
Email: prefix + "@mail.test",
Wallet: "0x" + strings.Repeat("00", 20),
},
Storage: piecestore.OldConfig{
Path: filepath.Join(storageDir, "pieces/"),
AllocatedDiskSpace: 1 * memory.GB,
KBucketRefreshInterval: defaultInterval,
},
Collector: collector.Config{
Interval: defaultInterval,
},
Nodestats: nodestats.Config{
MaxSleep: 0,
ReputationSync: defaultInterval,
StorageSync: defaultInterval,
},
Console: consoleserver.Config{
Address: "127.0.0.1:0",
StaticDir: filepath.Join(developmentRoot, "web/storagenode/"),
},
Storage2: piecestore.Config{
CacheSyncInterval: defaultInterval,
ExpirationGracePeriod: 0,
MaxConcurrentRequests: 100,
OrderLimitGracePeriod: time.Hour,
StreamOperationTimeout: time.Hour,
ReportCapacityThreshold: 100 * memory.MB,
DeleteQueueSize: 10000,
DeleteWorkers: 1,
Orders: orders.Config{
SenderInterval: defaultInterval,
SenderTimeout: 10 * time.Minute,
CleanupInterval: defaultInterval,
ArchiveTTL: time.Hour,
MaxSleep: 0,
Path: filepath.Join(storageDir, "orders"),
},
Monitor: monitor.Config{
MinimumDiskSpace: 100 * memory.MB,
NotifyLowDiskCooldown: defaultInterval,
VerifyDirReadableInterval: defaultInterval,
VerifyDirWritableInterval: defaultInterval,
},
Trust: trust.Config{
Sources: sources,
CachePath: filepath.Join(storageDir, "trust-cache.json"),
RefreshInterval: defaultInterval,
},
MaxUsedSerialsSize: memory.MiB,
},
Pieces: pieces.DefaultConfig,
Filestore: filestore.DefaultConfig,
Retain: retain.Config{
MaxTimeSkew: 10 * time.Second,
Status: retain.Enabled,
Concurrency: 5,
},
Version: planet.NewVersionConfig(),
Bandwidth: bandwidth.Config{
Interval: defaultInterval,
},
Contact: contact.Config{
Interval: defaultInterval,
},
GracefulExit: gracefulexit.Config{
ChoreInterval: defaultInterval,
NumWorkers: 3,
NumConcurrentTransfers: 1,
MinBytesPerSecond: 128 * memory.B,
MinDownloadTimeout: 2 * time.Minute,
},
}
if planet.config.Reconfigure.StorageNode != nil {
planet.config.Reconfigure.StorageNode(index, &config)
}
newIPCount := planet.config.Reconfigure.UniqueIPCount
if newIPCount > 0 {
if index >= count-newIPCount {
config.Server.Address = fmt.Sprintf("127.0.%d.1:0", index+1)
config.Server.PrivateAddress = fmt.Sprintf("127.0.%d.1:0", index+1)
}
}
verisonInfo := planet.NewVersionInfo()
var db storagenode.DB
db, err = storagenodedb.OpenNew(ctx, log.Named("db"), config.DatabaseConfig())
if err != nil {
return nil, err
}
if planet.config.Reconfigure.StorageNodeDB != nil {
db, err = planet.config.Reconfigure.StorageNodeDB(index, db, planet.log)
if err != nil {
return nil, err
}
}
revocationDB, err := revocation.OpenDBFromCfg(ctx, config.Server.Config)
if err != nil {
return nil, errs.Wrap(err)
}
planet.databases = append(planet.databases, revocationDB)
peer, err := storagenode.New(log, identity, db, revocationDB, config, verisonInfo, nil)
if err != nil {
return nil, err
}
// Mark the peer's PieceDeleter as in testing mode, so it is easy to wait on the deleter
peer.Storage2.PieceDeleter.SetupTest()
err = db.MigrateToLatest(ctx)
if err != nil {
return nil, err
}
planet.databases = append(planet.databases, db)
return &StorageNode{
Name: prefix,
Config: config,
Peer: peer,
}, nil
}

View File

@ -9,6 +9,7 @@ import (
"fmt"
"io"
"io/ioutil"
"runtime/pprof"
"strconv"
"time"
@ -68,10 +69,15 @@ func (project *Project) DialMetainfo(ctx context.Context) (*metainfo.Client, err
}
// newUplinks creates initializes uplinks, requires peer to have at least one satellite.
func (planet *Planet) newUplinks(prefix string, count int) ([]*Uplink, error) {
func (planet *Planet) newUplinks(ctx context.Context, prefix string, count int) ([]*Uplink, error) {
var xs []*Uplink
for i := 0; i < count; i++ {
uplink, err := planet.newUplink(prefix + strconv.Itoa(i))
name := prefix + strconv.Itoa(i)
var uplink *Uplink
var err error
pprof.Do(ctx, pprof.Labels("peer", name), func(ctx context.Context) {
uplink, err = planet.newUplink(ctx, name)
})
if err != nil {
return nil, err
}
@ -82,9 +88,7 @@ func (planet *Planet) newUplinks(prefix string, count int) ([]*Uplink, error) {
}
// newUplink creates a new uplink.
func (planet *Planet) newUplink(name string) (*Uplink, error) {
ctx := context.TODO()
func (planet *Planet) newUplink(ctx context.Context, name string) (*Uplink, error) {
identity, err := planet.NewIdentity()
if err != nil {
return nil, err

View File

@ -21,7 +21,7 @@ func RunDBs(t *testing.T, test func(*testing.T, extensions.RevocationDB, storage
ctx := testcontext.New(t)
defer ctx.Cleanup()
redis, err := redisserver.Mini()
redis, err := redisserver.Mini(ctx)
require.NoError(t, err)
defer ctx.Check(redis.Close)

View File

@ -32,7 +32,7 @@ func TestLiveAccountingCache(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
redis, err := redisserver.Mini()
redis, err := redisserver.Mini(ctx)
require.NoError(t, err)
defer ctx.Check(redis.Close)
@ -75,7 +75,7 @@ func TestRedisCacheConcurrency(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
redis, err := redisserver.Mini()
redis, err := redisserver.Mini(ctx)
require.NoError(t, err)
defer ctx.Check(redis.Close)
@ -162,7 +162,7 @@ func TestGetAllProjectTotals(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
redis, err := redisserver.Mini()
redis, err := redisserver.Mini(ctx)
require.NoError(t, err)
defer ctx.Check(redis.Close)

View File

@ -59,7 +59,7 @@ func TestGraphqlMutation(t *testing.T) {
},
)
redis, err := redisserver.Mini()
redis, err := redisserver.Mini(ctx)
require.NoError(t, err)
defer ctx.Check(redis.Close)

View File

@ -43,7 +43,7 @@ func TestGraphqlQuery(t *testing.T) {
},
)
redis, err := redisserver.Mini()
redis, err := redisserver.Mini(ctx)
require.NoError(t, err)
defer ctx.Check(redis.Close)

View File

@ -4,16 +4,21 @@
package redis
import (
"context"
"testing"
"github.com/stretchr/testify/require"
"storj.io/common/testcontext"
"storj.io/storj/storage/redis/redisserver"
"storj.io/storj/storage/testsuite"
)
func TestSuite(t *testing.T) {
redis, err := redisserver.Start()
ctx := testcontext.New(t)
defer ctx.Cleanup()
redis, err := redisserver.Start(ctx)
if err != nil {
t.Fatal(err)
}
@ -36,7 +41,9 @@ func TestInvalidConnection(t *testing.T) {
}
func BenchmarkSuite(b *testing.B) {
redis, err := redisserver.Start()
ctx := context.Background()
redis, err := redisserver.Start(ctx)
if err != nil {
b.Fatal(err)
}

View File

@ -6,6 +6,7 @@ package redisserver
import (
"bufio"
"context"
"errors"
"fmt"
"io"
@ -15,6 +16,7 @@ import (
"os"
"os/exec"
"path/filepath"
"runtime/pprof"
"strconv"
"strings"
"time"
@ -51,17 +53,17 @@ func freeport() (addr string, port int) {
}
// Start starts a redis-server when available, otherwise falls back to miniredis.
func Start() (Server, error) {
server, err := Process()
func Start(ctx context.Context) (Server, error) {
server, err := Process(ctx)
if err != nil {
log.Println("failed to start redis-server: ", err)
return Mini()
return Mini(ctx)
}
return server, err
}
// Process starts a redis-server test process.
func Process() (Server, error) {
func Process(ctx context.Context) (Server, error) {
tmpdir, err := ioutil.TempDir("", "storj-redis")
if err != nil {
return nil, err
@ -159,12 +161,17 @@ func pingServer(addr string) error {
}
// Mini starts miniredis server.
func Mini() (Server, error) {
server, err := miniredis.Run()
func Mini(ctx context.Context) (Server, error) {
var server *miniredis.Miniredis
var err error
pprof.Do(ctx, pprof.Labels("db", "miniredis"), func(ctx context.Context) {
server, err = miniredis.Run()
})
if err != nil {
return nil, err
}
return &miniserver{server}, nil
}

View File

@ -739,4 +739,3 @@ func Schema() map[string]*dbschema.Schema {
"used_serial": &dbschema.Schema{},
}
}