develop #3

Manually merged
JakeHillion merged 4 commits from develop into master 2020-11-08 14:17:33 +00:00
5 changed files with 776 additions and 221 deletions

View File

@ -1,4 +1,13 @@
# Dissertation Evaluation
A Python backing to partially automate producing the graphs for my dissertation using an iPython Notebook and a Proxmox server.
# Git
## Hooks
### pre-commit
Clears the output of the Jupyter notebook to avoid Git churn.
#!/bin/sh
jupyter nbconvert --ClearOutputPreprocessor.enabled=True --inplace evaluation.ipynb
git add evaluation.ipynb

File diff suppressed because one or more lines are too long

View File

@ -1,9 +1,14 @@
import concurrent.futures
import ipaddress
import os
import re
import time
from datetime import datetime
from typing import Callable, List, Tuple
from typing import Callable, List, Tuple, Optional, Union
from urllib.parse import quote
import proxmoxer
import paramiko
import structure
@ -15,7 +20,7 @@ def check_env(*names: str) -> bool:
return True
def bridge_node_dfs(
def bridge_node_search(
first: structure.Bridge,
bridge_name_generator: Callable[[structure.Bridge], str],
node_id_generator: Callable[[structure.Node], int],
@ -59,7 +64,7 @@ class PrintRunner:
self._last_node_id = 0
def build(self, bridge: structure.Bridge):
bridges, nodes = bridge_node_dfs(bridge, lambda _: self.name_bridge(), lambda _: self.id_node())
bridges, nodes = bridge_node_search(bridge, lambda _: self.name_bridge(), lambda _: self.id_node())
print(bridges)
print(nodes)
@ -77,6 +82,8 @@ class PrintRunner:
class ProxmoxRunner:
ssh_timeout = 300
def __init__(
self,
host: str,
@ -92,6 +99,7 @@ class ProxmoxRunner:
management_bridge: str,
management_initial_ip: ipaddress,
management_netmask: int = 24,
verify_ssl: bool = False,
):
@ -111,19 +119,35 @@ class ProxmoxRunner:
self._proxmox_node = node
self._template_id = template_id
self._initial_vm_id = initial_vm_id - 1
self._initial_vm_id = initial_vm_id
self._internet_bridge = internet_bridge
self._internet_bridge = structure.Bridge()
self._internet_bridge.set_name(internet_bridge)
self._management_bridge = management_bridge
self._management_bridge = structure.Bridge()
self._management_bridge.set_name(management_bridge)
self._management_initial_ip = management_initial_ip
self._management_netmask = management_netmask
# generate a single use SSH key (we can use any with Proxmox)
self._private_key = paramiko.RSAKey.generate(3072)
self._client = paramiko.SSHClient()
def build(self, bridge: structure.Bridge):
bridges, nodes = bridge_node_dfs(bridge, lambda x: self._create_bridge(x), lambda x: self._create_node(x))
bridges, nodes = bridge_node_search(bridge, lambda x: self._create_bridge(x), lambda x: self._create_node(x))
self._build_bridges()
for node in nodes:
self._build_node(node)
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
build_futures = [executor.submit(self._build_node, node) for node in nodes]
for future in build_futures:
future.result()
# guarantee that setup is not called until all of the nodes are built
# this means that all will have their final IPs by this point
setup_futures = [executor.submit(self._setup_node, node) for node in nodes]
for future in setup_futures:
future.result()
def _await_task(self, upid, timeout=10):
t1 = datetime.now()
@ -133,8 +157,6 @@ class ProxmoxRunner:
raise TimeoutError
def _create_bridge(self, bridge: structure.Bridge) -> str:
self._last_bridge += 1
while True:
try:
self._proxmox.nodes(self._proxmox_node).network.post(
@ -143,6 +165,7 @@ class ProxmoxRunner:
autostart=1,
comments='Automatically created by Python evaluation',
)
self._last_bridge += 1
break
except proxmoxer.core.ResourceException as e:
if 'interface already exists' in str(e):
@ -150,7 +173,7 @@ class ProxmoxRunner:
else:
raise e
bridge_name = 'vmbr{}'.format(self._last_bridge)
bridge_name = 'vmbr{}'.format(self._last_bridge - 1)
self._created_bridges.append(bridge_name)
return bridge_name
@ -159,14 +182,13 @@ class ProxmoxRunner:
self._await_task(network_task)
def _create_node(self, node: structure.Node) -> int:
self._last_node_id += 1
while True:
try:
clone_task = self._proxmox.nodes(self._proxmox_node).qemu(self._template_id).clone.post(
newid=self._initial_vm_id + self._last_node_id,
name='Diss-{}-Testing'.format(node.__class__.__name__),
)
self._last_node_id += 1
break
except proxmoxer.core.ResourceException as e:
if 'config file already exists' in str(e):
@ -176,13 +198,174 @@ class ProxmoxRunner:
self._await_task(clone_task)
new_id = self._initial_vm_id + self._last_node_id
self._created_nodes.append(new_id)
return new_id
self._created_nodes.append(new_id - 1)
return new_id - 1
def _open_ssh(self, node: structure.Node, interface: structure.Interface = None):
if interface is None:
for iface in node.get_interfaces():
if iface.get_method() == structure.IpMethod.Management:
interface = iface
break
if interface is None:
raise RuntimeError('no management interface available')
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy)
t1 = datetime.now()
while (datetime.now() - t1).seconds < ProxmoxRunner.ssh_timeout:
try:
client.connect(
hostname=str(interface.get_address()),
username='python',
pkey=self._private_key,
banner_timeout=15,
)
client.set_missing_host_key_policy(paramiko.RejectPolicy)
break
except (paramiko.ssh_exception.AuthenticationException, paramiko.ssh_exception.NoValidConnectionsError):
time.sleep(10)
node.client = client
def _close_ssh(self, node: structure.Node):
node.client.close()
del node.client
def ssh(
self,
node: structure.Node,
command: str,
error_stderr=False,
error_stdout=False,
return_stdout=False,
) -> Union[int, str]:
chan = node.client.get_transport().open_session()
chan.exec_command(command)
exit_status = chan.recv_exit_status()
if exit_status != 0:
if error_stderr and error_stdout:
raise Exception(
'stdout:\n{}\n\nstderr:\n{}\n'.format(chan.recv(2048).decode(), chan.recv_stderr(2048).decode()))
if error_stderr:
raise Exception(chan.recv_stderr(2048).decode())
if error_stdout:
raise Exception(chan.recv(2048).decode())
if return_stdout is not False:
if return_stdout is True:
return chan.makefile().read()
else:
return chan.recv(return_stdout).decode()
return exit_status
def _build_node(self, node: structure.Node):
# Step 1: connect to Internet bridge with DHCP to install packages
# Step 2: connect to management bridge for correct setup
pass
# Step 1: Configure access
self._proxmox.nodes(self._proxmox_node).qemu(node.get_id()).config.put(
ciuser='python',
sshkeys=quote('ssh-rsa ' + self._private_key.get_base64(), ''),
cores=node.get_core_count(),
sockets=1,
memory=node.get_memory_mb(),
)
# Step 2: connect to Internet bridge with DHCP to install packages
if node.get_internet_setup() is not None:
interfaces = node.get_interfaces()
internet_interface = structure.Interface(structure.IpMethod.Dhcp4)
internet_interface.set_bridge(self._internet_bridge)
temp_interfaces = [internet_interface, interfaces[len(interfaces) - 1]]
self._setup_node_interfaces(node, temp_interfaces)
start_task = self._proxmox.nodes(self._proxmox_node).qemu(node.get_id()).status.start.post()
self._await_task(start_task)
self._open_ssh(node)
self.ssh(node, node.get_internet_setup(), error_stdout=True, error_stderr=True)
self._close_ssh(node)
stop_task = self._proxmox.nodes(self._proxmox_node).qemu(node.get_id()).status.shutdown.post()
self._await_task(stop_task, timeout=20)
# Step 3: connect to management bridge for final setup
self._setup_node_interfaces(node)
start_task = self._proxmox.nodes(self._proxmox_node).qemu(node.get_id()).status.start.post()
self._await_task(start_task)
self._open_ssh(node)
node.ssh = (lambda n: lambda *args, **kwargs: self.ssh(n, *args, **kwargs))(node)
def _setup_node_interfaces(self, node: structure.Node, interfaces: List[structure.Interface] = None):
if interfaces is None:
interfaces = node.get_interfaces()
kwargs = dict()
for i in range(len(interfaces)):
interface = interfaces[i]
method = interface.get_method()
if method == structure.IpMethod.Manual:
pass
elif method == structure.IpMethod.Management:
interface.set_bridge(self._management_bridge)
addr = self._management_initial_ip + node.get_id() - self._initial_vm_id
kwargs['ipconfig{}'.format(i)] = 'ip={}/{}'.format(addr, self._management_netmask)
interface.set_address(addr)
elif method == structure.IpMethod.Auto4:
bridge = interface.get_bridge()
addr = bridge.get_ip_address()
kwargs['ipconfig{}'.format(i)] = 'ip={}/{}'.format(addr, bridge.netmask)
interface.set_address(addr)
elif method == structure.IpMethod.Dhcp4:
kwargs['ipconfig{}'.format(i)] = 'ip=dhcp'
else:
raise RuntimeError('not implemented')
kwargs['net{}'.format(i)] = 'model=virtio,bridge={}'.format(interface.get_bridge().get_name())
if interface.get_rate() is not None:
kwargs['net{}'.format(i)] += ',rate={}'.format(interface.get_rate())
def interface_set_rate(iface):
def new_set_rate(rate: Optional[int]):
structure.Interface.set_rate(iface, rate)
self._update_node_interfaces(node)
return new_set_rate
interface.set_rate = interface_set_rate(interface)
self._proxmox.nodes(self._proxmox_node).qemu(node.get_id()).config.put(**kwargs)
def _update_node_interfaces(self, node: structure.Node):
interfaces = node.get_interfaces()
old_config = self._proxmox.nodes(self._proxmox_node).qemu(node.get_id()).config.get()
old_digest = old_config['digest']
old_config = {k: v for (k, v) in old_config.items() if k[:3] == 'net'}
rate_regex = re.compile(r',rate=(\d+(?:\.\d+)?)')
new_config = {'digest': old_digest}
for k, v in old_config.items():
index = int(k[3:])
iface = interfaces[index]
new_config[k] = rate_regex.sub('', v)
if iface.get_rate() is not None:
new_config[k] += ',rate={}'.format(iface.get_rate())
self._proxmox.nodes(self._proxmox_node).qemu(node.get_id()).config.put(**new_config)
def _setup_node(self, node: structure.Node):
if node.get_setup() is not None:
self.ssh(node, node.get_setup(), error_stdout=True, error_stderr=True)
def teardown(self):
for node in self._created_nodes:
@ -193,6 +376,7 @@ class ProxmoxRunner:
for bridge in self._created_bridges:
self._proxmox.nodes(self._proxmox_node).network(bridge).delete()
network_task = self._proxmox.nodes(self._proxmox_node).network.put()
self._await_task(network_task)

View File

@ -2,4 +2,4 @@ from .structure import Node
from .structure import IpMethod, Interface, Bridge
from .structure import SpeedTestServer, LocalServer, RemoteServer
from .structure import SpeedTestServer, LocalPortal, RemotePortal

View File

@ -1,8 +1,11 @@
import ipaddress
import json
import textwrap
from enum import Enum
from typing import List, Optional, Union
import random
from typing import List, Optional, Union, Dict
# Enums
class IpMethod(Enum):
Manual = 0
Management = 1
@ -13,15 +16,16 @@ class IpMethod(Enum):
class Interface:
def __init__(self, method: IpMethod, limit: Optional[int] = None):
def __init__(self, method: IpMethod, rate: Optional[int] = None):
self._method: IpMethod
self._node: Optional[Node] = None
self._limit: Optional[int] = None
self._rate: Optional[int] = None
self._bridge: Optional[Bridge] = None
self._method = method
self._limit = limit
self._rate = rate
self._address: ipaddress.ip_address = None
def get_method(self):
return self._method
@ -38,6 +42,18 @@ class Interface:
def get_bridge(self):
return self._bridge
def set_address(self, addr: ipaddress.ip_address):
self._address = addr
def get_address(self) -> ipaddress.ip_address:
return self._address
def get_rate(self) -> Optional[int]:
return self._rate
def set_rate(self, rate: Optional[int]):
self._rate = rate
class Bridge:
def __init__(self, *interfaces: Interface):
@ -48,6 +64,11 @@ class Bridge:
self._interfaces.append(interface)
interface.set_bridge(self)
# Generate a random class c private range by default (10.0.0.0)
self.netmask = 24
self._addr: ipaddress.ip_address = ipaddress.ip_address('10.0.0.0') + random.randint(0, 16777216)
self._network_iterator = ipaddress.ip_network('{}/{}'.format(self._addr, self.netmask), False).hosts()
def get_interfaces(self) -> List[Interface]:
return self._interfaces
@ -57,9 +78,19 @@ class Bridge:
def get_name(self) -> str:
return self._name
def set_netmask(self, mask: int):
self.netmask = mask
self._network_iterator = ipaddress.ip_network('{}/{}'.format(self._addr, self.netmask), False).hosts()
def get_ip_address(self) -> ipaddress.ip_address:
return next(self._network_iterator)
def get_network(self) -> str:
return str(ipaddress.ip_network('{}/{}'.format(self._addr, self.netmask), False))
class Node:
def __init__(self, interfaces: List[Interface]):
def __init__(self, interfaces: List[Interface], setup_params: Dict = None):
self._id: Union[int, None] = None
self._interfaces: List[Interface] = interfaces
self._interfaces.append(Interface(IpMethod.Management))
@ -67,6 +98,8 @@ class Node:
for interface in self._interfaces:
interface.set_node(self)
self.setup_params = {} if setup_params is None else setup_params
def get_interfaces(self):
return self._interfaces
@ -76,22 +109,189 @@ class Node:
def get_id(self):
return self._id
def get_core_count(self) -> int:
return 2
def get_memory_mb(self) -> int:
return 2048
def get_internet_setup(self) -> Optional[str]:
return None
def get_setup(self) -> Optional[str]:
return None
def ssh(self, *args, **kwargs):
raise RuntimeError('ssh not implemented')
class SpeedTestServer(Node):
def server(self):
pass
def client(self, server: Interface):
pass
# Entry method for running the serve with `with speedtest:`
def __enter__(self):
pass
class RemoteServer(Node):
pass
def __exit__(self, exc_type, exc_val, exc_tb):
pass
class LocalServer(Node):
def __init__(self, wan_interfaces: List[Interface], child: Node):
lan_interface = Interface(IpMethod.Manual)
super().__init__([*wan_interfaces, lan_interface])
class RemotePortal(Node):
def __init__(self, interfaces, **kwargs):
super(RemotePortal, self).__init__(interfaces, **kwargs)
Bridge(lan_interface, child.get_interfaces()[0])
self.local_portal = None
def set_local_portal(self, local_portal):
self.local_portal = local_portal
def get_internet_setup(self) -> Optional[str]:
return textwrap.dedent('''
set -e
wget -q http://10.20.0.11/minio-client
chmod +x minio-client
./minio-client alias set s3 http://10.20.0.25:3900 {access_key} {secret_key} || \
./minio-client alias set s3 s3.us-west-001.backblazeb2.com {access_key} {secret_key}
./minio-client cp s3/dissertation/binaries/debian/{branch} mpbl3p
chmod +x mpbl3p
cloud-init status --wait || cloud-init status --long
sudo apt-get install -y iperf3
''').format(**self.setup_params)
def get_setup(self) -> Optional[str]:
return textwrap.dedent('''
set -e
sudo sysctl -w net.ipv4.conf.all.arp_announce=1
sudo sysctl -w net.ipv4.conf.all.arp_ignore=2
cat << EOF > config.ini
[Host]
PrivateKey = INVALID
[Peer]
PublicKey = INVALID
Method = TCP
LocalHost = {local_host}
LocalPort = 1234
EOF
(nohup sudo ./mpbl3p > mpbl3p.log 2>&1 & echo $! > mpbl3p.pid)
sleep 1
sudo ip link set up nc0
sudo ip addr add 172.19.152.2/31 dev nc0
ps $(cat mpbl3p.pid)
''').format(
local_host=self.get_interfaces()[0].get_address(),
**self.setup_params,
)
def speedtest_server(self):
self.ssh('iperf3 -s -1 -D', error_stdout=True, error_stderr=True)
def speedtest_client(self, target, time=30):
command = 'iperf3 -c {target} -t {time} -O 5 -J'.format(target=target, time=time)
out = self.ssh(command, error_stdout=True, error_stderr=True, return_stdout=True)
return json.loads(out)
class LocalPortal(Node):
def __init__(self, wan_interfaces: List[Interface], child: Optional[Node], **kwargs):
if child is not None:
lan_interface = Interface(IpMethod.Manual)
Bridge(lan_interface, child.get_interfaces()[0])
super().__init__([*wan_interfaces, lan_interface], **kwargs)
else:
super().__init__(wan_interfaces, **kwargs)
self.remote_portal = None
def set_remote_portal(self, remote_portal):
self.remote_portal = remote_portal
def get_internet_setup(self) -> Optional[str]:
return textwrap.dedent('''
set -e
wget -q http://10.20.0.11/minio-client
chmod +x minio-client
./minio-client alias set s3 http://10.20.0.25:3900 {access_key} {secret_key} || \
./minio-client alias set s3 s3.us-west-001.backblazeb2.com {access_key} {secret_key}
./minio-client cp s3/dissertation/binaries/debian/{branch} mpbl3p
chmod +x mpbl3p
cloud-init status --wait || cloud-init status --long
sudo apt-get install -y iperf3
''').format(**self.setup_params)
def get_setup(self) -> str:
peer_string = textwrap.dedent('''
[Peer]
PublicKey = INVALID
Method = TCP
LocalHost = {local_host}
RemoteHost = {remote_host}
RemotePort = 1234
''')
peers = '\n\n'.join([peer_string.format(
local_host=x.get_address(),
remote_host=self.remote_portal.get_interfaces()[0].get_address(),
) for x in self.get_interfaces()[:-1]])
policy_routing_string = textwrap.dedent('''
sudo ip route flush {table_number}
sudo ip route add table {table_number} to {network} dev {device}
sudo ip rule add from {local_address} table {table_number} priority {table_number}
''')
policy_routing = '\n\n'.join([policy_routing_string.format(
table_number=i+10,
device='eth{}'.format(i),
network=iface.get_bridge().get_network(),
local_address=iface.get_address(),
) for i, iface in enumerate(self.get_interfaces()[:-1])])
return textwrap.dedent('''
set -e
sudo sysctl -w net.ipv4.conf.all.arp_announce=1
sudo sysctl -w net.ipv4.conf.all.arp_ignore=2
{policy_routing}
cat << EOF > config.ini
[Host]
PrivateKey = INVALID
{peers}
EOF
(nohup sudo ./mpbl3p > mpbl3p.log 2>&1 & echo $! > mpbl3p.pid)
sleep 1
sudo ip link set up nc0
sudo ip addr add 172.19.152.3/31 dev nc0
ps $(cat mpbl3p.pid)
''').format(**self.setup_params, peers=peers, policy_routing=policy_routing)
def speedtest_server(self):
self.ssh('iperf3 -s -1 -D', error_stdout=True, error_stderr=True)
def speedtest_client(self, target, time=30):
command = 'iperf3 -c {target} -t {time} -O 5 -J'.format(target=target, time=time)
out = self.ssh(command, error_stdout=True, error_stderr=True, return_stdout=True)
return json.loads(out)