Files
topologyviewer/easymesh.py
Joanna Lien e9e72a1bb9 PPM-2654 Fix crash issue with Haze
Signed-off-by: Frederik Van Bogaert <frederik.vanbogaert@mind.be>
2023-11-22 15:13:45 +01:00

718 lines
23 KiB
Python

# pylint: disable=line-too-long, invalid-name, too-many-instance-attributes, too-few-public-methods, too-many-arguments
"""
This module defines a set of classes to represent WiFi networks, devices and measurements.
Classes:
- Station: Represents a client or an access point connected to a WiFi network.
- BSS: Represents a Basic Service Set (BSS), which is a set of stations controlled by a single access point.
- UnassociatedStation: Represents a WiFi client that is not connected to any access point.
- UnassociatedStationRSSIMeasurement: Represents a measurement of signal strength from an unassociated station.
- Radio: Represents a WiFi radio that is capable of transmitting and receiving WiFi signals.
Enums:
- ORIENTATION: Represents the orientation of an object in 2D space.
Constants:
- mediaType_to_str: A dictionary that maps WiFi media types to their string representations.
Functions:
None
Note: The module requires Python 3.7 or higher.
"""
from typing import List
from enum import Enum
import hashlib
mediaType_to_str = dict([
(0x0, 'Fast Ethernet'),
(0x1, 'Gigabit Ethernet'),
(0x100, 'B 2.4GHz'),
(0x101, 'G 2.4GHz'),
(0x102, 'A 5 GHz'),
(0x103, 'N 2.4 GHz'),
(0x104, 'N 5 GHz'),
(0x105, 'AC 5 GHz'),
(0x106, 'AD 60 GHz'),
(0x107, 'AF'),
(0x108, 'AX'),
(0x200, 'IEEE_1901_WAVELET'),
(0x201, 'IEEE_1901_FFT'),
(0x300, 'MOCA_V1_1'),
(0xffff, 'UNKNOWN_MEDIA')])
class ORIENTATION(Enum):
"""Orientation of a given EasyMesh node in cartesian space.
"""
RIGHT = 0
UP = 1
DOWN = 2
class Station():
"""Class representing a Wi-Fi EasyMesh station.
"""
def __init__(self, path, params) -> None:
self.path = path
self.params = params
self.x = 0
self.y = 0
self.is_steered = False
def get_mac(self) -> str:
"""Get this station's MAC address
Returns:
str: The MAC address of this station.
"""
if 'MACAddress' in self.params:
return self.params['MACAddress']
return ''
def get_hash_mac(self) -> str:
"""Get the MD5 hash of this station's MAC address
Returns:
str: The MD5 hash of this station's MAC address.
"""
return hashlib.md5(self.get_mac().encode()).hexdigest()
def get_steered(self) -> bool:
"""Has this station been steered?
Returns:
bool: True if steered, False otherwise.
"""
return self.is_steered
def set_steered(self, steered: bool) -> None:
"""Set whether or not this station has been steered.
Args:
steered (bool): True if steered, False if not.
"""
self.is_steered = steered
def get_rssi(self) -> int:
"""
Get this station's last signal strength measurement.
Note: this returns the signal strength of this station relative to the Agent that it is connected to.
Returns:
int: The signal strength. -127 if the field is not present.
"""
if 'RSSI' in self.params:
return self.params['RSSI']
if 'SignalStrength' in self.params:
return self.params['SignalStrength']
if 'RCPI' in self.params:
return self.params['RCPI']
# Return -INT8_MAX, assuming dBm
return -127
class BSS():
"""Represents a Wi-Fi EasyMesh Basic Service Set
"""
def __init__(self, path, params) -> None:
self.path = path
self.params = params
self.interface = {}
self.connected_stations: List[Station] = []
self.connected_sta_key = 'STA(s)'
self.params[self.connected_sta_key] = []
def add_connected_station(self, station) -> None:
"""Adds a station to this BSS's list of connected stations.
Args:
station (Station): The station.
"""
self.connected_stations.append(station)
self.params[self.connected_sta_key].append(station.params)
def get_num_connected_stations(self) -> int:
"""Returns the number of stations connected to this BSS.
Returns:
int: The number of stations connected to this BSS.
"""
return len(self.connected_stations)
def get_connected_stations(self) -> List[Station]:
"""Return the list of connected stations.
Returns:
List[Station]: The list of stations connected to this BSS.
"""
return self.connected_stations
def get_bssid(self) -> str:
"""Get this BSS's BSSID.
Returns:
str: The BSSID.
"""
if 'BSSID' in self.params:
return self.params['BSSID']
return ''
def is_vbss(self) -> bool:
"""Is this BSS a Virtual BSS?
Returns:
bool: True if this is a VBSS, false otherwise.
"""
if 'IsVBSS' in self.params:
return self.params['IsVBSS']
return False
class UnassociatedStationRSSIMeasurement():
"""Represents a Wi-Fi EasyMesh Unassociated STA Link Metrics Response payload.
"""
def __init__(self, station: str, signal_strength: int, channel_number: int, timestamp: int, ruid: str):
self.station = station
self.signal_strength = signal_strength
self.channel_number = channel_number
self.timestamp = timestamp
self.ruid = ruid
def get_signal_strength(self) -> int:
"""Get the signal strength of this unassociated station RSSI measurement.
Returns:
int: The signal strength, in dBm
"""
return self.signal_strength
def get_channel_number(self) -> int:
"""Get the channel number this measurement was made on.
"""
return self.channel_number
def get_parent_ruid(self) -> str:
"""The ruid of the Radio that saw this measurement.
Returns:
str: The RUID
"""
return self.ruid
def get_timestamp(self) -> int:
"""Get the timestamp of this measurement. The timestamp is defined in the EasyMesh spec
as the time delta between the Agent requesting link metrics of a radio, and the radio
responding.
Returns:
int: The timestamp of this measurement.
"""
return self.timestamp
class UnassociatedStation():
"""Represents a Wi-Fi EasyMesh unassociated station.
"""
def __init__(self, path, params) -> None:
self.path = path
self.params = params
self.rssi_measurements: List[UnassociatedStationRSSIMeasurement] = []
self.parent_radio = None
def get_mac(self) -> str:
"""Get this unassociated station's MAC address.
Returns:
str: The MAC address of this unassociated station.
"""
if 'MACAddress' in self.params:
return self.params['MACAddress']
return ''
def set_parent_radio(self, radio) -> None:
"""Set the parent radio of this unassociated station. A parent radio is one which can
hear this station even though it is unassociated.
Args:
radio (Radio): The parent radio of this unassociated station.
"""
self.parent_radio = radio
def add_rssi_measurement(self, params) -> None:
"""Adds a link metrics measurement to this unassociated station object.
Args:
params (Dict[str]): The link metrics payload.
"""
rcpi = 0
timestamp = 0
ch_num = 0
if 'SignalStrength' in params:
rcpi = params['SignalStrength']
if 'Timestamp' in params:
timestamp = params['Timestamp']
if 'ChannelNumer' in params:
ch_num = params['ChannelNumber']
self.rssi_measurements.append(UnassociatedStationRSSIMeasurement(self.get_mac(), rcpi, ch_num, timestamp, self.parent_radio.get_ruid()))
def get_rssi_measurements(self):
"""Get the list of this unassociated stations link metrics measurements.
Returns:
List[UnassociatedStationRSSIMeasurement]: The list of link metrics measurements made for this station by it's parent radio.
"""
return self.rssi_measurements
def get_parent_radio(self):
"""Get the radio making unassociated link metrics measurements for this unassociated station.
Returns:
Radio: The parent Radio.
"""
return self.parent_radio
class Radio():
"""Represents a Wi-Fi EasyMesh radio.
"""
def __init__(self, path, params) -> None:
self.path = path
self.params = params
self.bsses: List[BSS] = []
self.bss_key = 'BSS'
self.params[self.bss_key] = []
self.unassociated_stations: List[UnassociatedStation] = []
def add_bss(self, bss) -> None:
"""Add a BSS to this radio's BSS list.
Args:
bss (BSS): The BSS to add.
"""
self.bsses.append(bss)
self.params[self.bss_key].append(bss.params)
def get_ruid(self) -> str:
"""Get the RUID of this radio.
Returns:
str: The RUID.
"""
if 'ID' in self.params:
return self.params['ID']
return ''
def get_bsses(self) -> List[BSS]:
"""Returns all BSSes on this radio.
Returns:
List[BSS]: A list of BSS objects.
"""
return self.bsses
def get_unassociated_stations(self) -> List[UnassociatedStation]:
"""Returns all unassociated stations this radio is listening to.
Returns:
List[UnassociatedStation]: A list of unassociated station objects.
"""
return self.unassociated_stations
def add_unassociated_station(self, station: UnassociatedStation) -> None:
"""Adds an unassociated station to this radio's list.
Args:
station (UnassociatedStation): The unassociated station to add.
"""
self.unassociated_stations.append(station)
def get_rssi_for_sta(self, sta: Station) -> int:
"""Gets the unassociated RSSI that this Radio has heard for a Station.
Args:
sta (Station): The station of interest
Raises:
ValueError: If the station is not being sniffed by this radio.
Returns:
int: The RSSI of the station relative to this radio.
"""
station = self.get_unassociated_station_by_mac(sta.get_mac())
if not station:
raise ValueError(f"Unknown station: {sta.get_mac()}")
return station.get_rssi()
def update_unassociated_sta(self, unassoc_sta: UnassociatedStation, params) -> None:
"""Updates an unassociated station's list of link metrics measurements.
Args:
unassoc_sta (UnassociatedStation): The unassociated station to update.
params (Dict[str]): The link metrics payload.
"""
sta = self.get_unassociated_station_by_mac(unassoc_sta.get_mac())
sta.add_rssi_measurement(params)
def get_unassociated_station_by_mac(self, mac: str) -> UnassociatedStation:
"""Get the unassociated Station object whose MAC is 'mac'
Args:
mac (str): The unassociated STA MAC of interest
Returns:
UnassociatedStation: The station if found, otherwise None.
"""
for station in self.unassociated_stations:
if station.get_mac() == mac:
return station
return None
class Neighbor():
"""Represents a Wi-Fi EasyMesh neighbor.
"""
def __init__(self, path, params) -> None:
self.path = path
self.params = params
class Interface():
"""Represents a Wi-Fi EasyMesh interface object (PHY layer)
"""
def __init__(self, path, params) -> None:
self.path = path
self.params = params
self.neighbors: List[Neighbor] = []
self.neighbors_key = 'neighbors'
self.params[self.neighbors_key] = []
self.children: List[Interface] = []
self.parentAgent: Agent = {}
self.children_key = 'children'
self.params[self.children_key] = []
self.connected_stations: List[Station] = []
self.connected_sta_key = 'STA(s)'
self.params[self.connected_sta_key] = []
if isinstance(self.params["MediaType"], int):
self.params["MediaTypeString"] = mediaType_to_str[self.params["MediaType"]]
self.params["wired"] = self.params["MediaType"]==0x0 or self.params["MediaType"]==0x1
self.params["wireless"] = self.params["MediaType"]>0x1 and self.params["MediaType"]<0x200
else:
self.params["MediaTypeString"] = self.params["MediaType"]
if (self.params["MediaType"] == "IEEE_802_3AB_GIGABIT_ETHERNET"
or self.params["MediaType"] == "IEEE_802_3U_FAST_ETHERNET"):
self.params["wired"] = 1
self.params["wireless"] = 0
else:
self.params["wired"] = 0
self.params["wireless"] = 1
self.x = 0
self.y = 0
self.orientation = ORIENTATION.RIGHT
def add_connected_station(self, station) -> None:
"""Adds a station to this interface's list of connected stations.
Args:
station (Station): The station to add.
"""
self.parentAgent.add_connected_station(station)
self.connected_stations.append(station)
self.params[self.connected_sta_key].append(station.params)
def get_connected_stations(self) -> List[Station]:
"""Get the list of stations connected to this interface.
Returns:
List[Station]: List of station objects.
"""
return self.connected_stations
def add_neighbor(self, neighbor) -> None:
"""Adds a neighbor interface to this interface object.
Args:
neighbor (Interface): The neighbor interface to add.
"""
self.neighbors.append(neighbor)
self.params[self.neighbors_key].append(neighbor.params)
# Sort neighbors by ID
self.neighbors.sort(key=lambda n: n.params["ID"])
self.params[self.neighbors_key].sort(key=lambda n: n["ID"])
def get_neighbors(self) -> List[Neighbor]:
"""Get all neighbors of this interface.
Returns:
List[Neighbor]: A list of interface objects.
"""
return self.neighbors
def add_child(self, interface) -> None:
"""Add a child interface to this interface object.
Args:
interface (Interface): The child interface of this interface object.
"""
# Datamodel/ GL-inet bug
# There will be some links in the datamodel between interfaces of a different media type (Ethernet<>Wireless)
# When this happens, mark both interfaces as the same media type.
if self.params["wireless"] and not interface.params["wireless"]:
interface.params["MediaTypeString"] = self.params["MediaTypeString"]
interface.params["wired"] = self.params["wired"]
interface.params["wireless"] = self.params["wireless"]
elif interface.params["wireless"] and not self.params["wireless"]:
self.params["MediaTypeString"] = interface.params["MediaTypeString"]
self.params["wired"] = interface.params["wired"]
self.params["wireless"] = interface.params["wireless"]
self.children.append(interface)
self.params[self.children_key].append(interface.params)
# Sort children by ID
self.children.sort(key=lambda n: n.params["MACAddress"])
self.params[self.children_key].sort(key=lambda n: n["MACAddress"])
def get_children(self):
"""Get all child interfaces of this interface.
Returns:
List[Interface]: A list of interface objects.
"""
return self.children
def is_child(self, macAddress) -> bool:
"""Check if a given interface is a child of this interface, by MAC address.
Args:
macAddress (str): The MAC address of the interface to check child-ness for.
Returns:
bool: True if the interface key'd by macAddress is a child of this interface, False otherwise.
"""
for iface in self.children:
if iface.params["MACAddress"] == macAddress:
return True
return False
def set_parent_agent(self, agent) -> None:
"""Sets the parent Agent of this interface object.
Args:
agent (Agent): The parent Agent that this interface belongs to.
"""
self.parentAgent = agent
def get_parent_agent(self):
"""Get this interface's owning parent Agent
Returns:
Agent: The parent Agent of this interface.
"""
return self.parentAgent
def get_interface_number(self):
"""Gets this interface's interface number. 0...n for n many interfaces on any Agent.
Returns:
str: The interface number.
"""
return self.path[-2::][0]
def get_hash_id(self) -> str:
"""Get the MD5 hash of this interface object. Hashes it's path member.
Returns:
str: The MD5 hash of this interface object.
"""
return hashlib.md5(self.path.encode()).hexdigest()
def get_mac(self) -> str:
"""Get this interface's MAC address
Returns:
str: The MAC address of this interface.
"""
return self.params['MACAddress']
def has_child_iface(self, mac: str) -> bool:
"""Walk this interface's children looking for a child interface key'd by `mac`
Args:
mac (str): The MAC address to check.
Returns:
bool: True if any interface with MAC address `mac` is a child of this interface, False otherwise.
"""
for child_iface in self.children:
if child_iface.get_mac() == mac:
return True
return False
class Agent():
"""Class representing a Wi-Fi EasyMesh Agent.
"""
def __init__(self, path, params) -> None:
self.path = path
self.params = params
self.isController = False
self.radios: List[Radio] = []
self.radios_key = 'Radios'
self.params[self.radios_key] = []
self.interfaces: List[Interface] = []
self.interfaces_key = 'Interfaces'
self.params[self.interfaces_key] = []
self.children: List[Agent] = []
self.connected_stations: List[Station] = []
self.connected_sta_key = 'STA(s)'
self.x = 0
self.y = 0
def add_child(self, agent) -> None:
"""Add a child to this Agent.
Args:
agent (Agent): The child of this agent.
"""
self.children.append(agent)
# Sort children by ID
self.children.sort(key=lambda n: n.get_id())
def get_children(self):
"""Get this Agent's list of child Agent's.
Returns:
List[Agent]: A list of child Agent objects.
"""
return self.children
def is_child(self, macAddress):
"""Check if there's any Agent with MAC address `macAddress` in this Agent's children list.
Args:
macAddress (str): The MAC address of the Agent to check child-ness for.
Returns:
bool: True if there's any Agent with MAC address `macAddress` in this Agent's children list, False otherwise.
"""
is_child = False
for i in self.interfaces:
if i.is_child(macAddress):
is_child = True
return is_child
def num_children(self):
"""Return the total number of interfaces and stations for this Agent.
Returns:
int: The total number of interfaces and stations for this Agent.
"""
return len(self.get_interfaces()) + len(self.get_connected_stations())
def add_radio(self, radio) -> None:
"""Add a radio object to this Agent.
Args:
radio (Radio): The radio to add.
"""
self.radios.append(radio)
self.params[self.radios_key].append(radio.params)
def add_interface(self, interface) -> None:
"""Add an interface to this Agent.
Args:
interface (Interface): The interface to add.
"""
self.interfaces.append(interface)
self.params[self.interfaces_key].append(interface.params)
self.interfaces.sort(key=lambda n: n.params["wired"])
def sort_interfaces(self) -> None:
"""Sort interfaces by their cartesian coordinates.
"""
def compare_interface_horizontal_coordinates(interface: Interface):
x_coord = self.x
if interface.get_children():
x_coord = interface.get_children()[0].get_parent_agent().x
return x_coord
self.interfaces.sort(key=compare_interface_horizontal_coordinates)
def add_connected_station(self, station) -> None:
"""Add a connected station to this Agent.
Args:
station (Station): The station to add.
"""
self.connected_stations.append(station)
def get_connected_stations(self) -> List[Station]:
"""Get all stations connected to some interface on this Agent.
Returns:
List[Station]: A list of station objects.
"""
return self.connected_stations
def get_id(self) -> str:
"""Get this Agent's ID (MAC address)
Returns:
str: This Agent's ID.
"""
if 'ID' in self.params:
return self.params['ID']
return ''
def get_hash_id(self) -> str:
"""Get the MD5 hash of this Agent's ID
Returns:
str: The MD5 hash of this Agent's ID
"""
return hashlib.md5(self.get_id().encode()).hexdigest()
def get_radios(self) -> List[Radio]:
"""Get all radios hosted on this Agent.
Returns:
List[Radio]: A list of radios.
"""
return self.radios
def get_interfaces(self) -> List[Interface]:
"""Get all interfaces on this Agent, regardless of PHY type.
Returns:
List[Interface]: A list of interface objects.
"""
return self.interfaces
def num_radios(self) -> int:
"""Get the number of radios hosted on this Agent.
Returns:
int: The number of radios hosted on this Agent.
"""
return len(self.radios)
def get_manufacturer(self) -> str:
"""Get the manufacturer of this Agent.
Returns:
str: The manufacturer name of this physical Agent.
"""
if 'ManufacturerModel' in self.params:
return self.params['ManufacturerModel']
return ''
def get_interfaces_by_orientation(self, orientation: ORIENTATION) -> List[Interface]:
"""Get a list of interfaces on this Agent by their orientation.
Args:
orientation (ORIENTATION): The orientation of interest.
Returns:
List[Interface]: A list of interfaces who have orientation `orientation`
"""
interfaces = []
for i in self.interfaces:
if i.orientation == orientation:
interfaces.append(i)
return interfaces