dissertation-3-evaluation/runners/runners.py

394 lines
14 KiB
Python
Raw Normal View History

2020-11-05 17:25:23 +00:00
import concurrent.futures
import ipaddress
import os
2020-11-05 17:25:23 +00:00
import re
2020-11-04 22:53:36 +00:00
import time
from datetime import datetime
2020-11-05 17:25:23 +00:00
from typing import Callable, List, Tuple, Optional, Union
2020-11-04 22:53:36 +00:00
from urllib.parse import quote
import proxmoxer
2020-11-04 22:53:36 +00:00
import paramiko
import structure
def check_env(*names: str) -> bool:
for name in names:
if name not in os.environ:
return False
return True
2020-11-04 22:53:36 +00:00
def bridge_node_search(
first: structure.Bridge,
bridge_name_generator: Callable[[structure.Bridge], str],
node_id_generator: Callable[[structure.Node], int],
) -> Tuple[List[structure.Bridge], List[structure.Node]]:
bridges: List[structure.Bridge] = []
nodes: List[structure.Node] = []
queue: List[structure.Bridge] = [first]
while len(queue) > 0:
bridge = queue.pop()
if bridge.get_name() != '':
continue
bridges.append(bridge)
bridge.set_name(bridge_name_generator(bridge))
# from this bridge, find all nodes (via all interfaces)
reachable_nodes: List[structure.Node] = []
for interface in bridge.get_interfaces():
node = interface.get_node()
if node.get_id() is not None:
continue
node.set_id(node_id_generator(node))
reachable_nodes.append(node)
nodes.append(node)
# from each node, find all bridges (via all interfaces)
for node in reachable_nodes:
for interface in node.get_interfaces():
bridge = interface.get_bridge()
if bridge is not None and bridge.get_name() == '':
queue.append(bridge)
return bridges, nodes
class PrintRunner:
def __init__(self):
self._last_bridge: int = 0
self._last_node_id = 0
def build(self, bridge: structure.Bridge):
2020-11-04 22:53:36 +00:00
bridges, nodes = bridge_node_search(bridge, lambda _: self.name_bridge(), lambda _: self.id_node())
print(bridges)
print(nodes)
def teardown(self):
pass
def name_bridge(self) -> str:
self._last_bridge += 1
return 'fake{}'.format(self._last_bridge)
def id_node(self) -> int:
self._last_node_id += 1
return self._last_node_id
class ProxmoxRunner:
2020-11-04 22:53:36 +00:00
ssh_timeout = 300
def __init__(
self,
host: str,
node: str,
user: str,
token_name: str,
token_value: str,
template_id: int,
initial_vm_id: int,
internet_bridge: str,
management_bridge: str,
management_initial_ip: ipaddress,
2020-11-05 17:25:23 +00:00
management_netmask: int = 24,
verify_ssl: bool = False,
):
self._last_node_id = 0
self._created_nodes: List[int] = []
self._last_bridge = 0
self._created_bridges: List[str] = []
self._proxmox = proxmoxer.ProxmoxAPI(
host,
user=user,
token_name=token_name,
token_value=token_value,
verify_ssl=verify_ssl,
)
self._proxmox_node = node
self._template_id = template_id
2020-11-04 22:53:36 +00:00
self._initial_vm_id = initial_vm_id
2020-11-04 22:53:36 +00:00
self._internet_bridge = structure.Bridge()
self._internet_bridge.set_name(internet_bridge)
2020-11-04 22:53:36 +00:00
self._management_bridge = structure.Bridge()
self._management_bridge.set_name(management_bridge)
self._management_initial_ip = management_initial_ip
2020-11-05 17:25:23 +00:00
self._management_netmask = management_netmask
2020-11-04 22:53:36 +00:00
# generate a single use SSH key (we can use any with Proxmox)
self._private_key = paramiko.RSAKey.generate(3072)
self._client = paramiko.SSHClient()
def build(self, bridge: structure.Bridge):
2020-11-04 22:53:36 +00:00
bridges, nodes = bridge_node_search(bridge, lambda x: self._create_bridge(x), lambda x: self._create_node(x))
2020-11-14 12:54:50 +00:00
self._build_bridges(bridges)
2020-11-04 22:53:36 +00:00
2020-11-05 17:25:23 +00:00
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
build_futures = [executor.submit(self._build_node, node) for node in nodes]
for future in build_futures:
future.result()
2020-11-05 17:25:23 +00:00
# guarantee that setup is not called until all of the nodes are built
# this means that all will have their final IPs by this point
setup_futures = [executor.submit(self._setup_node, node) for node in nodes]
for future in setup_futures:
future.result()
2020-11-04 22:53:36 +00:00
def _await_task(self, upid, timeout=10):
t1 = datetime.now()
while (datetime.now() - t1).seconds < timeout:
if self._proxmox.nodes(self._proxmox_node).tasks(upid).status.get()['status'] == 'stopped':
return
raise TimeoutError
def _create_bridge(self, bridge: structure.Bridge) -> str:
while True:
try:
self._proxmox.nodes(self._proxmox_node).network.post(
iface='vmbr{}'.format(self._last_bridge),
type='bridge',
autostart=1,
comments='Automatically created by Python evaluation',
)
2020-11-04 22:53:36 +00:00
self._last_bridge += 1
break
except proxmoxer.core.ResourceException as e:
if 'interface already exists' in str(e):
self._last_bridge += 1
else:
raise e
2020-11-04 22:53:36 +00:00
bridge_name = 'vmbr{}'.format(self._last_bridge - 1)
self._created_bridges.append(bridge_name)
return bridge_name
2020-11-14 12:54:50 +00:00
def _build_bridges(self, bridges: List[structure.Bridge]):
network_task = self._proxmox.nodes(self._proxmox_node).network.put()
self._await_task(network_task)
2020-11-14 12:54:50 +00:00
existing = []
for bridge in bridges:
while True in [ipaddress.ip_network(bridge.get_network()).overlaps(x) for x in existing]:
bridge.new_network()
existing.append(bridge.get_network())
def _create_node(self, node: structure.Node) -> int:
while True:
try:
clone_task = self._proxmox.nodes(self._proxmox_node).qemu(self._template_id).clone.post(
newid=self._initial_vm_id + self._last_node_id,
name='Diss-{}-Testing'.format(node.__class__.__name__),
)
2020-11-04 22:53:36 +00:00
self._last_node_id += 1
break
except proxmoxer.core.ResourceException as e:
if 'config file already exists' in str(e):
self._last_node_id += 1
else:
raise e
self._await_task(clone_task)
new_id = self._initial_vm_id + self._last_node_id
2020-11-04 22:53:36 +00:00
self._created_nodes.append(new_id - 1)
return new_id - 1
def _open_ssh(self, node: structure.Node, interface: structure.Interface = None):
if interface is None:
for iface in node.get_interfaces():
if iface.get_method() == structure.IpMethod.Management:
interface = iface
break
if interface is None:
raise RuntimeError('no management interface available')
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy)
t1 = datetime.now()
while (datetime.now() - t1).seconds < ProxmoxRunner.ssh_timeout:
try:
client.connect(
hostname=str(interface.get_address()),
username='python',
pkey=self._private_key,
banner_timeout=15,
)
client.set_missing_host_key_policy(paramiko.RejectPolicy)
break
except (paramiko.ssh_exception.AuthenticationException, paramiko.ssh_exception.NoValidConnectionsError):
time.sleep(10)
2020-11-09 14:18:23 +00:00
node.ssh_client = client
2020-11-04 22:53:36 +00:00
def _close_ssh(self, node: structure.Node):
2020-11-09 14:18:23 +00:00
node.ssh_client.close()
del node.ssh_client
2020-11-04 22:53:36 +00:00
2020-11-05 17:25:23 +00:00
def ssh(
self,
node: structure.Node,
command: str,
error_stderr=False,
error_stdout=False,
return_stdout=False,
) -> Union[int, str]:
2020-11-09 14:18:23 +00:00
chan = node.ssh_client.get_transport().open_session()
2020-11-04 22:53:36 +00:00
chan.exec_command(command)
exit_status = chan.recv_exit_status()
if exit_status != 0:
if error_stderr and error_stdout:
raise Exception(
'stdout:\n{}\n\nstderr:\n{}\n'.format(chan.recv(2048).decode(), chan.recv_stderr(2048).decode()))
if error_stderr:
raise Exception(chan.recv_stderr(2048).decode())
if error_stdout:
raise Exception(chan.recv(2048).decode())
2020-11-05 17:25:23 +00:00
if return_stdout is not False:
if return_stdout is True:
return chan.makefile().read()
else:
return chan.recv(return_stdout).decode()
2020-11-04 22:53:36 +00:00
return exit_status
def _build_node(self, node: structure.Node):
2020-11-04 22:53:36 +00:00
# Step 1: Configure access
self._proxmox.nodes(self._proxmox_node).qemu(node.get_id()).config.put(
ciuser='python',
sshkeys=quote('ssh-rsa ' + self._private_key.get_base64(), ''),
cores=node.get_core_count(),
sockets=1,
memory=node.get_memory_mb(),
)
# Step 2: connect to Internet bridge with DHCP to install packages
if node.get_internet_setup() is not None:
interfaces = node.get_interfaces()
internet_interface = structure.Interface(structure.IpMethod.Dhcp4)
internet_interface.set_bridge(self._internet_bridge)
2020-11-05 17:25:23 +00:00
temp_interfaces = [internet_interface, interfaces[len(interfaces) - 1]]
2020-11-04 22:53:36 +00:00
self._setup_node_interfaces(node, temp_interfaces)
start_task = self._proxmox.nodes(self._proxmox_node).qemu(node.get_id()).status.start.post()
self._await_task(start_task)
self._open_ssh(node)
self.ssh(node, node.get_internet_setup(), error_stdout=True, error_stderr=True)
self._close_ssh(node)
stop_task = self._proxmox.nodes(self._proxmox_node).qemu(node.get_id()).status.shutdown.post()
2020-11-05 17:25:23 +00:00
self._await_task(stop_task, timeout=20)
2020-11-04 22:53:36 +00:00
# Step 3: connect to management bridge for final setup
self._setup_node_interfaces(node)
start_task = self._proxmox.nodes(self._proxmox_node).qemu(node.get_id()).status.start.post()
2020-11-09 14:18:23 +00:00
self._await_task(start_task, timeout=20)
2020-11-04 22:53:36 +00:00
self._open_ssh(node)
2020-11-05 17:25:23 +00:00
node.ssh = (lambda n: lambda *args, **kwargs: self.ssh(n, *args, **kwargs))(node)
2020-11-04 22:53:36 +00:00
def _setup_node_interfaces(self, node: structure.Node, interfaces: List[structure.Interface] = None):
if interfaces is None:
interfaces = node.get_interfaces()
kwargs = dict()
for i in range(len(interfaces)):
interface = interfaces[i]
method = interface.get_method()
2020-11-09 14:18:23 +00:00
if method == structure.IpMethod.Management:
2020-11-04 22:53:36 +00:00
interface.set_bridge(self._management_bridge)
addr = self._management_initial_ip + node.get_id() - self._initial_vm_id
2020-11-05 17:25:23 +00:00
kwargs['ipconfig{}'.format(i)] = 'ip={}/{}'.format(addr, self._management_netmask)
2020-11-04 22:53:36 +00:00
interface.set_address(addr)
2020-11-09 14:18:23 +00:00
elif method == structure.IpMethod.Auto4 or method == structure.IpMethod.Manual:
# handle manual the same as auto4 so it doesn't get stuck in DHCP
2020-11-04 22:53:36 +00:00
bridge = interface.get_bridge()
addr = bridge.get_ip_address()
kwargs['ipconfig{}'.format(i)] = 'ip={}/{}'.format(addr, bridge.netmask)
interface.set_address(addr)
elif method == structure.IpMethod.Dhcp4:
kwargs['ipconfig{}'.format(i)] = 'ip=dhcp'
else:
raise RuntimeError('not implemented')
kwargs['net{}'.format(i)] = 'model=virtio,bridge={}'.format(interface.get_bridge().get_name())
2020-11-05 17:25:23 +00:00
if interface.get_rate() is not None:
kwargs['net{}'.format(i)] += ',rate={}'.format(interface.get_rate())
def interface_set_rate(iface):
def new_set_rate(rate: Optional[int]):
structure.Interface.set_rate(iface, rate)
self._update_node_interfaces(node)
return new_set_rate
interface.set_rate = interface_set_rate(interface)
2020-11-04 22:53:36 +00:00
self._proxmox.nodes(self._proxmox_node).qemu(node.get_id()).config.put(**kwargs)
2020-11-05 17:25:23 +00:00
def _update_node_interfaces(self, node: structure.Node):
interfaces = node.get_interfaces()
old_config = self._proxmox.nodes(self._proxmox_node).qemu(node.get_id()).config.get()
old_digest = old_config['digest']
old_config = {k: v for (k, v) in old_config.items() if k[:3] == 'net'}
rate_regex = re.compile(r',rate=(\d+(?:\.\d+)?)')
new_config = {'digest': old_digest}
for k, v in old_config.items():
index = int(k[3:])
iface = interfaces[index]
new_config[k] = rate_regex.sub('', v)
if iface.get_rate() is not None:
new_config[k] += ',rate={}'.format(iface.get_rate())
self._proxmox.nodes(self._proxmox_node).qemu(node.get_id()).config.put(**new_config)
2020-11-04 22:53:36 +00:00
def _setup_node(self, node: structure.Node):
if node.get_setup() is not None:
2020-11-09 14:18:23 +00:00
cmd = node.get_setup()
self.ssh(node, cmd, error_stdout=True, error_stderr=True)
def teardown(self):
for node in self._created_nodes:
stop_task = self._proxmox.nodes(self._proxmox_node).qemu(node).status.stop.post()
self._await_task(stop_task)
delete_task = self._proxmox.nodes(self._proxmox_node).qemu(node).delete()
self._await_task(delete_task)
for bridge in self._created_bridges:
self._proxmox.nodes(self._proxmox_node).network(bridge).delete()
2020-11-04 22:53:36 +00:00
network_task = self._proxmox.nodes(self._proxmox_node).network.put()
self._await_task(network_task)
self._created_nodes = []
self._last_node_id = 0
self._created_bridges = []
self._last_bridge = 0