storj/private/testplanet/storagenode.go

232 lines
6.3 KiB
Go
Raw Normal View History

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information
package testplanet
import (
"context"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/zeebo/errs"
"storj.io/common/memory"
"storj.io/common/peertls/extensions"
"storj.io/common/peertls/tlsopts"
"storj.io/common/storj"
"storj.io/private/debug"
"storj.io/storj/pkg/revocation"
"storj.io/storj/pkg/server"
"storj.io/storj/storage/filestore"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/bandwidth"
"storj.io/storj/storagenode/collector"
"storj.io/storj/storagenode/console/consoleserver"
"storj.io/storj/storagenode/contact"
"storj.io/storj/storagenode/gracefulexit"
"storj.io/storj/storagenode/monitor"
"storj.io/storj/storagenode/nodestats"
"storj.io/storj/storagenode/orders"
"storj.io/storj/storagenode/pieces"
"storj.io/storj/storagenode/piecestore"
"storj.io/storj/storagenode/preflight"
"storj.io/storj/storagenode/retain"
"storj.io/storj/storagenode/storagenodedb"
"storj.io/storj/storagenode/trust"
)
// StorageNode contains all the processes needed to run a full StorageNode setup.
type StorageNode struct {
Config storagenode.Config
*storagenode.Peer
}
// URL returns the node url as a string.
func (system *StorageNode) URL() string { return system.NodeURL().String() }
// NodeURL returns the storj.NodeURL.
func (system *StorageNode) NodeURL() storj.NodeURL {
return storj.NodeURL{ID: system.Peer.ID(), Address: system.Peer.Addr()}
}
// newStorageNodes initializes storage nodes.
func (planet *Planet) newStorageNodes(count int, whitelistedSatellites storj.NodeURLs) ([]*StorageNode, error) {
var xs []*StorageNode
defer func() {
for _, x := range xs {
planet.peers = append(planet.peers, newClosablePeer(x))
}
}()
var sources []trust.Source
for _, u := range whitelistedSatellites {
source, err := trust.NewStaticURLSource(u.String())
if err != nil {
return nil, err
}
sources = append(sources, source)
}
for i := 0; i < count; i++ {
prefix := "storage" + strconv.Itoa(i)
log := planet.log.Named(prefix)
storageDir := filepath.Join(planet.directory, prefix)
if err := os.MkdirAll(storageDir, 0700); err != nil {
return nil, err
}
identity, err := planet.NewIdentity()
if err != nil {
return nil, err
}
config := storagenode.Config{
Server: server.Config{
Address: "127.0.0.1:0",
PrivateAddress: "127.0.0.1:0",
Config: tlsopts.Config{
RevocationDBURL: "bolt://" + filepath.Join(storageDir, "revocation.db"),
UsePeerCAWhitelist: true,
PeerCAWhitelistPath: planet.whitelistPath,
PeerIDVersions: "*",
Extensions: extensions.Config{
Revocation: false,
WhitelistSignedLeaf: false,
},
},
},
Debug: debug.Config{
Address: "",
},
Preflight: preflight.Config{
LocalTimeCheck: false,
},
Operator: storagenode.OperatorConfig{
Email: prefix + "@mail.test",
Wallet: "0x" + strings.Repeat("00", 20),
},
Storage: piecestore.OldConfig{
Path: filepath.Join(storageDir, "pieces/"),
AllocatedDiskSpace: 1 * memory.GB,
KBucketRefreshInterval: defaultInterval,
},
Collector: collector.Config{
Interval: defaultInterval,
},
Nodestats: nodestats.Config{
MaxSleep: 0,
ReputationSync: defaultInterval,
StorageSync: defaultInterval,
},
Console: consoleserver.Config{
Address: "127.0.0.1:0",
StaticDir: filepath.Join(developmentRoot, "web/storagenode/"),
},
Storage2: piecestore.Config{
CacheSyncInterval: defaultInterval,
ExpirationGracePeriod: 0,
MaxConcurrentRequests: 100,
OrderLimitGracePeriod: time.Hour,
StreamOperationTimeout: time.Hour,
ReportCapacityThreshold: 100 * memory.MB,
Orders: orders.Config{
SenderInterval: defaultInterval,
SenderTimeout: 10 * time.Minute,
CleanupInterval: defaultInterval,
ArchiveTTL: time.Hour,
MaxSleep: 0,
},
Monitor: monitor.Config{
MinimumDiskSpace: 100 * memory.MB,
NotifyLowDiskCooldown: defaultInterval,
},
Trust: trust.Config{
Sources: sources,
CachePath: filepath.Join(storageDir, "trust-cache.json"),
RefreshInterval: defaultInterval,
},
},
Pieces: pieces.DefaultConfig,
Filestore: filestore.DefaultConfig,
Retain: retain.Config{
MaxTimeSkew: 10 * time.Second,
storagenode/retain: fix concurrency issues (#2828) * nicer flags * fix concurrency * add concurrent workers * initialize things * fix tests * close retain service * ensure we don't have workers working on the same satellite * ensure things compile * fix other compilation issues: * concurrency changes ran this with `go test -count=1000` and it passed all of them. - we add a closed channel so that we can select on it with context cancellation. - we put a once in so we only close the channel once. - every time the queue/running state changes, we have to broadcast because we may want to wake up N pending Wait calls or other concurrent workers. - because we broadcast, we don't need to do the polling in Wait anymore. - ensure Run doesn't start multiple times so that we don't have to worry about concurrent Close with multiple Runs. - hold the lock while we start workers so that a concurrent Close with Run can't decide that there's nothing started and exit and then have Run start things. - make sure to poll the closed/context channels through loops or at the start of Run calls in case Close happens first. - these polls should be under a mutex because they have a default case which makes it possible to schedule such that Close hasn't executed the channel close so it starts more work. - cancel a local Run context when it's going to exit to make sure that any retainPieces calls have a canceled context. - hopefully enough comments to both check my work and help readers digest what's going on. Change-Id: Ida0e226a7e01e8ae64fa2c59dd5a84b04bccfbd7 * use the retain error class Change-Id: I1511eaef135f98afd57b878e997e4c8a0d11cafc * concurrency fixes again - forgot to update the gc test to use the old Wait api. - we need to drop the lock while we wait for the workers to exit, because they may be blocked on the condition variable - additionally, we need to broadcast when we close the signal channel because the state changed: they want to wake up and exit. Change-Id: I4204699792275260cd912f29aa73720f7d9b14b5 * undo my misguided rename Change-Id: I6baffe1eb0434e260212c485bbcc01bed3250881 * remove pollInterval * format paragraph more nicely * move skew calculation into retain pieces
2019-08-28 21:35:25 +01:00
Status: retain.Enabled,
Concurrency: 5,
},
Version: planet.NewVersionConfig(),
Bandwidth: bandwidth.Config{
Interval: defaultInterval,
},
Contact: contact.Config{
Interval: defaultInterval,
},
GracefulExit: gracefulexit.Config{
ChoreInterval: defaultInterval,
NumWorkers: 3,
NumConcurrentTransfers: 1,
MinBytesPerSecond: 128 * memory.B,
MinDownloadTimeout: 2 * time.Minute,
},
}
if planet.config.Reconfigure.StorageNode != nil {
planet.config.Reconfigure.StorageNode(i, &config)
}
newIPCount := planet.config.Reconfigure.UniqueIPCount
if newIPCount > 0 {
if i >= count-newIPCount {
2019-06-24 16:33:18 +01:00
config.Server.Address = fmt.Sprintf("127.0.%d.1:0", i+1)
config.Server.PrivateAddress = fmt.Sprintf("127.0.%d.1:0", i+1)
}
}
verisonInfo := planet.NewVersionInfo()
var db storagenode.DB
db, err = storagenodedb.New(log.Named("db"), config.DatabaseConfig())
if err != nil {
return nil, err
}
if planet.config.Reconfigure.StorageNodeDB != nil {
db, err = planet.config.Reconfigure.StorageNodeDB(i, db, planet.log)
if err != nil {
return nil, err
}
}
revocationDB, err := revocation.NewDBFromCfg(config.Server.Config)
if err != nil {
return xs, errs.Wrap(err)
}
planet.databases = append(planet.databases, revocationDB)
peer, err := storagenode.New(log, identity, db, revocationDB, config, verisonInfo)
if err != nil {
return xs, err
}
// Mark the peer's PieceDeleter as in testing mode, so it is easy to wait on the deleter
peer.Storage2.PieceDeleter.SetupTest()
err = db.MigrateToLatest(context.TODO())
if err != nil {
return nil, err
}
planet.databases = append(planet.databases, db)
log.Debug("id=" + peer.ID().String() + " addr=" + peer.Addr())
xs = append(xs, &StorageNode{
Config: config,
Peer: peer,
})
}
return xs, nil
}