Refactor pb.Node protobuf (#1785)

This commit is contained in:
Kaloyan Raev 2019-04-22 12:07:50 +03:00 committed by GitHub
parent 123bf291f2
commit 8fc5fe1d6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
50 changed files with 708 additions and 1234 deletions

View File

@ -17,6 +17,7 @@ import (
"storj.io/storj/internal/version"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/kademlia"
"storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/peertls/tlsopts"
"storj.io/storj/pkg/server"
@ -128,17 +129,19 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config, ver
return nil, errs.Combine(err, peer.Close())
}
self := pb.Node{
Id: peer.ID(),
Type: pb.NodeType_BOOTSTRAP,
Address: &pb.NodeAddress{
Transport: pb.NodeTransport_TCP_TLS_GRPC,
Address: config.ExternalAddress,
self := &overlay.NodeDossier{
Node: pb.Node{
Id: peer.ID(),
Address: &pb.NodeAddress{
Transport: pb.NodeTransport_TCP_TLS_GRPC,
Address: config.ExternalAddress,
},
},
Metadata: &pb.NodeMetadata{
Type: pb.NodeType_BOOTSTRAP,
Operator: pb.NodeOperator{
Wallet: config.Operator.Wallet,
},
Version: pbVersion,
Version: *pbVersion,
}
kdb, ndb := peer.DB.RoutingTable()
@ -149,7 +152,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config, ver
peer.Transport = peer.Transport.WithObservers(peer.Kademlia.RoutingTable)
peer.Kademlia.Service, err = kademlia.NewService(peer.Log.Named("kademlia"), self, peer.Transport, peer.Kademlia.RoutingTable, config)
peer.Kademlia.Service, err = kademlia.NewService(peer.Log.Named("kademlia"), peer.Transport, peer.Kademlia.RoutingTable, config)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
@ -251,7 +254,7 @@ func (peer *Peer) Close() error {
func (peer *Peer) ID() storj.NodeID { return peer.Identity.ID }
// Local returns the peer local node info.
func (peer *Peer) Local() pb.Node { return peer.Kademlia.RoutingTable.Local() }
func (peer *Peer) Local() overlay.NodeDossier { return peer.Kademlia.RoutingTable.Local() }
// Addr returns the public address.
func (peer *Peer) Addr() string { return peer.Server.Addr().String() }

View File

@ -116,11 +116,6 @@ func cmdAdd(cmd *cobra.Command, args []string) (err error) {
Transport: 0,
Address: a,
},
Restrictions: &pb.NodeRestrictions{
FreeBandwidth: 2000000000,
FreeDisk: 2000000000,
},
Type: pb.NodeType_STORAGE,
})
if err != nil {
return err

View File

@ -63,7 +63,7 @@ import (
type Peer interface {
ID() storj.NodeID
Addr() string
Local() pb.Node
Local() overlay.NodeDossier
Run(context.Context) error
Close() error
@ -224,13 +224,13 @@ func NewCustom(log *zap.Logger, config Config) (*Planet, error) {
// init Satellites
for _, satellite := range planet.Satellites {
if len(satellite.Kademlia.Service.GetBootstrapNodes()) == 0 {
satellite.Kademlia.Service.SetBootstrapNodes([]pb.Node{planet.Bootstrap.Local()})
satellite.Kademlia.Service.SetBootstrapNodes([]pb.Node{planet.Bootstrap.Local().Node})
}
}
// init storage nodes
for _, storageNode := range planet.StorageNodes {
if len(storageNode.Kademlia.Service.GetBootstrapNodes()) == 0 {
storageNode.Kademlia.Service.SetBootstrapNodes([]pb.Node{planet.Bootstrap.Local()})
storageNode.Kademlia.Service.SetBootstrapNodes([]pb.Node{planet.Bootstrap.Local().Node})
}
}
@ -258,10 +258,11 @@ func (planet *Planet) Start(ctx context.Context) {
planet.Bootstrap.Kademlia.Service.WaitForBootstrap()
for _, peer := range planet.Satellites {
for _, peer := range planet.StorageNodes {
peer.Kademlia.Service.WaitForBootstrap()
}
for _, peer := range planet.StorageNodes {
for _, peer := range planet.Satellites {
peer.Kademlia.Service.WaitForBootstrap()
}
@ -280,7 +281,7 @@ func (planet *Planet) Reconnect(ctx context.Context) {
for _, storageNode := range planet.StorageNodes {
storageNode := storageNode
group.Go(func() error {
_, err := storageNode.Kademlia.Service.Ping(ctx, planet.Bootstrap.Local())
_, err := storageNode.Kademlia.Service.Ping(ctx, planet.Bootstrap.Local().Node)
if err != nil {
log.Error("storage node did not find bootstrap", zap.Error(err))
}
@ -292,7 +293,7 @@ func (planet *Planet) Reconnect(ctx context.Context) {
satellite := satellite
group.Go(func() error {
for _, storageNode := range planet.StorageNodes {
_, err := satellite.Kademlia.Service.Ping(ctx, storageNode.Local())
_, err := satellite.Kademlia.Service.Ping(ctx, storageNode.Local().Node)
if err != nil {
log.Error("satellite did not find storage node", zap.Error(err))
}

View File

@ -38,18 +38,18 @@ func TestBasic(t *testing.T) {
}
// ping a satellite
_, err = planet.StorageNodes[0].Kademlia.Service.Ping(ctx, planet.Satellites[0].Local())
_, err = planet.StorageNodes[0].Kademlia.Service.Ping(ctx, planet.Satellites[0].Local().Node)
require.NoError(t, err)
// ping a storage node
_, err = planet.StorageNodes[0].Kademlia.Service.Ping(ctx, planet.StorageNodes[1].Local())
_, err = planet.StorageNodes[0].Kademlia.Service.Ping(ctx, planet.StorageNodes[1].Local().Node)
require.NoError(t, err)
err = planet.StopPeer(planet.StorageNodes[1])
require.NoError(t, err)
// ping a stopped storage node
_, err = planet.StorageNodes[0].Kademlia.Service.Ping(ctx, planet.StorageNodes[1].Local())
_, err = planet.StorageNodes[0].Kademlia.Service.Ping(ctx, planet.StorageNodes[1].Local().Node)
require.Error(t, err)
// wait a bit to see whether some failures occur

View File

@ -64,6 +64,10 @@ func Run(t *testing.T, config Config, test func(t *testing.T, ctx *testcontext.C
defer ctx.Check(planet.Shutdown)
planet.Start(ctx)
// make sure nodes are refreshed in db
planet.Satellites[0].Discovery.Service.Refresh.TriggerWait()
test(t, ctx, planet)
})
}

View File

@ -66,8 +66,7 @@ func (planet *Planet) newUplink(name string, storageNodeCount int) (*Uplink, err
uplink.Transport = transport.NewClient(tlsOpts)
uplink.Info = pb.Node{
Id: uplink.Identity.ID,
Type: pb.NodeType_UPLINK,
Id: uplink.Identity.ID,
Address: &pb.NodeAddress{
Transport: pb.NodeTransport_TCP_TLS_GRPC,
Address: "",
@ -137,7 +136,7 @@ func (uplink *Uplink) DialMetainfo(ctx context.Context, destination Peer, apikey
func (uplink *Uplink) DialPiecestore(ctx context.Context, destination Peer) (*piecestore.Client, error) {
node := destination.Local()
conn, err := uplink.Transport.DialNode(ctx, &node)
conn, err := uplink.Transport.DialNode(ctx, &node.Node)
if err != nil {
return nil, err
}

View File

@ -21,196 +21,177 @@ import (
)
func TestUploadDownload(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
expectedData := make([]byte, 1*memory.MiB)
_, err := rand.Read(expectedData)
assert.NoError(t, err)
planet, err := testplanet.New(t, 1, 6, 1)
require.NoError(t, err)
defer ctx.Check(planet.Shutdown)
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path", expectedData)
assert.NoError(t, err)
planet.Start(ctx)
data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], "testbucket", "test/path")
assert.NoError(t, err)
expectedData := make([]byte, 1*memory.MiB)
_, err = rand.Read(expectedData)
assert.NoError(t, err)
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path", expectedData)
assert.NoError(t, err)
data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], "testbucket", "test/path")
assert.NoError(t, err)
assert.Equal(t, expectedData, data)
assert.Equal(t, expectedData, data)
})
}
func TestDownloadWithSomeNodesOffline(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 5, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
// first, upload some remote data
ul := planet.Uplinks[0]
satellite := planet.Satellites[0]
planet, err := testplanet.New(t, 1, 5, 1)
require.NoError(t, err)
defer ctx.Check(planet.Shutdown)
// stop discovery service so that we do not get a race condition when we delete nodes from overlay cache
satellite.Discovery.Service.Discovery.Stop()
planet.Start(ctx)
// first, upload some remote data
ul := planet.Uplinks[0]
satellite := planet.Satellites[0]
// stop discovery service so that we do not get a race condition when we delete nodes from overlay cache
satellite.Discovery.Service.Discovery.Stop()
testData := make([]byte, 1*memory.MiB)
_, err = rand.Read(testData)
require.NoError(t, err)
err = ul.UploadWithConfig(ctx, satellite, &uplink.RSConfig{
MinThreshold: 2,
RepairThreshold: 3,
SuccessThreshold: 4,
MaxThreshold: 5,
}, "testbucket", "test/path", testData)
require.NoError(t, err)
// get a remote segment from pointerdb
pdb := satellite.Metainfo.Service
listResponse, _, err := pdb.List("", "", "", true, 0, 0)
require.NoError(t, err)
var path string
var pointer *pb.Pointer
for _, v := range listResponse {
path = v.GetPath()
pointer, err = pdb.Get(path)
testData := make([]byte, 1*memory.MiB)
_, err := rand.Read(testData)
require.NoError(t, err)
if pointer.GetType() == pb.Pointer_REMOTE {
break
}
}
// calculate how many storagenodes to kill
redundancy := pointer.GetRemote().GetRedundancy()
remotePieces := pointer.GetRemote().GetRemotePieces()
minReq := redundancy.GetMinReq()
numPieces := len(remotePieces)
toKill := numPieces - int(minReq)
err = ul.UploadWithConfig(ctx, satellite, &uplink.RSConfig{
MinThreshold: 2,
RepairThreshold: 3,
SuccessThreshold: 4,
MaxThreshold: 5,
}, "testbucket", "test/path", testData)
require.NoError(t, err)
nodesToKill := make(map[storj.NodeID]bool)
for i, piece := range remotePieces {
if i >= toKill {
continue
}
nodesToKill[piece.NodeId] = true
}
// get a remote segment from pointerdb
pdb := satellite.Metainfo.Service
listResponse, _, err := pdb.List("", "", "", true, 0, 0)
require.NoError(t, err)
for _, node := range planet.StorageNodes {
if nodesToKill[node.ID()] {
err = planet.StopPeer(node)
require.NoError(t, err)
// mark node as offline in overlay cache
_, err = satellite.Overlay.Service.UpdateUptime(ctx, node.ID(), false)
var path string
var pointer *pb.Pointer
for _, v := range listResponse {
path = v.GetPath()
pointer, err = pdb.Get(path)
require.NoError(t, err)
if pointer.GetType() == pb.Pointer_REMOTE {
break
}
}
}
// we should be able to download data without any of the original nodes
newData, err := ul.Download(ctx, satellite, "testbucket", "test/path")
require.NoError(t, err)
require.Equal(t, testData, newData)
// calculate how many storagenodes to kill
redundancy := pointer.GetRemote().GetRedundancy()
remotePieces := pointer.GetRemote().GetRemotePieces()
minReq := redundancy.GetMinReq()
numPieces := len(remotePieces)
toKill := numPieces - int(minReq)
nodesToKill := make(map[storj.NodeID]bool)
for i, piece := range remotePieces {
if i >= toKill {
continue
}
nodesToKill[piece.NodeId] = true
}
for _, node := range planet.StorageNodes {
if nodesToKill[node.ID()] {
err = planet.StopPeer(node)
require.NoError(t, err)
// mark node as offline in overlay cache
_, err = satellite.Overlay.Service.UpdateUptime(ctx, node.ID(), false)
require.NoError(t, err)
}
}
// we should be able to download data without any of the original nodes
newData, err := ul.Download(ctx, satellite, "testbucket", "test/path")
require.NoError(t, err)
require.Equal(t, testData, newData)
})
}
func TestUploadDownloadOneUplinksInParallel(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
dataToUpload := make([][]byte, 5)
for i := 0; i < len(dataToUpload); i++ {
dataToUpload[i] = make([]byte, 100*memory.KiB.Int()+(i*100*memory.KiB.Int()))
_, err := rand.Read(dataToUpload[i])
require.NoError(t, err)
}
planet, err := testplanet.New(t, 1, 6, 1)
require.NoError(t, err)
defer ctx.Check(planet.Shutdown)
var group errgroup.Group
for i, data := range dataToUpload {
index := strconv.Itoa(i)
uplink := planet.Uplinks[0]
satellite := planet.Satellites[0]
planet.Start(ctx)
dataToUpload := make([][]byte, 5)
for i := 0; i < len(dataToUpload); i++ {
dataToUpload[i] = make([]byte, 100*memory.KiB.Int()+(i*100*memory.KiB.Int()))
_, err := rand.Read(dataToUpload[i])
data := data
group.Go(func() error {
return uplink.Upload(ctx, satellite, "testbucket"+index, "test/path"+index, data)
})
}
err := group.Wait()
require.NoError(t, err)
}
var group errgroup.Group
for i, data := range dataToUpload {
index := strconv.Itoa(i)
uplink := planet.Uplinks[0]
satellite := planet.Satellites[0]
for i, data := range dataToUpload {
index := strconv.Itoa(i)
uplink := planet.Uplinks[0]
satellite := planet.Satellites[0]
data := data
group.Go(func() error {
return uplink.Upload(ctx, satellite, "testbucket"+index, "test/path"+index, data)
})
}
err = group.Wait()
require.NoError(t, err)
for i, data := range dataToUpload {
index := strconv.Itoa(i)
uplink := planet.Uplinks[0]
satellite := planet.Satellites[0]
expectedData := data
group.Go(func() error {
data, err := uplink.Download(ctx, satellite, "testbucket"+index, "test/path"+index)
require.Equal(t, expectedData, data)
return err
})
}
err = group.Wait()
require.NoError(t, err)
expectedData := data
group.Go(func() error {
data, err := uplink.Download(ctx, satellite, "testbucket"+index, "test/path"+index)
require.Equal(t, expectedData, data)
return err
})
}
err = group.Wait()
require.NoError(t, err)
})
}
func TestUploadDownloadMultipleUplinksInParallel(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
numberOfUplinks := 5
planet, err := testplanet.New(t, 1, 6, numberOfUplinks)
require.NoError(t, err)
defer ctx.Check(planet.Shutdown)
planet.Start(ctx)
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: numberOfUplinks,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
dataToUpload := make([][]byte, numberOfUplinks)
for i := 0; i < len(dataToUpload); i++ {
dataToUpload[i] = make([]byte, 100*memory.KiB.Int()+(i*100*memory.KiB.Int()))
_, err := rand.Read(dataToUpload[i])
require.NoError(t, err)
}
dataToUpload := make([][]byte, numberOfUplinks)
for i := 0; i < len(dataToUpload); i++ {
dataToUpload[i] = make([]byte, 100*memory.KiB.Int()+(i*100*memory.KiB.Int()))
_, err := rand.Read(dataToUpload[i])
var group errgroup.Group
for i, data := range dataToUpload {
index := strconv.Itoa(i)
uplink := planet.Uplinks[i]
satellite := planet.Satellites[0]
data := data
group.Go(func() error {
return uplink.Upload(ctx, satellite, "testbucket"+index, "test/path"+index, data)
})
}
err := group.Wait()
require.NoError(t, err)
}
var group errgroup.Group
for i, data := range dataToUpload {
index := strconv.Itoa(i)
uplink := planet.Uplinks[i]
satellite := planet.Satellites[0]
for i, data := range dataToUpload {
index := strconv.Itoa(i)
uplink := planet.Uplinks[i]
satellite := planet.Satellites[0]
data := data
group.Go(func() error {
return uplink.Upload(ctx, satellite, "testbucket"+index, "test/path"+index, data)
})
}
err = group.Wait()
require.NoError(t, err)
for i, data := range dataToUpload {
index := strconv.Itoa(i)
uplink := planet.Uplinks[i]
satellite := planet.Satellites[0]
expectedData := data
group.Go(func() error {
data, err := uplink.Download(ctx, satellite, "testbucket"+index, "test/path"+index)
require.Equal(t, expectedData, data)
return err
})
}
err = group.Wait()
require.NoError(t, err)
expectedData := data
group.Go(func() error {
data, err := uplink.Download(ctx, satellite, "testbucket"+index, "test/path"+index)
require.Equal(t, expectedData, data)
return err
})
}
err = group.Wait()
require.NoError(t, err)
})
}

View File

@ -55,6 +55,5 @@ func MockNode(s string) *pb.Node {
id := NodeIDFromString(s)
var node pb.Node
node.Id = id
node.Type = pb.NodeType_STORAGE
return &node
}

View File

@ -11,7 +11,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"storj.io/storj/internal/memory"
"storj.io/storj/internal/testcontext"
@ -20,51 +19,39 @@ import (
)
type testConfig struct {
planetCfg *testplanet.Config
uplinkCfg Config
}
func testPlanetWithLibUplink(t *testing.T, cfg testConfig, encKey *storj.Key,
testFunc func(*testing.T, *testcontext.Context, *testplanet.Planet, *Project)) {
if cfg.planetCfg == nil {
cfg.planetCfg = &testplanet.Config{SatelliteCount: 1, StorageNodeCount: 5, UplinkCount: 1}
}
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 5, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
// we only use testUplink for the free API key, until such time
// as testplanet makes it easy to get another way :D
testUplink := planet.Uplinks[0]
satellite := planet.Satellites[0]
cfg.uplinkCfg.Volatile.TLS.SkipPeerCAWhitelist = true
ctx := testcontext.New(t)
defer ctx.Cleanup()
apiKey, err := ParseAPIKey(testUplink.APIKey[satellite.ID()])
if err != nil {
t.Fatalf("could not parse API key from testplanet: %v", err)
}
uplink, err := NewUplink(ctx, &cfg.uplinkCfg)
if err != nil {
t.Fatalf("could not create new Uplink object: %v", err)
}
defer ctx.Check(uplink.Close)
var projectOptions ProjectOptions
projectOptions.Volatile.EncryptionKey = encKey
proj, err := uplink.OpenProject(ctx, satellite.Addr(), apiKey, &projectOptions)
if err != nil {
t.Fatalf("could not open project from libuplink under testplanet: %v", err)
}
defer ctx.Check(proj.Close)
planet, err := testplanet.NewCustom(zaptest.NewLogger(t), *cfg.planetCfg)
if err != nil {
t.Fatal(err)
}
defer ctx.Check(planet.Shutdown)
planet.Start(ctx)
// we only use testUplink for the free API key, until such time
// as testplanet makes it easy to get another way :D
testUplink := planet.Uplinks[0]
satellite := planet.Satellites[0]
cfg.uplinkCfg.Volatile.TLS.SkipPeerCAWhitelist = true
apiKey, err := ParseAPIKey(testUplink.APIKey[satellite.ID()])
if err != nil {
t.Fatalf("could not parse API key from testplanet: %v", err)
}
uplink, err := NewUplink(ctx, &cfg.uplinkCfg)
if err != nil {
t.Fatalf("could not create new Uplink object: %v", err)
}
defer ctx.Check(uplink.Close)
var projectOptions ProjectOptions
projectOptions.Volatile.EncryptionKey = encKey
proj, err := uplink.OpenProject(ctx, satellite.Addr(), apiKey, &projectOptions)
if err != nil {
t.Fatalf("could not open project from libuplink under testplanet: %v", err)
}
defer ctx.Check(proj.Close)
testFunc(t, ctx, planet, proj)
testFunc(t, ctx, planet, proj)
})
}
func simpleEncryptionAccess(encKey string) (access EncryptionAccess) {

View File

@ -183,7 +183,6 @@ func (d *defaultDownloader) getShare(ctx context.Context, limit *pb.AddressedOrd
conn, err := d.transport.DialNode(timedCtx, &pb.Node{
Id: storageNodeID,
Address: limit.GetStorageNodeAddress(),
Type: pb.NodeType_STORAGE,
})
if err != nil {
return Share{}, err

View File

@ -7,6 +7,7 @@ import (
"context"
"time"
"storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
"storj.io/storj/storage"
@ -14,7 +15,7 @@ import (
// DHT is the interface for the DHT in the Storj network
type DHT interface {
FindNear(ctx context.Context, start storj.NodeID, limit int, restrictions ...pb.Restriction) ([]*pb.Node, error)
FindNear(ctx context.Context, start storj.NodeID, limit int) ([]*pb.Node, error)
Bootstrap(ctx context.Context) error
Ping(ctx context.Context, node pb.Node) (pb.Node, error)
FindNode(ctx context.Context, ID storj.NodeID) (pb.Node, error)
@ -24,11 +25,11 @@ type DHT interface {
// RoutingTable contains information on nodes we have locally
type RoutingTable interface {
// local params
Local() pb.Node
Local() overlay.NodeDossier
K() int
CacheSize() int
GetBucketIds() (storage.Keys, error)
FindNear(id storj.NodeID, limit int, restrictions ...pb.Restriction) ([]*pb.Node, error)
FindNear(id storj.NodeID, limit int) ([]*pb.Node, error)
ConnectionSuccess(node *pb.Node) error
ConnectionFailed(node *pb.Node) error
// these are for refreshing

View File

@ -147,10 +147,6 @@ func (discovery *Discovery) refresh(ctx context.Context) error {
if err != nil {
discovery.log.Error("could not update node uptime in cache", zap.String("ID", ping.Id.String()), zap.Error(err))
}
err = discovery.cache.Put(ctx, ping.Id, ping)
if err != nil {
discovery.log.Error("could not put node into cache", zap.String("ID", ping.Id.String()), zap.Error(err))
}
// update wallet with correct info
info, err := discovery.kad.FetchInfo(ctx, node.Node)
@ -161,7 +157,7 @@ func (discovery *Discovery) refresh(ctx context.Context) error {
_, err = discovery.cache.UpdateNodeInfo(ctx, ping.Id, info)
if err != nil {
discovery.log.Warn("could not update node operator", zap.String("ID", ping.GetAddress().String()))
discovery.log.Warn("could not update node info", zap.String("ID", ping.GetAddress().String()))
}
}

View File

@ -41,7 +41,6 @@ func (c Config) BootstrapNodes() []pb.Node {
Transport: pb.NodeTransport_TCP_TLS_GRPC,
Address: c.BootstrapAddr,
},
Type: pb.NodeType_BOOTSTRAP,
})
}
return nodes

View File

@ -13,6 +13,7 @@ import (
"github.com/zeebo/errs"
"go.uber.org/zap/zaptest"
"golang.org/x/sync/errgroup"
"storj.io/storj/pkg/peertls/tlsopts"
"storj.io/storj/internal/memory"
@ -51,7 +52,7 @@ func TestDialer(t *testing.T) {
for _, peer := range peers {
peer := peer
group.Go(func() error {
pinged, err := dialer.PingNode(ctx, peer.Local())
pinged, err := dialer.PingNode(ctx, peer.Local().Node)
var pingErr error
if !pinged {
pingErr = fmt.Errorf("ping to %s should have succeeded", peer.ID())
@ -71,7 +72,7 @@ func TestDialer(t *testing.T) {
defer ctx.Check(group.Wait)
group.Go(func() error {
ident, err := dialer.FetchPeerIdentity(ctx, planet.Satellites[0].Local())
ident, err := dialer.FetchPeerIdentity(ctx, planet.Satellites[0].Local().Node)
if err != nil {
return fmt.Errorf("failed to fetch peer identity")
}
@ -107,7 +108,7 @@ func TestDialer(t *testing.T) {
peer.Local().Type.DPanicOnInvalid("test client peer")
target.Local().Type.DPanicOnInvalid("test client target")
results, err := dialer.Lookup(ctx, self.Local(), peer.Local(), target.Local())
results, err := dialer.Lookup(ctx, self.Local().Node, peer.Local().Node, target.Local().Node)
if err != nil {
return errs.Combine(errTag, err)
}
@ -147,7 +148,7 @@ func TestDialer(t *testing.T) {
group.Go(func() error {
errTag := fmt.Errorf("invalid lookup peer:%s target:%s", peer.ID(), target)
peer.Local().Type.DPanicOnInvalid("peer info")
results, err := dialer.Lookup(ctx, self.Local(), peer.Local(), pb.Node{Id: target, Type: pb.NodeType_STORAGE})
results, err := dialer.Lookup(ctx, self.Local().Node, peer.Local().Node, pb.Node{Id: target})
if err != nil {
return errs.Combine(errTag, err)
}
@ -201,7 +202,7 @@ func TestSlowDialerHasTimeout(t *testing.T) {
for _, peer := range peers {
peer := peer
group.Go(func() error {
_, err := dialer.PingNode(ctx, peer.Local())
_, err := dialer.PingNode(ctx, peer.Local().Node)
require.Error(t, err, context.DeadlineExceeded)
require.True(t, transport.Error.Has(err))
@ -233,7 +234,7 @@ func TestSlowDialerHasTimeout(t *testing.T) {
defer ctx.Check(group.Wait)
group.Go(func() error {
_, err := dialer.FetchPeerIdentity(ctx, planet.Satellites[0].Local())
_, err := dialer.FetchPeerIdentity(ctx, planet.Satellites[0].Local().Node)
require.Error(t, err, context.DeadlineExceeded)
require.True(t, transport.Error.Has(err))
@ -275,7 +276,7 @@ func TestSlowDialerHasTimeout(t *testing.T) {
peer.Local().Type.DPanicOnInvalid("test client peer")
target.Local().Type.DPanicOnInvalid("test client target")
_, err := dialer.Lookup(ctx, self.Local(), peer.Local(), target.Local())
_, err := dialer.Lookup(ctx, self.Local().Node, peer.Local().Node, target.Local().Node)
require.Error(t, err, context.DeadlineExceeded, errTag)
require.True(t, transport.Error.Has(err), errTag)

View File

@ -84,20 +84,9 @@ func (endpoint *Endpoint) RequestInfo(ctx context.Context, req *pb.InfoRequest)
self := endpoint.service.Local()
return &pb.InfoResponse{
Type: self.GetType(),
Operator: &pb.NodeOperator{
Email: self.GetMetadata().GetEmail(),
Wallet: self.GetMetadata().GetWallet(),
},
Capacity: &pb.NodeCapacity{
FreeBandwidth: self.GetRestrictions().GetFreeBandwidth(),
FreeDisk: self.GetRestrictions().GetFreeDisk(),
},
Version: &pb.NodeVersion{
Version: self.GetVersion().GetVersion(),
CommitHash: self.GetVersion().GetCommitHash(),
Timestamp: self.GetVersion().GetTimestamp(),
Release: self.GetVersion().GetRelease(),
},
Type: self.Type,
Operator: &self.Operator,
Capacity: &self.Capacity,
Version: &self.Version,
}, nil
}

View File

@ -71,11 +71,8 @@ func (srv *Inspector) FindNear(ctx context.Context, req *pb.FindNearRequest) (*p
// PingNode sends a PING RPC to the provided node ID in the Kad network.
func (srv *Inspector) PingNode(ctx context.Context, req *pb.PingNodeRequest) (*pb.PingNodeResponse, error) {
self := srv.dht.Local()
_, err := srv.dht.Ping(ctx, pb.Node{
Id: req.Id,
Type: self.Type,
Id: req.Id,
Address: &pb.NodeAddress{
Address: req.Address,
},
@ -119,12 +116,9 @@ func (srv *Inspector) DumpNodes(ctx context.Context, req *pb.DumpNodesRequest) (
// NodeInfo sends a PING RPC to a node and returns its local info.
func (srv *Inspector) NodeInfo(ctx context.Context, req *pb.NodeInfoRequest) (*pb.NodeInfoResponse, error) {
self := srv.dht.Local()
info, err := srv.dht.FetchInfo(ctx, pb.Node{
Id: req.Id,
Address: req.Address,
Type: self.Type,
})
if err != nil {
return &pb.NodeInfoResponse{}, err

View File

@ -15,6 +15,7 @@ import (
"storj.io/storj/internal/sync2"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
"storj.io/storj/pkg/transport"
@ -60,7 +61,7 @@ type Kademlia struct {
}
// NewService returns a newly configured Kademlia instance
func NewService(log *zap.Logger, self pb.Node, transport transport.Client, rt *RoutingTable, config Config) (*Kademlia, error) {
func NewService(log *zap.Logger, transport transport.Client, rt *RoutingTable, config Config) (*Kademlia, error) {
k := &Kademlia{
log: log,
alpha: config.Alpha,
@ -110,9 +111,9 @@ func (k *Kademlia) Queried() {
}
// FindNear returns all nodes from a starting node up to a maximum limit
// stored in the local routing table limiting the result by the specified restrictions
func (k *Kademlia) FindNear(ctx context.Context, start storj.NodeID, limit int, restrictions ...pb.Restriction) ([]*pb.Node, error) {
return k.routingTable.FindNear(start, limit, restrictions...)
// stored in the local routing table.
func (k *Kademlia) FindNear(ctx context.Context, start storj.NodeID, limit int) ([]*pb.Node, error) {
return k.routingTable.FindNear(start, limit)
}
// GetBucketIds returns a storage.Keys type of bucket ID's in the Kademlia instance
@ -120,8 +121,8 @@ func (k *Kademlia) GetBucketIds() (storage.Keys, error) {
return k.routingTable.GetBucketIds()
}
// Local returns the local nodes ID
func (k *Kademlia) Local() pb.Node {
// Local returns the local node
func (k *Kademlia) Local() overlay.NodeDossier {
return k.routingTable.Local()
}
@ -276,7 +277,7 @@ func (k *Kademlia) lookup(ctx context.Context, ID storj.NodeID, isBootstrap bool
return pb.Node{}, err
}
}
lookup := newPeerDiscovery(k.log, k.routingTable.Local(), nodes, k.dialer, ID, discoveryOptions{
lookup := newPeerDiscovery(k.log, k.routingTable.Local().Node, nodes, k.dialer, ID, discoveryOptions{
concurrency: k.alpha, retries: defaultRetries, bootstrap: isBootstrap, bootstrapNodes: k.bootstrapNodes,
})
target, err := lookup.Run(ctx)
@ -407,88 +408,3 @@ func randomIDInRange(start, end bucketID) (storj.NodeID, error) {
}
return randID, nil
}
// Restrict is used to limit nodes returned that don't match the miniumum storage requirements
func Restrict(r pb.Restriction, n []*pb.Node) []*pb.Node {
oper := r.GetOperand()
op := r.GetOperator()
val := r.GetValue()
var comp int64
results := []*pb.Node{}
for _, v := range n {
switch oper {
case pb.Restriction_FREE_BANDWIDTH:
comp = v.GetRestrictions().GetFreeBandwidth()
case pb.Restriction_FREE_DISK:
comp = v.GetRestrictions().GetFreeDisk()
}
switch op {
case pb.Restriction_EQ:
if comp == val {
results = append(results, v)
continue
}
case pb.Restriction_LT:
if comp < val {
results = append(results, v)
continue
}
case pb.Restriction_LTE:
if comp <= val {
results = append(results, v)
continue
}
case pb.Restriction_GT:
if comp > val {
results = append(results, v)
continue
}
case pb.Restriction_GTE:
if comp >= val {
results = append(results, v)
continue
}
}
}
return results
}
func meetsRestrictions(rs []pb.Restriction, n pb.Node) bool {
for _, r := range rs {
oper := r.GetOperand()
op := r.GetOperator()
val := r.GetValue()
var comp int64
switch oper {
case pb.Restriction_FREE_BANDWIDTH:
comp = n.GetRestrictions().GetFreeBandwidth()
case pb.Restriction_FREE_DISK:
comp = n.GetRestrictions().GetFreeDisk()
}
switch op {
case pb.Restriction_EQ:
if comp != val {
return false
}
case pb.Restriction_LT:
if comp >= val {
return false
}
case pb.Restriction_LTE:
if comp > val {
return false
}
case pb.Restriction_GT:
if comp <= val {
return false
}
case pb.Restriction_GTE:
if comp < val {
return false
}
}
}
return true
}

View File

@ -8,6 +8,7 @@ import (
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
@ -38,13 +39,12 @@ func TestRequestInfo(t *testing.T) {
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
node := planet.StorageNodes[0]
info, err := planet.Satellites[0].Kademlia.Service.FetchInfo(ctx, node.Local())
info, err := planet.Satellites[0].Kademlia.Service.FetchInfo(ctx, node.Local().Node)
require.NoError(t, err)
require.Equal(t, node.Local().Type, info.GetType())
require.Equal(t, node.Local().Metadata.GetEmail(), info.GetOperator().GetEmail())
require.Equal(t, node.Local().Metadata.GetWallet(), info.GetOperator().GetWallet())
require.Equal(t, node.Local().Restrictions.GetFreeDisk(), info.GetCapacity().GetFreeDisk())
require.Equal(t, node.Local().Restrictions.GetFreeBandwidth(), info.GetCapacity().GetFreeBandwidth())
require.Empty(t, cmp.Diff(node.Local().Operator, *info.GetOperator(), cmp.Comparer(pb.Equal)))
require.Empty(t, cmp.Diff(node.Local().Capacity, *info.GetCapacity(), cmp.Comparer(pb.Equal)))
require.Empty(t, cmp.Diff(node.Local().Version, *info.GetVersion(), cmp.Comparer(pb.Equal)))
})
}
@ -69,14 +69,7 @@ func TestPingTimeout(t *testing.T) {
slowClient := network.NewClient(self.Transport)
require.NotNil(t, slowClient)
node := pb.Node{
Id: self.ID(),
Address: &pb.NodeAddress{
Transport: pb.NodeTransport_TCP_TLS_GRPC,
},
}
newService, err := kademlia.NewService(zaptest.NewLogger(t), node, slowClient, routingTable, kademlia.Config{})
newService, err := kademlia.NewService(zaptest.NewLogger(t), slowClient, routingTable, kademlia.Config{})
require.NoError(t, err)
target := pb.Node{

View File

@ -23,6 +23,7 @@ import (
"storj.io/storj/internal/testidentity"
"storj.io/storj/internal/teststorj"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/peertls/tlsopts"
"storj.io/storj/pkg/storj"
@ -65,7 +66,7 @@ func TestNewKademlia(t *testing.T) {
}
for i, v := range cases {
kad, err := newKademlia(zaptest.NewLogger(t), pb.NodeType_STORAGE, v.bn, v.addr, nil, v.id, ctx.Dir(strconv.Itoa(i)), defaultAlpha)
kad, err := newKademlia(zaptest.NewLogger(t), pb.NodeType_STORAGE, v.bn, v.addr, pb.NodeOperator{}, v.id, ctx.Dir(strconv.Itoa(i)), defaultAlpha)
require.NoError(t, err)
assert.Equal(t, v.expectedErr, err)
assert.Equal(t, kad.bootstrapNodes, v.bn)
@ -88,14 +89,14 @@ func TestPeerDiscovery(t *testing.T) {
targetServer, _, targetID, targetAddress := startTestNodeServer(ctx)
defer targetServer.GracefulStop()
bootstrapNodes := []pb.Node{{Id: bootID.ID, Address: &pb.NodeAddress{Address: bootAddress}, Type: pb.NodeType_STORAGE}}
metadata := &pb.NodeMetadata{
bootstrapNodes := []pb.Node{{Id: bootID.ID, Address: &pb.NodeAddress{Address: bootAddress}}}
operator := pb.NodeOperator{
Wallet: "OperatorWallet",
}
k, err := newKademlia(zaptest.NewLogger(t), pb.NodeType_STORAGE, bootstrapNodes, testAddress, metadata, testID, ctx.Dir("test"), defaultAlpha)
k, err := newKademlia(zaptest.NewLogger(t), pb.NodeType_STORAGE, bootstrapNodes, testAddress, operator, testID, ctx.Dir("test"), defaultAlpha)
require.NoError(t, err)
rt := k.routingTable
assert.Equal(t, rt.Local().Metadata.Wallet, "OperatorWallet")
assert.Equal(t, rt.Local().Operator.Wallet, "OperatorWallet")
defer ctx.Check(k.Close)
@ -105,7 +106,7 @@ func TestPeerDiscovery(t *testing.T) {
expectedErr error
}{
{target: func() storj.NodeID {
mockBootServer.returnValue = []*pb.Node{{Id: targetID.ID, Type: pb.NodeType_STORAGE, Address: &pb.NodeAddress{Address: targetAddress}}}
mockBootServer.returnValue = []*pb.Node{{Id: targetID.ID, Address: &pb.NodeAddress{Address: targetAddress}}}
return targetID.ID
}(),
expected: &pb.Node{},
@ -130,14 +131,14 @@ func TestBootstrap(t *testing.T) {
defer clean()
defer s.GracefulStop()
n1, s1, clean1 := testNode(ctx, "2", t, []pb.Node{bn.routingTable.self})
n1, s1, clean1 := testNode(ctx, "2", t, []pb.Node{bn.routingTable.self.Node})
defer clean1()
defer s1.GracefulStop()
err := n1.Bootstrap(ctx)
require.NoError(t, err)
n2, s2, clean2 := testNode(ctx, "3", t, []pb.Node{bn.routingTable.self})
n2, s2, clean2 := testNode(ctx, "3", t, []pb.Node{bn.routingTable.self.Node})
defer clean2()
defer s2.GracefulStop()
@ -160,7 +161,7 @@ func testNode(ctx *testcontext.Context, name string, t *testing.T, bn []pb.Node)
// new kademlia
logger := zaptest.NewLogger(t)
k, err := newKademlia(logger, pb.NodeType_STORAGE, bn, lis.Addr().String(), nil, fid, ctx.Dir(name), defaultAlpha)
k, err := newKademlia(logger, pb.NodeType_STORAGE, bn, lis.Addr().String(), pb.NodeOperator{}, fid, ctx.Dir(name), defaultAlpha)
require.NoError(t, err)
s := NewEndpoint(logger, k, k.routingTable)
@ -243,7 +244,7 @@ func TestFindNear(t *testing.T) {
bootstrap := []pb.Node{{Id: fid2.ID, Address: &pb.NodeAddress{Address: lis.Addr().String()}}}
k, err := newKademlia(zaptest.NewLogger(t), pb.NodeType_STORAGE, bootstrap,
lis.Addr().String(), nil, fid, ctx.Dir("kademlia"), defaultAlpha)
lis.Addr().String(), pb.NodeOperator{}, fid, ctx.Dir("kademlia"), defaultAlpha)
require.NoError(t, err)
defer ctx.Check(k.Close)
@ -251,15 +252,14 @@ func TestFindNear(t *testing.T) {
nodes := []*pb.Node{}
newNode := func(id string, bw, disk int64) pb.Node {
nodeID := teststorj.NodeIDFromString(id)
restriction := &pb.NodeRestrictions{FreeBandwidth: bw, FreeDisk: disk}
n := &pb.Node{Id: nodeID, Restrictions: restriction, Type: pb.NodeType_STORAGE}
n := &pb.Node{Id: nodeID}
nodes = append(nodes, n)
err = k.routingTable.ConnectionSuccess(n)
require.NoError(t, err)
return *n
}
nodeIDA := newNode("AAAAA", 1, 4)
nodeIDB := newNode("BBBBB", 2, 3)
newNode("BBBBB", 2, 3)
newNode("CCCCC", 3, 2)
newNode("DDDDD", 4, 1)
require.Len(t, nodes, 4)
@ -271,23 +271,12 @@ func TestFindNear(t *testing.T) {
restrictions []pb.Restriction
expected []*pb.Node
}{
{testID: "one", target: nodeIDB.Id, limit: 2, expected: nodes[2:],
restrictions: []pb.Restriction{
{Operator: pb.Restriction_GT, Operand: pb.Restriction_FREE_BANDWIDTH, Value: int64(2)},
},
},
{testID: "two", target: nodeIDA.Id, limit: 3, expected: nodes[3:],
restrictions: []pb.Restriction{
{Operator: pb.Restriction_GT, Operand: pb.Restriction_FREE_BANDWIDTH, Value: int64(2)},
{Operator: pb.Restriction_LT, Operand: pb.Restriction_FREE_DISK, Value: int64(2)},
},
},
{testID: "three", target: nodeIDA.Id, limit: 4, expected: nodes, restrictions: []pb.Restriction{}},
}
for _, c := range cases {
t.Run(c.testID, func(t *testing.T) {
ns, err := k.FindNear(ctx, c.target, c.limit, c.restrictions...)
ns, err := k.FindNear(ctx, c.target, c.limit)
require.NoError(t, err)
assert.Equal(t, len(c.expected), len(ns))
for _, e := range c.expected {
@ -303,100 +292,6 @@ func TestFindNear(t *testing.T) {
}
}
func TestMeetsRestrictions(t *testing.T) {
cases := []struct {
testID string
r []pb.Restriction
n pb.Node
expect bool
}{
{testID: "pass one",
r: []pb.Restriction{
{
Operator: pb.Restriction_EQ,
Operand: pb.Restriction_FREE_BANDWIDTH,
Value: int64(1),
},
},
n: pb.Node{
Restrictions: &pb.NodeRestrictions{
FreeBandwidth: int64(1),
},
},
expect: true,
},
{testID: "pass multiple",
r: []pb.Restriction{
{
Operator: pb.Restriction_LTE,
Operand: pb.Restriction_FREE_BANDWIDTH,
Value: int64(2),
},
{
Operator: pb.Restriction_GTE,
Operand: pb.Restriction_FREE_DISK,
Value: int64(2),
},
},
n: pb.Node{
Restrictions: &pb.NodeRestrictions{
FreeBandwidth: int64(1),
FreeDisk: int64(3),
},
},
expect: true,
},
{testID: "fail one",
r: []pb.Restriction{
{
Operator: pb.Restriction_LT,
Operand: pb.Restriction_FREE_BANDWIDTH,
Value: int64(2),
},
{
Operator: pb.Restriction_GT,
Operand: pb.Restriction_FREE_DISK,
Value: int64(2),
},
},
n: pb.Node{
Restrictions: &pb.NodeRestrictions{
FreeBandwidth: int64(2),
FreeDisk: int64(3),
},
},
expect: false,
},
{testID: "fail multiple",
r: []pb.Restriction{
{
Operator: pb.Restriction_LT,
Operand: pb.Restriction_FREE_BANDWIDTH,
Value: int64(2),
},
{
Operator: pb.Restriction_GT,
Operand: pb.Restriction_FREE_DISK,
Value: int64(2),
},
},
n: pb.Node{
Restrictions: &pb.NodeRestrictions{
FreeBandwidth: int64(2),
FreeDisk: int64(2),
},
},
expect: false,
},
}
for _, c := range cases {
t.Run(c.testID, func(t *testing.T) {
result := meetsRestrictions(c.r, c.n)
assert.Equal(t, c.expect, result)
})
}
}
func startTestNodeServer(ctx *testcontext.Context) (*grpc.Server, *mockNodesServer, *identity.FullIdentity, string) {
lis, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
@ -512,12 +407,14 @@ func (mn *mockNodesServer) RequestInfo(ctx context.Context, req *pb.InfoRequest)
}
// newKademlia returns a newly configured Kademlia instance
func newKademlia(log *zap.Logger, nodeType pb.NodeType, bootstrapNodes []pb.Node, address string, metadata *pb.NodeMetadata, identity *identity.FullIdentity, path string, alpha int) (*Kademlia, error) {
self := pb.Node{
Id: identity.ID,
func newKademlia(log *zap.Logger, nodeType pb.NodeType, bootstrapNodes []pb.Node, address string, operator pb.NodeOperator, identity *identity.FullIdentity, path string, alpha int) (*Kademlia, error) {
self := &overlay.NodeDossier{
Node: pb.Node{
Id: identity.ID,
Address: &pb.NodeAddress{Address: address},
},
Type: nodeType,
Address: &pb.NodeAddress{Address: address},
Metadata: metadata,
Operator: operator,
}
rt, err := NewRoutingTable(log, self, teststore.New(), teststore.New(), nil)
@ -538,7 +435,7 @@ func newKademlia(log *zap.Logger, nodeType pb.NodeType, bootstrapNodes []pb.Node
Alpha: alpha,
}
kad, err := NewService(log, self, transportClient, rt, kadConfig)
kad, err := NewService(log, transportClient, rt, kadConfig)
if err != nil {
return nil, err
}

View File

@ -88,13 +88,7 @@ func (lookup *peerDiscovery) Run(ctx context.Context) (target *pb.Node, err erro
lookup.cond.Wait()
}
lookup.cond.L.Unlock()
var nodeType pb.NodeType
if target != nil {
nodeType = target.Type
nodeType.DPanicOnInvalid("Peer Discovery Run")
}
next.Type.DPanicOnInvalid("next")
neighbors, err := lookup.dialer.Lookup(ctx, lookup.self, *next, pb.Node{Id: lookup.target, Type: nodeType})
neighbors, err := lookup.dialer.Lookup(ctx, lookup.self, *next, pb.Node{Id: lookup.target})
if err != nil && !isDone(ctx) {
// TODO: reenable retry after fixing logic

View File

@ -14,6 +14,7 @@ import (
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
"storj.io/storj/storage"
@ -55,7 +56,7 @@ type RoutingTableConfig struct {
// RoutingTable implements the RoutingTable interface
type RoutingTable struct {
log *zap.Logger
self pb.Node
self *overlay.NodeDossier
kadBucketDB storage.KeyValueStore
nodeBucketDB storage.KeyValueStore
transport *pb.NodeTransport
@ -68,9 +69,7 @@ type RoutingTable struct {
}
// NewRoutingTable returns a newly configured instance of a RoutingTable
func NewRoutingTable(logger *zap.Logger, localNode pb.Node, kdb, ndb storage.KeyValueStore, config *RoutingTableConfig) (*RoutingTable, error) {
localNode.Type.DPanicOnInvalid("new routing table")
func NewRoutingTable(logger *zap.Logger, localNode *overlay.NodeDossier, kdb, ndb storage.KeyValueStore, config *RoutingTableConfig) (*RoutingTable, error) {
if config == nil || config.BucketSize == 0 || config.ReplacementCacheSize == 0 {
// TODO: handle this more nicely
config = &RoutingTableConfig{
@ -94,7 +93,7 @@ func NewRoutingTable(logger *zap.Logger, localNode pb.Node, kdb, ndb storage.Key
bucketSize: config.BucketSize,
rcBucketSize: config.ReplacementCacheSize,
}
ok, err := rt.addNode(&localNode)
ok, err := rt.addNode(&localNode.Node)
if !ok || err != nil {
return nil, RoutingErr.New("could not add localNode to routing table: %s", err)
}
@ -106,11 +105,20 @@ func (rt *RoutingTable) Close() error {
return nil
}
// Local returns the local nodes ID
func (rt *RoutingTable) Local() pb.Node {
// Local returns the local node
func (rt *RoutingTable) Local() overlay.NodeDossier {
rt.mutex.Lock()
defer rt.mutex.Unlock()
return rt.self
return *rt.self
}
// UpdateSelf updates the local node with the provided info
func (rt *RoutingTable) UpdateSelf(capacity *pb.NodeCapacity) {
rt.mutex.Lock()
defer rt.mutex.Unlock()
if capacity != nil {
rt.self.Capacity = *capacity
}
}
// K returns the currently configured maximum of nodes to store in a bucket
@ -173,7 +181,7 @@ func (rt *RoutingTable) DumpNodes() ([]*pb.Node, error) {
// FindNear returns the node corresponding to the provided nodeID
// returns all Nodes (excluding self) closest via XOR to the provided nodeID up to the provided limit
func (rt *RoutingTable) FindNear(target storj.NodeID, limit int, restrictions ...pb.Restriction) ([]*pb.Node, error) {
func (rt *RoutingTable) FindNear(target storj.NodeID, limit int) ([]*pb.Node, error) {
closestNodes := make([]*pb.Node, 0, limit+1)
err := rt.iterateNodes(storj.NodeID{}, func(newID storj.NodeID, protoNode []byte) error {
newPos := len(closestNodes)
@ -185,14 +193,12 @@ func (rt *RoutingTable) FindNear(target storj.NodeID, limit int, restrictions ..
if err != nil {
return err
}
if meetsRestrictions(restrictions, newNode) {
closestNodes = append(closestNodes, &newNode)
if newPos != len(closestNodes) { //reorder
copy(closestNodes[newPos+1:], closestNodes[newPos:])
closestNodes[newPos] = &newNode
if len(closestNodes) > limit {
closestNodes = closestNodes[:limit]
}
closestNodes = append(closestNodes, &newNode)
if newPos != len(closestNodes) { //reorder
copy(closestNodes[newPos+1:], closestNodes[newPos:])
closestNodes[newPos] = &newNode
if len(closestNodes) > limit {
closestNodes = closestNodes[:limit]
}
}
}
@ -201,25 +207,6 @@ func (rt *RoutingTable) FindNear(target storj.NodeID, limit int, restrictions ..
return closestNodes, Error.Wrap(err)
}
// UpdateSelf updates a node on the routing table
func (rt *RoutingTable) UpdateSelf(node *pb.Node) error {
// TODO: replace UpdateSelf with UpdateRestrictions and UpdateAddress
rt.mutex.Lock()
if node.Id != rt.self.Id {
rt.mutex.Unlock()
return RoutingErr.New("self does not have a matching node id")
}
rt.self = *node
rt.seen[node.Id] = node
rt.mutex.Unlock()
if err := rt.updateNode(node); err != nil {
return RoutingErr.New("could not update node %s", err)
}
return nil
}
// ConnectionSuccess updates or adds a node to the routing table when
// a successful connection is made to the node on the network
func (rt *RoutingTable) ConnectionSuccess(node *pb.Node) error {
@ -228,8 +215,6 @@ func (rt *RoutingTable) ConnectionSuccess(node *pb.Node) error {
return nil
}
node.Type.DPanicOnInvalid("connection success")
rt.mutex.Lock()
rt.seen[node.Id] = node
rt.mutex.Unlock()
@ -255,7 +240,6 @@ func (rt *RoutingTable) ConnectionSuccess(node *pb.Node) error {
// ConnectionFailed removes a node from the routing table when
// a connection fails for the node on the network
func (rt *RoutingTable) ConnectionFailed(node *pb.Node) error {
node.Type.DPanicOnInvalid("connection failed")
err := rt.removeNode(node)
if err != nil {
return RoutingErr.New("could not remove node %s", err)

View File

@ -16,6 +16,7 @@ import (
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/teststorj"
"storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
"storj.io/storj/storage"
@ -29,7 +30,7 @@ type routingTableOpts struct {
}
// newTestRoutingTable returns a newly configured instance of a RoutingTable
func newTestRoutingTable(localNode pb.Node, opts routingTableOpts) (*RoutingTable, error) {
func newTestRoutingTable(local *overlay.NodeDossier, opts routingTableOpts) (*RoutingTable, error) {
if opts.bucketSize == 0 {
opts.bucketSize = 6
}
@ -37,7 +38,7 @@ func newTestRoutingTable(localNode pb.Node, opts routingTableOpts) (*RoutingTabl
opts.cacheSize = 2
}
rt := &RoutingTable{
self: localNode,
self: local,
kadBucketDB: storelogger.New(zap.L().Named("rt.kad"), teststore.New()),
nodeBucketDB: storelogger.New(zap.L().Named("rt.node"), teststore.New()),
transport: &defaultTransport,
@ -50,7 +51,7 @@ func newTestRoutingTable(localNode pb.Node, opts routingTableOpts) (*RoutingTabl
bucketSize: opts.bucketSize,
rcBucketSize: opts.cacheSize,
}
ok, err := rt.addNode(&localNode)
ok, err := rt.addNode(&local.Node)
if !ok || err != nil {
return nil, RoutingErr.New("could not add localNode to routing table: %s", err)
}
@ -61,9 +62,9 @@ func createRoutingTableWith(localNodeID storj.NodeID, opts routingTableOpts) *Ro
if localNodeID == (storj.NodeID{}) {
panic("empty local node id")
}
localNode := pb.Node{Id: localNodeID}
local := &overlay.NodeDossier{Node: pb.Node{Id: localNodeID}}
rt, err := newTestRoutingTable(localNode, opts)
rt, err := newTestRoutingTable(local, opts)
if err != nil {
panic(err)
}

View File

@ -145,8 +145,8 @@ func TestConnectionSuccess(t *testing.T) {
id2 := teststorj.NodeIDFromString("BB")
address1 := &pb.NodeAddress{Address: "a"}
address2 := &pb.NodeAddress{Address: "b"}
node1 := &pb.Node{Id: id, Address: address1, Type: pb.NodeType_STORAGE}
node2 := &pb.Node{Id: id2, Address: address2, Type: pb.NodeType_STORAGE}
node1 := &pb.Node{Id: id, Address: address1}
node2 := &pb.Node{Id: id2, Address: address2}
cases := []struct {
testID string
node *pb.Node
@ -177,52 +177,12 @@ func TestConnectionSuccess(t *testing.T) {
}
}
func TestUpdateSelf(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
id := teststorj.NodeIDFromString("AA")
rt := createRoutingTable(id)
defer ctx.Check(rt.Close)
address := &pb.NodeAddress{Address: "a"}
node := &pb.Node{Id: id, Address: address, Type: pb.NodeType_STORAGE}
cases := []struct {
testID string
node *pb.Node
id storj.NodeID
address *pb.NodeAddress
}{
{testID: "Update Node",
node: node,
id: id,
address: address,
},
}
for _, c := range cases {
t.Run(c.testID, func(t *testing.T) {
newNode := c.node
restrictions := &pb.NodeRestrictions{
FreeBandwidth: 10,
}
newNode.Restrictions = restrictions
err := rt.UpdateSelf(newNode)
assert.NoError(t, err)
v, err := rt.nodeBucketDB.Get(c.id.Bytes())
assert.NoError(t, err)
n, err := unmarshalNodes([]storage.Value{v})
assert.NoError(t, err)
assert.Equal(t, c.address.Address, n[0].Address.Address)
assert.Equal(t, newNode.Restrictions.GetFreeBandwidth(), n[0].Restrictions.GetFreeBandwidth())
})
}
}
func TestConnectionFailed(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
id := teststorj.NodeIDFromString("AA")
node := &pb.Node{Id: id, Type: pb.NodeType_STORAGE}
node := &pb.Node{Id: id}
rt := createRoutingTable(id)
defer ctx.Check(rt.Close)
err := rt.ConnectionFailed(node)

View File

@ -9,6 +9,7 @@ import (
"time"
"storj.io/storj/pkg/dht"
"storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
"storj.io/storj/storage"
@ -121,7 +122,7 @@ func (t *Table) ConnectionFailed(node *pb.Node) error {
// FindNear will return up to limit nodes in the routing table ordered by
// kademlia xor distance from the given id.
func (t *Table) FindNear(id storj.NodeID, limit int, restrictions ...pb.Restriction) ([]*pb.Node, error) {
func (t *Table) FindNear(id storj.NodeID, limit int) ([]*pb.Node, error) {
t.mu.Lock()
defer t.mu.Unlock()
@ -147,8 +148,8 @@ func (t *Table) FindNear(id storj.NodeID, limit int, restrictions ...pb.Restrict
return rv, nil
}
// Local returns the local nodes ID
func (t *Table) Local() pb.Node {
// Local returns the local node
func (t *Table) Local() overlay.NodeDossier {
// the routing table has no idea what the right address of ourself is,
// so this is the wrong place to get this information. we could return
// our own id only?

View File

@ -310,20 +310,14 @@ func getBucketNames(bucketList storj.BucketList) []string {
}
func runTest(t *testing.T, test func(context.Context, *testplanet.Planet, *kvmetainfo.DB, buckets.Store, streams.Store)) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
db, buckets, streams, err := newMetainfoParts(planet)
require.NoError(t, err)
planet, err := testplanet.New(t, 1, 4, 1)
require.NoError(t, err)
defer ctx.Check(planet.Shutdown)
planet.Start(ctx)
db, buckets, streams, err := newMetainfoParts(planet)
require.NoError(t, err)
test(ctx, planet, db, buckets, streams)
test(ctx, planet, db, buckets, streams)
})
}
func newMetainfoParts(planet *testplanet.Planet) (*kvmetainfo.DB, buckets.Store, streams.Store, error) {

View File

@ -51,13 +51,13 @@ type DB interface {
List(ctx context.Context, cursor storj.NodeID, limit int) ([]*NodeDossier, error)
// Paginate will page through the database nodes
Paginate(ctx context.Context, offset int64, limit int) ([]*NodeDossier, bool, error)
// Update updates node information
Update(ctx context.Context, value *pb.Node) error
// CreateStats initializes the stats for node.
CreateStats(ctx context.Context, nodeID storj.NodeID, initial *NodeStats) (stats *NodeStats, err error)
// FindInvalidNodes finds a subset of storagenodes that have stats below provided reputation requirements.
FindInvalidNodes(ctx context.Context, nodeIDs storj.NodeIDList, maxStats *NodeStats) (invalid storj.NodeIDList, err error)
// Update updates node address
UpdateAddress(ctx context.Context, value *pb.Node) error
// UpdateStats all parts of single storagenode's stats.
UpdateStats(ctx context.Context, request *UpdateRequest) (stats *NodeStats, err error)
// UpdateNodeInfo updates node dossier with info requested from the node itself like node type, email, wallet, capacity, and version.
@ -288,7 +288,7 @@ func (cache *Cache) GetAll(ctx context.Context, ids storj.NodeIDList) (_ []*Node
return cache.db.GetAll(ctx, ids)
}
// Put adds a node id and proto definition into the overlay cache and stat db
// Put adds a node id and proto definition into the overlay cache
func (cache *Cache) Put(ctx context.Context, nodeID storj.NodeID, value pb.Node) (err error) {
defer mon.Task()(&ctx)(&err)
@ -300,7 +300,7 @@ func (cache *Cache) Put(ctx context.Context, nodeID storj.NodeID, value pb.Node)
if nodeID != value.Id {
return errors.New("invalid request")
}
return cache.db.Update(ctx, &value)
return cache.db.UpdateAddress(ctx, &value)
}
// Create adds a new stats entry for node.

View File

@ -146,11 +146,11 @@ func testRandomizedSelection(t *testing.T, reputable bool) {
for i := 0; i < totalNodes; i++ {
newID := storj.NodeID{}
_, _ = rand.Read(newID[:])
err := cache.Update(ctx, &pb.Node{
Id: newID,
Type: pb.NodeType_STORAGE,
Restrictions: &pb.NodeRestrictions{},
Reputation: &pb.NodeStats{},
err := cache.UpdateAddress(ctx, &pb.Node{Id: newID})
require.NoError(t, err)
_, err = cache.UpdateNodeInfo(ctx, newID, &pb.InfoResponse{
Type: pb.NodeType_STORAGE,
Capacity: &pb.NodeCapacity{},
})
require.NoError(t, err)
_, err = cache.UpdateUptime(ctx, newID, true)

View File

@ -46,10 +46,10 @@ func (srv *Inspector) GetStats(ctx context.Context, req *pb.GetStatsRequest) (*p
}
return &pb.GetStatsResponse{
AuditCount: node.GetReputation().GetAuditCount(),
AuditRatio: node.GetReputation().GetAuditSuccessRatio(),
UptimeCount: node.GetReputation().GetUptimeCount(),
UptimeRatio: node.GetReputation().GetUptimeRatio(),
AuditCount: node.Reputation.AuditCount,
AuditRatio: node.Reputation.AuditSuccessRatio,
UptimeCount: node.Reputation.UptimeCount,
UptimeRatio: node.Reputation.UptimeRatio,
}, nil
}

View File

@ -71,7 +71,7 @@ func TestNodeSelection(t *testing.T) {
// ensure all storagenodes are in overlay service
for _, storageNode := range planet.StorageNodes {
err = satellite.Overlay.Service.Put(ctx, storageNode.ID(), storageNode.Local())
err = satellite.Overlay.Service.Put(ctx, storageNode.ID(), storageNode.Local().Node)
assert.NoError(t, err)
}

View File

@ -52,7 +52,7 @@ func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) {
UptimeSuccessCount: currUptimeSuccess,
}
err := cache.Update(ctx, &pb.Node{Id: nodeID})
err := cache.UpdateAddress(ctx, &pb.Node{Id: nodeID})
require.NoError(t, err)
stats, err := cache.CreateStats(ctx, nodeID, nodeStats)
@ -105,7 +105,7 @@ func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) {
UptimeSuccessCount: tt.uptimeSuccessCount,
}
err := cache.Update(ctx, &pb.Node{Id: tt.nodeID})
err := cache.UpdateAddress(ctx, &pb.Node{Id: tt.nodeID})
require.NoError(t, err)
_, err = cache.CreateStats(ctx, tt.nodeID, nodeStats)
@ -133,7 +133,7 @@ func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) {
{ // TestUpdateOperator
nodeID := storj.NodeID{10}
err := cache.Update(ctx, &pb.Node{Id: nodeID})
err := cache.UpdateAddress(ctx, &pb.Node{Id: nodeID})
require.NoError(t, err)
update, err := cache.UpdateNodeInfo(ctx, nodeID, &pb.InfoResponse{

View File

@ -3,13 +3,11 @@
package pb
import (
fmt "fmt"
_ "github.com/gogo/protobuf/gogoproto"
proto "github.com/gogo/protobuf/proto"
timestamp "github.com/golang/protobuf/ptypes/timestamp"
math "math"
)
import proto "github.com/gogo/protobuf/proto"
import fmt "fmt"
import math "math"
import _ "github.com/gogo/protobuf/gogoproto"
import timestamp "github.com/golang/protobuf/ptypes/timestamp"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
@ -40,7 +38,6 @@ var NodeType_name = map[int32]string{
3: "UPLINK",
4: "BOOTSTRAP",
}
var NodeType_value = map[string]int32{
"INVALID": 0,
"SATELLITE": 1,
@ -52,9 +49,8 @@ var NodeType_value = map[string]int32{
func (x NodeType) String() string {
return proto.EnumName(NodeType_name, int32(x))
}
func (NodeType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_0c843d59d2d938e7, []int{0}
return fileDescriptor_node_52aec12f51af5891, []int{0}
}
// NodeTransport is an enum of possible transports for the overlay network
@ -67,7 +63,6 @@ const (
var NodeTransport_name = map[int32]string{
0: "TCP_TLS_GRPC",
}
var NodeTransport_value = map[string]int32{
"TCP_TLS_GRPC": 0,
}
@ -75,38 +70,26 @@ var NodeTransport_value = map[string]int32{
func (x NodeTransport) String() string {
return proto.EnumName(NodeTransport_name, int32(x))
}
func (NodeTransport) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_0c843d59d2d938e7, []int{1}
return fileDescriptor_node_52aec12f51af5891, []int{1}
}
// TODO move statdb.Update() stuff out of here
// Node represents a node in the overlay network
// Node is info for a updating a single storagenode, used in the Update rpc calls
type Node struct {
Id NodeID `protobuf:"bytes,1,opt,name=id,proto3,customtype=NodeID" json:"id"`
Address *NodeAddress `protobuf:"bytes,2,opt,name=address,proto3" json:"address,omitempty"`
Type NodeType `protobuf:"varint,3,opt,name=type,proto3,enum=node.NodeType" json:"type,omitempty"`
Restrictions *NodeRestrictions `protobuf:"bytes,4,opt,name=restrictions,proto3" json:"restrictions,omitempty"`
Reputation *NodeStats `protobuf:"bytes,5,opt,name=reputation,proto3" json:"reputation,omitempty"`
Metadata *NodeMetadata `protobuf:"bytes,6,opt,name=metadata,proto3" json:"metadata,omitempty"`
LatencyList []int64 `protobuf:"varint,7,rep,packed,name=latency_list,json=latencyList,proto3" json:"latency_list,omitempty"`
AuditSuccess bool `protobuf:"varint,8,opt,name=audit_success,json=auditSuccess,proto3" json:"audit_success,omitempty"`
IsUp bool `protobuf:"varint,9,opt,name=is_up,json=isUp,proto3" json:"is_up,omitempty"`
UpdateLatency bool `protobuf:"varint,10,opt,name=update_latency,json=updateLatency,proto3" json:"update_latency,omitempty"`
UpdateAuditSuccess bool `protobuf:"varint,11,opt,name=update_audit_success,json=updateAuditSuccess,proto3" json:"update_audit_success,omitempty"`
UpdateUptime bool `protobuf:"varint,12,opt,name=update_uptime,json=updateUptime,proto3" json:"update_uptime,omitempty"`
Version *NodeVersion `protobuf:"bytes,13,opt,name=version,proto3" json:"version,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
Id NodeID `protobuf:"bytes,1,opt,name=id,proto3,customtype=NodeID" json:"id"`
Address *NodeAddress `protobuf:"bytes,2,opt,name=address,proto3" json:"address,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Node) Reset() { *m = Node{} }
func (m *Node) String() string { return proto.CompactTextString(m) }
func (*Node) ProtoMessage() {}
func (*Node) Descriptor() ([]byte, []int) {
return fileDescriptor_0c843d59d2d938e7, []int{0}
return fileDescriptor_node_52aec12f51af5891, []int{0}
}
func (m *Node) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Node.Unmarshal(m, b)
@ -114,8 +97,8 @@ func (m *Node) XXX_Unmarshal(b []byte) error {
func (m *Node) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Node.Marshal(b, m, deterministic)
}
func (m *Node) XXX_Merge(src proto.Message) {
xxx_messageInfo_Node.Merge(m, src)
func (dst *Node) XXX_Merge(src proto.Message) {
xxx_messageInfo_Node.Merge(dst, src)
}
func (m *Node) XXX_Size() int {
return xxx_messageInfo_Node.Size(m)
@ -133,83 +116,6 @@ func (m *Node) GetAddress() *NodeAddress {
return nil
}
func (m *Node) GetType() NodeType {
if m != nil {
return m.Type
}
return NodeType_INVALID
}
func (m *Node) GetRestrictions() *NodeRestrictions {
if m != nil {
return m.Restrictions
}
return nil
}
func (m *Node) GetReputation() *NodeStats {
if m != nil {
return m.Reputation
}
return nil
}
func (m *Node) GetMetadata() *NodeMetadata {
if m != nil {
return m.Metadata
}
return nil
}
func (m *Node) GetLatencyList() []int64 {
if m != nil {
return m.LatencyList
}
return nil
}
func (m *Node) GetAuditSuccess() bool {
if m != nil {
return m.AuditSuccess
}
return false
}
func (m *Node) GetIsUp() bool {
if m != nil {
return m.IsUp
}
return false
}
func (m *Node) GetUpdateLatency() bool {
if m != nil {
return m.UpdateLatency
}
return false
}
func (m *Node) GetUpdateAuditSuccess() bool {
if m != nil {
return m.UpdateAuditSuccess
}
return false
}
func (m *Node) GetUpdateUptime() bool {
if m != nil {
return m.UpdateUptime
}
return false
}
func (m *Node) GetVersion() *NodeVersion {
if m != nil {
return m.Version
}
return nil
}
// NodeAddress contains the information needed to communicate with a node on the network
type NodeAddress struct {
Transport NodeTransport `protobuf:"varint,1,opt,name=transport,proto3,enum=node.NodeTransport" json:"transport,omitempty"`
@ -223,7 +129,7 @@ func (m *NodeAddress) Reset() { *m = NodeAddress{} }
func (m *NodeAddress) String() string { return proto.CompactTextString(m) }
func (*NodeAddress) ProtoMessage() {}
func (*NodeAddress) Descriptor() ([]byte, []int) {
return fileDescriptor_0c843d59d2d938e7, []int{1}
return fileDescriptor_node_52aec12f51af5891, []int{1}
}
func (m *NodeAddress) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_NodeAddress.Unmarshal(m, b)
@ -231,8 +137,8 @@ func (m *NodeAddress) XXX_Unmarshal(b []byte) error {
func (m *NodeAddress) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_NodeAddress.Marshal(b, m, deterministic)
}
func (m *NodeAddress) XXX_Merge(src proto.Message) {
xxx_messageInfo_NodeAddress.Merge(m, src)
func (dst *NodeAddress) XXX_Merge(src proto.Message) {
xxx_messageInfo_NodeAddress.Merge(dst, src)
}
func (m *NodeAddress) XXX_Size() int {
return xxx_messageInfo_NodeAddress.Size(m)
@ -278,7 +184,7 @@ func (m *NodeStats) Reset() { *m = NodeStats{} }
func (m *NodeStats) String() string { return proto.CompactTextString(m) }
func (*NodeStats) ProtoMessage() {}
func (*NodeStats) Descriptor() ([]byte, []int) {
return fileDescriptor_0c843d59d2d938e7, []int{2}
return fileDescriptor_node_52aec12f51af5891, []int{2}
}
func (m *NodeStats) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_NodeStats.Unmarshal(m, b)
@ -286,8 +192,8 @@ func (m *NodeStats) XXX_Unmarshal(b []byte) error {
func (m *NodeStats) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_NodeStats.Marshal(b, m, deterministic)
}
func (m *NodeStats) XXX_Merge(src proto.Message) {
xxx_messageInfo_NodeStats.Merge(m, src)
func (dst *NodeStats) XXX_Merge(src proto.Message) {
xxx_messageInfo_NodeStats.Merge(dst, src)
}
func (m *NodeStats) XXX_Size() int {
return xxx_messageInfo_NodeStats.Size(m)
@ -374,7 +280,7 @@ func (m *NodeOperator) Reset() { *m = NodeOperator{} }
func (m *NodeOperator) String() string { return proto.CompactTextString(m) }
func (*NodeOperator) ProtoMessage() {}
func (*NodeOperator) Descriptor() ([]byte, []int) {
return fileDescriptor_0c843d59d2d938e7, []int{3}
return fileDescriptor_node_52aec12f51af5891, []int{3}
}
func (m *NodeOperator) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_NodeOperator.Unmarshal(m, b)
@ -382,8 +288,8 @@ func (m *NodeOperator) XXX_Unmarshal(b []byte) error {
func (m *NodeOperator) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_NodeOperator.Marshal(b, m, deterministic)
}
func (m *NodeOperator) XXX_Merge(src proto.Message) {
xxx_messageInfo_NodeOperator.Merge(m, src)
func (dst *NodeOperator) XXX_Merge(src proto.Message) {
xxx_messageInfo_NodeOperator.Merge(dst, src)
}
func (m *NodeOperator) XXX_Size() int {
return xxx_messageInfo_NodeOperator.Size(m)
@ -421,7 +327,7 @@ func (m *NodeCapacity) Reset() { *m = NodeCapacity{} }
func (m *NodeCapacity) String() string { return proto.CompactTextString(m) }
func (*NodeCapacity) ProtoMessage() {}
func (*NodeCapacity) Descriptor() ([]byte, []int) {
return fileDescriptor_0c843d59d2d938e7, []int{4}
return fileDescriptor_node_52aec12f51af5891, []int{4}
}
func (m *NodeCapacity) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_NodeCapacity.Unmarshal(m, b)
@ -429,8 +335,8 @@ func (m *NodeCapacity) XXX_Unmarshal(b []byte) error {
func (m *NodeCapacity) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_NodeCapacity.Marshal(b, m, deterministic)
}
func (m *NodeCapacity) XXX_Merge(src proto.Message) {
xxx_messageInfo_NodeCapacity.Merge(m, src)
func (dst *NodeCapacity) XXX_Merge(src proto.Message) {
xxx_messageInfo_NodeCapacity.Merge(dst, src)
}
func (m *NodeCapacity) XXX_Size() int {
return xxx_messageInfo_NodeCapacity.Size(m)
@ -468,7 +374,7 @@ func (m *NodeMetadata) Reset() { *m = NodeMetadata{} }
func (m *NodeMetadata) String() string { return proto.CompactTextString(m) }
func (*NodeMetadata) ProtoMessage() {}
func (*NodeMetadata) Descriptor() ([]byte, []int) {
return fileDescriptor_0c843d59d2d938e7, []int{5}
return fileDescriptor_node_52aec12f51af5891, []int{5}
}
func (m *NodeMetadata) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_NodeMetadata.Unmarshal(m, b)
@ -476,8 +382,8 @@ func (m *NodeMetadata) XXX_Unmarshal(b []byte) error {
func (m *NodeMetadata) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_NodeMetadata.Marshal(b, m, deterministic)
}
func (m *NodeMetadata) XXX_Merge(src proto.Message) {
xxx_messageInfo_NodeMetadata.Merge(m, src)
func (dst *NodeMetadata) XXX_Merge(src proto.Message) {
xxx_messageInfo_NodeMetadata.Merge(dst, src)
}
func (m *NodeMetadata) XXX_Size() int {
return xxx_messageInfo_NodeMetadata.Size(m)
@ -515,7 +421,7 @@ func (m *NodeRestrictions) Reset() { *m = NodeRestrictions{} }
func (m *NodeRestrictions) String() string { return proto.CompactTextString(m) }
func (*NodeRestrictions) ProtoMessage() {}
func (*NodeRestrictions) Descriptor() ([]byte, []int) {
return fileDescriptor_0c843d59d2d938e7, []int{6}
return fileDescriptor_node_52aec12f51af5891, []int{6}
}
func (m *NodeRestrictions) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_NodeRestrictions.Unmarshal(m, b)
@ -523,8 +429,8 @@ func (m *NodeRestrictions) XXX_Unmarshal(b []byte) error {
func (m *NodeRestrictions) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_NodeRestrictions.Marshal(b, m, deterministic)
}
func (m *NodeRestrictions) XXX_Merge(src proto.Message) {
xxx_messageInfo_NodeRestrictions.Merge(m, src)
func (dst *NodeRestrictions) XXX_Merge(src proto.Message) {
xxx_messageInfo_NodeRestrictions.Merge(dst, src)
}
func (m *NodeRestrictions) XXX_Size() int {
return xxx_messageInfo_NodeRestrictions.Size(m)
@ -564,7 +470,7 @@ func (m *NodeVersion) Reset() { *m = NodeVersion{} }
func (m *NodeVersion) String() string { return proto.CompactTextString(m) }
func (*NodeVersion) ProtoMessage() {}
func (*NodeVersion) Descriptor() ([]byte, []int) {
return fileDescriptor_0c843d59d2d938e7, []int{7}
return fileDescriptor_node_52aec12f51af5891, []int{7}
}
func (m *NodeVersion) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_NodeVersion.Unmarshal(m, b)
@ -572,8 +478,8 @@ func (m *NodeVersion) XXX_Unmarshal(b []byte) error {
func (m *NodeVersion) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_NodeVersion.Marshal(b, m, deterministic)
}
func (m *NodeVersion) XXX_Merge(src proto.Message) {
xxx_messageInfo_NodeVersion.Merge(m, src)
func (dst *NodeVersion) XXX_Merge(src proto.Message) {
xxx_messageInfo_NodeVersion.Merge(dst, src)
}
func (m *NodeVersion) XXX_Size() int {
return xxx_messageInfo_NodeVersion.Size(m)
@ -613,8 +519,6 @@ func (m *NodeVersion) GetRelease() bool {
}
func init() {
proto.RegisterEnum("node.NodeType", NodeType_name, NodeType_value)
proto.RegisterEnum("node.NodeTransport", NodeTransport_name, NodeTransport_value)
proto.RegisterType((*Node)(nil), "node.Node")
proto.RegisterType((*NodeAddress)(nil), "node.NodeAddress")
proto.RegisterType((*NodeStats)(nil), "node.NodeStats")
@ -623,62 +527,58 @@ func init() {
proto.RegisterType((*NodeMetadata)(nil), "node.NodeMetadata")
proto.RegisterType((*NodeRestrictions)(nil), "node.NodeRestrictions")
proto.RegisterType((*NodeVersion)(nil), "node.NodeVersion")
proto.RegisterEnum("node.NodeType", NodeType_name, NodeType_value)
proto.RegisterEnum("node.NodeTransport", NodeTransport_name, NodeTransport_value)
}
func init() { proto.RegisterFile("node.proto", fileDescriptor_0c843d59d2d938e7) }
func init() { proto.RegisterFile("node.proto", fileDescriptor_node_52aec12f51af5891) }
var fileDescriptor_0c843d59d2d938e7 = []byte{
// 828 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x94, 0xcf, 0x6e, 0xdb, 0x46,
0x10, 0xc6, 0x4d, 0x91, 0x96, 0xc4, 0xd1, 0x9f, 0x32, 0x6b, 0x23, 0x20, 0x5c, 0xb4, 0x56, 0x14,
0x14, 0x15, 0x52, 0x40, 0x76, 0xdd, 0x4b, 0x53, 0xf4, 0x22, 0xcb, 0x6e, 0x2a, 0x94, 0xb5, 0x8d,
0x15, 0xed, 0x43, 0x2e, 0xc4, 0x8a, 0x5c, 0x4b, 0x8b, 0x50, 0x22, 0xc1, 0x5d, 0x36, 0xd0, 0xbb,
0xe4, 0x81, 0xfa, 0x0c, 0x3d, 0xe4, 0xdc, 0xc7, 0x28, 0x76, 0x97, 0x14, 0x45, 0x17, 0x2d, 0x5a,
0x20, 0x37, 0xee, 0x37, 0xbf, 0x9d, 0x19, 0x70, 0xbe, 0x1d, 0x80, 0x4d, 0x12, 0xd1, 0x71, 0x9a,
0x25, 0x22, 0x41, 0x96, 0xfc, 0x3e, 0x81, 0x65, 0xb2, 0x4c, 0xb4, 0x72, 0x72, 0xba, 0x4c, 0x92,
0x65, 0x4c, 0xcf, 0xd4, 0x69, 0x91, 0x3f, 0x9e, 0x09, 0xb6, 0xa6, 0x5c, 0x90, 0x75, 0xaa, 0x81,
0xe1, 0x07, 0x0b, 0xac, 0x9b, 0x24, 0xa2, 0xe8, 0x4b, 0x68, 0xb0, 0xc8, 0x35, 0x06, 0xc6, 0xa8,
0x7b, 0xd9, 0xff, 0xfd, 0xe3, 0xe9, 0xc1, 0x1f, 0x1f, 0x4f, 0x9b, 0x32, 0x32, 0xbb, 0xc2, 0x0d,
0x16, 0xa1, 0x6f, 0xa0, 0x45, 0xa2, 0x28, 0xa3, 0x9c, 0xbb, 0x8d, 0x81, 0x31, 0xea, 0x5c, 0x3c,
0x1b, 0xab, 0xca, 0x12, 0x99, 0xe8, 0x00, 0x2e, 0x09, 0x34, 0x04, 0x4b, 0x6c, 0x53, 0xea, 0x9a,
0x03, 0x63, 0xd4, 0xbf, 0xe8, 0x57, 0xa4, 0xbf, 0x4d, 0x29, 0x56, 0x31, 0xf4, 0x03, 0x74, 0x33,
0xca, 0x45, 0xc6, 0x42, 0xc1, 0x92, 0x0d, 0x77, 0x2d, 0x95, 0xf5, 0x79, 0xc5, 0xe2, 0xbd, 0x28,
0xae, 0xb1, 0xe8, 0x0c, 0x20, 0xa3, 0x69, 0x2e, 0x88, 0x3c, 0xba, 0x87, 0xea, 0xe6, 0x67, 0xd5,
0xcd, 0xb9, 0x20, 0x82, 0xe3, 0x3d, 0x04, 0x8d, 0xa1, 0xbd, 0xa6, 0x82, 0x44, 0x44, 0x10, 0xb7,
0xa9, 0x70, 0x54, 0xe1, 0xbf, 0x16, 0x11, 0xbc, 0x63, 0xd0, 0x0b, 0xe8, 0xc6, 0x44, 0xd0, 0x4d,
0xb8, 0x0d, 0x62, 0xc6, 0x85, 0xdb, 0x1a, 0x98, 0x23, 0x13, 0x77, 0x0a, 0xcd, 0x63, 0x5c, 0xa0,
0x97, 0xd0, 0x23, 0x79, 0xc4, 0x44, 0xc0, 0xf3, 0x30, 0x94, 0xbf, 0xa5, 0x3d, 0x30, 0x46, 0x6d,
0xdc, 0x55, 0xe2, 0x5c, 0x6b, 0xe8, 0x08, 0x0e, 0x19, 0x0f, 0xf2, 0xd4, 0xb5, 0x55, 0xd0, 0x62,
0xfc, 0x3e, 0x45, 0x5f, 0x41, 0x3f, 0x4f, 0x23, 0x22, 0x68, 0x50, 0xe4, 0x73, 0x41, 0x45, 0x7b,
0x5a, 0xf5, 0xb4, 0x88, 0xce, 0xe1, 0xb8, 0xc0, 0xea, 0x75, 0x3a, 0x0a, 0x46, 0x3a, 0x36, 0xd9,
0xaf, 0xf6, 0x12, 0x8a, 0x14, 0x41, 0x9e, 0xca, 0x41, 0xbb, 0x5d, 0xdd, 0x92, 0x16, 0xef, 0x95,
0x26, 0x07, 0xf9, 0x1b, 0xcd, 0xb8, 0xfc, 0x71, 0xbd, 0xa7, 0x83, 0x7c, 0xd0, 0x01, 0x5c, 0x12,
0xc3, 0xb7, 0xd0, 0xd9, 0x1b, 0x30, 0xfa, 0x16, 0x6c, 0x91, 0x91, 0x0d, 0x4f, 0x93, 0x4c, 0x28,
0xaf, 0xf4, 0x2f, 0x8e, 0xf6, 0x86, 0x5b, 0x86, 0x70, 0x45, 0x21, 0xb7, 0xee, 0x1b, 0x7b, 0x67,
0x92, 0xe1, 0x9f, 0x26, 0xd8, 0xbb, 0x69, 0xa1, 0xaf, 0xa1, 0x25, 0x13, 0x05, 0xff, 0x68, 0xc2,
0xa6, 0x0c, 0xcf, 0x22, 0xf4, 0x05, 0x40, 0x39, 0x9a, 0xd7, 0xe7, 0x2a, 0xa7, 0x89, 0xed, 0x42,
0x79, 0x7d, 0x8e, 0xc6, 0x70, 0x54, 0xfb, 0x5d, 0x41, 0x26, 0x1d, 0xa0, 0x9c, 0x68, 0xe0, 0x67,
0xfb, 0xc3, 0xc1, 0x32, 0x20, 0x27, 0xad, 0x7f, 0x56, 0x01, 0x5a, 0x0a, 0xec, 0x68, 0x4d, 0x23,
0xa7, 0xd0, 0xd1, 0x29, 0xc3, 0x24, 0xdf, 0x08, 0x65, 0x37, 0x13, 0x83, 0x92, 0xa6, 0x52, 0xf9,
0x7b, 0x4d, 0x0d, 0x36, 0x15, 0x58, 0xab, 0xa9, 0xf9, 0xaa, 0xa6, 0x06, 0x5b, 0x0a, 0x2c, 0x6a,
0x6a, 0x44, 0x0d, 0x5f, 0x21, 0xf5, 0x9c, 0x6d, 0x85, 0x22, 0x1d, 0xab, 0x25, 0xf5, 0xe0, 0x38,
0x26, 0x5c, 0x36, 0xb9, 0x11, 0x24, 0xac, 0xec, 0x62, 0xab, 0x21, 0x9f, 0x8c, 0xf5, 0x26, 0x18,
0x97, 0x9b, 0x60, 0xec, 0x97, 0x9b, 0x00, 0x23, 0x79, 0x6f, 0xaa, 0xaf, 0x95, 0x56, 0x7a, 0x9a,
0xed, 0x91, 0xb0, 0x38, 0xcf, 0xa8, 0x72, 0xea, 0x7f, 0xcf, 0xf6, 0x93, 0xbe, 0x35, 0xfc, 0x11,
0xba, 0x72, 0x8a, 0xb7, 0x29, 0xcd, 0x88, 0x48, 0x32, 0x74, 0x0c, 0x87, 0x74, 0x4d, 0x58, 0xac,
0x46, 0x6d, 0x63, 0x7d, 0x40, 0xcf, 0xa1, 0xf9, 0x9e, 0xc4, 0x31, 0x15, 0x85, 0x53, 0x8a, 0xd3,
0x10, 0xeb, 0xdb, 0x53, 0x92, 0x92, 0x90, 0x89, 0xad, 0x7c, 0x3f, 0x8f, 0x19, 0xa5, 0xc1, 0x82,
0x6c, 0xa2, 0xf7, 0x2c, 0x12, 0x2b, 0x95, 0xc6, 0xc4, 0x3d, 0xa9, 0x5e, 0x96, 0x22, 0xfa, 0x1c,
0x6c, 0x85, 0x45, 0x8c, 0xbf, 0x2b, 0x7c, 0xd2, 0x96, 0xc2, 0x15, 0xe3, 0xef, 0xca, 0x8e, 0xca,
0xa7, 0xff, 0x3f, 0x3b, 0x7a, 0x00, 0xe7, 0xe9, 0x86, 0xfa, 0x24, 0x5d, 0x7d, 0x30, 0xf4, 0x7b,
0x2b, 0xde, 0xa1, 0x7c, 0x3c, 0xe5, 0x5b, 0xd5, 0x7d, 0x95, 0x47, 0xe9, 0xc9, 0x30, 0x59, 0xaf,
0x99, 0x08, 0x56, 0x84, 0xaf, 0x8a, 0xf6, 0x40, 0x4b, 0x3f, 0x13, 0xbe, 0x42, 0xdf, 0x83, 0xbd,
0xdb, 0xf5, 0xca, 0xfd, 0xff, 0x3e, 0xb5, 0x0a, 0x96, 0x45, 0x33, 0x1a, 0x53, 0xc2, 0xa9, 0x7a,
0x0c, 0x6d, 0x5c, 0x1e, 0x5f, 0xdd, 0x40, 0xbb, 0x5c, 0xe2, 0xa8, 0x03, 0xad, 0xd9, 0xcd, 0xc3,
0xc4, 0x9b, 0x5d, 0x39, 0x07, 0xa8, 0x07, 0xf6, 0x7c, 0xe2, 0x5f, 0x7b, 0xde, 0xcc, 0xbf, 0x76,
0x0c, 0x19, 0x9b, 0xfb, 0xb7, 0x78, 0xf2, 0xe6, 0xda, 0x69, 0x20, 0x80, 0xe6, 0xfd, 0x9d, 0x37,
0xbb, 0xf9, 0xc5, 0x31, 0x25, 0x77, 0x79, 0x7b, 0xeb, 0xcf, 0x7d, 0x3c, 0xb9, 0x73, 0xac, 0x57,
0x2f, 0xa0, 0x57, 0xdb, 0x1b, 0xc8, 0x81, 0xae, 0x3f, 0xbd, 0x0b, 0x7c, 0x6f, 0x1e, 0xbc, 0xc1,
0x77, 0x53, 0xe7, 0xe0, 0xd2, 0x7a, 0xdb, 0x48, 0x17, 0x8b, 0xa6, 0xea, 0xf8, 0xbb, 0xbf, 0x02,
0x00, 0x00, 0xff, 0xff, 0x8f, 0x51, 0xee, 0x90, 0xed, 0x06, 0x00, 0x00,
var fileDescriptor_node_52aec12f51af5891 = []byte{
// 723 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x94, 0xdd, 0x6e, 0xea, 0x46,
0x10, 0xc7, 0x63, 0x20, 0x80, 0x87, 0x0f, 0x39, 0x1b, 0x54, 0xa1, 0x54, 0x2d, 0x09, 0x52, 0xd5,
0x28, 0x95, 0x48, 0x9a, 0xde, 0x34, 0x52, 0x6f, 0x80, 0xa4, 0x29, 0x2d, 0x05, 0xb4, 0xb8, 0xb9,
0xc8, 0x8d, 0xb5, 0xd8, 0x1b, 0x58, 0xc5, 0xd8, 0x96, 0x77, 0xdd, 0x88, 0x77, 0xe9, 0x03, 0xf5,
0x19, 0x7a, 0x91, 0xbb, 0x4a, 0xe7, 0x31, 0x8e, 0xf6, 0xc3, 0x21, 0x28, 0x3a, 0x47, 0x8a, 0x74,
0xee, 0x98, 0x99, 0xdf, 0xfc, 0x67, 0x66, 0x3d, 0x03, 0x40, 0x14, 0x07, 0xb4, 0x97, 0xa4, 0xb1,
0x88, 0x51, 0x49, 0xfe, 0x3e, 0x82, 0x65, 0xbc, 0x8c, 0xb5, 0xe7, 0xa8, 0xb3, 0x8c, 0xe3, 0x65,
0x48, 0xcf, 0x95, 0xb5, 0xc8, 0x1e, 0xce, 0x05, 0x5b, 0x53, 0x2e, 0xc8, 0x3a, 0xd1, 0x40, 0xf7,
0x7f, 0x0b, 0x4a, 0x93, 0x38, 0xa0, 0xe8, 0x5b, 0x28, 0xb0, 0xa0, 0x6d, 0x1d, 0x5b, 0xa7, 0xf5,
0x41, 0xf3, 0xdf, 0xe7, 0xce, 0xde, 0x7f, 0xcf, 0x9d, 0xb2, 0x8c, 0x8c, 0xae, 0x71, 0x81, 0x05,
0xe8, 0x07, 0xa8, 0x90, 0x20, 0x48, 0x29, 0xe7, 0xed, 0xc2, 0xb1, 0x75, 0x5a, 0xbb, 0x3c, 0xe8,
0xa9, 0xca, 0x12, 0xe9, 0xeb, 0x00, 0xce, 0x89, 0xdf, 0x4b, 0xd5, 0xa2, 0xd3, 0xc4, 0x25, 0xb1,
0x49, 0x28, 0xae, 0xa7, 0x94, 0x8b, 0x94, 0xf9, 0x82, 0xc5, 0x11, 0xc7, 0x90, 0xd2, 0x24, 0x13,
0x44, 0x1a, 0xb8, 0xba, 0xa6, 0x82, 0x04, 0x44, 0x10, 0x5c, 0x0f, 0x89, 0xa0, 0x91, 0xbf, 0xf1,
0x42, 0xc6, 0x05, 0x6e, 0x90, 0x2c, 0x60, 0xc2, 0xe3, 0x99, 0xef, 0x4b, 0xd5, 0x7d, 0xc6, 0xbd,
0x2c, 0xc1, 0xcd, 0x2c, 0x09, 0x88, 0xa0, 0x9e, 0x41, 0x71, 0xcb, 0xd8, 0xbb, 0x70, 0xc3, 0x78,
0xb3, 0x44, 0x4e, 0x8a, 0x2b, 0x7f, 0xd3, 0x94, 0xb3, 0x38, 0xea, 0xde, 0x43, 0xed, 0x55, 0xa7,
0xe8, 0x47, 0xb0, 0x45, 0x4a, 0x22, 0x9e, 0xc4, 0xa9, 0x50, 0x43, 0x37, 0x2f, 0x0f, 0xb7, 0xf3,
0xb8, 0x79, 0x08, 0x6f, 0x29, 0xd4, 0xde, 0x7d, 0x00, 0xfb, 0x65, 0xda, 0xee, 0x87, 0x22, 0xd8,
0x32, 0x6d, 0x2e, 0x88, 0xe0, 0xe8, 0x7b, 0xa8, 0x48, 0x21, 0xef, 0x93, 0xaf, 0x59, 0x96, 0xe1,
0x51, 0x80, 0xbe, 0x01, 0xc8, 0xc7, 0xbe, 0xba, 0x50, 0x9a, 0x45, 0x6c, 0x1b, 0xcf, 0xd5, 0x05,
0xea, 0xc1, 0xe1, 0xce, 0x68, 0x5e, 0x2a, 0x5f, 0xad, 0x5d, 0x3c, 0xb6, 0x4e, 0x2d, 0x7c, 0xa0,
0x42, 0x73, 0x33, 0xb4, 0x0c, 0xa0, 0x13, 0xa8, 0xeb, 0xa1, 0x0d, 0x58, 0x52, 0x60, 0xcd, 0x3c,
0x84, 0x42, 0x3a, 0x50, 0xd3, 0x92, 0x7e, 0x9c, 0x45, 0xa2, 0xbd, 0xaf, 0x4a, 0x82, 0x72, 0x0d,
0xa5, 0xe7, 0x6d, 0x4d, 0x0d, 0x96, 0x15, 0xb8, 0x53, 0x53, 0xf3, 0xdb, 0x9a, 0x1a, 0xac, 0x28,
0xd0, 0xd4, 0xd4, 0xc8, 0x05, 0xb4, 0x0c, 0xb2, 0xab, 0x59, 0x55, 0x28, 0xd2, 0xb1, 0x1d, 0xd1,
0x31, 0xb4, 0x42, 0xc2, 0x65, 0x93, 0x91, 0x20, 0xfe, 0x4b, 0x2f, 0x6d, 0x5b, 0xad, 0xdd, 0x51,
0x4f, 0xaf, 0x74, 0x2f, 0x5f, 0xe9, 0x9e, 0x9b, 0xaf, 0x34, 0x46, 0x32, 0x6f, 0xa8, 0xd3, 0x8c,
0xe4, 0x1b, 0xb5, 0x07, 0xc2, 0xc2, 0x2c, 0xa5, 0x6d, 0x78, 0x97, 0xda, 0xaf, 0x3a, 0xab, 0xfb,
0x0b, 0xd4, 0xe5, 0x57, 0x9c, 0x26, 0x34, 0x25, 0x22, 0x4e, 0x51, 0x0b, 0xf6, 0xe9, 0x9a, 0xb0,
0x50, 0x7d, 0x6a, 0x1b, 0x6b, 0x03, 0x7d, 0x05, 0xe5, 0x27, 0x12, 0x86, 0x54, 0x98, 0x4d, 0x31,
0x56, 0x17, 0xeb, 0xec, 0x21, 0x49, 0x88, 0xcf, 0xc4, 0x06, 0x7d, 0x07, 0xcd, 0x87, 0x94, 0x52,
0x6f, 0x41, 0xa2, 0xe0, 0x89, 0x05, 0x62, 0xa5, 0x64, 0x8a, 0xb8, 0x21, 0xbd, 0x83, 0xdc, 0x89,
0xbe, 0x06, 0x5b, 0x61, 0x01, 0xe3, 0x8f, 0x66, 0x4f, 0xaa, 0xd2, 0x71, 0xcd, 0xf8, 0x63, 0xde,
0xd1, 0x9f, 0xe6, 0x94, 0xde, 0xd9, 0xd1, 0x1d, 0x38, 0x32, 0x1b, 0xbf, 0x3a, 0xd1, 0x2f, 0xd2,
0xd5, 0x3f, 0x96, 0xbe, 0xb7, 0x3b, 0x7d, 0x7e, 0xf2, 0x78, 0xcc, 0x25, 0x9a, 0xbe, 0x72, 0x53,
0xee, 0xa4, 0x1f, 0xaf, 0xd7, 0x4c, 0x78, 0x2b, 0xc2, 0x57, 0xa6, 0x3d, 0xd0, 0xae, 0xdf, 0x08,
0x5f, 0xa1, 0x9f, 0xc1, 0x7e, 0xf9, 0xd3, 0x52, 0xdb, 0xff, 0xf9, 0xaf, 0xb6, 0x85, 0x65, 0xd1,
0x94, 0x86, 0x94, 0x70, 0xaa, 0x8e, 0xa1, 0x8a, 0x73, 0xf3, 0x6c, 0x02, 0x55, 0x75, 0xe7, 0x9b,
0x84, 0xa2, 0x1a, 0x54, 0x46, 0x93, 0xbb, 0xfe, 0x78, 0x74, 0xed, 0xec, 0xa1, 0x06, 0xd8, 0xf3,
0xbe, 0x7b, 0x33, 0x1e, 0x8f, 0xdc, 0x1b, 0xc7, 0x92, 0xb1, 0xb9, 0x3b, 0xc5, 0xfd, 0xdb, 0x1b,
0xa7, 0x80, 0x00, 0xca, 0x7f, 0xcd, 0xc6, 0xa3, 0xc9, 0x1f, 0x4e, 0x51, 0x72, 0x83, 0xe9, 0xd4,
0x9d, 0xbb, 0xb8, 0x3f, 0x73, 0x4a, 0x67, 0x27, 0xd0, 0xd8, 0xf9, 0xdf, 0x40, 0x0e, 0xd4, 0xdd,
0xe1, 0xcc, 0x73, 0xc7, 0x73, 0xef, 0x16, 0xcf, 0x86, 0xce, 0xde, 0xa0, 0x74, 0x5f, 0x48, 0x16,
0x8b, 0xb2, 0xea, 0xf8, 0xa7, 0x8f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x92, 0xae, 0xb2, 0xdb, 0xb6,
0x05, 0x00, 0x00,
}

View File

@ -15,17 +15,8 @@ import "google/protobuf/timestamp.proto";
message Node {
bytes id = 1 [(gogoproto.customtype) = "NodeID", (gogoproto.nullable) = false];
NodeAddress address = 2;
NodeType type = 3;
NodeRestrictions restrictions = 4;
NodeStats reputation = 5;
NodeMetadata metadata = 6;
repeated int64 latency_list = 7;
bool audit_success = 8;
bool is_up = 9;
bool update_latency = 10;
bool update_audit_success = 11;
bool update_uptime = 12;
NodeVersion version = 13;
reserved 3 to 13;
reserved "type", "restrictions", "reputation", "metadata", "latency_list", "audit_success", "is_up", "update_latency", "update_audit_success", "update_uptime", "version";
}
// NodeType is an enum of possible node types

View File

@ -25,29 +25,15 @@ func NodesToIDs(nodes []*Node) storj.NodeIDList {
// with gogo's customtype extension.
// (see https://github.com/gogo/protobuf/issues/147)
func CopyNode(src *Node) (dst *Node) {
src.Type.DPanicOnInvalid("copy node")
node := Node{Id: storj.NodeID{}}
copy(node.Id[:], src.Id[:])
if src.Address != nil {
node.Address = &NodeAddress{
Transport: src.Address.Transport,
Address: src.Address.Address,
}
}
if src.Metadata != nil {
node.Metadata = &NodeMetadata{
Email: src.Metadata.Email,
Wallet: src.Metadata.Wallet,
}
}
if src.Restrictions != nil {
node.Restrictions = &NodeRestrictions{
FreeBandwidth: src.Restrictions.FreeBandwidth,
FreeDisk: src.Restrictions.FreeDisk,
}
}
node.Type = src.Type
return &node
}

View File

@ -136,7 +136,7 @@ func TestOptions_ServerOption_Peer_CA_Whitelist(t *testing.T) {
transportClient := transport.NewClient(opts)
conn, err := transportClient.DialNode(ctx, &target, dialOption)
conn, err := transportClient.DialNode(ctx, &target.Node, dialOption)
assert.NotNil(t, conn)
assert.NoError(t, err)
})

View File

@ -51,7 +51,6 @@ func NewClient(tc transport.Client, memoryLimit int) Client {
}
func (ec *ecClient) newPSClient(ctx context.Context, n *pb.Node) (*piecestore.Client, error) {
n.Type.DPanicOnInvalid("new ps client")
conn, err := ec.transport.DialNode(ctx, n)
if err != nil {
return nil, err
@ -124,7 +123,6 @@ func (ec *ecClient) Put(ctx context.Context, limits []*pb.AddressedOrderLimit, r
successfulNodes[info.i] = &pb.Node{
Id: limits[info.i].GetLimit().StorageNodeId,
Address: limits[info.i].GetStorageNodeAddress(),
Type: pb.NodeType_STORAGE,
}
successfulHashes[info.i] = info.hash
@ -240,7 +238,6 @@ func (ec *ecClient) Repair(ctx context.Context, limits []*pb.AddressedOrderLimit
successfulNodes[info.i] = &pb.Node{
Id: limits[info.i].GetLimit().StorageNodeId,
Address: limits[info.i].GetStorageNodeAddress(),
Type: pb.NodeType_STORAGE,
}
successfulHashes[info.i] = info.hash
@ -290,7 +287,6 @@ func (ec *ecClient) putPiece(ctx, parent context.Context, limit *pb.AddressedOrd
ps, err := ec.newPSClient(ctx, &pb.Node{
Id: storageNodeID,
Address: limit.GetStorageNodeAddress(),
Type: pb.NodeType_STORAGE,
})
if err != nil {
zap.S().Debugf("Failed dialing for putting piece %s to node %s: %v", pieceID, storageNodeID, err)
@ -385,7 +381,6 @@ func (ec *ecClient) Delete(ctx context.Context, limits []*pb.AddressedOrderLimit
ps, err := ec.newPSClient(ctx, &pb.Node{
Id: limit.StorageNodeId,
Address: addressedLimit.GetStorageNodeAddress(),
Type: pb.NodeType_STORAGE,
})
if err != nil {
zap.S().Errorf("Failed dialing for deleting piece %s from node %s: %v", limit.PieceId, limit.StorageNodeId, err)
@ -467,7 +462,6 @@ func (lr *lazyPieceRanger) Range(ctx context.Context, offset, length int64) (io.
ps, err := lr.newPSClientHelper(ctx, &pb.Node{
Id: lr.limit.GetLimit().StorageNodeId,
Address: lr.limit.GetStorageNodeAddress(),
Type: pb.NodeType_STORAGE,
})
if err != nil {
return nil, err

View File

@ -248,7 +248,6 @@ func makeRemotePointer(nodes []*pb.Node, hashes []*pb.PieceHash, rs eestream.Red
if nodes[i] == nil {
continue
}
nodes[i].Type.DPanicOnInvalid("makeremotepointer")
remotePieces = append(remotePieces, &pb.RemotePiece{
PieceNum: int32(i),
NodeId: nodes[i].Id,

View File

@ -58,9 +58,6 @@ func NewClientWithTimeout(tlsOpts *tlsopts.Options, requestTimeout time.Duration
func (transport *Transport) DialNode(ctx context.Context, node *pb.Node, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
defer mon.Task()(&ctx)(&err)
if node != nil {
node.Type.DPanicOnInvalid("transport dial node")
}
if node.Address == nil || node.Address.Address == "" {
return nil, Error.New("no address")
}

View File

@ -61,14 +61,12 @@ func TestDialNode(t *testing.T) {
{
Id: storj.NodeID{},
Address: nil,
Type: pb.NodeType_STORAGE,
},
{
Id: storj.NodeID{},
Address: &pb.NodeAddress{
Transport: pb.NodeTransport_TCP_TLS_GRPC,
},
Type: pb.NodeType_STORAGE,
},
{
Id: storj.NodeID{123},
@ -76,7 +74,6 @@ func TestDialNode(t *testing.T) {
Transport: pb.NodeTransport_TCP_TLS_GRPC,
Address: "127.0.0.1:100",
},
Type: pb.NodeType_STORAGE,
},
{
Id: storj.NodeID{},
@ -84,7 +81,6 @@ func TestDialNode(t *testing.T) {
Transport: pb.NodeTransport_TCP_TLS_GRPC,
Address: planet.StorageNodes[1].Addr(),
},
Type: pb.NodeType_STORAGE,
},
}
@ -106,7 +102,6 @@ func TestDialNode(t *testing.T) {
Transport: pb.NodeTransport_TCP_TLS_GRPC,
Address: planet.StorageNodes[1].Addr(),
},
Type: pb.NodeType_STORAGE,
}
timedCtx, cancel := context.WithTimeout(ctx, time.Second)
@ -126,7 +121,6 @@ func TestDialNode(t *testing.T) {
Transport: pb.NodeTransport_TCP_TLS_GRPC,
Address: planet.StorageNodes[1].Addr(),
},
Type: pb.NodeType_STORAGE,
}
dialOption, err := opts.DialOption(target.Id)
@ -149,7 +143,6 @@ func TestDialNode(t *testing.T) {
Transport: pb.NodeTransport_TCP_TLS_GRPC,
Address: planet.StorageNodes[1].Addr(),
},
Type: pb.NodeType_STORAGE,
}
timedCtx, cancel := context.WithTimeout(ctx, time.Second)
@ -231,7 +224,6 @@ func TestDialNode_BadServerCertificate(t *testing.T) {
Transport: pb.NodeTransport_TCP_TLS_GRPC,
Address: planet.StorageNodes[1].Addr(),
},
Type: pb.NodeType_STORAGE,
}
timedCtx, cancel := context.WithTimeout(ctx, time.Second)

View File

@ -22,85 +22,80 @@ import (
)
func TestInspectorStats(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
uplink := planet.Uplinks[0]
testData := make([]byte, 1*memory.MiB)
_, err := rand.Read(testData)
require.NoError(t, err)
planet, err := testplanet.New(t, 1, 6, 1)
require.NoError(t, err)
defer ctx.Check(planet.Shutdown)
bucket := "testbucket"
planet.Start(ctx)
err = uplink.Upload(ctx, planet.Satellites[0], bucket, "test/path", testData)
require.NoError(t, err)
uplink := planet.Uplinks[0]
testData := make([]byte, 1*memory.MiB)
_, err = rand.Read(testData)
require.NoError(t, err)
healthEndpoint := planet.Satellites[0].Inspector.Endpoint
bucket := "testbucket"
err = uplink.Upload(ctx, planet.Satellites[0], bucket, "test/path", testData)
require.NoError(t, err)
healthEndpoint := planet.Satellites[0].Inspector.Endpoint
// Get path of random segment we just uploaded and check the health
_ = planet.Satellites[0].Metainfo.Database.Iterate(storage.IterateOptions{Recurse: true},
func(it storage.Iterator) error {
var item storage.ListItem
for it.Next(&item) {
if bytes.Contains(item.Key, []byte(fmt.Sprintf("%s/", bucket))) {
break
}
}
fullPath := storj.SplitPath(item.Key.String())
require.Falsef(t, len(fullPath) < 4, "Could not retrieve a full path from pointerdb")
projectID := fullPath[0]
bucket := fullPath[2]
encryptedPath := strings.Join(fullPath[3:], "/")
{ // Test Segment Health Request
req := &pb.SegmentHealthRequest{
ProjectId: []byte(projectID),
EncryptedPath: []byte(encryptedPath),
Bucket: []byte(bucket),
SegmentIndex: -1,
// Get path of random segment we just uploaded and check the health
_ = planet.Satellites[0].Metainfo.Database.Iterate(storage.IterateOptions{Recurse: true},
func(it storage.Iterator) error {
var item storage.ListItem
for it.Next(&item) {
if bytes.Contains(item.Key, []byte(fmt.Sprintf("%s/", bucket))) {
break
}
}
resp, err := healthEndpoint.SegmentHealth(ctx, req)
require.NoError(t, err)
fullPath := storj.SplitPath(item.Key.String())
require.Falsef(t, len(fullPath) < 4, "Could not retrieve a full path from pointerdb")
redundancy, err := eestream.NewRedundancyStrategyFromProto(resp.GetRedundancy())
require.NoError(t, err)
projectID := fullPath[0]
bucket := fullPath[2]
encryptedPath := strings.Join(fullPath[3:], "/")
require.Equal(t, 4, redundancy.TotalCount())
require.True(t, bytes.Equal([]byte("l"), resp.GetHealth().GetSegment()))
}
{ // Test Segment Health Request
req := &pb.SegmentHealthRequest{
ProjectId: []byte(projectID),
EncryptedPath: []byte(encryptedPath),
Bucket: []byte(bucket),
SegmentIndex: -1,
}
{ // Test Object Health Request
objectHealthReq := &pb.ObjectHealthRequest{
ProjectId: []byte(projectID),
EncryptedPath: []byte(encryptedPath),
Bucket: []byte(bucket),
StartAfterSegment: 0,
EndBeforeSegment: 0,
Limit: 0,
resp, err := healthEndpoint.SegmentHealth(ctx, req)
require.NoError(t, err)
redundancy, err := eestream.NewRedundancyStrategyFromProto(resp.GetRedundancy())
require.NoError(t, err)
require.Equal(t, 4, redundancy.TotalCount())
require.True(t, bytes.Equal([]byte("l"), resp.GetHealth().GetSegment()))
}
resp, err := healthEndpoint.ObjectHealth(ctx, objectHealthReq)
require.NoError(t, err)
segments := resp.GetSegments()
require.Len(t, segments, 1)
{ // Test Object Health Request
objectHealthReq := &pb.ObjectHealthRequest{
ProjectId: []byte(projectID),
EncryptedPath: []byte(encryptedPath),
Bucket: []byte(bucket),
StartAfterSegment: 0,
EndBeforeSegment: 0,
Limit: 0,
}
resp, err := healthEndpoint.ObjectHealth(ctx, objectHealthReq)
require.NoError(t, err)
redundancy, err := eestream.NewRedundancyStrategyFromProto(resp.GetRedundancy())
require.NoError(t, err)
segments := resp.GetSegments()
require.Len(t, segments, 1)
require.Equal(t, 4, redundancy.TotalCount())
require.True(t, bytes.Equal([]byte("l"), segments[0].GetSegment()))
}
redundancy, err := eestream.NewRedundancyStrategyFromProto(resp.GetRedundancy())
require.NoError(t, err)
return nil
},
)
require.Equal(t, 4, redundancy.TotalCount())
require.True(t, bytes.Equal([]byte("l"), segments[0].GetSegment()))
}
return nil
},
)
})
}

View File

@ -160,68 +160,63 @@ func TestServiceList(t *testing.T) {
}
func TestCommitSegment(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
projects, err := planet.Satellites[0].DB.Console().Projects().GetAll(ctx)
require.NoError(t, err)
apiKey := console.APIKeyFromBytes([]byte(projects[0].Name)).String()
planet, err := testplanet.New(t, 1, 6, 1)
require.NoError(t, err)
defer ctx.Check(planet.Shutdown)
planet.Start(ctx)
projects, err := planet.Satellites[0].DB.Console().Projects().GetAll(ctx)
require.NoError(t, err)
apiKey := console.APIKeyFromBytes([]byte(projects[0].Name)).String()
metainfo, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey)
require.NoError(t, err)
{
// error if pointer is nil
_, err = metainfo.CommitSegment(ctx, "bucket", "path", -1, nil, []*pb.OrderLimit2{})
require.Error(t, err)
}
{
// error if bucket contains slash
_, err = metainfo.CommitSegment(ctx, "bucket/storj", "path", -1, &pb.Pointer{}, []*pb.OrderLimit2{})
require.Error(t, err)
}
{
// error if number of remote pieces is lower then repair threshold
redundancy := &pb.RedundancyScheme{
MinReq: 1,
RepairThreshold: 2,
SuccessThreshold: 4,
Total: 6,
ErasureShareSize: 10,
}
addresedLimits, rootPieceID, err := metainfo.CreateSegment(ctx, "bucket", "path", -1, redundancy, 1000, time.Now())
metainfo, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey)
require.NoError(t, err)
// create number of pieces below repair threshold
usedForPieces := addresedLimits[:redundancy.RepairThreshold-1]
pieces := make([]*pb.RemotePiece, len(usedForPieces))
for i, limit := range usedForPieces {
pieces[i] = &pb.RemotePiece{
PieceNum: int32(i),
NodeId: limit.Limit.StorageNodeId,
{
// error if pointer is nil
_, err = metainfo.CommitSegment(ctx, "bucket", "path", -1, nil, []*pb.OrderLimit2{})
require.Error(t, err)
}
{
// error if bucket contains slash
_, err = metainfo.CommitSegment(ctx, "bucket/storj", "path", -1, &pb.Pointer{}, []*pb.OrderLimit2{})
require.Error(t, err)
}
{
// error if number of remote pieces is lower then repair threshold
redundancy := &pb.RedundancyScheme{
MinReq: 1,
RepairThreshold: 2,
SuccessThreshold: 4,
Total: 6,
ErasureShareSize: 10,
}
}
pointer := &pb.Pointer{
Type: pb.Pointer_REMOTE,
Remote: &pb.RemoteSegment{
RootPieceId: rootPieceID,
Redundancy: redundancy,
RemotePieces: pieces,
},
}
addresedLimits, rootPieceID, err := metainfo.CreateSegment(ctx, "bucket", "path", -1, redundancy, 1000, time.Now())
require.NoError(t, err)
limits := make([]*pb.OrderLimit2, len(addresedLimits))
for i, addresedLimit := range addresedLimits {
limits[i] = addresedLimit.Limit
// create number of pieces below repair threshold
usedForPieces := addresedLimits[:redundancy.RepairThreshold-1]
pieces := make([]*pb.RemotePiece, len(usedForPieces))
for i, limit := range usedForPieces {
pieces[i] = &pb.RemotePiece{
PieceNum: int32(i),
NodeId: limit.Limit.StorageNodeId,
}
}
pointer := &pb.Pointer{
Type: pb.Pointer_REMOTE,
Remote: &pb.RemoteSegment{
RootPieceId: rootPieceID,
Redundancy: redundancy,
RemotePieces: pieces,
},
}
limits := make([]*pb.OrderLimit2, len(addresedLimits))
for i, addresedLimit := range addresedLimits {
limits[i] = addresedLimit.Limit
}
_, err = metainfo.CommitSegment(ctx, "bucket", "path", -1, pointer, limits)
require.Error(t, err)
require.Contains(t, err.Error(), "Number of valid pieces is lower then repair threshold")
}
_, err = metainfo.CommitSegment(ctx, "bucket", "path", -1, pointer, limits)
require.Error(t, err)
require.Contains(t, err.Error(), "Number of valid pieces is lower then repair threshold")
}
})
}

View File

@ -9,10 +9,12 @@ import (
"time"
"github.com/stretchr/testify/require"
"storj.io/storj/internal/memory"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testplanet"
"storj.io/storj/pkg/storj"
"storj.io/storj/uplink"
)
func TestSendingReceivingOrders(t *testing.T) {
@ -20,7 +22,7 @@ func TestSendingReceivingOrders(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
planet.Satellites[0].Audit.Service.Loop.Stop()
for _, storageNode := range planet.StorageNodes {
storageNode.Storage2.Sender.Loop.Pause()
}
@ -29,7 +31,8 @@ func TestSendingReceivingOrders(t *testing.T) {
_, err := rand.Read(expectedData)
require.NoError(t, err)
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path", expectedData)
redundancy := noLongTailRedundancy(planet)
err = planet.Uplinks[0].UploadWithConfig(ctx, planet.Satellites[0], &redundancy, "testbucket", "test/path", expectedData)
require.NoError(t, err)
sumBeforeSend := 0
@ -65,6 +68,7 @@ func TestUnableToSendOrders(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
planet.Satellites[0].Audit.Service.Loop.Stop()
for _, storageNode := range planet.StorageNodes {
storageNode.Storage2.Sender.Loop.Pause()
}
@ -73,7 +77,8 @@ func TestUnableToSendOrders(t *testing.T) {
_, err := rand.Read(expectedData)
require.NoError(t, err)
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path", expectedData)
redundancy := noLongTailRedundancy(planet)
err = planet.Uplinks[0].UploadWithConfig(ctx, planet.Satellites[0], &redundancy, "testbucket", "test/path", expectedData)
require.NoError(t, err)
sumBeforeSend := 0
@ -112,6 +117,7 @@ func TestUploadDownloadBandwidth(t *testing.T) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
hourBeforeTest := time.Now().UTC().Add(-time.Hour)
planet.Satellites[0].Audit.Service.Loop.Stop()
for _, storageNode := range planet.StorageNodes {
storageNode.Storage2.Sender.Loop.Pause()
}
@ -120,7 +126,8 @@ func TestUploadDownloadBandwidth(t *testing.T) {
_, err := rand.Read(expectedData)
require.NoError(t, err)
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path", expectedData)
redundancy := noLongTailRedundancy(planet)
err = planet.Uplinks[0].UploadWithConfig(ctx, planet.Satellites[0], &redundancy, "testbucket", "test/path", expectedData)
require.NoError(t, err)
data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], "testbucket", "test/path")
@ -161,3 +168,9 @@ func TestUploadDownloadBandwidth(t *testing.T) {
}
})
}
func noLongTailRedundancy(planet *testplanet.Planet) uplink.RSConfig {
redundancy := planet.Uplinks[0].GetConfig(planet.Satellites[0]).RS
redundancy.SuccessThreshold = redundancy.MaxThreshold
return redundancy
}

View File

@ -500,10 +500,6 @@ func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, repairer
limits := make([]*pb.AddressedOrderLimit, totalPieces)
var pieceNum int
for _, node := range newNodes {
if node != nil {
node.Type.DPanicOnInvalid("order service put repair order limits")
}
for pieceNum < totalPieces && getOrderLimits[pieceNum] != nil {
pieceNum++
}

View File

@ -247,16 +247,18 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config, ve
return nil, errs.Combine(err, peer.Close())
}
self := pb.Node{
Id: peer.ID(),
Type: pb.NodeType_SATELLITE,
Address: &pb.NodeAddress{
Address: config.ExternalAddress,
self := &overlay.NodeDossier{
Node: pb.Node{
Id: peer.ID(),
Address: &pb.NodeAddress{
Address: config.ExternalAddress,
},
},
Metadata: &pb.NodeMetadata{
Type: pb.NodeType_SATELLITE,
Operator: pb.NodeOperator{
Wallet: config.Operator.Wallet,
},
Version: pbVersion,
Version: *pbVersion,
}
{ // setup routing table
@ -283,7 +285,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config, ve
peer.Transport = peer.Transport.WithObservers(peer.Kademlia.RoutingTable)
}
peer.Kademlia.Service, err = kademlia.NewService(peer.Log.Named("kademlia"), self, peer.Transport, peer.Kademlia.RoutingTable, config)
peer.Kademlia.Service, err = kademlia.NewService(peer.Log.Named("kademlia"), peer.Transport, peer.Kademlia.RoutingTable, config)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
@ -620,7 +622,7 @@ func (peer *Peer) Close() error {
func (peer *Peer) ID() storj.NodeID { return peer.Identity.ID }
// Local returns the peer local node info.
func (peer *Peer) Local() pb.Node { return peer.Kademlia.RoutingTable.Local() }
func (peer *Peer) Local() overlay.NodeDossier { return peer.Kademlia.RoutingTable.Local() }
// Addr returns the public address.
func (peer *Peer) Addr() string { return peer.Server.Addr().String() }

View File

@ -745,14 +745,14 @@ func (m *lockedOverlayCache) SelectStorageNodes(ctx context.Context, count int,
return m.db.SelectStorageNodes(ctx, count, criteria)
}
// Update updates node information
func (m *lockedOverlayCache) Update(ctx context.Context, value *pb.Node) error {
// Update updates node address
func (m *lockedOverlayCache) UpdateAddress(ctx context.Context, value *pb.Node) error {
m.Lock()
defer m.Unlock()
return m.db.Update(ctx, value)
return m.db.UpdateAddress(ctx, value)
}
// UpdateOperator updates the email and wallet for a given node ID for satellite payments.
// UpdateNodeInfo updates node dossier with info requested from the node itself like node type, email, wallet, capacity, and version.
func (m *lockedOverlayCache) UpdateNodeInfo(ctx context.Context, node storj.NodeID, nodeInfo *pb.InfoResponse) (stats *overlay.NodeDossier, err error) {
m.Lock()
defer m.Unlock()

View File

@ -220,8 +220,8 @@ func (cache *overlaycache) Paginate(ctx context.Context, offset int64, limit int
return infos, more, nil
}
// Update updates node information
func (cache *overlaycache) Update(ctx context.Context, info *pb.Node) (err error) {
// Update updates node address
func (cache *overlaycache) UpdateAddress(ctx context.Context, info *pb.Node) (err error) {
if info == nil || info.Id.IsZero() {
return overlay.ErrEmptyNode
}
@ -239,64 +239,30 @@ func (cache *overlaycache) Update(ctx context.Context, info *pb.Node) (err error
}
if err != nil {
metadata := info.Metadata
if metadata == nil {
metadata = &pb.NodeMetadata{}
}
restrictions := info.Restrictions
if restrictions == nil {
restrictions = &pb.NodeRestrictions{
FreeBandwidth: -1,
FreeDisk: -1,
}
}
reputation := info.Reputation
if reputation == nil {
reputation = &pb.NodeStats{}
}
ver := info.Version
var semver version.SemVer
var verTime time.Time
if ver == nil {
ver = &pb.NodeVersion{}
} else {
parsed, err := version.NewSemVer(ver.Version)
if err == nil {
semver = *parsed
}
verTime, err = ptypes.Timestamp(ver.Timestamp)
if err != nil {
verTime = time.Time{}
}
}
// add the node to DB for first time
_, err = tx.Create_Node(
ctx,
dbx.Node_Id(info.Id.Bytes()),
dbx.Node_Address(address.Address),
dbx.Node_Protocol(int(address.Transport)),
dbx.Node_Type(int(info.Type)),
dbx.Node_Email(metadata.Email),
dbx.Node_Wallet(metadata.Wallet),
dbx.Node_FreeBandwidth(restrictions.FreeBandwidth),
dbx.Node_FreeDisk(restrictions.FreeDisk),
dbx.Node_Major(semver.Major),
dbx.Node_Minor(semver.Minor),
dbx.Node_Patch(semver.Patch),
dbx.Node_Hash(ver.CommitHash),
dbx.Node_Timestamp(verTime),
dbx.Node_Release(ver.Release),
dbx.Node_Latency90(reputation.Latency_90),
dbx.Node_AuditSuccessCount(reputation.AuditSuccessCount),
dbx.Node_TotalAuditCount(reputation.AuditCount),
dbx.Node_AuditSuccessRatio(reputation.AuditSuccessRatio),
dbx.Node_UptimeSuccessCount(reputation.UptimeSuccessCount),
dbx.Node_TotalUptimeCount(reputation.UptimeCount),
dbx.Node_UptimeRatio(reputation.UptimeRatio),
dbx.Node_Type(int(pb.NodeType_INVALID)),
dbx.Node_Email(""),
dbx.Node_Wallet(""),
dbx.Node_FreeBandwidth(-1),
dbx.Node_FreeDisk(-1),
dbx.Node_Major(0),
dbx.Node_Minor(0),
dbx.Node_Patch(0),
dbx.Node_Hash(""),
dbx.Node_Timestamp(time.Time{}),
dbx.Node_Release(false),
dbx.Node_Latency90(0),
dbx.Node_AuditSuccessCount(0),
dbx.Node_TotalAuditCount(0),
dbx.Node_AuditSuccessRatio(0),
dbx.Node_UptimeSuccessCount(0),
dbx.Node_TotalUptimeCount(0),
dbx.Node_UptimeRatio(0),
dbx.Node_LastContactSuccess(time.Now()),
dbx.Node_LastContactFailure(time.Time{}),
)
@ -305,31 +271,10 @@ func (cache *overlaycache) Update(ctx context.Context, info *pb.Node) (err error
}
} else {
update := dbx.Node_Update_Fields{
// TODO: should we be able to update node type?
Address: dbx.Node_Address(address.Address),
Protocol: dbx.Node_Protocol(int(address.Transport)),
}
if info.Reputation != nil {
update.Latency90 = dbx.Node_Latency90(info.Reputation.Latency_90)
update.AuditSuccessRatio = dbx.Node_AuditSuccessRatio(info.Reputation.AuditSuccessRatio)
update.UptimeRatio = dbx.Node_UptimeRatio(info.Reputation.UptimeRatio)
update.TotalAuditCount = dbx.Node_TotalAuditCount(info.Reputation.AuditCount)
update.AuditSuccessCount = dbx.Node_AuditSuccessCount(info.Reputation.AuditSuccessCount)
update.TotalUptimeCount = dbx.Node_TotalUptimeCount(info.Reputation.UptimeCount)
update.UptimeSuccessCount = dbx.Node_UptimeSuccessCount(info.Reputation.UptimeSuccessCount)
}
if info.Metadata != nil {
update.Email = dbx.Node_Email(info.Metadata.Email)
update.Wallet = dbx.Node_Wallet(info.Metadata.Wallet)
}
if info.Restrictions != nil {
update.FreeBandwidth = dbx.Node_FreeBandwidth(info.Restrictions.FreeBandwidth)
update.FreeDisk = dbx.Node_FreeDisk(info.Restrictions.FreeDisk)
}
_, err := tx.Update_Node_By_Id(ctx, dbx.Node_Id(info.Id.Bytes()), update)
if err != nil {
return Error.Wrap(errs.Combine(err, tx.Rollback()))
@ -513,6 +458,9 @@ func (cache *overlaycache) UpdateNodeInfo(ctx context.Context, nodeID storj.Node
var updateFields dbx.Node_Update_Fields
if nodeInfo != nil {
if nodeInfo.GetType() != pb.NodeType_INVALID {
updateFields.Type = dbx.Node_Type(int(nodeInfo.GetType()))
}
if nodeInfo.GetOperator() != nil {
updateFields.Wallet = dbx.Node_Wallet(nodeInfo.GetOperator().GetWallet())
updateFields.Email = dbx.Node_Email(nodeInfo.GetOperator().GetEmail())
@ -524,11 +472,11 @@ func (cache *overlaycache) UpdateNodeInfo(ctx context.Context, nodeID storj.Node
if nodeInfo.GetVersion() != nil {
semVer, err := version.NewSemVer(nodeInfo.GetVersion().GetVersion())
if err != nil {
return &overlay.NodeDossier{}, errs.New("unable to convert version to semVer")
return nil, errs.New("unable to convert version to semVer")
}
pbts, err := ptypes.Timestamp(nodeInfo.GetVersion().GetTimestamp())
if err != nil {
return &overlay.NodeDossier{}, errs.New("unable to convert version timestamp")
return nil, errs.New("unable to convert version timestamp")
}
updateFields.Major = dbx.Node_Major(semVer.Major)
updateFields.Minor = dbx.Node_Minor(semVer.Minor)
@ -623,7 +571,6 @@ func convertDBNode(info *dbx.Node) (*overlay.NodeDossier, error) {
Address: info.Address,
Transport: pb.NodeTransport(info.Protocol),
},
Type: pb.NodeType(info.Type),
},
Type: pb.NodeType(info.Type),
Operator: pb.NodeOperator{
@ -653,10 +600,6 @@ func convertDBNode(info *dbx.Node) (*overlay.NodeDossier, error) {
},
}
if time.Now().Sub(info.LastContactSuccess) < 1*time.Hour && info.LastContactSuccess.After(info.LastContactFailure) {
node.IsUp = true
}
return node, nil
}

View File

@ -29,6 +29,8 @@ func TestInspectorStats(t *testing.T) {
planet.Start(ctx)
planet.Satellites[0].Discovery.Service.Refresh.TriggerWait()
var availableBandwidth int64
var availableSpace int64
for _, storageNode := range planet.StorageNodes {
@ -94,48 +96,43 @@ func TestInspectorStats(t *testing.T) {
func TestInspectorDashboard(t *testing.T) {
testStartedTime := time.Now()
ctx := testcontext.New(t)
defer ctx.Cleanup()
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
for _, storageNode := range planet.StorageNodes {
response, err := storageNode.Storage2.Inspector.Dashboard(ctx, &pb.DashboardRequest{})
require.NoError(t, err)
planet, err := testplanet.New(t, 1, 6, 1)
require.NoError(t, err)
defer ctx.Check(planet.Shutdown)
assert.True(t, response.Uptime.Nanos > 0)
assert.Equal(t, storageNode.ID(), response.NodeId)
assert.Equal(t, storageNode.Addr(), response.ExternalAddress)
assert.NotNil(t, response.Stats)
}
planet.Start(ctx)
for _, storageNode := range planet.StorageNodes {
response, err := storageNode.Storage2.Inspector.Dashboard(ctx, &pb.DashboardRequest{})
expectedData := make([]byte, 100*memory.KiB)
_, err := rand.Read(expectedData)
require.NoError(t, err)
assert.True(t, response.Uptime.Nanos > 0)
assert.Equal(t, storageNode.ID(), response.NodeId)
assert.Equal(t, storageNode.Addr(), response.ExternalAddress)
assert.NotNil(t, response.Stats)
}
expectedData := make([]byte, 100*memory.KiB)
_, err = rand.Read(expectedData)
require.NoError(t, err)
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path", expectedData)
require.NoError(t, err)
for _, storageNode := range planet.StorageNodes {
response, err := storageNode.Storage2.Inspector.Dashboard(ctx, &pb.DashboardRequest{})
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path", expectedData)
require.NoError(t, err)
lastPinged, err := ptypes.Timestamp(response.LastPinged)
assert.NoError(t, err)
assert.True(t, lastPinged.After(testStartedTime))
for _, storageNode := range planet.StorageNodes {
response, err := storageNode.Storage2.Inspector.Dashboard(ctx, &pb.DashboardRequest{})
require.NoError(t, err)
lastQueried, err := ptypes.Timestamp(response.LastQueried)
assert.NoError(t, err)
assert.True(t, lastQueried.After(testStartedTime))
lastPinged, err := ptypes.Timestamp(response.LastPinged)
assert.NoError(t, err)
assert.True(t, lastPinged.After(testStartedTime))
assert.True(t, response.Uptime.Nanos > 0)
assert.Equal(t, storageNode.ID(), response.NodeId)
assert.Equal(t, storageNode.Addr(), response.ExternalAddress)
assert.Equal(t, int64(len(planet.StorageNodes)+len(planet.Satellites)), response.NodeConnections)
assert.NotNil(t, response.Stats)
}
lastQueried, err := ptypes.Timestamp(response.LastQueried)
assert.NoError(t, err)
assert.True(t, lastQueried.After(testStartedTime))
assert.True(t, response.Uptime.Nanos > 0)
assert.Equal(t, storageNode.ID(), response.NodeId)
assert.Equal(t, storageNode.Addr(), response.ExternalAddress)
assert.Equal(t, int64(len(planet.StorageNodes)+len(planet.Satellites)), response.NodeConnections)
assert.NotNil(t, response.Stats)
}
})
}

View File

@ -127,17 +127,10 @@ func (service *Service) updateNodeInformation(ctx context.Context) error {
return Error.Wrap(err)
}
self := service.routingTable.Local()
self.Restrictions = &pb.NodeRestrictions{
service.routingTable.UpdateSelf(&pb.NodeCapacity{
FreeBandwidth: service.allocatedBandwidth - usedBandwidth,
FreeDisk: service.allocatedDiskSpace - usedSpace,
}
// Update the routing table with latest restrictions
if err := service.routingTable.UpdateSelf(&self); err != nil {
return Error.Wrap(err)
}
})
return nil
}

View File

@ -17,46 +17,41 @@ import (
)
func TestMonitor(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
var freeBandwidth int64
for _, storageNode := range planet.StorageNodes {
storageNode.Storage2.Monitor.Loop.Pause()
planet, err := testplanet.New(t, 1, 6, 1)
require.NoError(t, err)
defer ctx.Check(planet.Shutdown)
info, err := storageNode.Kademlia.Service.FetchInfo(ctx, storageNode.Local().Node)
require.NoError(t, err)
planet.Start(ctx)
var freeBandwidth int64
for _, storageNode := range planet.StorageNodes {
storageNode.Storage2.Monitor.Loop.Pause()
info, err := storageNode.Kademlia.Service.FetchInfo(ctx, storageNode.Local())
require.NoError(t, err)
// assume that all storage nodes have the same initial values
freeBandwidth = info.Capacity.FreeBandwidth
}
expectedData := make([]byte, 100*memory.KiB)
_, err = rand.Read(expectedData)
require.NoError(t, err)
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path", expectedData)
require.NoError(t, err)
nodeAssertions := 0
for _, storageNode := range planet.StorageNodes {
storageNode.Storage2.Monitor.Loop.TriggerWait()
info, err := storageNode.Kademlia.Service.FetchInfo(ctx, storageNode.Local())
require.NoError(t, err)
stats, err := storageNode.Storage2.Inspector.Stats(ctx, &pb.StatsRequest{})
require.NoError(t, err)
if stats.UsedSpace > 0 {
assert.Equal(t, freeBandwidth-stats.UsedBandwidth, info.Capacity.FreeBandwidth)
nodeAssertions++
// assume that all storage nodes have the same initial values
freeBandwidth = info.Capacity.FreeBandwidth
}
}
assert.NotZero(t, nodeAssertions, "No storage node were verifed")
expectedData := make([]byte, 100*memory.KiB)
_, err := rand.Read(expectedData)
require.NoError(t, err)
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path", expectedData)
require.NoError(t, err)
nodeAssertions := 0
for _, storageNode := range planet.StorageNodes {
storageNode.Storage2.Monitor.Loop.TriggerWait()
info, err := storageNode.Kademlia.Service.FetchInfo(ctx, storageNode.Local().Node)
require.NoError(t, err)
stats, err := storageNode.Storage2.Inspector.Stats(ctx, &pb.StatsRequest{})
require.NoError(t, err)
if stats.UsedSpace > 0 {
assert.Equal(t, freeBandwidth-stats.UsedBandwidth, info.Capacity.FreeBandwidth)
nodeAssertions++
}
}
assert.NotZero(t, nodeAssertions, "No storage node were verifed")
})
}

View File

@ -15,6 +15,7 @@ import (
"storj.io/storj/pkg/auth/signing"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/kademlia"
"storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/peertls/tlsopts"
"storj.io/storj/pkg/piecestore/psserver"
@ -153,17 +154,19 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config, ver
return nil, errs.Combine(err, peer.Close())
}
self := pb.Node{
Id: peer.ID(),
Type: pb.NodeType_STORAGE,
Address: &pb.NodeAddress{
Transport: pb.NodeTransport_TCP_TLS_GRPC,
Address: config.ExternalAddress,
self := &overlay.NodeDossier{
Node: pb.Node{
Id: peer.ID(),
Address: &pb.NodeAddress{
Transport: pb.NodeTransport_TCP_TLS_GRPC,
Address: config.ExternalAddress,
},
},
Metadata: &pb.NodeMetadata{
Type: pb.NodeType_STORAGE,
Operator: pb.NodeOperator{
Wallet: config.Operator.Wallet,
},
Version: pbVersion,
Version: *pbVersion,
}
kdb, ndb := peer.DB.RoutingTable()
@ -174,7 +177,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config, ver
peer.Transport = peer.Transport.WithObservers(peer.Kademlia.RoutingTable)
peer.Kademlia.Service, err = kademlia.NewService(peer.Log.Named("kademlia"), self, peer.Transport, peer.Kademlia.RoutingTable, config)
peer.Kademlia.Service, err = kademlia.NewService(peer.Log.Named("kademlia"), peer.Transport, peer.Kademlia.RoutingTable, config)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
@ -315,7 +318,7 @@ func (peer *Peer) Close() error {
func (peer *Peer) ID() storj.NodeID { return peer.Identity.ID }
// Local returns the peer local node info.
func (peer *Peer) Local() pb.Node { return peer.Kademlia.RoutingTable.Local() }
func (peer *Peer) Local() overlay.NodeDossier { return peer.Kademlia.RoutingTable.Local() }
// Addr returns the public address.
func (peer *Peer) Addr() string { return peer.Server.Addr().String() }

View File

@ -26,70 +26,65 @@ import (
)
func TestUploadAndPartialDownload(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
expectedData := make([]byte, 100*memory.KiB)
_, err := rand.Read(expectedData)
require.NoError(t, err)
planet, err := testplanet.New(t, 1, 6, 1)
require.NoError(t, err)
defer ctx.Check(planet.Shutdown)
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path", expectedData)
assert.NoError(t, err)
planet.Start(ctx)
var totalDownload int64
for _, tt := range []struct {
offset, size int64
}{
{0, 1510},
{1513, 1584},
{13581, 4783},
} {
if piecestore.DefaultConfig.InitialStep < tt.size {
t.Fatal("test expects initial step to be larger than size to download")
}
totalDownload += piecestore.DefaultConfig.InitialStep
expectedData := make([]byte, 100*memory.KiB)
_, err = rand.Read(expectedData)
require.NoError(t, err)
download, err := planet.Uplinks[0].DownloadStream(ctx, planet.Satellites[0], "testbucket", "test/path")
require.NoError(t, err)
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path", expectedData)
assert.NoError(t, err)
pos, err := download.Seek(tt.offset, io.SeekStart)
require.NoError(t, err)
assert.Equal(t, pos, tt.offset)
var totalDownload int64
for _, tt := range []struct {
offset, size int64
}{
{0, 1510},
{1513, 1584},
{13581, 4783},
} {
if piecestore.DefaultConfig.InitialStep < tt.size {
t.Fatal("test expects initial step to be larger than size to download")
data := make([]byte, tt.size)
n, err := io.ReadFull(download, data)
require.NoError(t, err)
assert.Equal(t, int(tt.size), n)
assert.Equal(t, expectedData[tt.offset:tt.offset+tt.size], data)
require.NoError(t, download.Close())
}
totalDownload += piecestore.DefaultConfig.InitialStep
download, err := planet.Uplinks[0].DownloadStream(ctx, planet.Satellites[0], "testbucket", "test/path")
var totalBandwidthUsage bandwidth.Usage
for _, storagenode := range planet.StorageNodes {
usage, err := storagenode.DB.Bandwidth().Summary(ctx, time.Now().Add(-10*time.Hour), time.Now().Add(10*time.Hour))
require.NoError(t, err)
totalBandwidthUsage.Add(usage)
}
err = planet.Uplinks[0].Delete(ctx, planet.Satellites[0], "testbucket", "test/path")
require.NoError(t, err)
_, err = planet.Uplinks[0].Download(ctx, planet.Satellites[0], "testbucket", "test/path")
require.Error(t, err)
pos, err := download.Seek(tt.offset, io.SeekStart)
require.NoError(t, err)
assert.Equal(t, pos, tt.offset)
data := make([]byte, tt.size)
n, err := io.ReadFull(download, data)
require.NoError(t, err)
assert.Equal(t, int(tt.size), n)
assert.Equal(t, expectedData[tt.offset:tt.offset+tt.size], data)
require.NoError(t, download.Close())
}
var totalBandwidthUsage bandwidth.Usage
for _, storagenode := range planet.StorageNodes {
usage, err := storagenode.DB.Bandwidth().Summary(ctx, time.Now().Add(-10*time.Hour), time.Now().Add(10*time.Hour))
require.NoError(t, err)
totalBandwidthUsage.Add(usage)
}
err = planet.Uplinks[0].Delete(ctx, planet.Satellites[0], "testbucket", "test/path")
require.NoError(t, err)
_, err = planet.Uplinks[0].Download(ctx, planet.Satellites[0], "testbucket", "test/path")
require.Error(t, err)
// check rough limits for the upload and download
totalUpload := int64(len(expectedData))
t.Log(totalUpload, totalBandwidthUsage.Put, int64(len(planet.StorageNodes))*totalUpload)
assert.True(t, totalUpload < totalBandwidthUsage.Put && totalBandwidthUsage.Put < int64(len(planet.StorageNodes))*totalUpload)
t.Log(totalDownload, totalBandwidthUsage.Get, int64(len(planet.StorageNodes))*totalDownload)
assert.True(t, totalBandwidthUsage.Get < int64(len(planet.StorageNodes))*totalDownload)
// check rough limits for the upload and download
totalUpload := int64(len(expectedData))
t.Log(totalUpload, totalBandwidthUsage.Put, int64(len(planet.StorageNodes))*totalUpload)
assert.True(t, totalUpload < totalBandwidthUsage.Put && totalBandwidthUsage.Put < int64(len(planet.StorageNodes))*totalUpload)
t.Log(totalDownload, totalBandwidthUsage.Get, int64(len(planet.StorageNodes))*totalDownload)
assert.True(t, totalBandwidthUsage.Get < int64(len(planet.StorageNodes))*totalDownload)
})
}
func TestUpload(t *testing.T) {