storj/private/testplanet/satellite.go

551 lines
16 KiB
Go
Raw Normal View History

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information
package testplanet
import (
"context"
"fmt"
"net"
"os"
"path/filepath"
"strconv"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/peertls/extensions"
"storj.io/storj/pkg/peertls/tlsopts"
"storj.io/storj/pkg/revocation"
"storj.io/storj/pkg/rpc"
"storj.io/storj/pkg/server"
"storj.io/storj/pkg/storj"
"storj.io/storj/private/dbutil"
"storj.io/storj/private/dbutil/pgutil/pgtest"
"storj.io/storj/private/dbutil/tempdb"
"storj.io/storj/private/errs2"
"storj.io/storj/private/memory"
"storj.io/storj/private/version"
versionchecker "storj.io/storj/private/version/checker"
"storj.io/storj/satellite"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/accounting/live"
"storj.io/storj/satellite/accounting/rollup"
"storj.io/storj/satellite/accounting/tally"
"storj.io/storj/satellite/audit"
"storj.io/storj/satellite/console"
"storj.io/storj/satellite/console/consoleweb"
"storj.io/storj/satellite/contact"
"storj.io/storj/satellite/dbcleanup"
"storj.io/storj/satellite/gc"
"storj.io/storj/satellite/gracefulexit"
"storj.io/storj/satellite/inspector"
"storj.io/storj/satellite/mailservice"
"storj.io/storj/satellite/marketingweb"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/metrics"
"storj.io/storj/satellite/nodestats"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/repair/checker"
"storj.io/storj/satellite/repair/irreparable"
"storj.io/storj/satellite/repair/repairer"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
"storj.io/storj/satellite/vouchers"
)
// SatelliteSystem contains all the processes needed to run a full Satellite setup
type SatelliteSystem struct {
Core *satellite.Core
API *satellite.API
Repairer *satellite.Repairer
Log *zap.Logger
Identity *identity.FullIdentity
DB satellite.DB
Dialer rpc.Dialer
Server *server.Server
Version *versionchecker.Service
Contact struct {
Service *contact.Service
Endpoint *contact.Endpoint
}
Overlay struct {
DB overlay.DB
Service *overlay.Service
Inspector *overlay.Inspector
}
Metainfo struct {
Database metainfo.PointerDB
Service *metainfo.Service
Endpoint2 *metainfo.Endpoint
Loop *metainfo.Loop
}
Inspector struct {
Endpoint *inspector.Endpoint
}
Orders struct {
Endpoint *orders.Endpoint
Service *orders.Service
}
Repair struct {
Checker *checker.Checker
Repairer *repairer.Service
Inspector *irreparable.Inspector
}
Audit struct {
Queue *audit.Queue
Worker *audit.Worker
Chore *audit.Chore
Verifier *audit.Verifier
Reporter *audit.Reporter
}
GarbageCollection struct {
Service *gc.Service
}
DBCleanup struct {
Chore *dbcleanup.Chore
}
Accounting struct {
Tally *tally.Service
Rollup *rollup.Service
ProjectUsage *accounting.Service
}
LiveAccounting struct {
Cache accounting.Cache
}
Mail struct {
Service *mailservice.Service
}
Vouchers struct {
Endpoint *vouchers.Endpoint
}
Console struct {
Listener net.Listener
Service *console.Service
Endpoint *consoleweb.Server
}
Marketing struct {
Listener net.Listener
Endpoint *marketingweb.Server
}
NodeStats struct {
Endpoint *nodestats.Endpoint
}
GracefulExit struct {
Chore *gracefulexit.Chore
Endpoint *gracefulexit.Endpoint
}
Metrics struct {
Chore *metrics.Chore
}
}
// ID returns the ID of the Satellite system.
func (system *SatelliteSystem) ID() storj.NodeID { return system.API.Identity.ID }
// Local returns the peer local node info from the Satellite system API.
func (system *SatelliteSystem) Local() overlay.NodeDossier { return system.API.Contact.Service.Local() }
// Addr returns the public address from the Satellite system API.
func (system *SatelliteSystem) Addr() string { return system.API.Server.Addr().String() }
// URL returns the storj.NodeURL from the Satellite system API.
func (system *SatelliteSystem) URL() storj.NodeURL {
return storj.NodeURL{ID: system.API.ID(), Address: system.API.Addr()}
}
// Close closes all the subsystems in the Satellite system
func (system *SatelliteSystem) Close() error {
return errs.Combine(system.API.Close(), system.Core.Close(), system.Repairer.Close())
}
// Run runs all the subsystems in the Satellite system
func (system *SatelliteSystem) Run(ctx context.Context) (err error) {
group, ctx := errgroup.WithContext(ctx)
group.Go(func() error {
return errs2.IgnoreCanceled(system.Core.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(system.API.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(system.Repairer.Run(ctx))
})
return group.Wait()
}
// PrivateAddr returns the private address from the Satellite system API.
func (system *SatelliteSystem) PrivateAddr() string { return system.API.Server.PrivateAddr().String() }
// newSatellites initializes satellites
func (planet *Planet) newSatellites(count int) ([]*SatelliteSystem, error) {
var xs []*SatelliteSystem
defer func() {
for _, x := range xs {
planet.peers = append(planet.peers, newClosablePeer(x))
}
}()
for i := 0; i < count; i++ {
prefix := "satellite" + strconv.Itoa(i)
log := planet.log.Named(prefix)
storageDir := filepath.Join(planet.directory, prefix)
if err := os.MkdirAll(storageDir, 0700); err != nil {
return nil, err
}
identity, err := planet.NewIdentity()
if err != nil {
return nil, err
}
var db satellite.DB
if planet.config.Reconfigure.NewSatelliteDB != nil {
db, err = planet.config.Reconfigure.NewSatelliteDB(log.Named("db"), i)
} else {
// TODO: This is analogous to the way we worked prior to the advent of OpenUnique,
// but it seems wrong. Tests that use planet.Start() instead of testplanet.Run()
// will not get run against both types of DB.
connStr := *pgtest.ConnStr
if *pgtest.CrdbConnStr != "" {
connStr = *pgtest.CrdbConnStr
}
var tempDB *dbutil.TempDatabase
tempDB, err = tempdb.OpenUnique(connStr, fmt.Sprintf("%s.%d", planet.id, i))
if err != nil {
return nil, err
}
db, err = satellitedbtest.CreateMasterDBOnTopOf(log.Named("db"), tempDB)
}
if err != nil {
return nil, err
}
var pointerDB metainfo.PointerDB
if planet.config.Reconfigure.NewSatellitePointerDB != nil {
pointerDB, err = planet.config.Reconfigure.NewSatellitePointerDB(log.Named("pointerdb"), i)
} else {
pointerDB, err = metainfo.NewStore(log.Named("pointerdb"), "bolt://"+filepath.Join(storageDir, "pointers.db"))
}
if err != nil {
return nil, err
}
config := satellite.Config{
Server: server.Config{
Address: "127.0.0.1:0",
PrivateAddress: "127.0.0.1:0",
Config: tlsopts.Config{
RevocationDBURL: "bolt://" + filepath.Join(storageDir, "revocation.db"),
UsePeerCAWhitelist: true,
PeerCAWhitelistPath: planet.whitelistPath,
PeerIDVersions: "latest",
Extensions: extensions.Config{
Revocation: false,
WhitelistSignedLeaf: false,
},
},
},
Overlay: overlay.Config{
Node: overlay.NodeSelectionConfig{
UptimeCount: 0,
AuditCount: 0,
NewNodePercentage: 0,
OnlineWindow: time.Minute,
DistinctIP: false,
AuditReputationRepairWeight: 1,
AuditReputationUplinkWeight: 1,
AuditReputationAlpha0: 1,
AuditReputationBeta0: 0,
AuditReputationLambda: 0.95,
AuditReputationWeight: 1,
AuditReputationDQ: 0.6,
UptimeReputationRepairWeight: 1,
UptimeReputationUplinkWeight: 1,
UptimeReputationAlpha0: 2,
UptimeReputationBeta0: 0,
UptimeReputationLambda: 0.99,
UptimeReputationWeight: 1,
UptimeReputationDQ: 0.6,
},
UpdateStatsBatchSize: 100,
},
Metainfo: metainfo.Config{
DatabaseURL: "", // not used
MinRemoteSegmentSize: 0, // TODO: fix tests to work with 1024
MaxInlineSegmentSize: 8000,
MaxCommitInterval: 1 * time.Hour,
Overlay: true,
RS: metainfo.RSConfig{
MaxSegmentSize: 64 * memory.MiB,
MaxBufferMem: memory.Size(256),
ErasureShareSize: memory.Size(256),
MinThreshold: (planet.config.StorageNodeCount * 1 / 5),
RepairThreshold: (planet.config.StorageNodeCount * 2 / 5),
SuccessThreshold: (planet.config.StorageNodeCount * 3 / 5),
MinTotalThreshold: (planet.config.StorageNodeCount * 4 / 5),
MaxTotalThreshold: (planet.config.StorageNodeCount * 4 / 5),
Validate: false,
},
Loop: metainfo.LoopConfig{
CoalesceDuration: 1 * time.Second,
},
},
Orders: orders.Config{
Expiration: 7 * 24 * time.Hour,
},
Checker: checker.Config{
Interval: defaultInterval,
IrreparableInterval: defaultInterval,
ReliabilityCacheStaleness: 1 * time.Minute,
},
Repairer: repairer.Config{
[V3-1927] Repairer uploads to max threshold instead of success… (#2423) * pkg/datarepair: Add test to check num upload pieces Add a new test for ensuring the number of pieces that the repair process upload when a segment is injured. * satellite/orders: Don't create "put order limits" over total Repair must not create "put order limits" more than the total count. * pkg/datarepair: Update upload repair pieces test Update the test which checks the number of pieces which are uploaded during a repair for using the same excess over the success threshold value than the implementation. * satellites/orders: Limit repair put order for not being total Limit the number of put orders to be used by repair for only uploading pieces to a % excess over the successful threshold. * pkg/datarepair: Change DataRepair test to pass again Make some changes in the DataRepair test to make pass again after the repair upload repaired pieces only until a % excess over success threshold. Also update the steps description of the DataRepair test after it has been changed, to match on what's now, besides to leave it more generic for avoiding having to update it on minimal future refactorings. * satellite: Make repair excess optimal threshold configurable Add a new configuration parameter to the satellite for being able to configure the percentage excess over the optimal threshold, used for determining how many pieces should be repaired/uploaded, rather than having the value hard coded. * repairer: Add configurable param to segments/repairer Add a new parameters to the segment/repairer to calculate the maximum number of excess nodes, based on the optimal threshold, that repaired pieces can be uploaded. This new parameter has been added for not returning more nodes than the number of upload orders for data repair satellite service calculate for repairing pieces. * pkg/storage/ec: Update log message in clien.Repair * satellite: Update configuration lock file
2019-07-11 23:44:47 +01:00
MaxRepair: 10,
Interval: time.Hour,
Timeout: 1 * time.Minute, // Repairs can take up to 10 seconds. Leaving room for outliers
DownloadTimeout: 1 * time.Minute,
[V3-1927] Repairer uploads to max threshold instead of success… (#2423) * pkg/datarepair: Add test to check num upload pieces Add a new test for ensuring the number of pieces that the repair process upload when a segment is injured. * satellite/orders: Don't create "put order limits" over total Repair must not create "put order limits" more than the total count. * pkg/datarepair: Update upload repair pieces test Update the test which checks the number of pieces which are uploaded during a repair for using the same excess over the success threshold value than the implementation. * satellites/orders: Limit repair put order for not being total Limit the number of put orders to be used by repair for only uploading pieces to a % excess over the successful threshold. * pkg/datarepair: Change DataRepair test to pass again Make some changes in the DataRepair test to make pass again after the repair upload repaired pieces only until a % excess over success threshold. Also update the steps description of the DataRepair test after it has been changed, to match on what's now, besides to leave it more generic for avoiding having to update it on minimal future refactorings. * satellite: Make repair excess optimal threshold configurable Add a new configuration parameter to the satellite for being able to configure the percentage excess over the optimal threshold, used for determining how many pieces should be repaired/uploaded, rather than having the value hard coded. * repairer: Add configurable param to segments/repairer Add a new parameters to the segment/repairer to calculate the maximum number of excess nodes, based on the optimal threshold, that repaired pieces can be uploaded. This new parameter has been added for not returning more nodes than the number of upload orders for data repair satellite service calculate for repairing pieces. * pkg/storage/ec: Update log message in clien.Repair * satellite: Update configuration lock file
2019-07-11 23:44:47 +01:00
MaxBufferMem: 4 * memory.MiB,
MaxExcessRateOptimalThreshold: 0.05,
},
Audit: audit.Config{
MaxRetriesStatDB: 0,
MinBytesPerSecond: 1 * memory.KB,
MinDownloadTimeout: 5 * time.Second,
MaxReverifyCount: 3,
ChoreInterval: defaultInterval,
QueueInterval: defaultInterval,
Slots: 3,
WorkerConcurrency: 1,
},
GarbageCollection: gc.Config{
Interval: defaultInterval,
Enabled: true,
InitialPieces: 10,
FalsePositiveRate: 0.1,
ConcurrentSends: 1,
},
DBCleanup: dbcleanup.Config{
SerialsInterval: defaultInterval,
},
Tally: tally.Config{
Interval: defaultInterval,
},
Rollup: rollup.Config{
Interval: defaultInterval,
MaxAlphaUsage: 25 * memory.GB,
DeleteTallies: false,
},
Mail: mailservice.Config{
SMTPServerAddress: "smtp.mail.test:587",
From: "Labs <storj@mail.test>",
AuthType: "simulate",
TemplatePath: filepath.Join(developmentRoot, "web/satellite/static/emails"),
},
Console: consoleweb.Config{
Address: "127.0.0.1:0",
StaticDir: filepath.Join(developmentRoot, "web/satellite"),
PasswordCost: console.TestPasswordCost,
AuthTokenSecret: "my-suppa-secret-key",
},
Marketing: marketingweb.Config{
Address: "127.0.0.1:0",
StaticDir: filepath.Join(developmentRoot, "web/marketing"),
},
Version: planet.NewVersionConfig(),
GracefulExit: gracefulexit.Config{
Enabled: true,
ChoreBatchSize: 10,
ChoreInterval: defaultInterval,
EndpointBatchSize: 100,
MaxFailuresPerPiece: 5,
MaxInactiveTimeFrame: time.Second * 10,
OverallMaxFailuresPercentage: 10,
RecvTimeout: time.Minute * 1,
MaxOrderLimitSendCount: 3,
},
Metrics: metrics.Config{
ChoreInterval: defaultInterval,
},
}
if planet.ReferralManager != nil {
config.Referrals.ReferralManagerURL = storj.NodeURL{
ID: planet.ReferralManager.Identity().ID,
Address: planet.ReferralManager.Addr().String(),
}
}
if planet.config.Reconfigure.Satellite != nil {
planet.config.Reconfigure.Satellite(log, i, &config)
}
versionInfo := planet.NewVersionInfo()
revocationDB, err := revocation.NewDBFromCfg(config.Server.Config)
if err != nil {
return xs, errs.Wrap(err)
}
planet.databases = append(planet.databases, revocationDB)
liveAccountingCache, err := live.NewCache(log.Named("live-accounting"), config.LiveAccounting)
if err != nil {
return xs, errs.Wrap(err)
}
planet.databases = append(planet.databases, liveAccountingCache)
peer, err := satellite.New(log, identity, db, pointerDB, revocationDB, liveAccountingCache, versionInfo, &config)
if err != nil {
return xs, err
}
err = db.CreateTables()
if err != nil {
return nil, err
}
planet.databases = append(planet.databases, db)
planet.databases = append(planet.databases, pointerDB)
api, err := planet.newAPI(i, identity, db, pointerDB, config, versionInfo)
if err != nil {
return xs, err
}
repairerPeer, err := planet.newRepairer(i, identity, db, pointerDB, config, versionInfo)
if err != nil {
return xs, err
}
log.Debug("id=" + peer.ID().String() + " addr=" + api.Addr())
system := createNewSystem(log, peer, api, repairerPeer)
xs = append(xs, system)
}
return xs, nil
}
// createNewSystem makes a new Satellite System and exposes the same interface from
// before we split out the API. In the short term this will help keep all the tests passing
// without much modification needed. However long term, we probably want to rework this
// so it represents how the satellite will run when it is made up of many prrocesses.
func createNewSystem(log *zap.Logger, peer *satellite.Core, api *satellite.API, repairerPeer *satellite.Repairer) *SatelliteSystem {
system := &SatelliteSystem{
Core: peer,
API: api,
Repairer: repairerPeer,
}
system.Log = log
system.Identity = peer.Identity
system.DB = api.DB
system.Dialer = api.Dialer
system.Contact.Service = api.Contact.Service
system.Contact.Endpoint = api.Contact.Endpoint
system.Overlay.DB = api.Overlay.DB
system.Overlay.Service = api.Overlay.Service
system.Overlay.Inspector = api.Overlay.Inspector
system.Metainfo.Database = api.Metainfo.Database
system.Metainfo.Service = peer.Metainfo.Service
system.Metainfo.Endpoint2 = api.Metainfo.Endpoint2
system.Metainfo.Loop = peer.Metainfo.Loop
system.Inspector.Endpoint = api.Inspector.Endpoint
system.Orders.Endpoint = api.Orders.Endpoint
system.Orders.Service = peer.Orders.Service
system.Repair.Checker = peer.Repair.Checker
system.Repair.Repairer = repairerPeer.Repairer
system.Repair.Inspector = api.Repair.Inspector
system.Audit.Queue = peer.Audit.Queue
system.Audit.Worker = peer.Audit.Worker
system.Audit.Chore = peer.Audit.Chore
system.Audit.Verifier = peer.Audit.Verifier
system.Audit.Reporter = peer.Audit.Reporter
system.GarbageCollection.Service = peer.GarbageCollection.Service
system.DBCleanup.Chore = peer.DBCleanup.Chore
system.Accounting.Tally = peer.Accounting.Tally
system.Accounting.Rollup = peer.Accounting.Rollup
system.Accounting.ProjectUsage = peer.Accounting.ProjectUsage
system.Marketing.Listener = api.Marketing.Listener
system.Marketing.Endpoint = api.Marketing.Endpoint
system.GracefulExit.Chore = peer.GracefulExit.Chore
system.GracefulExit.Endpoint = api.GracefulExit.Endpoint
system.Metrics.Chore = peer.Metrics.Chore
return system
}
func (planet *Planet) newAPI(count int, identity *identity.FullIdentity, db satellite.DB, pointerDB metainfo.PointerDB, config satellite.Config, versionInfo version.Info) (*satellite.API, error) {
prefix := "satellite-api" + strconv.Itoa(count)
log := planet.log.Named(prefix)
var err error
revocationDB, err := revocation.NewDBFromCfg(config.Server.Config)
if err != nil {
return nil, errs.Wrap(err)
}
planet.databases = append(planet.databases, revocationDB)
liveAccounting, err := live.NewCache(log.Named("live-accounting"), config.LiveAccounting)
if err != nil {
return nil, errs.Wrap(err)
}
planet.databases = append(planet.databases, liveAccounting)
return satellite.NewAPI(log, identity, db, pointerDB, revocationDB, liveAccounting, &config, versionInfo)
}
func (planet *Planet) newRepairer(count int, identity *identity.FullIdentity, db satellite.DB, pointerDB metainfo.PointerDB, config satellite.Config,
versionInfo version.Info) (*satellite.Repairer, error) {
prefix := "satellite-repairer" + strconv.Itoa(count)
log := planet.log.Named(prefix)
revocationDB, err := revocation.NewDBFromCfg(config.Server.Config)
if err != nil {
return nil, errs.Wrap(err)
}
return satellite.NewRepairer(log, identity, pointerDB, revocationDB, db.RepairQueue(), db.Buckets(), db.OverlayCache(), db.Orders(), versionInfo, &config)
}