# This file is part of naxalnet. # Copyright (C) 2021 The naxalnet Authors # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with this program. If not, see . """ network.py ---------- This submodule manages the systemd-networkd configuration. TODO: Add more details """ from pathlib import Path from dasbus.connection import SystemMessageBus NETWORKD_BUS = "org.freedesktop.network1" NETWORKD_PATH = "/org/freedesktop/network1" class NetworkD: """control systemd-networkd""" def __init__(self, runtime_dir="/run/systemd/network", bus=SystemMessageBus()): self._bus = bus self.proxy_reload() self.variables = {} self.runtime_path = Path(runtime_dir) # Create the runtime directory if it doesn't exist self.runtime_path.mkdir(parents=True, exist_ok=True) def set_vars(self, **variables): """set the variables to replace with str.format""" self.variables = variables def proxy_reload(self) -> None: """reload the proxy""" self.proxy = self._bus.get_proxy(NETWORKD_BUS, NETWORKD_PATH) def reload(self) -> None: """reload the systemd-networkd configuration""" self.proxy.Reload() def add_config(self, name: str) -> None: """add config file to runtime directory and reload networkd""" source = Path(name) destination = self.runtime_path / source.name # Substitute variables in the config text = source.read_text(encoding="utf-8").format(**self.variables) # now write it to a runtime config of the same name destination.write_text(text, encoding="utf-8") self.reload() def is_routable(self) -> bool: """returns true if any interface is routable""" return self.proxy.AddressState == "routable" def delete_interface(self, name: str) -> None: """delete the given interface""" def remove_config(self, name: str) -> None: """ remove the file called 'name' from the runtime dir and reload """ path = self.runtime_path / name path.unlink() self.reload() def remove_all_configs(self) -> None: """ Remove all configs in runtime_path. This will remove all files in runtime_path without checking who put them there. """ for i in self.runtime_path.iterdir(): self.remove_config(i.name)