storj/private/testplanet/storagenode.go

211 lines
5.6 KiB
Go
Raw Normal View History

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information
package testplanet
import (
"context"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/zeebo/errs"
"storj.io/common/memory"
"storj.io/common/peertls/extensions"
"storj.io/common/peertls/tlsopts"
"storj.io/common/storj"
"storj.io/storj/pkg/revocation"
"storj.io/storj/pkg/server"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/bandwidth"
"storj.io/storj/storagenode/collector"
"storj.io/storj/storagenode/console/consoleserver"
"storj.io/storj/storagenode/contact"
"storj.io/storj/storagenode/gracefulexit"
"storj.io/storj/storagenode/monitor"
"storj.io/storj/storagenode/nodestats"
"storj.io/storj/storagenode/orders"
"storj.io/storj/storagenode/piecestore"
"storj.io/storj/storagenode/preflight"
"storj.io/storj/storagenode/retain"
"storj.io/storj/storagenode/storagenodedb"
"storj.io/storj/storagenode/trust"
)
// newStorageNodes initializes storage nodes
func (planet *Planet) newStorageNodes(count int, whitelistedSatellites storj.NodeURLs) ([]*storagenode.Peer, error) {
var xs []*storagenode.Peer
defer func() {
for _, x := range xs {
planet.peers = append(planet.peers, newClosablePeer(x))
}
}()
var sources []trust.Source
for _, u := range whitelistedSatellites {
source, err := trust.NewStaticURLSource(u.String())
if err != nil {
return nil, err
}
sources = append(sources, source)
}
for i := 0; i < count; i++ {
prefix := "storage" + strconv.Itoa(i)
log := planet.log.Named(prefix)
storageDir := filepath.Join(planet.directory, prefix)
if err := os.MkdirAll(storageDir, 0700); err != nil {
return nil, err
}
identity, err := planet.NewIdentity()
if err != nil {
return nil, err
}
config := storagenode.Config{
Server: server.Config{
Address: "127.0.0.1:0",
PrivateAddress: "127.0.0.1:0",
Config: tlsopts.Config{
RevocationDBURL: "bolt://" + filepath.Join(storageDir, "revocation.db"),
UsePeerCAWhitelist: true,
PeerCAWhitelistPath: planet.whitelistPath,
PeerIDVersions: "*",
Extensions: extensions.Config{
Revocation: false,
WhitelistSignedLeaf: false,
},
},
},
Preflight: preflight.Config{
LocalTimeCheck: false,
},
Operator: storagenode.OperatorConfig{
Email: prefix + "@mail.test",
Wallet: "0x" + strings.Repeat("00", 20),
},
Storage: piecestore.OldConfig{
Path: filepath.Join(storageDir, "pieces/"),
AllocatedDiskSpace: 1 * memory.GB,
AllocatedBandwidth: memory.TB,
KBucketRefreshInterval: defaultInterval,
},
Collector: collector.Config{
Interval: defaultInterval,
},
Nodestats: nodestats.Config{
MaxSleep: 0,
ReputationSync: defaultInterval,
StorageSync: defaultInterval,
},
Console: consoleserver.Config{
Address: "127.0.0.1:0",
StaticDir: filepath.Join(developmentRoot, "web/storagenode/"),
},
Storage2: piecestore.Config{
CacheSyncInterval: defaultInterval,
ExpirationGracePeriod: 0,
MaxConcurrentRequests: 100,
OrderLimitGracePeriod: time.Hour,
StreamOperationTimeout: time.Hour,
Orders: orders.Config{
SenderInterval: defaultInterval,
SenderTimeout: 10 * time.Minute,
CleanupInterval: defaultInterval,
ArchiveTTL: time.Hour,
MaxSleep: 0,
},
Monitor: monitor.Config{
MinimumBandwidth: 100 * memory.MB,
MinimumDiskSpace: 100 * memory.MB,
},
Trust: trust.Config{
Sources: sources,
CachePath: filepath.Join(storageDir, "trust-cache.json"),
RefreshInterval: defaultInterval,
},
},
Retain: retain.Config{
MaxTimeSkew: 10 * time.Second,
storagenode/retain: fix concurrency issues (#2828) * nicer flags * fix concurrency * add concurrent workers * initialize things * fix tests * close retain service * ensure we don't have workers working on the same satellite * ensure things compile * fix other compilation issues: * concurrency changes ran this with `go test -count=1000` and it passed all of them. - we add a closed channel so that we can select on it with context cancellation. - we put a once in so we only close the channel once. - every time the queue/running state changes, we have to broadcast because we may want to wake up N pending Wait calls or other concurrent workers. - because we broadcast, we don't need to do the polling in Wait anymore. - ensure Run doesn't start multiple times so that we don't have to worry about concurrent Close with multiple Runs. - hold the lock while we start workers so that a concurrent Close with Run can't decide that there's nothing started and exit and then have Run start things. - make sure to poll the closed/context channels through loops or at the start of Run calls in case Close happens first. - these polls should be under a mutex because they have a default case which makes it possible to schedule such that Close hasn't executed the channel close so it starts more work. - cancel a local Run context when it's going to exit to make sure that any retainPieces calls have a canceled context. - hopefully enough comments to both check my work and help readers digest what's going on. Change-Id: Ida0e226a7e01e8ae64fa2c59dd5a84b04bccfbd7 * use the retain error class Change-Id: I1511eaef135f98afd57b878e997e4c8a0d11cafc * concurrency fixes again - forgot to update the gc test to use the old Wait api. - we need to drop the lock while we wait for the workers to exit, because they may be blocked on the condition variable - additionally, we need to broadcast when we close the signal channel because the state changed: they want to wake up and exit. Change-Id: I4204699792275260cd912f29aa73720f7d9b14b5 * undo my misguided rename Change-Id: I6baffe1eb0434e260212c485bbcc01bed3250881 * remove pollInterval * format paragraph more nicely * move skew calculation into retain pieces
2019-08-28 21:35:25 +01:00
Status: retain.Enabled,
Concurrency: 5,
},
Version: planet.NewVersionConfig(),
Bandwidth: bandwidth.Config{
Interval: defaultInterval,
},
Contact: contact.Config{
Interval: defaultInterval,
},
GracefulExit: gracefulexit.Config{
ChoreInterval: defaultInterval,
NumWorkers: 3,
NumConcurrentTransfers: 1,
MinBytesPerSecond: 128 * memory.B,
MinDownloadTimeout: 2 * time.Minute,
},
}
if planet.config.Reconfigure.StorageNode != nil {
planet.config.Reconfigure.StorageNode(i, &config)
}
newIPCount := planet.config.Reconfigure.UniqueIPCount
if newIPCount > 0 {
if i >= count-newIPCount {
2019-06-24 16:33:18 +01:00
config.Server.Address = fmt.Sprintf("127.0.%d.1:0", i+1)
config.Server.PrivateAddress = fmt.Sprintf("127.0.%d.1:0", i+1)
}
}
verisonInfo := planet.NewVersionInfo()
storageConfig := storagenodedb.Config{
Remove Kademlia dependencies from Satellite and Storagenode (#2966) What: cmd/inspector/main.go: removes kad commands internal/testplanet/planet.go: Waits for contact chore to finish satellite/contact/nodesservice.go: creates an empty nodes service implementation satellite/contact/service.go: implements Local and FetchInfo methods & adds external address config value satellite/discovery/service.go: replaces kad.FetchInfo with contact.FetchInfo in Refresh() & removes Discover() satellite/peer.go: sets up contact service and endpoints storagenode/console/service.go: replaces nodeID with contact.Local() storagenode/contact/chore.go: replaces routing table with contact service storagenode/contact/nodesservice.go: creates empty implementation for ping and request info nodes service & implements RequestInfo method storagenode/contact/service.go: creates a service to return the local node and update its own capacity storagenode/monitor/monitor.go: uses contact service in place of routing table storagenode/operator.go: moves operatorconfig from kad into its own setup storagenode/peer.go: sets up contact service, chore, pingstats and endpoints satellite/overlay/config.go: changes NodeSelectionConfig.OnlineWindow default to 4hr to allow for accurate repair selection Removes kademlia setups in: cmd/storagenode/main.go cmd/storj-sim/network.go internal/testplane/planet.go internal/testplanet/satellite.go internal/testplanet/storagenode.go satellite/peer.go scripts/test-sim-backwards.sh scripts/testdata/satellite-config.yaml.lock storagenode/inspector/inspector.go storagenode/peer.go storagenode/storagenodedb/database.go Why: Replacing Kademlia Please describe the tests: • internal/testplanet/planet_test.go: TestBasic: assert that the storagenode can check in with the satellite without any errors TestContact: test that all nodes get inserted into both satellites' overlay cache during testplanet setup • satellite/contact/contact_test.go: TestFetchInfo: Tests that the FetchInfo method returns the correct info • storagenode/contact/contact_test.go: TestNodeInfoUpdated: tests that the contact chore updates the node information TestRequestInfoEndpoint: tests that the Request info endpoint returns the correct info Please describe the performance impact: Node discovery should be at least slightly more performant since each node connects directly to each satellite and no longer needs to wait for bootstrapping. It probably won't be faster in real time on start up since each node waits a random amount of time (less than 1 hr) to initialize its first connection (jitter).
2019-09-19 20:56:34 +01:00
Storage: config.Storage.Path,
Info: filepath.Join(config.Storage.Path, "piecestore.db"),
Info2: filepath.Join(config.Storage.Path, "info.db"),
Pieces: config.Storage.Path,
}
var db storagenode.DB
db, err = storagenodedb.New(log.Named("db"), storageConfig)
if err != nil {
return nil, err
}
if planet.config.Reconfigure.NewStorageNodeDB != nil {
db, err = planet.config.Reconfigure.NewStorageNodeDB(i, db, planet.log)
if err != nil {
return nil, err
}
}
revocationDB, err := revocation.NewDBFromCfg(config.Server.Config)
if err != nil {
return xs, errs.Wrap(err)
}
planet.databases = append(planet.databases, revocationDB)
peer, err := storagenode.New(log, identity, db, revocationDB, config, verisonInfo)
if err != nil {
return xs, err
}
err = db.CreateTables(context.TODO())
if err != nil {
return nil, err
}
planet.databases = append(planet.databases, db)
log.Debug("id=" + peer.ID().String() + " addr=" + peer.Addr())
xs = append(xs, peer)
}
return xs, nil
}