mirror of
https://git.disroot.org/pranav/pybatmesh.git
synced 2025-01-09 16:27:53 +05:30
added class to wait until routable
I haven't tested this properly yet
This commit is contained in:
parent
28de9570b5
commit
fbf518e14c
@ -30,6 +30,7 @@ examples.
|
|||||||
import subprocess
|
import subprocess
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from dasbus.connection import SystemMessageBus
|
from dasbus.connection import SystemMessageBus
|
||||||
|
from dasbus.loop import EventLoop
|
||||||
|
|
||||||
|
|
||||||
NETWORKD_BUS = "org.freedesktop.network1"
|
NETWORKD_BUS = "org.freedesktop.network1"
|
||||||
@ -118,3 +119,54 @@ class NetworkD:
|
|||||||
"""
|
"""
|
||||||
for i in self.runtime_path.iterdir():
|
for i in self.runtime_path.iterdir():
|
||||||
self.remove_config(i.name)
|
self.remove_config(i.name)
|
||||||
|
|
||||||
|
|
||||||
|
class NetworkLoop(NetworkD):
|
||||||
|
"""Used to wait until a condition is met
|
||||||
|
|
||||||
|
Available methods:
|
||||||
|
|
||||||
|
NetworkLoop.wait_until_routable(timeout=0):
|
||||||
|
return true when the network is routable, or false when timed out
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
# first, initialise the parent object
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
self.waitfor = None
|
||||||
|
self.wait_function = None
|
||||||
|
self.loop = EventLoop()
|
||||||
|
|
||||||
|
def start_loop(self):
|
||||||
|
"""start the dasbus loop"""
|
||||||
|
self.proxy.PropertiesChanged.connect(self.on_properties_changed)
|
||||||
|
self.loop.run()
|
||||||
|
|
||||||
|
def wait_until_routable(self, timeout=0):
|
||||||
|
"""
|
||||||
|
wait until timeout in milliseconds and returns True when any
|
||||||
|
network interface is shown routable by networkd
|
||||||
|
"""
|
||||||
|
self.wait_for_change("AddressState", self.on_addressstate_change)
|
||||||
|
return self.is_routable()
|
||||||
|
|
||||||
|
def wait_for_change(self, name, function):
|
||||||
|
"""used by the public functions"""
|
||||||
|
self.waitfor = name
|
||||||
|
self.wait_function = function
|
||||||
|
self.start_loop()
|
||||||
|
|
||||||
|
def on_addressstate_change(self):
|
||||||
|
"""quit the loop if the network is routable"""
|
||||||
|
if self.is_routable():
|
||||||
|
self.loop.quit()
|
||||||
|
|
||||||
|
def on_properties_changed(self, bus_interface, data, blah):
|
||||||
|
"""give this function some documentation"""
|
||||||
|
if self.waitfor in data:
|
||||||
|
return self.wait_function()
|
||||||
|
|
||||||
|
def on_timeout(self):
|
||||||
|
"""called by dasbus when a timeout occurs"""
|
||||||
|
self.loop.quit()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user