dissertation-3-evaluation/structure/structure.py
2020-11-16 11:47:22 +00:00

551 lines
17 KiB
Python

import ipaddress
import json
import textwrap
import threading
from enum import Enum
import random
import numpy as np
from typing import List, Optional, Union, Dict, Tuple
class IpMethod(Enum):
Manual = 0
Management = 1
Auto4 = 2
Auto6 = 3
Dhcp4 = 4
Dhcp6 = 5
class Interface:
def __init__(self, method: IpMethod, rate: Optional[int] = None):
self._method: IpMethod
self._node: Optional[Node] = None
self._rate: Optional[int] = None
self._bridge: Optional[Bridge] = None
self._method = method
self._rate = rate
self._address: ipaddress.ip_address = None
def get_method(self):
return self._method
def set_node(self, node):
self._node = node
def get_node(self):
return self._node
def set_bridge(self, bridge):
self._bridge = bridge
def get_bridge(self):
return self._bridge
def set_address(self, addr: ipaddress.ip_address):
self._address = addr
def get_address(self) -> ipaddress.ip_address:
return self._address
def get_rate(self) -> Optional[int]:
return self._rate
def set_rate(self, rate: Optional[int]):
self._rate = rate
class Bridge:
def __init__(self, *interfaces: Interface):
self._interfaces: List[Interface] = []
self._name: str = ''
for interface in interfaces:
self._interfaces.append(interface)
interface.set_bridge(self)
# Generate a random class c private range by default (10.0.0.0)
self._addr: ipaddress.ip_address = None
self._network_iterator: iter = None
self.netmask = 24
self.new_network()
def get_interfaces(self) -> List[Interface]:
return self._interfaces
def set_name(self, name: str):
self._name = name
def get_name(self) -> str:
return self._name
def set_netmask(self, mask: int):
self.netmask = mask
self._network_iterator = ipaddress.ip_network('{}/{}'.format(self._addr, self.netmask), False).hosts()
def get_ip_address(self) -> ipaddress.ip_address:
return next(self._network_iterator)
def get_network(self) -> ipaddress.ip_network:
return ipaddress.ip_network('{}/{}'.format(self._addr, self.netmask), False)
def get_network_string(self) -> str:
return str(ipaddress.ip_network('{}/{}'.format(self._addr, self.netmask), False))
def new_network(self):
self._addr: ipaddress.ip_address = ipaddress.ip_address('10.0.0.0') + random.randint(0, 16777216)
self._network_iterator = ipaddress.ip_network('{}/{}'.format(self._addr, self.netmask), False).hosts()
class Node:
def __init__(self, interfaces: List[Interface], setup_params: Dict = None):
self._id: Union[int, None] = None
self._interfaces: List[Interface] = interfaces
self._interfaces.append(Interface(IpMethod.Management))
for interface in self._interfaces:
interface.set_node(self)
self.setup_params = {} if setup_params is None else setup_params
def get_interfaces(self):
return self._interfaces
def set_id(self, new_id):
self._id = new_id
def get_id(self):
return self._id
def get_core_count(self) -> int:
return 2
def get_memory_mb(self) -> int:
return 2048
def get_internet_setup(self) -> Optional[str]:
return None
def get_setup(self) -> Optional[str]:
return None
def ssh(self, *args, **kwargs):
raise RuntimeError('ssh not implemented')
class SpeedTestServer(Node):
def __init__(self, clone_interface: Interface = None, **kwargs):
super().__init__([Interface(IpMethod.Manual)], **kwargs)
self.clone_interface = clone_interface
def get_internet_setup(self) -> Optional[str]:
return textwrap.dedent('''
cloud-init status --wait || cloud-init status --long
sleep 2
sudo apt-get install -y iperf3
''')
def get_setup(self) -> Optional[str]:
if self.clone_interface is None:
return None
self.get_interfaces()[0].set_address(self.clone_interface.get_address())
return textwrap.dedent('''
set -e
sudo ip addr flush dev eth0
sudo ip addr add {} dev eth0
sudo ip route add 192.168.1.1 dev eth0
sudo ip route add default via 192.168.1.1 dev eth0
''').format(self.clone_interface.get_address())
def server(self):
self.ssh('iperf3 -s -1 -D', error_stdout=True, error_stderr=True)
def client(self, target, time=30):
if isinstance(target, SpeedTestServer):
target = target.get_interfaces()[0].get_address()
command = 'iperf3 -c {target} -t {time} -O 5 -J'.format(target=target, time=time)
return self.ssh(command, error_stdout=True, error_stderr=True, return_stdout=True)
class RemotePortal(Node):
def __init__(self, interfaces, **kwargs):
super(RemotePortal, self).__init__(interfaces, **kwargs)
self.local_portal = None
def set_local_portal(self, local_portal):
self.local_portal = local_portal
def get_internet_setup(self) -> Optional[str]:
return textwrap.dedent('''
set -e
wget -q https://f001.backblazeb2.com/file/dissertation/binaries/debian/{branch} -O mpbl3p
chmod +x mpbl3p
cloud-init status --wait || cloud-init status --long
''').format(**self.setup_params)
def get_setup(self) -> Optional[str]:
return textwrap.dedent('''
set -e
sudo sysctl -w net.ipv4.ip_forward=1
sudo sysctl -w net.ipv4.conf.eth0.proxy_arp=1
cat << EOF > config.ini
[Host]
PrivateKey = INVALID
[Peer]
PublicKey = INVALID
Method = TCP
LocalHost = {local_host}
LocalPort = 1234
EOF
(nohup sudo ./mpbl3p > mpbl3p.log 2>&1 & echo $! > mpbl3p.pid)
sleep 10
ps $(cat mpbl3p.pid) || cat mpbl3p.log
sudo ip addr add 172.19.152.2/31 dev nc0
sudo ip link set up nc0
sudo ip rule add from all table local priority 20
sudo ip rule del priority 0
sudo ip rule add to {local_host} dport 1234 table local priority 9
sudo ip route flush 10
sudo ip route add table 10 to {local_host} via 172.19.152.3 dev nc0
sudo ip rule add to {local_host} table 10 priority 10
ps $(cat mpbl3p.pid) || cat mpbl3p.log
''').format(
local_host=self.get_interfaces()[0].get_address(),
**self.setup_params,
)
class LocalPortal(Node):
def __init__(self, wan_interfaces: List[Interface], child: Optional[Node], **kwargs):
if child is not None:
lan_interface = Interface(IpMethod.Manual)
Bridge(lan_interface, child.get_interfaces()[0])
super().__init__([*wan_interfaces, lan_interface], **kwargs)
else:
super().__init__(wan_interfaces, **kwargs)
self.remote_portal = None
def set_remote_portal(self, remote_portal):
self.remote_portal = remote_portal
def get_internet_setup(self) -> Optional[str]:
return textwrap.dedent('''
set -e
wget -q https://f001.backblazeb2.com/file/dissertation/binaries/debian/{branch} -O mpbl3p
chmod +x mpbl3p
cloud-init status --wait || cloud-init status --long
''').format(**self.setup_params)
def get_setup(self) -> str:
peer_string = textwrap.dedent('''
[Peer]
PublicKey = INVALID
Method = TCP
LocalHost = {local_host}
RemoteHost = {remote_host}
RemotePort = 1234
''')
peers = '\n\n'.join([peer_string.format(
local_host=x.get_address(),
remote_host=self.remote_portal.get_interfaces()[0].get_address(),
) for x in self.get_interfaces()[:-2]])
policy_routing_string = textwrap.dedent('''
sudo ip route flush {table_number}
sudo ip route add table {table_number} to {network} dev {device}
sudo ip rule add from {local_address} table {table_number} priority {table_number}
''')
policy_routing = '\n\n'.join([policy_routing_string.format(
table_number=i + 10,
device='eth{}'.format(i),
network=iface.get_bridge().get_network_string(),
local_address=iface.get_address(),
) for i, iface in enumerate(self.get_interfaces()[:-2])])
return textwrap.dedent('''
set -e
sudo sysctl -w net.ipv4.conf.all.arp_announce=1
sudo sysctl -w net.ipv4.conf.all.arp_ignore=1
sudo sysctl -w net.ipv4.ip_forward=1
sudo ip addr flush dev {local_interface}
sudo ip addr add 192.168.1.1 dev {local_interface}
{policy_routing}
cat << EOF > config.ini
[Host]
PrivateKey = INVALID
{peers}
EOF
(nohup sudo ./mpbl3p > mpbl3p.log 2>&1 & echo $! > mpbl3p.pid)
sleep 10
ps $(cat mpbl3p.pid) || cat mpbl3p.log
sudo ip addr add 172.19.152.3/31 dev nc0
sudo ip link set up nc0
sudo ip route flush 18
sudo ip route add table 18 default via 172.19.152.2 dev nc0
sudo ip rule add from {remote_host} iif {local_interface} table 18 priority 18
sudo ip route flush 19
sudo ip route add to {remote_host} dev {local_interface} table 19
sudo ip rule add to {remote_host} table 19 priority 19
ps $(cat mpbl3p.pid) || cat mpbl3p.log
''').format(
**self.setup_params,
peers=peers,
policy_routing=policy_routing,
remote_host=self.remote_portal.get_interfaces()[0].get_address(),
local_interface='eth{}'.format(len(self.get_interfaces()) - 2),
)
class StandardTest:
def __init__(
self,
rates: List[int],
events: Dict[float, Tuple[int, int]] = None,
duration: int = 10,
variation_target: float = 0.2,
max_failures: int = 3,
max_attempts: int = 60,
):
self.rates = rates
self.events = events if events is not None else dict()
self.duration = duration
self.variation_target = variation_target
self.max_failures = max_failures
self.max_attempts = max_attempts
def name(self) -> str:
name_builder = ['R{}-{}'.format(*y) for y in enumerate(self.rates)]
name_builder += ['E{}R{}-{}'.format(x, *y) for (x, y) in self.events.items()]
name_builder.append('T{}'.format(self.duration))
return ''.join(name_builder)
class DirectTest(StandardTest):
def __init__(self, rate: int, **kwargs):
super().__init__([rate], **kwargs)
def name(self) -> str:
return 'D{}'.format(super().name())
class StandardIperfResult:
def __init__(self, test: StandardTest, iperf: str, interval_size=1.0):
self.test = test
self.interval_size = interval_size
# list containing an exact time and a value
self.data: List[Tuple[float, float]] = []
self.add_results(iperf)
def add_results(self, iperf: str):
data = json.loads(iperf)
# grab the sum data of all non omitted intervals, excluding any that are smaller than expected
intervals = [
x['sum'] for x in data['intervals'] if
(not x['sum']['omitted']) and (x['sum']['end'] - x['sum']['start'] > self.interval_size / 2)
]
for (time, result) in zip(
[((x['start'] + x['end']) / 2) for x in intervals],
[x['bits_per_second'] for x in intervals],
):
self.data.append((time, result))
def bins(self) -> List[List[Tuple[float, float]]]:
bins: List[List[Tuple[float, float]]] = [[] for _ in np.arange(0, self.test.duration, self.interval_size)]
for time, result in self.data:
index = int(np.round((time - self.interval_size / 2) / self.interval_size))
bins[index].append((time, result))
return bins
def summarise(self) -> Dict[float, float]:
bins = self.bins()
means = [np.mean(x, axis=0)[1] for x in bins]
times = [i + self.interval_size / 2 for i in np.arange(0, self.test.duration, self.interval_size)]
return dict(zip(times, means))
def standard_deviation(self) -> Dict[float, float]:
bins = self.bins()
stds = [np.std(x, axis=0)[1] for x in bins]
times = [i + self.interval_size / 2 for i in np.arange(0, self.test.duration, self.interval_size)]
return dict(zip(times, stds))
def coefficient_variance(self) -> Dict[float, float]:
stds = self.standard_deviation()
means = self.summarise()
return {k: stds[k] / means[k] for k in stds.keys()}
def time_range(self) -> Dict[float, Tuple[float, float]]:
bins = self.bins()
times = [i + self.interval_size / 2 for i in np.arange(0, self.test.duration, self.interval_size)]
ranges = [(-np.min(x, axis=0)[0] + time, np.max(x, axis=0)[0] - time) for (x, time) in zip(bins, times)]
return dict(zip(times, ranges))
def repeat_until_satisfied(reducer, satisfied, initial=None, max_attempts=100, max_failures=3):
val = initial()
i = 0
for i in range(max_attempts):
for j in range(max_failures):
try:
val = reducer(val)
except Exception as e:
print('failed with {}'.format(e))
if j == max_failures - 1:
raise e
if satisfied(val):
return val
raise RuntimeError('too many attempts')
class BaseEnvironment:
def __init__(self, runner, top_level_bridge: Bridge):
self.top_level_bridge = top_level_bridge
self._runner = runner
def __enter__(self):
try:
self._runner.build(self.top_level_bridge)
except Exception as e:
self._runner.teardown()
raise e
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._runner.teardown()
def _test(
self,
test: StandardTest,
inbound_server: SpeedTestServer,
inbound_client: SpeedTestServer,
rated_node: Node,
expected_interfaces: int,
) -> Tuple[StandardIperfResult, StandardIperfResult]:
if len(test.rates) != expected_interfaces:
raise RuntimeError('mismatched number of interfaces')
results = []
for server, client in [(inbound_server, inbound_client), (inbound_client, inbound_server)]:
def test_reducer(old: Optional[StandardIperfResult]) -> StandardIperfResult:
for i, r in enumerate(test.rates):
rated_node.get_interfaces()[i].set_rate(r)
server.server()
for t, (iface, rate) in test.events.items():
threading.Timer(
5 + t,
(lambda s: lambda: s.lp.get_interfaces()[iface].set_rate(rate))(self),
)
iperf = client.client(server, time=test.duration)
if old is None:
return StandardIperfResult(test, iperf)
else:
old.add_results(iperf)
return old
result = repeat_until_satisfied(
test_reducer,
lambda x: max(x.coefficient_variance().values()) < test.variation_target,
max_failures=test.max_failures,
max_attempts=test.max_attempts,
)
results.append(result)
# Return a tuple of (inbound, outbound)
return results[0], results[1]
class StandardEnvironment(BaseEnvironment):
def __init__(self, interfaces: int, runner, setup_params: dict):
self._interfaces = interfaces
self.rp = RemotePortal([Interface(IpMethod.Auto4)], setup_params=setup_params)
self.st = SpeedTestServer()
self.cl = SpeedTestServer(clone_interface=self.rp.get_interfaces()[0])
self.lp = LocalPortal(
[Interface(IpMethod.Auto4) for _ in range(interfaces)],
self.cl,
setup_params=setup_params,
)
self.rp.set_local_portal(self.lp)
self.lp.set_remote_portal(self.rp)
super().__init__(runner, Bridge(
self.st.get_interfaces()[0],
self.rp.get_interfaces()[0],
*self.lp.get_interfaces()[0:interfaces],
))
def test(self, test: StandardTest) -> Tuple[StandardIperfResult, StandardIperfResult]:
return self._test(test, self.st, self.cl, self.lp, self._interfaces)
class DirectEnvironment(BaseEnvironment):
def __init__(self, runner):
self.st1 = SpeedTestServer()
self.st2 = SpeedTestServer()
super().__init__(runner, Bridge(
self.st1.get_interfaces()[0],
self.st2.get_interfaces()[0],
))
def test(self, test: StandardTest) -> Tuple[StandardIperfResult, StandardIperfResult]:
return self._test(test, self.st2, self.st1, self.st2, 1)