diff --git a/evaluation.ipynb b/evaluation.ipynb index 92e287d..2b80dce 100644 --- a/evaluation.ipynb +++ b/evaluation.ipynb @@ -6,7 +6,7 @@ "source": [ "# Project Evaluation\n", "\n", - "This file interfaces with a Proxmox server to automatically generate VM structures and graphs for testing the\n", + "This file interfaces with a Proxmox server to automatically generate VM structures and plots for testing the\n", "success criteria of my project." ] }, @@ -31,10 +31,12 @@ "import os\n", "import ipaddress\n", "import threading\n", + "from typing import Dict\n", "\n", "import runners\n", "from structure import Bridge, Interface, IpMethod\n", "from structure import RemotePortal, LocalPortal, SpeedTestServer\n", + "from structure import StandardEnvironment, StandardTest, IperfResult\n", "\n", "%load_ext dotenv\n", "%dotenv" @@ -84,8 +86,23 @@ " 'branch': os.getenv('TARGET_BRANCH'),\n", "}\n", "\n", - "directionInbound = {}\n", - "directionOutbound = {}" + "directionInbound: Dict[str, IperfResult] = {}\n", + "directionOutbound: Dict[str, IperfResult] = {}\n", + "\n", + "def run_and_save_test(env: StandardEnvironment, test: StandardTest):\n", + " (directionInbound[test.name()], directionOutbound[test.name()]) = env.test(test)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "### Direct Server to Server Testing" ] }, { @@ -124,6 +141,48 @@ " runner.teardown()" ] }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "### Local Portal with 2 Interfaces" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "is_executing": true, + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "with StandardEnvironment(2, runner, setup_params) as env:\n", + " run_and_save_test(env, StandardTest([1,1]))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "pycharm": { + "is_executing": true, + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "from pprint import pprint\n", + "pprint(directionInbound[StandardTest([1,1]).name()].standard_deviation())" + ] + }, { "cell_type": "code", "execution_count": null, @@ -196,6 +255,15 @@ " runner.teardown()" ] }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false + }, + "source": [ + "### Local Portal with 3 Interfaces" + ] + }, { "cell_type": "code", "execution_count": null, @@ -241,6 +309,15 @@ " runner.teardown()" ] }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false + }, + "source": [ + "### Local Portal with 4 Interfaces" + ] + }, { "cell_type": "code", "execution_count": null, @@ -428,27 +505,8 @@ " },\n", " 'Network Slow',\n", " events={0: 'Y=2', 15: 'Y=1', 30: 'Y=2'}\n", - ")" + ")\n" ] - }, - { - "cell_type": "markdown", - "metadata": { - "pycharm": { - "name": "#%% md\n" - } - }, - "source": [ - "## Criteria\n", - "This section automatically verifies some criteria with assertions." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] } ], "metadata": { diff --git a/structure/__init__.py b/structure/__init__.py index d8de3bf..2b1f1a2 100644 --- a/structure/__init__.py +++ b/structure/__init__.py @@ -3,3 +3,4 @@ from .structure import Node from .structure import IpMethod, Interface, Bridge from .structure import SpeedTestServer, LocalPortal, RemotePortal +from .structure import StandardEnvironment, StandardTest, IperfResult diff --git a/structure/structure.py b/structure/structure.py index 488a0bc..47a7cf6 100644 --- a/structure/structure.py +++ b/structure/structure.py @@ -1,9 +1,12 @@ import ipaddress import json import textwrap +import threading from enum import Enum import random -from typing import List, Optional, Union, Dict + +import numpy as np +from typing import List, Optional, Union, Dict, Tuple class IpMethod(Enum): @@ -160,8 +163,7 @@ class SpeedTestServer(Node): target = target.get_interfaces()[0].get_address() command = 'iperf3 -c {target} -t {time} -O 5 -J'.format(target=target, time=time) - out = self.ssh(command, error_stdout=True, error_stderr=True, return_stdout=True) - return json.loads(out) + return self.ssh(command, error_stdout=True, error_stderr=True, return_stdout=True) class RemotePortal(Node): @@ -210,7 +212,10 @@ class RemotePortal(Node): (nohup sudo ./mpbl3p > mpbl3p.log 2>&1 & echo $! > mpbl3p.pid) - sleep 1 + sleep 10 + + ps $(cat mpbl3p.pid) || cat mpbl3p.log + sudo ip addr add 172.19.152.2/31 dev nc0 sudo ip link set up nc0 @@ -223,7 +228,7 @@ class RemotePortal(Node): sudo ip route add table 10 to {local_host} via 172.19.152.3 dev nc0 sudo ip rule add to {local_host} table 10 priority 10 - ps $(cat mpbl3p.pid) + ps $(cat mpbl3p.pid) || cat mpbl3p.log ''').format( local_host=self.get_interfaces()[0].get_address(), **self.setup_params, @@ -284,7 +289,7 @@ class LocalPortal(Node): ''') policy_routing = '\n\n'.join([policy_routing_string.format( - table_number=i+10, + table_number=i + 10, device='eth{}'.format(i), network=iface.get_bridge().get_network(), local_address=iface.get_address(), @@ -312,7 +317,10 @@ class LocalPortal(Node): (nohup sudo ./mpbl3p > mpbl3p.log 2>&1 & echo $! > mpbl3p.pid) - sleep 1 + sleep 10 + + ps $(cat mpbl3p.pid) || cat mpbl3p.log + sudo ip addr add 172.19.152.3/31 dev nc0 sudo ip link set up nc0 @@ -324,11 +332,174 @@ class LocalPortal(Node): sudo ip route add to {remote_host} dev {local_interface} table 19 sudo ip rule add to {remote_host} table 19 priority 19 - ps $(cat mpbl3p.pid) + ps $(cat mpbl3p.pid) || cat mpbl3p.log ''').format( **self.setup_params, peers=peers, policy_routing=policy_routing, remote_host=self.remote_portal.get_interfaces()[0].get_address(), - local_interface='eth{}'.format(len(self.get_interfaces())-2), + local_interface='eth{}'.format(len(self.get_interfaces()) - 2), ) + + +class StandardTest: + def __init__( + self, + rates: List[int], + events: Dict[float, Tuple[int, int]] = None, + duration: int = 30, + variation_target: float = 0.4, + max_failures: int = 3, + max_attempts: int = 30, + ): + self.rates = rates + self.events = events if events is not None else dict() + self.duration = duration + + self.variation_target = variation_target + self.max_failures = max_failures + self.max_attempts = max_attempts + + def name(self) -> str: + name_builder = ['R{}-{}'.format(*y) for y in enumerate(self.rates)] + name_builder += ['E{}R{}-{}'.format(x, *y) for (x, y) in self.events.items()] + name_builder.append('D{}'.format(self.duration)) + return ''.join(name_builder) + + +class IperfResult: + def __init__(self, iperf: str, interval_size=1.0, duration=30): + self.interval_size = interval_size + self.duration = duration + + # list containing an exact time and a value + self.data: List[Tuple[float, float]] = [] + + self.add_results(iperf) + + def add_results(self, iperf: str): + data = json.loads(iperf) + # grab the sum data of all non omitted intervals, excluding any that are smaller than expected + intervals = [ + x['sum'] for x in data['intervals'] if + (not x['sum']['omitted']) and (x['sum']['end'] - x['sum']['start'] > self.interval_size / 2) + ] + + for (time, result) in zip( + [((x['start'] + x['end']) / 2) for x in intervals], + [x['bits_per_second'] for x in intervals], + ): + self.data.append((time, result)) + + def bins(self) -> List[List[Tuple[float, float]]]: + # Binning phase + bins: List[List[Tuple[float, float]]] = [[] for _ in np.arange(0, self.duration, self.interval_size)] + + for time, result in self.data: + index = int((time - self.interval_size / 2) / self.interval_size) + bins[index].append((time, result)) + + return bins + + def summarise(self) -> Dict[float, float]: + bins = self.bins() + means = [np.mean(x, axis=0)[1] for x in bins] + times = [i + self.interval_size / 2 for i in np.arange(0, self.duration, self.interval_size)] + return dict(zip(times, means)) + + def standard_deviation(self) -> Dict[float, float]: + bins = self.bins() + stds = [np.std(x, axis=0)[1] for x in bins] + times = [i + self.interval_size / 2 for i in np.arange(0, self.duration, self.interval_size)] + return dict(zip(times, stds)) + + def coefficient_variance(self) -> Dict[float, float]: + stds = self.standard_deviation() + means = self.summarise() + + return {k: stds[k] / means[k] for k in stds.keys()} + + def time_range(self) -> Dict[float, Tuple[float, float]]: + bins = self.bins() + times = [i + self.interval_size / 2 for i in np.arange(0, self.duration, self.interval_size)] + ranges = [(np.min(x, axis=0)[0] - time, np.max(x, axis=0)[0] - time) for (x, time) in zip(bins, times)] + return dict(zip(times, ranges)) + + +class StandardEnvironment: + def __init__(self, interfaces: int, runner, setup_params: dict): + self._interfaces = interfaces + self._runner = runner + + self.rp = RemotePortal([Interface(IpMethod.Auto4)], setup_params=setup_params) + + self.st = SpeedTestServer() + self.cl = SpeedTestServer(clone_interface=self.rp.get_interfaces()[0]) + + self.lp = LocalPortal( + [Interface(IpMethod.Auto4) for _ in range(interfaces)], + self.cl, + setup_params=setup_params, + ) + + self.rp.set_local_portal(self.lp) + self.lp.set_remote_portal(self.rp) + + self.top_level_bridge = Bridge( + self.st.get_interfaces()[0], + self.rp.get_interfaces()[0], + *self.lp.get_interfaces()[0:interfaces], + ) + + def __enter__(self): + try: + self._runner.build(self.top_level_bridge) + except Exception as e: + self._runner.teardown() + raise e + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self._runner.teardown() + + def test(self, test: StandardTest) -> Tuple[IperfResult, IperfResult]: + if len(test.rates) != self._interfaces: + raise RuntimeError('mismatched number of interfaces') + + for i, r in enumerate(test.rates): + self.lp.get_interfaces()[i].set_rate(r) + + results = [] + for server, client in [(self.cl, self.st), (self.st, self.cl)]: + result: Optional[IperfResult] = None + + for i in range(test.max_attempts): + if i > 2 and max(result.coefficient_variance().values()) < test.variation_target: + break + + for j in range(test.max_failures): + try: + server.server() + + for t, (iface, rate) in test.events.items(): + threading.Timer(5 + t, lambda: self.lp.get_interfaces()[iface].set_rate(rate)) + + iperf = client.client(server, time=test.duration) + if result is None: + result = IperfResult(iperf) + else: + result.add_results(iperf) + + break + except Exception as e: + print('failed with {}'.format(e)) + if j == test.max_failures - 1: + raise e + + if max(result.coefficient_variance().values()) > test.variation_target: + raise RuntimeError('too many attempts') + + results.append(result) + + # Return a tuple of (inbound, outbound) + return results[0], results[1]