Refactored graph generation

- Refactored graph generation
- Added bar graphs and switched most
This commit is contained in:
Jake Hillion 2020-12-12 10:40:29 +00:00
parent ccff05c8d2
commit a9d001e764
11 changed files with 1484 additions and 1028 deletions

2
.gitignore vendored
View File

@ -1,4 +1,4 @@
graphs/
output/
# Created by https://www.toptal.com/developers/gitignore/api/python,intellij+all,dotenv
# Edit at https://www.toptal.com/developers/gitignore?templates=python,intellij+all,dotenv

File diff suppressed because one or more lines are too long

2
graphs/__init__.py Normal file
View File

@ -0,0 +1,2 @@
from .store import TestStore
from .graphs import plot_iperf_results, plot_iperf_results_time

132
graphs/graphs.py Normal file
View File

@ -0,0 +1,132 @@
from itertools import cycle
from typing import Dict
from matplotlib import pyplot as plt
from graphs import TestStore
from structure import StandardTest
def plot_iperf_results(
store: TestStore,
series: Dict[str, StandardTest],
fast_tests: bool,
title: str = None,
direction='both',
error_bars=True,
filename=None,
):
if direction == 'both':
plot_iperf_results(store, series, fast_tests, title, 'inbound', error_bars, filename)
plot_iperf_results(store, series, fast_tests, title, 'outbound', error_bars, filename)
return
if filename in ['png', 'eps']:
filename = 'output/{}{}{}.{}'.format(
'I' if direction == 'inbound' else 'O',
'E' if error_bars else '',
''.join(['S{}-{}'.format(i, x.name()) for (i, x) in enumerate(series.values())]),
filename,
)
print(filename)
series = {
k: (store.get_inbound(v) if direction == 'inbound' else store.get_outbound(v))
for (k, v) in series.items()
}
fig = plt.figure()
axes = fig.add_axes([0, 0, 1, 1])
if title is not None:
axes.set_title(title, pad=20.0 if True in [len(x.test.events) > 0 for x in series.values()] else None)
axes.set_ylabel('Throughput (Mbps)')
for k, v in series.items():
axes.bar(
k,
v.bandwidth_mean() / 1e6,
yerr=1.5 * v.bandwidth_standard_deviation() / 1e6,
width=0.4,
capsize=15,
)
if fast_tests:
fig.text(0.95, 0.05, 'Draft', fontsize=50, color='gray', ha='right', va='bottom', alpha=0.5)
if filename is not None:
fig.savefig(filename, bbox_inches='tight', pad_inches=0.3)
def plot_iperf_results_time(
store: TestStore,
series: Dict[str, StandardTest],
fast_tests: bool,
title: str = None,
direction='both',
error_bars_x=False,
error_bars_y=True,
filename=None,
start_at_zero=True,
):
if direction == 'both':
plot_iperf_results_time(store, series, fast_tests, title, 'outbound', error_bars_x, error_bars_y, filename,
start_at_zero)
plot_iperf_results_time(store, series, fast_tests, title, 'inbound', error_bars_x, error_bars_y, filename,
start_at_zero)
return
if filename in ['png', 'eps']:
filename = 'output/T{}{}{}{}.{}'.format(
'I' if direction == 'inbound' else 'O',
'Ex' if error_bars_x else '',
'Ey' if error_bars_y else '',
''.join(['S{}-{}'.format(i, x.name()) for (i, x) in enumerate(series.values())]),
filename,
)
print(filename)
series = {
k: (store.get_inbound(v) if direction == 'inbound' else store.get_outbound(v))
for (k, v) in series.items()
}
cycol = cycle('brgy')
fig = plt.figure()
axes = fig.add_axes([0, 0, 1, 1])
if title is not None:
axes.set_title(title, pad=20.0 if True in [len(x.test.events) > 0 for x in series.values()] else None)
axes.set_xlabel('Time (s)')
axes.set_ylabel('Throughput (Mbps)')
for k, v in series.items():
data = v.interval_means()
axes.errorbar(
data.keys(),
[x / 1e6 for x in data.values()],
xerr=(
[x[0] for x in v.interval_time_ranges().values()],
[x[1] for x in v.interval_time_ranges().values()]) if error_bars_x else None,
yerr=[x * 1.5 / 1e6 for x in v.interval_standard_deviations().values()] if error_bars_y else None,
capsize=3,
ecolor='grey',
color=next(cycol),
label=k,
)
legend = axes.legend()
if start_at_zero:
axes.set_ylim(bottom=0)
axes.set_xlim(left=0)
if fast_tests:
fig.text(0.95, 0.05, 'Draft', fontsize=50, color='gray', ha='right', va='bottom', alpha=0.5)
if filename is not None:
fig.savefig(filename, bbox_extra_artists=(legend,), bbox_inches='tight', pad_inches=0.3)

