dissertation-3-evaluation/runners/runners.py
2020-12-12 17:15:09 +00:00

396 lines
14 KiB
Python

import concurrent.futures
import ipaddress
import os
import re
import time
from datetime import datetime
from typing import Callable, List, Tuple, Optional, Union
from urllib.parse import quote
import proxmoxer
import paramiko
import structure
import structure.generic
def check_env(*names: str) -> bool:
for name in names:
if name not in os.environ:
return False
return True
def bridge_node_search(
first: structure.generic.Bridge,
bridge_name_generator: Callable[[structure.generic.Bridge], str],
node_id_generator: Callable[[structure.generic.Node], int],
) -> Tuple[List[structure.generic.Bridge], List[structure.generic.Node]]:
bridges: List[structure.generic.Bridge] = []
nodes: List[structure.generic.Node] = []
queue: List[structure.generic.Bridge] = [first]
while len(queue) > 0:
bridge = queue.pop()
if bridge.get_name() != '':
continue
bridges.append(bridge)
bridge.set_name(bridge_name_generator(bridge))
# from this bridge, find all nodes (via all interfaces)
reachable_nodes: List[structure.generic.Node] = []
for interface in bridge.get_interfaces():
node = interface.get_node()
if node.get_id() is not None:
continue
node.set_id(node_id_generator(node))
reachable_nodes.append(node)
nodes.append(node)
# from each node, find all bridges (via all interfaces)
for node in reachable_nodes:
for interface in node.get_interfaces():
bridge = interface.get_bridge()
if bridge is not None and bridge.get_name() == '':
queue.append(bridge)
return bridges, nodes
class PrintRunner:
def __init__(self):
self._last_bridge: int = 0
self._last_node_id = 0
def build(self, bridge: structure.generic.Bridge):
bridges, nodes = bridge_node_search(bridge, lambda _: self.name_bridge(), lambda _: self.id_node())
print(bridges)
print(nodes)
def teardown(self):
pass
def name_bridge(self) -> str:
self._last_bridge += 1
return 'fake{}'.format(self._last_bridge)
def id_node(self) -> int:
self._last_node_id += 1
return self._last_node_id
class ProxmoxRunner:
ssh_timeout = 300
def __init__(
self,
host: str,
node: str,
user: str,
token_name: str,
token_value: str,
template_id: int,
initial_vm_id: int,
internet_bridge: str,
management_bridge: str,
management_initial_ip: ipaddress,
management_netmask: int = 24,
verify_ssl: bool = False,
):
self._last_node_id = 0
self._created_nodes: List[int] = []
self._last_bridge = 0
self._created_bridges: List[str] = []
self._proxmox = proxmoxer.ProxmoxAPI(
host,
user=user,
token_name=token_name,
token_value=token_value,
verify_ssl=verify_ssl,
)
self._proxmox_node = node
self._template_id = template_id
self._initial_vm_id = initial_vm_id
self._internet_bridge = structure.generic.Bridge()
self._internet_bridge.set_name(internet_bridge)
self._management_bridge = structure.generic.Bridge()
self._management_bridge.set_name(management_bridge)
self._management_initial_ip = management_initial_ip
self._management_netmask = management_netmask
# generate a single use SSH key (we can use any with Proxmox)
self._private_key = paramiko.RSAKey.generate(3072)
self._client = paramiko.SSHClient()
def build(self, bridge: structure.generic.Bridge):
bridges, nodes = bridge_node_search(bridge, lambda x: self._create_bridge(x), lambda x: self._create_node(x))
self._build_bridges(bridges)
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
build_futures = [executor.submit(self._build_node, node) for node in nodes]
for future in build_futures:
future.result(300)
# guarantee that setup is not called until all of the nodes are built
# this means that all will have their final IPs by this point
setup_futures = [executor.submit(self._setup_node, node) for node in nodes]
for future in setup_futures:
future.result(300)
def _await_task(self, upid, timeout=10):
t1 = datetime.now()
while (datetime.now() - t1).seconds < timeout:
if self._proxmox.nodes(self._proxmox_node).tasks(upid).status.get()['status'] == 'stopped':
return
raise TimeoutError
def _create_bridge(self, bridge: structure.generic.Bridge) -> str:
while True:
try:
self._proxmox.nodes(self._proxmox_node).network.post(
iface='vmbr{}'.format(self._last_bridge),
type='bridge',
autostart=1,
comments='Automatically created by Python evaluation',
)
self._last_bridge += 1
break
except proxmoxer.core.ResourceException as e:
if 'interface already exists' in str(e):
self._last_bridge += 1
else:
raise e
bridge_name = 'vmbr{}'.format(self._last_bridge - 1)
self._created_bridges.append(bridge_name)
return bridge_name
def _build_bridges(self, bridges: List[structure.generic.Bridge]):
network_task = self._proxmox.nodes(self._proxmox_node).network.put()
self._await_task(network_task)
existing = []
for bridge in bridges:
while True in [ipaddress.ip_network(bridge.get_network()).overlaps(x) for x in existing]:
bridge.new_network()
existing.append(bridge.get_network())
def _create_node(self, node: structure.generic.Node) -> int:
while True:
try:
clone_task = self._proxmox.nodes(self._proxmox_node).qemu(self._template_id).clone.post(
newid=self._initial_vm_id + self._last_node_id,
name='Diss-{}-Testing'.format(node.__class__.__name__),
)
self._last_node_id += 1
break
except proxmoxer.core.ResourceException as e:
if 'config file already exists' in str(e):
self._last_node_id += 1
else:
raise e
self._await_task(clone_task)
new_id = self._initial_vm_id + self._last_node_id
self._created_nodes.append(new_id - 1)
return new_id - 1
def _open_ssh(self, node: structure.generic.Node, interface: structure.generic.Interface = None):
if interface is None:
for iface in node.get_interfaces():
if iface.get_method() == structure.generic.IpMethod.Management:
interface = iface
break
if interface is None:
raise RuntimeError('no management interface available')
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy)
t1 = datetime.now()
while (datetime.now() - t1).seconds < ProxmoxRunner.ssh_timeout:
try:
client.connect(
hostname=str(interface.get_address()),
username='python',
pkey=self._private_key,
banner_timeout=15,
)
client.set_missing_host_key_policy(paramiko.RejectPolicy)
break
except (paramiko.ssh_exception.AuthenticationException, paramiko.ssh_exception.NoValidConnectionsError):
time.sleep(10)
node.ssh_client = client
def _close_ssh(self, node: structure.generic.Node):
node.ssh_client.close()
del node.ssh_client
def ssh(
self,
node: structure.generic.Node,
command: str,
error_stderr=False,
error_stdout=False,
return_stdout=False,
) -> Union[int, str]:
chan = node.ssh_client.get_transport().open_session()
chan.exec_command(command)
exit_status = chan.recv_exit_status()
if exit_status != 0:
if error_stderr and error_stdout:
raise Exception(
'stdout:\n{}\n\nstderr:\n{}\n'.format(chan.recv(2048).decode(), chan.recv_stderr(2048).decode()))
if error_stderr:
raise Exception(chan.recv_stderr(2048).decode())
if error_stdout:
raise Exception(chan.recv(2048).decode())
if return_stdout is not False:
if return_stdout is True:
return chan.makefile().read()
else:
return chan.recv(return_stdout).decode()
return exit_status
def _build_node(self, node: structure.generic.Node):
# Step 1: Configure access
self._proxmox.nodes(self._proxmox_node).qemu(node.get_id()).config.put(
ciuser='python',
sshkeys=quote('ssh-rsa ' + self._private_key.get_base64(), ''),
cores=node.get_core_count(),
sockets=1,
memory=node.get_memory_mb(),
)
# Step 2: connect to Internet bridge with DHCP to install packages
if node.get_internet_setup() is not None:
interfaces = node.get_interfaces()
internet_interface = structure.generic.Interface(structure.generic.IpMethod.Dhcp4)
internet_interface.set_bridge(self._internet_bridge)
temp_interfaces = [internet_interface, interfaces[len(interfaces) - 1]]
self._setup_node_interfaces(node, temp_interfaces)
start_task = self._proxmox.nodes(self._proxmox_node).qemu(node.get_id()).status.start.post()
self._await_task(start_task)
self._open_ssh(node)
self.ssh(node, node.get_internet_setup(), error_stdout=True, error_stderr=True)
self._close_ssh(node)
stop_task = self._proxmox.nodes(self._proxmox_node).qemu(node.get_id()).status.shutdown.post()
self._await_task(stop_task, timeout=20)
# Step 3: connect to management bridge for final setup
self._setup_node_interfaces(node)
start_task = self._proxmox.nodes(self._proxmox_node).qemu(node.get_id()).status.start.post()
self._await_task(start_task, timeout=20)
self._open_ssh(node)
node.ssh = (lambda n: lambda *args, **kwargs: self.ssh(n, *args, **kwargs))(node)
def _setup_node_interfaces(self, node: structure.generic.Node, interfaces: List[structure.generic.Interface] = None):
if interfaces is None:
interfaces = node.get_interfaces()
kwargs = dict()
for i in range(len(interfaces)):
interface = interfaces[i]
method = interface.get_method()
if method == structure.generic.IpMethod.Management:
interface.set_bridge(self._management_bridge)
addr = self._management_initial_ip + node.get_id() - self._initial_vm_id
kwargs['ipconfig{}'.format(i)] = 'ip={}/{}'.format(addr, self._management_netmask)
interface.set_address(addr)
elif method == structure.generic.IpMethod.Auto4 or method == structure.generic.IpMethod.Manual:
# handle manual the same as auto4 so it doesn't get stuck in DHCP
bridge = interface.get_bridge()
addr = bridge.get_ip_address()
kwargs['ipconfig{}'.format(i)] = 'ip={}/{}'.format(addr, bridge.netmask)
interface.set_address(addr)
elif method == structure.generic.IpMethod.Dhcp4:
kwargs['ipconfig{}'.format(i)] = 'ip=dhcp'
else:
raise RuntimeError('not implemented')
kwargs['net{}'.format(i)] = 'model=virtio,bridge={}'.format(interface.get_bridge().get_name())
if interface.get_rate() is not None:
kwargs['net{}'.format(i)] += ',rate={}'.format(interface.get_rate())
def interface_set_rate(iface):
def new_set_rate(rate: Optional[int]):
structure.generic.Interface.set_rate(iface, rate)
self._update_node_interfaces(node)
return new_set_rate
interface.set_rate = interface_set_rate(interface)
self._proxmox.nodes(self._proxmox_node).qemu(node.get_id()).config.put(**kwargs)
def _update_node_interfaces(self, node: structure.generic.Node):
interfaces = node.get_interfaces()
old_config = self._proxmox.nodes(self._proxmox_node).qemu(node.get_id()).config.get()
old_digest = old_config['digest']
old_config = {k: v for (k, v) in old_config.items() if k[:3] == 'net'}
rate_regex = re.compile(r',rate=(\d+(?:\.\d+)?)')
new_config = {'digest': old_digest}
for k, v in old_config.items():
index = int(k[3:])
iface = interfaces[index]
new_config[k] = rate_regex.sub('', v)
if iface.get_rate() is not None:
new_config[k] += ',rate={}'.format(iface.get_rate())
self._proxmox.nodes(self._proxmox_node).qemu(node.get_id()).config.put(**new_config)
def _setup_node(self, node: structure.generic.Node):
if node.get_setup() is not None:
cmd = node.get_setup()
self.ssh(node, cmd, error_stdout=True, error_stderr=True)
def teardown(self):
for node in self._created_nodes:
stop_task = self._proxmox.nodes(self._proxmox_node).qemu(node).status.stop.post()
self._await_task(stop_task)
delete_task = self._proxmox.nodes(self._proxmox_node).qemu(node).delete()
self._await_task(delete_task)
for bridge in self._created_bridges:
self._proxmox.nodes(self._proxmox_node).network(bridge).delete()
if len(self._created_bridges) > 0:
network_task = self._proxmox.nodes(self._proxmox_node).network.put()
self._await_task(network_task)
self._created_nodes = []
self._last_node_id = 0
self._created_bridges = []
self._last_bridge = 0