// Copyright (C) 2019 Storj Labs, Inc. // See LICENSE for copying information package testplanet import ( "context" "fmt" "os" "path/filepath" "runtime/pprof" "strconv" "strings" "time" "github.com/zeebo/errs" "go.uber.org/zap" "storj.io/common/memory" "storj.io/common/peertls/extensions" "storj.io/common/peertls/tlsopts" "storj.io/common/storj" "storj.io/private/debug" "storj.io/storj/pkg/revocation" "storj.io/storj/pkg/server" "storj.io/storj/storage/filestore" "storj.io/storj/storagenode" "storj.io/storj/storagenode/bandwidth" "storj.io/storj/storagenode/collector" "storj.io/storj/storagenode/console/consoleserver" "storj.io/storj/storagenode/contact" "storj.io/storj/storagenode/gracefulexit" "storj.io/storj/storagenode/monitor" "storj.io/storj/storagenode/nodestats" "storj.io/storj/storagenode/orders" "storj.io/storj/storagenode/pieces" "storj.io/storj/storagenode/piecestore" "storj.io/storj/storagenode/preflight" "storj.io/storj/storagenode/retain" "storj.io/storj/storagenode/storagenodedb" "storj.io/storj/storagenode/trust" ) // StorageNode contains all the processes needed to run a full StorageNode setup. type StorageNode struct { Name string Config storagenode.Config *storagenode.Peer } // Label returns name for debugger. func (system *StorageNode) Label() string { return system.Name } // URL returns the node url as a string. func (system *StorageNode) URL() string { return system.NodeURL().String() } // NodeURL returns the storj.NodeURL. func (system *StorageNode) NodeURL() storj.NodeURL { return storj.NodeURL{ID: system.Peer.ID(), Address: system.Peer.Addr()} } // newStorageNodes initializes storage nodes. func (planet *Planet) newStorageNodes(ctx context.Context, count int, whitelistedSatellites storj.NodeURLs) ([]*StorageNode, error) { var sources []trust.Source for _, u := range whitelistedSatellites { source, err := trust.NewStaticURLSource(u.String()) if err != nil { return nil, err } sources = append(sources, source) } var xs []*StorageNode for i := 0; i < count; i++ { index := i prefix := "storage" + strconv.Itoa(index) log := planet.log.Named(prefix) var system *StorageNode var err error pprof.Do(ctx, pprof.Labels("peer", prefix), func(ctx context.Context) { system, err = planet.newStorageNode(ctx, prefix, index, count, log, sources) }) if err != nil { return nil, err } log.Debug("id=" + system.ID().String() + " addr=" + system.Addr()) xs = append(xs, system) planet.peers = append(planet.peers, newClosablePeer(system)) } return xs, nil } func (planet *Planet) newStorageNode(ctx context.Context, prefix string, index, count int, log *zap.Logger, sources []trust.Source) (*StorageNode, error) { storageDir := filepath.Join(planet.directory, prefix) if err := os.MkdirAll(storageDir, 0700); err != nil { return nil, err } identity, err := planet.NewIdentity() if err != nil { return nil, err } config := storagenode.Config{ Server: server.Config{ Address: "127.0.0.1:0", PrivateAddress: "127.0.0.1:0", Config: tlsopts.Config{ RevocationDBURL: "bolt://" + filepath.Join(storageDir, "revocation.db"), UsePeerCAWhitelist: true, PeerCAWhitelistPath: planet.whitelistPath, PeerIDVersions: "*", Extensions: extensions.Config{ Revocation: false, WhitelistSignedLeaf: false, }, }, }, Debug: debug.Config{ Address: "", }, Preflight: preflight.Config{ LocalTimeCheck: false, }, Operator: storagenode.OperatorConfig{ Email: prefix + "@mail.test", Wallet: "0x" + strings.Repeat("00", 20), WalletFeatures: nil, }, Storage: piecestore.OldConfig{ Path: filepath.Join(storageDir, "pieces/"), AllocatedDiskSpace: 1 * memory.GB, KBucketRefreshInterval: defaultInterval, }, Collector: collector.Config{ Interval: defaultInterval, }, Nodestats: nodestats.Config{ MaxSleep: 0, ReputationSync: defaultInterval, StorageSync: defaultInterval, }, Console: consoleserver.Config{ Address: "127.0.0.1:0", StaticDir: filepath.Join(developmentRoot, "web/storagenode/"), }, Storage2: piecestore.Config{ CacheSyncInterval: defaultInterval, ExpirationGracePeriod: 0, MaxConcurrentRequests: 100, OrderLimitGracePeriod: time.Hour, StreamOperationTimeout: time.Hour, ReportCapacityThreshold: 100 * memory.MB, DeleteQueueSize: 10000, DeleteWorkers: 1, Orders: orders.Config{ SenderInterval: defaultInterval, SenderTimeout: 10 * time.Minute, CleanupInterval: defaultInterval, ArchiveTTL: time.Hour, MaxSleep: 0, Path: filepath.Join(storageDir, "orders"), }, Monitor: monitor.Config{ MinimumDiskSpace: 100 * memory.MB, NotifyLowDiskCooldown: defaultInterval, VerifyDirReadableInterval: defaultInterval, VerifyDirWritableInterval: defaultInterval, }, Trust: trust.Config{ Sources: sources, CachePath: filepath.Join(storageDir, "trust-cache.json"), RefreshInterval: defaultInterval, }, MaxUsedSerialsSize: memory.MiB, }, Pieces: pieces.DefaultConfig, Filestore: filestore.DefaultConfig, Retain: retain.Config{ MaxTimeSkew: 10 * time.Second, Status: retain.Enabled, Concurrency: 5, }, Version: planet.NewVersionConfig(), Bandwidth: bandwidth.Config{ Interval: defaultInterval, }, Contact: contact.Config{ Interval: defaultInterval, }, GracefulExit: gracefulexit.Config{ ChoreInterval: defaultInterval, NumWorkers: 3, NumConcurrentTransfers: 1, MinBytesPerSecond: 128 * memory.B, MinDownloadTimeout: 2 * time.Minute, }, } if planet.config.Reconfigure.StorageNode != nil { planet.config.Reconfigure.StorageNode(index, &config) } newIPCount := planet.config.Reconfigure.UniqueIPCount if newIPCount > 0 { if index >= count-newIPCount { config.Server.Address = fmt.Sprintf("127.0.%d.1:0", index+1) config.Server.PrivateAddress = fmt.Sprintf("127.0.%d.1:0", index+1) } } verisonInfo := planet.NewVersionInfo() var db storagenode.DB db, err = storagenodedb.OpenNew(ctx, log.Named("db"), config.DatabaseConfig()) if err != nil { return nil, err } if err := db.Pieces().CreateVerificationFile(identity.ID); err != nil { return nil, err } if planet.config.Reconfigure.StorageNodeDB != nil { db, err = planet.config.Reconfigure.StorageNodeDB(index, db, planet.log) if err != nil { return nil, err } } revocationDB, err := revocation.OpenDBFromCfg(ctx, config.Server.Config) if err != nil { return nil, errs.Wrap(err) } planet.databases = append(planet.databases, revocationDB) peer, err := storagenode.New(log, identity, db, revocationDB, config, verisonInfo, nil) if err != nil { return nil, err } // Mark the peer's PieceDeleter as in testing mode, so it is easy to wait on the deleter peer.Storage2.PieceDeleter.SetupTest() err = db.MigrateToLatest(ctx) if err != nil { return nil, err } planet.databases = append(planet.databases, db) return &StorageNode{ Name: prefix, Config: config, Peer: peer, }, nil }