21
graphs/store.py Normal file
View File

@ -0,0 +1,21 @@
from typing import Dict
from structure import IperfResult, StandardTest
class TestStore:
def __init__(self):
self.inbound: Dict[str, IperfResult] = dict()
self.outbound: Dict[str, IperfResult] = dict()
def save_inbound(self, test: StandardTest, result: IperfResult):
self.inbound[test.name()] = result
def save_outbound(self, test: StandardTest, result: IperfResult):
self.outbound[test.name()] = result
def get_inbound(self, test: StandardTest) -> IperfResult:
return self.inbound[test.name()]
def get_outbound(self, test: StandardTest) -> IperfResult:
return self.outbound[test.name()]

View File

@ -11,6 +11,7 @@ import proxmoxer
import paramiko
import structure
import structure.generic
def check_env(*names: str) -> bool:
@ -21,14 +22,14 @@ def check_env(*names: str) -> bool:
def bridge_node_search(
first: structure.Bridge,
bridge_name_generator: Callable[[structure.Bridge], str],
node_id_generator: Callable[[structure.Node], int],
) -> Tuple[List[structure.Bridge], List[structure.Node]]:
bridges: List[structure.Bridge] = []
nodes: List[structure.Node] = []
first: structure.generic.Bridge,
bridge_name_generator: Callable[[structure.generic.Bridge], str],
node_id_generator: Callable[[structure.generic.Node], int],
) -> Tuple[List[structure.generic.Bridge], List[structure.generic.Node]]:
bridges: List[structure.generic.Bridge] = []
nodes: List[structure.generic.Node] = []
queue: List[structure.Bridge] = [first]
queue: List[structure.generic.Bridge] = [first]
while len(queue) > 0:
bridge = queue.pop()
if bridge.get_name() != '':
@ -38,7 +39,7 @@ def bridge_node_search(
bridge.set_name(bridge_name_generator(bridge))
# from this bridge, find all nodes (via all interfaces)
reachable_nodes: List[structure.Node] = []
reachable_nodes: List[structure.generic.Node] = []
for interface in bridge.get_interfaces():
node = interface.get_node()
if node.get_id() is not None:
@ -63,7 +64,7 @@ class PrintRunner:
self._last_bridge: int = 0
self._last_node_id = 0
def build(self, bridge: structure.Bridge):
def build(self, bridge: structure.generic.Bridge):
bridges, nodes = bridge_node_search(bridge, lambda _: self.name_bridge(), lambda _: self.id_node())
print(bridges)
@ -121,10 +122,10 @@ class ProxmoxRunner:
self._template_id = template_id
self._initial_vm_id = initial_vm_id
self._internet_bridge = structure.Bridge()
self._internet_bridge = structure.generic.Bridge()
self._internet_bridge.set_name(internet_bridge)
self._management_bridge = structure.Bridge()
self._management_bridge = structure.generic.Bridge()
self._management_bridge.set_name(management_bridge)
self._management_initial_ip = management_initial_ip
self._management_netmask = management_netmask
@ -133,7 +134,7 @@ class ProxmoxRunner:
self._private_key = paramiko.RSAKey.generate(3072)
self._client = paramiko.SSHClient()
def build(self, bridge: structure.Bridge):
def build(self, bridge: structure.generic.Bridge):
bridges, nodes = bridge_node_search(bridge, lambda x: self._create_bridge(x), lambda x: self._create_node(x))
self._build_bridges(bridges)
@ -156,7 +157,7 @@ class ProxmoxRunner:
return
raise TimeoutError
def _create_bridge(self, bridge: structure.Bridge) -> str:
def _create_bridge(self, bridge: structure.generic.Bridge) -> str:
while True:
try:
self._proxmox.nodes(self._proxmox_node).network.post(
@ -177,7 +178,7 @@ class ProxmoxRunner:
self._created_bridges.append(bridge_name)
return bridge_name
def _build_bridges(self, bridges: List[structure.Bridge]):
def _build_bridges(self, bridges: List[structure.generic.Bridge]):
network_task = self._proxmox.nodes(self._proxmox_node).network.put()
self._await_task(network_task)
@ -187,7 +188,7 @@ class ProxmoxRunner:
bridge.new_network()
existing.append(bridge.get_network())
def _create_node(self, node: structure.Node) -> int:
def _create_node(self, node: structure.generic.Node) -> int:
while True:
try:
clone_task = self._proxmox.nodes(self._proxmox_node).qemu(self._template_id).clone.post(
@ -207,10 +208,10 @@ class ProxmoxRunner:
self._created_nodes.append(new_id - 1)
return new_id - 1
def _open_ssh(self, node: structure.Node, interface: structure.Interface = None):
def _open_ssh(self, node: structure.generic.Node, interface: structure.generic.Interface = None):
if interface is None:
for iface in node.get_interfaces():
if iface.get_method() == structure.IpMethod.Management:
if iface.get_method() == structure.generic.IpMethod.Management:
interface = iface
break
if interface is None:
@ -235,13 +236,13 @@ class ProxmoxRunner:
node.ssh_client = client
def _close_ssh(self, node: structure.Node):
def _close_ssh(self, node: structure.generic.Node):
node.ssh_client.close()
del node.ssh_client
def ssh(
self,
node: structure.Node,
node: structure.generic.Node,
command: str,
error_stderr=False,
error_stdout=False,
@ -269,7 +270,7 @@ class ProxmoxRunner:
return exit_status
def _build_node(self, node: structure.Node):
def _build_node(self, node: structure.generic.Node):
# Step 1: Configure access
self._proxmox.nodes(self._proxmox_node).qemu(node.get_id()).config.put(
ciuser='python',
@ -282,7 +283,7 @@ class ProxmoxRunner:
# Step 2: connect to Internet bridge with DHCP to install packages
if node.get_internet_setup() is not None:
interfaces = node.get_interfaces()
internet_interface = structure.Interface(structure.IpMethod.Dhcp4)
internet_interface = structure.generic.Interface(structure.generic.IpMethod.Dhcp4)
internet_interface.set_bridge(self._internet_bridge)
temp_interfaces = [internet_interface, interfaces[len(interfaces) - 1]]
@ -307,7 +308,7 @@ class ProxmoxRunner:
self._open_ssh(node)
node.ssh = (lambda n: lambda *args, **kwargs: self.ssh(n, *args, **kwargs))(node)
def _setup_node_interfaces(self, node: structure.Node, interfaces: List[structure.Interface] = None):
def _setup_node_interfaces(self, node: structure.generic.Node, interfaces: List[structure.generic.Interface] = None):
if interfaces is None:
interfaces = node.get_interfaces()
@ -316,20 +317,20 @@ class ProxmoxRunner:
interface = interfaces[i]
method = interface.get_method()
if method == structure.IpMethod.Management:
if method == structure.generic.IpMethod.Management:
interface.set_bridge(self._management_bridge)
addr = self._management_initial_ip + node.get_id() - self._initial_vm_id
kwargs['ipconfig{}'.format(i)] = 'ip={}/{}'.format(addr, self._management_netmask)
interface.set_address(addr)
elif method == structure.IpMethod.Auto4 or method == structure.IpMethod.Manual:
elif method == structure.generic.IpMethod.Auto4 or method == structure.generic.IpMethod.Manual:
# handle manual the same as auto4 so it doesn't get stuck in DHCP
bridge = interface.get_bridge()
addr = bridge.get_ip_address()
kwargs['ipconfig{}'.format(i)] = 'ip={}/{}'.format(addr, bridge.netmask)
interface.set_address(addr)
elif method == structure.IpMethod.Dhcp4:
elif method == structure.generic.IpMethod.Dhcp4:
kwargs['ipconfig{}'.format(i)] = 'ip=dhcp'
else:
raise RuntimeError('not implemented')
@ -340,7 +341,7 @@ class ProxmoxRunner:
def interface_set_rate(iface):
def new_set_rate(rate: Optional[int]):
structure.Interface.set_rate(iface, rate)
structure.generic.Interface.set_rate(iface, rate)
self._update_node_interfaces(node)
return new_set_rate
@ -349,7 +350,7 @@ class ProxmoxRunner:
self._proxmox.nodes(self._proxmox_node).qemu(node.get_id()).config.put(**kwargs)
def _update_node_interfaces(self, node: structure.Node):
def _update_node_interfaces(self, node: structure.generic.Node):
interfaces = node.get_interfaces()
old_config = self._proxmox.nodes(self._proxmox_node).qemu(node.get_id()).config.get()
@ -368,7 +369,7 @@ class ProxmoxRunner:
self._proxmox.nodes(self._proxmox_node).qemu(node.get_id()).config.put(**new_config)
def _setup_node(self, node: structure.Node):
def _setup_node(self, node: structure.generic.Node):
if node.get_setup() is not None:
cmd = node.get_setup()
self.ssh(node, cmd, error_stdout=True, error_stderr=True)

View File

@ -1,6 +1,5 @@
from .structure import Node
from .structure import IpMethod, Interface, Bridge
from .generic import IpMethod, Interface, Bridge, Node
from .structure import SpeedTestServer, LocalPortal, RemotePortal
from .structure import StandardEnvironment, StandardTest, StandardIperfResult
from .tests import StandardTest, DirectTest, IperfResult
from .environments import StandardEnvironment, DirectEnvironment, BaseEnvironment

116
structure/environments.py Normal file
View File

@ -0,0 +1,116 @@
import threading
from typing import Tuple, Optional
from structure import Bridge, StandardTest, SpeedTestServer, Node, IperfResult, RemotePortal, Interface, \
IpMethod, LocalPortal
from structure.tests import repeat_until_satisfied
class BaseEnvironment:
def __init__(self, runner, top_level_bridge: Bridge):
self.top_level_bridge = top_level_bridge
self._runner = runner
def __enter__(self):
try:
self._runner.build(self.top_level_bridge)
except Exception as e:
self._runner.teardown()
raise e
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._runner.teardown()
def _test(
self,
test: StandardTest,
inbound_server: SpeedTestServer,
inbound_client: SpeedTestServer,
rated_node: Node,
expected_interfaces: int,
) -> Tuple[IperfResult, IperfResult]:
if len(test.rates) != expected_interfaces:
raise RuntimeError('mismatched number of interfaces')
results = []
for server, client in [(inbound_server, inbound_client), (inbound_client, inbound_server)]:
def test_reducer(old: Optional[IperfResult]) -> IperfResult:
for i, r in enumerate(test.rates):
rated_node.get_interfaces()[i].set_rate(r)
server.server()
for t, (iface, rate) in test.events.items():
threading.Timer(
6 + t,
(lambda n: lambda: n.get_interfaces()[iface].set_rate(rate))(rated_node),
)
iperf = client.client(server, time=test.duration)
if old is None:
return IperfResult(test, iperf)
else:
old.add_results(iperf)
return old
def test_satisfier(val: IperfResult) -> bool:
if val.num_tests < 3:
return False
return val.bandwidth_coefficient_variance() < test.bandwidth_variation_target and False not in \
[x < test.interval_variation_target for x in val.interval_coefficient_variances().values()]
result = repeat_until_satisfied(
test_reducer,
test_satisfier,
max_failures=test.max_failures,
max_attempts=test.max_attempts,
)
results.append(result)
# Return a tuple of (inbound, outbound)
return results[0], results[1]
def test(self, test: StandardTest) -> Tuple[IperfResult, IperfResult]:
raise RuntimeError('not implemented')
class StandardEnvironment(BaseEnvironment):
def __init__(self, interfaces: int, runner, setup_params: dict):
self._interfaces = interfaces
self.rp = RemotePortal([Interface(IpMethod.Auto4)], setup_params=setup_params)
self.st = SpeedTestServer()
self.cl = SpeedTestServer(clone_interface=self.rp.get_interfaces()[0])
self.lp = LocalPortal(
[Interface(IpMethod.Auto4) for _ in range(interfaces)],
self.cl,
setup_params=setup_params,
)
self.rp.set_local_portal(self.lp)
self.lp.set_remote_portal(self.rp)
super().__init__(runner, Bridge(
self.st.get_interfaces()[0],
self.rp.get_interfaces()[0],
*self.lp.get_interfaces()[0:interfaces],
))
def test(self, test: StandardTest) -> Tuple[IperfResult, IperfResult]:
return self._test(test, self.st, self.cl, self.lp, self._interfaces)
class DirectEnvironment(BaseEnvironment):
def __init__(self, runner):
self.st1 = SpeedTestServer()
self.st2 = SpeedTestServer()
super().__init__(runner, Bridge(
self.st1.get_interfaces()[0],
self.st2.get_interfaces()[0],
))
def test(self, test: StandardTest) -> Tuple[IperfResult, IperfResult]:
return self._test(test, self.st2, self.st1, self.st2, 1)

132
structure/generic.py Normal file
View File

@ -0,0 +1,132 @@
import ipaddress
import random
from enum import Enum
from typing import Optional, List, Dict, Union
class IpMethod(Enum):
Manual = 0
Management = 1
Auto4 = 2
Auto6 = 3
Dhcp4 = 4
Dhcp6 = 5
class Interface:
def __init__(self, method: IpMethod, rate: Optional[int] = None):
self._method: IpMethod
self._node: Optional[Node] = None
self._rate: Optional[int] = None
self._bridge: Optional[Bridge] = None
self._method = method
self._rate = rate
self._address: ipaddress.ip_address = None
def get_method(self):
return self._method
def set_node(self, node):
self._node = node
def get_node(self):
return self._node
def set_bridge(self, bridge):
self._bridge = bridge
def get_bridge(self):
return self._bridge
def set_address(self, addr: ipaddress.ip_address):
self._address = addr
def get_address(self) -> ipaddress.ip_address:
return self._address
def get_rate(self) -> Optional[int]:
return self._rate
def set_rate(self, rate: Optional[int]):
self._rate = rate
class Bridge:
def __init__(self, *interfaces: Interface):
self._interfaces: List[Interface] = []
self._name: str = ''
for interface in interfaces:
self._interfaces.append(interface)
interface.set_bridge(self)
# Generate a random class c private range by default (10.0.0.0)
self._addr: ipaddress.ip_address = None
self._network_iterator: iter = None
self.netmask = 24
self.new_network()
def get_interfaces(self) -> List[Interface]:
return self._interfaces
def set_name(self, name: str):
self._name = name
def get_name(self) -> str:
return self._name
def set_netmask(self, mask: int):
self.netmask = mask
self._network_iterator = ipaddress.ip_network('{}/{}'.format(self._addr, self.netmask), False).hosts()
def get_ip_address(self) -> ipaddress.ip_address:
return next(self._network_iterator)
def get_network(self) -> ipaddress.ip_network:
return ipaddress.ip_network('{}/{}'.format(self._addr, self.netmask), False)
def get_network_string(self) -> str:
return str(ipaddress.ip_network('{}/{}'.format(self._addr, self.netmask), False))
def new_network(self):
self._addr: ipaddress.ip_address = ipaddress.ip_address('10.0.0.0') + random.randint(0, 16777216)
self._network_iterator = ipaddress.ip_network('{}/{}'.format(self._addr, self.netmask), False).hosts()
class Node:
def __init__(self, interfaces: List[Interface], setup_params: Dict = None):
self._id: Union[int, None] = None
self._interfaces: List[Interface] = interfaces
self._interfaces.append(Interface(IpMethod.Management))
for interface in self._interfaces:
interface.set_node(self)
self.setup_params = {} if setup_params is None else setup_params
def get_interfaces(self):
return self._interfaces
def set_id(self, new_id):
self._id = new_id
def get_id(self):
return self._id
def get_core_count(self) -> int:
return 2
def get_memory_mb(self) -> int:
return 2048
def get_internet_setup(self) -> Optional[str]:
return None
def get_setup(self) -> Optional[str]:
return None
def ssh(self, *args, **kwargs):
raise RuntimeError('ssh not implemented')

View File

@ -1,140 +1,8 @@
import ipaddress
import json
import textwrap
import threading
from enum import Enum
import random
import numpy as np
from typing import List, Optional, Union, Dict, Tuple
from typing import List, Optional
class IpMethod(Enum):
Manual = 0
Management = 1
Auto4 = 2
Auto6 = 3
Dhcp4 = 4
Dhcp6 = 5
class Interface:
def __init__(self, method: IpMethod, rate: Optional[int] = None):
self._method: IpMethod
self._node: Optional[Node] = None
self._rate: Optional[int] = None
self._bridge: Optional[Bridge] = None
self._method = method
self._rate = rate
self._address: ipaddress.ip_address = None
def get_method(self):
return self._method
def set_node(self, node):
self._node = node
def get_node(self):
return self._node
def set_bridge(self, bridge):
self._bridge = bridge
def get_bridge(self):
return self._bridge
def set_address(self, addr: ipaddress.ip_address):
self._address = addr
def get_address(self) -> ipaddress.ip_address:
return self._address
def get_rate(self) -> Optional[int]:
return self._rate
def set_rate(self, rate: Optional[int]):
self._rate = rate
class Bridge:
def __init__(self, *interfaces: Interface):
self._interfaces: List[Interface] = []
self._name: str = ''
for interface in interfaces:
self._interfaces.append(interface)
interface.set_bridge(self)
# Generate a random class c private range by default (10.0.0.0)
self._addr: ipaddress.ip_address = None
self._network_iterator: iter = None
self.netmask = 24
self.new_network()
def get_interfaces(self) -> List[Interface]:
return self._interfaces
def set_name(self, name: str):
self._name = name
def get_name(self) -> str:
return self._name
def set_netmask(self, mask: int):
self.netmask = mask
self._network_iterator = ipaddress.ip_network('{}/{}'.format(self._addr, self.netmask), False).hosts()
def get_ip_address(self) -> ipaddress.ip_address:
return next(self._network_iterator)
def get_network(self) -> ipaddress.ip_network:
return ipaddress.ip_network('{}/{}'.format(self._addr, self.netmask), False)
def get_network_string(self) -> str:
return str(ipaddress.ip_network('{}/{}'.format(self._addr, self.netmask), False))
def new_network(self):
self._addr: ipaddress.ip_address = ipaddress.ip_address('10.0.0.0') + random.randint(0, 16777216)
self._network_iterator = ipaddress.ip_network('{}/{}'.format(self._addr, self.netmask), False).hosts()
class Node:
def __init__(self, interfaces: List[Interface], setup_params: Dict = None):
self._id: Union[int, None] = None
self._interfaces: List[Interface] = interfaces
self._interfaces.append(Interface(IpMethod.Management))
for interface in self._interfaces:
interface.set_node(self)
self.setup_params = {} if setup_params is None else setup_params
def get_interfaces(self):
return self._interfaces
def set_id(self, new_id):
self._id = new_id
def get_id(self):
return self._id
def get_core_count(self) -> int:
return 2
def get_memory_mb(self) -> int:
return 2048
def get_internet_setup(self) -> Optional[str]:
return None
def get_setup(self) -> Optional[str]:
return None
def ssh(self, *args, **kwargs):
raise RuntimeError('ssh not implemented')
from structure import IpMethod, Interface, Bridge, Node
class SpeedTestServer(Node):
@ -343,223 +211,3 @@ class LocalPortal(Node):
remote_host=self.remote_portal.get_interfaces()[0].get_address(),
local_interface='eth{}'.format(len(self.get_interfaces()) - 2),
)
class StandardTest:
def __init__(
self,
rates: List[int],
events: Dict[float, Tuple[int, int]] = None,
duration: int = 10,
variation_target: float = 0.2,
max_failures: int = 3,
max_attempts: int = 60,
):
self.rates = rates
self.events = events if events is not None else dict()
self.duration = duration
self.variation_target = variation_target
self.max_failures = max_failures
self.max_attempts = max_attempts
def name(self) -> str:
name_builder = ['R{}-{}'.format(*y) for y in enumerate(self.rates)]
name_builder += ['E{}R{}-{}'.format(x, *y) for (x, y) in self.events.items()]
name_builder.append('T{}'.format(self.duration))
return ''.join(name_builder)
class DirectTest(StandardTest):
def __init__(self, rate: int, **kwargs):
super().__init__([rate], **kwargs)
def name(self) -> str:
return 'D{}'.format(super().name())
class StandardIperfResult:
def __init__(self, test: StandardTest, iperf: str, interval_size=2.0):
self.test = test
self.interval_size = interval_size
# list containing an exact time and a value
self.data: List[Tuple[float, float]] = []
self.num_tests = 0
self.add_results(iperf)
def add_results(self, iperf: str):
data = json.loads(iperf)
# grab the sum data of all non omitted intervals, excluding any that are smaller than expected
intervals = [
x['sum'] for x in data['intervals'] if
(not x['sum']['omitted']) and (x['sum']['end'] - x['sum']['start'] > self.interval_size / 2)
]
for (time, result) in zip(
[((x['start'] + x['end']) / 2) for x in intervals],
[x['bits_per_second'] for x in intervals],
):
self.data.append((time, result))
self.num_tests += 1
def bins(self) -> List[List[Tuple[float, float]]]:
bins: List[List[Tuple[float, float]]] = [[] for _ in np.arange(0, self.test.duration, self.interval_size)]
for time, result in self.data:
index = int(np.round((time - self.interval_size / 2) / self.interval_size))
bins[index].append((time, result))
return bins
def summarise(self) -> Dict[float, float]:
bins = self.bins()
means = [np.mean(x, axis=0)[1] for x in bins]
times = [i + self.interval_size / 2 for i in np.arange(0, self.test.duration, self.interval_size)]
return dict(zip(times, means))
def standard_deviation(self) -> Dict[float, float]:
bins = self.bins()
stds = [np.std(x, axis=0)[1] for x in bins]
times = [i + self.interval_size / 2 for i in np.arange(0, self.test.duration, self.interval_size)]
return dict(zip(times, stds))
def coefficient_variance(self) -> Dict[float, float]:
stds = self.standard_deviation()
means = self.summarise()
return {k: stds[k] / means[k] for k in stds.keys()}
def time_range(self) -> Dict[float, Tuple[float, float]]:
bins = self.bins()
times = [i + self.interval_size / 2 for i in np.arange(0, self.test.duration, self.interval_size)]
ranges = [(-np.min(x, axis=0)[0] + time, np.max(x, axis=0)[0] - time) for (x, time) in zip(bins, times)]
return dict(zip(times, ranges))
def repeat_until_satisfied(reducer, satisfied, initial=None, max_attempts=100, max_failures=3):
val = initial
for i in range(max_attempts):
for j in range(max_failures):
try:
val = reducer(val)
break
except Exception as e:
print('failed with {}'.format(e))
if j == max_failures - 1:
raise e
if satisfied(val):
return val
raise RuntimeError('too many attempts')
class BaseEnvironment:
def __init__(self, runner, top_level_bridge: Bridge):
self.top_level_bridge = top_level_bridge
self._runner = runner
def __enter__(self):
try:
self._runner.build(self.top_level_bridge)
except Exception as e:
self._runner.teardown()
raise e
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._runner.teardown()
def _test(
self,
test: StandardTest,
inbound_server: SpeedTestServer,
inbound_client: SpeedTestServer,
rated_node: Node,
expected_interfaces: int,
) -> Tuple[StandardIperfResult, StandardIperfResult]:
if len(test.rates) != expected_interfaces:
raise RuntimeError('mismatched number of interfaces')
results = []
for server, client in [(inbound_server, inbound_client), (inbound_client, inbound_server)]:
def test_reducer(old: Optional[StandardIperfResult]) -> StandardIperfResult:
for i, r in enumerate(test.rates):
rated_node.get_interfaces()[i].set_rate(r)
server.server()
for t, (iface, rate) in test.events.items():
threading.Timer(
6 + t,
(lambda n: lambda: n.get_interfaces()[iface].set_rate(rate))(rated_node),
)
iperf = client.client(server, time=test.duration)
if old is None:
return StandardIperfResult(test, iperf)
else:
old.add_results(iperf)
return old
def test_satisfier(val: StandardIperfResult) -> bool:
if val.num_tests < 3:
return False
return False not in [x < test.variation_target for x in val.coefficient_variance().values()]
result = repeat_until_satisfied(
test_reducer,
test_satisfier,
max_failures=test.max_failures,
max_attempts=test.max_attempts,
)
results.append(result)
# Return a tuple of (inbound, outbound)
return results[0], results[1]
class StandardEnvironment(BaseEnvironment):
def __init__(self, interfaces: int, runner, setup_params: dict):
self._interfaces = interfaces
self.rp = RemotePortal([Interface(IpMethod.Auto4)], setup_params=setup_params)
self.st = SpeedTestServer()
self.cl = SpeedTestServer(clone_interface=self.rp.get_interfaces()[0])
self.lp = LocalPortal(
[Interface(IpMethod.Auto4) for _ in range(interfaces)],
self.cl,
setup_params=setup_params,
)
self.rp.set_local_portal(self.lp)
self.lp.set_remote_portal(self.rp)
super().__init__(runner, Bridge(
self.st.get_interfaces()[0],
self.rp.get_interfaces()[0],
*self.lp.get_interfaces()[0:interfaces],
))
def test(self, test: StandardTest) -> Tuple[StandardIperfResult, StandardIperfResult]:
return self._test(test, self.st, self.cl, self.lp, self._interfaces)
class DirectEnvironment(BaseEnvironment):
def __init__(self, runner):
self.st1 = SpeedTestServer()
self.st2 = SpeedTestServer()
super().__init__(runner, Bridge(
self.st1.get_interfaces()[0],
self.st2.get_interfaces()[0],
))
def test(self, test: StandardTest) -> Tuple[StandardIperfResult, StandardIperfResult]:
return self._test(test, self.st2, self.st1, self.st2, 1)

137
structure/tests.py Normal file
View File

@ -0,0 +1,137 @@
import json
from typing import List, Dict, Tuple
import numpy as np
def repeat_until_satisfied(reducer, satisfied, initial=None, max_attempts=100, max_failures=3):
val = initial
for i in range(max_attempts):
for j in range(max_failures):
try:
val = reducer(val)
break
except Exception as e:
print('failed with {}'.format(e))
if j == max_failures - 1:
raise e
if satisfied(val):
return val
raise RuntimeError('too many attempts')
class StandardTest:
def __init__(
self,
rates: List[int],
events: Dict[float, Tuple[int, int]] = None,
duration: int = 10,
interval_variation_target: float = np.inf,
bandwidth_variation_target: float = np.inf,
max_failures: int = 3,
max_attempts: int = 60,
):
self.rates = rates
self.events = events if events is not None else dict()
self.duration = duration
self.interval_variation_target = interval_variation_target
self.bandwidth_variation_target = bandwidth_variation_target
self.max_failures = max_failures
self.max_attempts = max_attempts
def name(self) -> str:
name_builder = ['R{}-{}'.format(*y) for y in enumerate(self.rates)]
name_builder += ['E{}R{}-{}'.format(x, *y) for (x, y) in self.events.items()]
name_builder.append('T{}'.format(self.duration))
return ''.join(name_builder)
class DirectTest(StandardTest):
def __init__(self, rate: int, **kwargs):
super().__init__([rate], **kwargs)
def name(self) -> str:
return 'D{}'.format(super().name())
class IperfResult:
def __init__(self, test: StandardTest, iperf: str, interval_size=2.0):
self.test = test
self.interval_size = interval_size
# list containing an exact time and a value
self.interval_data: List[Tuple[float, float]] = []
# list containing the overall data transferred and the time taken
self.bandwidth_data: List[float] = []
self.num_tests = 0
self.add_results(iperf)
def add_results(self, iperf: str):
data = json.loads(iperf)
# grab the overall bandwidth
self.bandwidth_data.append(data['end']['sum_sent']['bits_per_second'])
# grab the sum data of all non omitted intervals, excluding any that are smaller than expected
intervals = [
x['sum'] for x in data['intervals'] if
(not x['sum']['omitted']) and (x['sum']['end'] - x['sum']['start'] > self.interval_size / 2)
]
for (time, result) in zip(
[((x['start'] + x['end']) / 2) for x in intervals],
[x['bits_per_second'] for x in intervals],
):
self.interval_data.append((time, result))
self.num_tests += 1
def bins(self) -> List[List[Tuple[float, float]]]:
bins: List[List[Tuple[float, float]]] = [[] for _ in np.arange(0, self.test.duration, self.interval_size)]
for time, result in self.interval_data:
index = int(np.round((time - self.interval_size / 2) / self.interval_size))
bins[index].append((time, result))
return bins
def interval_means(self) -> Dict[float, float]:
bins = self.bins()
means = [np.mean(x, axis=0)[1] for x in bins]
times = [i + self.interval_size / 2 for i in np.arange(0, self.test.duration, self.interval_size)]
return dict(zip(times, means))
def interval_standard_deviations(self) -> Dict[float, float]:
bins = self.bins()
stds = [np.std(x, axis=0)[1] for x in bins]
times = [i + self.interval_size / 2 for i in np.arange(0, self.test.duration, self.interval_size)]
return dict(zip(times, stds))
def interval_coefficient_variances(self) -> Dict[float, float]:
stds = self.interval_standard_deviations()
means = self.interval_means()
return {k: stds[k] / means[k] for k in stds.keys()}
def interval_time_ranges(self) -> Dict[float, Tuple[float, float]]:
bins = self.bins()
times = [i + self.interval_size / 2 for i in np.arange(0, self.test.duration, self.interval_size)]
ranges = [(-np.min(x, axis=0)[0] + time, np.max(x, axis=0)[0] - time) for (x, time) in zip(bins, times)]
return dict(zip(times, ranges))
def bandwidth_mean(self):
return np.mean(self.bandwidth_data)
def bandwidth_standard_deviation(self):
return np.std(self.bandwidth_data)
def bandwidth_coefficient_variance(self):
return self.bandwidth_standard_deviation() / self.bandwidth_mean()