import concurrent.futures import ipaddress import os import re import time from datetime import datetime from typing import Callable, List, Tuple, Optional, Union from urllib.parse import quote import proxmoxer import paramiko import structure import structure.generic def check_env(*names: str) -> bool: for name in names: if name not in os.environ: return False return True def bridge_node_search( first: structure.generic.Bridge, bridge_name_generator: Callable[[structure.generic.Bridge], str], node_id_generator: Callable[[structure.generic.Node], int], ) -> Tuple[List[structure.generic.Bridge], List[structure.generic.Node]]: bridges: List[structure.generic.Bridge] = [] nodes: List[structure.generic.Node] = [] queue: List[structure.generic.Bridge] = [first] while len(queue) > 0: bridge = queue.pop() if bridge.get_name() != '': continue bridges.append(bridge) bridge.set_name(bridge_name_generator(bridge)) # from this bridge, find all nodes (via all interfaces) reachable_nodes: List[structure.generic.Node] = [] for interface in bridge.get_interfaces(): node = interface.get_node() if node.get_id() is not None: continue node.set_id(node_id_generator(node)) reachable_nodes.append(node) nodes.append(node) # from each node, find all bridges (via all interfaces) for node in reachable_nodes: for interface in node.get_interfaces(): bridge = interface.get_bridge() if bridge is not None and bridge.get_name() == '': queue.append(bridge) return bridges, nodes class PrintRunner: def __init__(self): self._last_bridge: int = 0 self._last_node_id = 0 def build(self, bridge: structure.generic.Bridge): bridges, nodes = bridge_node_search(bridge, lambda _: self.name_bridge(), lambda _: self.id_node()) print(bridges) print(nodes) def teardown(self): pass def name_bridge(self) -> str: self._last_bridge += 1 return 'fake{}'.format(self._last_bridge) def id_node(self) -> int: self._last_node_id += 1 return self._last_node_id class ProxmoxRunner: ssh_timeout = 300 def __init__( self, host: str, node: str, user: str, token_name: str, token_value: str, template_id: int, initial_vm_id: int, internet_bridge: str, management_bridge: str, management_initial_ip: ipaddress, management_netmask: int = 24, verify_ssl: bool = False, ): self._last_node_id = 0 self._created_nodes: List[int] = [] self._last_bridge = 0 self._created_bridges: List[str] = [] self._proxmox = proxmoxer.ProxmoxAPI( host, user=user, token_name=token_name, token_value=token_value, verify_ssl=verify_ssl, ) self._proxmox_node = node self._template_id = template_id self._initial_vm_id = initial_vm_id self._internet_bridge = structure.generic.Bridge() self._internet_bridge.set_name(internet_bridge) self._management_bridge = structure.generic.Bridge() self._management_bridge.set_name(management_bridge) self._management_initial_ip = management_initial_ip self._management_netmask = management_netmask # generate a single use SSH key (we can use any with Proxmox) self._private_key = paramiko.RSAKey.generate(3072) self._client = paramiko.SSHClient() def build(self, bridge: structure.generic.Bridge): bridges, nodes = bridge_node_search(bridge, lambda x: self._create_bridge(x), lambda x: self._create_node(x)) self._build_bridges(bridges) with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: build_futures = [executor.submit(self._build_node, node) for node in nodes] for future in build_futures: future.result(300) # guarantee that setup is not called until all of the nodes are built # this means that all will have their final IPs by this point setup_futures = [executor.submit(self._setup_node, node) for node in nodes] for future in setup_futures: future.result(300) def _await_task(self, upid, timeout=10): t1 = datetime.now() while (datetime.now() - t1).seconds < timeout: if self._proxmox.nodes(self._proxmox_node).tasks(upid).status.get()['status'] == 'stopped': return raise TimeoutError def _create_bridge(self, bridge: structure.generic.Bridge) -> str: while True: try: self._proxmox.nodes(self._proxmox_node).network.post( iface='vmbr{}'.format(self._last_bridge), type='bridge', autostart=1, comments='Automatically created by Python evaluation', ) self._last_bridge += 1 break except proxmoxer.core.ResourceException as e: if 'interface already exists' in str(e): self._last_bridge += 1 else: raise e bridge_name = 'vmbr{}'.format(self._last_bridge - 1) self._created_bridges.append(bridge_name) return bridge_name def _build_bridges(self, bridges: List[structure.generic.Bridge]): network_task = self._proxmox.nodes(self._proxmox_node).network.put() self._await_task(network_task) existing = [] for bridge in bridges: while True in [ipaddress.ip_network(bridge.get_network()).overlaps(x) for x in existing]: bridge.new_network() existing.append(bridge.get_network()) def _create_node(self, node: structure.generic.Node) -> int: while True: try: clone_task = self._proxmox.nodes(self._proxmox_node).qemu(self._template_id).clone.post( newid=self._initial_vm_id + self._last_node_id, name='Diss-{}-Testing'.format(node.__class__.__name__), ) self._last_node_id += 1 break except proxmoxer.core.ResourceException as e: if 'config file already exists' in str(e): self._last_node_id += 1 else: raise e self._await_task(clone_task) new_id = self._initial_vm_id + self._last_node_id self._created_nodes.append(new_id - 1) return new_id - 1 def _open_ssh(self, node: structure.generic.Node, interface: structure.generic.Interface = None): if interface is None: for iface in node.get_interfaces(): if iface.get_method() == structure.generic.IpMethod.Management: interface = iface break if interface is None: raise RuntimeError('no management interface available') client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy) t1 = datetime.now() while (datetime.now() - t1).seconds < ProxmoxRunner.ssh_timeout: try: client.connect( hostname=str(interface.get_address()), username='python', pkey=self._private_key, banner_timeout=15, ) client.set_missing_host_key_policy(paramiko.RejectPolicy) break except (paramiko.ssh_exception.AuthenticationException, paramiko.ssh_exception.NoValidConnectionsError): time.sleep(10) node.ssh_client = client def _close_ssh(self, node: structure.generic.Node): node.ssh_client.close() del node.ssh_client def ssh( self, node: structure.generic.Node, command: str, error_stderr=False, error_stdout=False, return_stdout=False, ) -> Union[int, str]: chan = node.ssh_client.get_transport().open_session() chan.exec_command(command) exit_status = chan.recv_exit_status() if exit_status != 0: if error_stderr and error_stdout: raise Exception( 'stdout:\n{}\n\nstderr:\n{}\n'.format(chan.recv(2048).decode(), chan.recv_stderr(2048).decode())) if error_stderr: raise Exception(chan.recv_stderr(2048).decode()) if error_stdout: raise Exception(chan.recv(2048).decode()) if return_stdout is not False: if return_stdout is True: return chan.makefile().read() else: return chan.recv(return_stdout).decode() return exit_status def _build_node(self, node: structure.generic.Node): # Step 1: Configure access self._proxmox.nodes(self._proxmox_node).qemu(node.get_id()).config.put( ciuser='python', sshkeys=quote('ssh-rsa ' + self._private_key.get_base64(), ''), cores=node.get_core_count(), sockets=1, memory=node.get_memory_mb(), ) # Step 2: connect to Internet bridge with DHCP to install packages if node.get_internet_setup() is not None: interfaces = node.get_interfaces() internet_interface = structure.generic.Interface(structure.generic.IpMethod.Dhcp4) internet_interface.set_bridge(self._internet_bridge) temp_interfaces = [internet_interface, interfaces[len(interfaces) - 1]] self._setup_node_interfaces(node, temp_interfaces) start_task = self._proxmox.nodes(self._proxmox_node).qemu(node.get_id()).status.start.post() self._await_task(start_task) self._open_ssh(node) self.ssh(node, node.get_internet_setup(), error_stdout=True, error_stderr=True) self._close_ssh(node) stop_task = self._proxmox.nodes(self._proxmox_node).qemu(node.get_id()).status.shutdown.post() self._await_task(stop_task, timeout=20) # Step 3: connect to management bridge for final setup self._setup_node_interfaces(node) start_task = self._proxmox.nodes(self._proxmox_node).qemu(node.get_id()).status.start.post() self._await_task(start_task, timeout=20) self._open_ssh(node) node.ssh = (lambda n: lambda *args, **kwargs: self.ssh(n, *args, **kwargs))(node) def _setup_node_interfaces(self, node: structure.generic.Node, interfaces: List[structure.generic.Interface] = None): if interfaces is None: interfaces = node.get_interfaces() kwargs = dict() for i in range(len(interfaces)): interface = interfaces[i] method = interface.get_method() if method == structure.generic.IpMethod.Management: interface.set_bridge(self._management_bridge) addr = self._management_initial_ip + node.get_id() - self._initial_vm_id kwargs['ipconfig{}'.format(i)] = 'ip={}/{}'.format(addr, self._management_netmask) interface.set_address(addr) elif method == structure.generic.IpMethod.Auto4 or method == structure.generic.IpMethod.Manual: # handle manual the same as auto4 so it doesn't get stuck in DHCP bridge = interface.get_bridge() addr = bridge.get_ip_address() kwargs['ipconfig{}'.format(i)] = 'ip={}/{}'.format(addr, bridge.netmask) interface.set_address(addr) elif method == structure.generic.IpMethod.Dhcp4: kwargs['ipconfig{}'.format(i)] = 'ip=dhcp' else: raise RuntimeError('not implemented') kwargs['net{}'.format(i)] = 'model=virtio,bridge={}'.format(interface.get_bridge().get_name()) if interface.get_rate() is not None: kwargs['net{}'.format(i)] += ',rate={}'.format(interface.get_rate()) def interface_set_rate(iface): def new_set_rate(rate: Optional[int]): structure.generic.Interface.set_rate(iface, rate) self._update_node_interfaces(node) return new_set_rate interface.set_rate = interface_set_rate(interface) self._proxmox.nodes(self._proxmox_node).qemu(node.get_id()).config.put(**kwargs) def _update_node_interfaces(self, node: structure.generic.Node): interfaces = node.get_interfaces() old_config = self._proxmox.nodes(self._proxmox_node).qemu(node.get_id()).config.get() old_digest = old_config['digest'] old_config = {k: v for (k, v) in old_config.items() if k[:3] == 'net'} rate_regex = re.compile(r',rate=(\d+(?:\.\d+)?)') new_config = {'digest': old_digest} for k, v in old_config.items(): index = int(k[3:]) iface = interfaces[index] new_config[k] = rate_regex.sub('', v) if iface.get_rate() is not None: new_config[k] += ',rate={}'.format(iface.get_rate()) self._proxmox.nodes(self._proxmox_node).qemu(node.get_id()).config.put(**new_config) def _setup_node(self, node: structure.generic.Node): if node.get_setup() is not None: cmd = node.get_setup() self.ssh(node, cmd, error_stdout=True, error_stderr=True) def teardown(self): for node in self._created_nodes: stop_task = self._proxmox.nodes(self._proxmox_node).qemu(node).status.stop.post() self._await_task(stop_task) delete_task = self._proxmox.nodes(self._proxmox_node).qemu(node).delete() self._await_task(delete_task) for bridge in self._created_bridges: self._proxmox.nodes(self._proxmox_node).network(bridge).delete() if len(self._created_bridges) > 0: network_task = self._proxmox.nodes(self._proxmox_node).network.put() self._await_task(network_task) self._created_nodes = [] self._last_node_id = 0 self._created_bridges = [] self._last_bridge = 0