import argparse from collections import deque import configparser from copy import deepcopy from dataclasses import dataclass import os import re from typing import ClassVar, Optional try: import pygeodesy from pygeodesy.dms import parseDMS, toDMS, F_SEC from pygeodesy.sphericalTrigonometry import LatLon from pygeodesy.utily import ft2m, m2NM except ModuleNotFoundError as e: print(f"PyGeodesy could not be imported from name {e.name}. Advanced functions not available.") print("Proceeding to attempt build in basic mode.") pygeodesy = None """Endless ATC custom airport file build utility.""" @dataclass class Fix: """Simple class to represent a fix.""" name: str _lat: str _lon: str heading: str pronunciation: str _latlon: LatLon = None runway_heading_true: Optional[float] = None fixes: ClassVar = None _var: ClassVar[float] = 0 _registry: ClassVar = {} @staticmethod def initialize(magvar): Fix.fixes = {} Fix._var = magvar @classmethod @property def special_prefixes(cls): return cls._registry.keys() @classmethod def __init_subclass__(cls, /, name_prefix=None, **kwargs): super().__init_subclass__(**kwargs) if name_prefix is not None: for prefix in name_prefix: cls._registry[prefix] = cls def __new__(cls, name, *args, **kwargs): name_prefix = name[:1] if name_prefix in cls._registry: return object.__new__(cls._registry[name_prefix]) else: return object.__new__(cls) def __init__(self, name, lat=None, lon=None, heading="", pronunciation="", *, latlon=None): self.name = name.strip() self._lat = lat and lat.strip() self._lon = lon and lon.strip() self.heading = heading.strip() self.pronunciation = pronunciation.strip() self._latlon = latlon Fix.fixes[self.name] = self @property def lat(self): if not self._lat: self._lat = toDMS(self._latlon.lat, form=F_SEC, prec=2, sep='.', pos='N', neg='S') return self._lat @property def lon(self): if not self._lon: self._lon = toDMS(self._latlon.lon, form=F_SEC, prec=2, sep='.', pos='E', neg='W') return self._lon @property def short_def(self): """Position definition as string.""" return f"{self.lat}, {self.lon}" @property def minor_def(self): """Definition without holding as string.""" if self.pronunciation: return f"{self.name}, {self.lat}, {self.lon}, {self.pronunciation}".rstrip(", ") return f"{self.name}, {self.lat}, {self.lon}" @property def full_def(self): """Full definition as string.""" if self.pronunciation: return f"{self.name}, {self.lat}, {self.lon}, {self.heading.lstrip('!') or 0}, {self.pronunciation}".rstrip(", ") elif self.heading.lstrip('!'): return f"{self.name}, {self.lat}, {self.lon}, {self.heading.lstrip('!')}" return f"{self.name}, {self.lat}, {self.lon}" @property def latlon(self): if self._latlon: return self._latlon self._generate_latlon() return self._latlon def _generate_latlon(self): try: lat = self._lat lon = self._lon if lat[:1].isalpha(): lat = lat[1:] + lat[:1] if lon[:1].isalpha(): lon = lon[1:] + lon[:1] if lat.find('.') != lat.rfind('.'): lat = parseDMS(lat, sep='.') if lon.find('.') != lon.rfind('.'): lon = parseDMS(lon, sep='.') self._latlon = LatLon(lat, lon) except Exception as e: raise RuntimeError(f"Unable to generate a LatLon for fix {self.name}: {self}") from e def heading_to(self, other, true_heading=False): return self.latlon.initialBearingTo(Fix.fixes[other].latlon) + (0 if true_heading else -Fix._var) def meters_on_heading(self, meters, heading, true_heading=False): if isinstance(heading, str): heading = heading.lstrip('!') match = re.match(r'RWYHDG(?:(?P[+-])(?P\d{2,3}(?:\.\d+)?))?$', heading) if match is not None: if self.runway_heading_true is None: raise RuntimeError( f'tried creating fix {meters}m on {heading} from {self.name} but {self.name} is not derived from a runway.') heading = self.runway_heading_true if match['operation']: if match['operation'] == '+': heading += float(match['offset']) elif match['operation'] == '-': heading -= float(match['offset']) heading %= 360 true_heading = True elif heading.endswith('T'): true_heading = True heading = heading[:-1] heading = float(heading) if not true_heading: heading += Fix._var result = self.latlon.destination(meters, heading) if self.runway_heading_true: result.runway_heading_true = self.runway_heading_true return result def km_on_heading(self, km, heading, true_heading=False): return self.meters_on_heading(km * 1000, heading, true_heading) def nmi_on_heading(self, nmi, heading, true_heading=False): return self.meters_on_heading(nmi * 1852, heading, true_heading) def intersect(self, radial, other_fix, other_radial, radial_true=False, other_radial_true=False): """Calculate the intersect of a line 'A' involving this point and another line 'B'. If `radial` is not a numeric string (optionally ending in T), line 'A' is the line between this point and the point named `radial`. Otherwise, lina 'A' is the line extending from this `Fix` on heading `radial`. If radial ends in 'T', the heading is a true heading. If `other_radial` is not a numeric string (optionally ending in T), line 'B' is the line between the `Fix` named `other_fix` and the `Fix` named `other_radial`. Otherwise, lina 'B' is the line extending from this `Fix` on heading `other_radial`. If radial ends in 'T', the heading is a true heading. Args: `radial`: A heading as a string, optionally ending in 'T', or the name of a `Fix`. `other_fix`: The name of a `Fix` as a string. `other_radial`: A heading as a string, optionally ending in 'T', or the name of a `Fix`. `radial_true`: Whether `radial` is a true heading. Defaults to `False`. `other_radial_true`: Whether `other_radial` is a true heading. Defaults to `False`. """ if radial.endswith('T') and radial[:-1].replace('.', '', 1).isdigit(): radial_true = True radial = radial[:-1] if other_radial.endswith('T') and other_radial[:-1].replace('.', '', 1).isdigit(): other_radial_true = True other_radial = other_radial[:-1] if radial.replace('.', '', 1).isdigit(): radial = float(radial) if not radial_true: radial += Fix._var else: radial = Fix.fixes[radial].latlon if other_radial.replace('.', '', 1).isdigit(): other_radial = float(other_radial) if not other_radial_true: other_radial += Fix._var else: other_radial = Fix.fixes[other_radial].latlon result = self.latlon.intersection(radial, other_fix.latlon, other_radial) if m2NM(self.latlon.distanceTo(result)) > 300: result = result.antipode() return result def intersects(self, radius, other_fix, other_radius): radius = float(radius) other_radius = float(other_radius) return self.latlon.intersections2(radius, other_fix.latlon, other_radius) def intersects_nmi(self, radius, other_fix, other_radius): radius = float(radius) * 1852 other_radius = float(other_radius) * 1852 return self.intersects(radius, other_fix, other_radius) def is_hidden(self): return self.heading.startswith("!") class RunwayFix(Fix, name_prefix="="): def __init__(self, runway_id, runway_designator, runway_heading_true, runway_length_meters, *args, **kwargs): self.runway_id = runway_id self.runway_designator = runway_designator self.runway_heading_true = runway_heading_true self.runway_length_meters = runway_length_meters super().__init__('=' + self.runway_id + self.runway_designator, *args, **kwargs) def reciprocal(self): if self.runway_designator[-1].isalpha(): reciprocal_runway_designator = str((int(self.runway_designator[:-1]) + 18) % 36).rjust(2, '0') if self.runway_designator[-1] in 'Ll': reciprocal_runway_designator += 'R' elif self.runway_designator[-1] in 'Rr': reciprocal_runway_designator += 'L' else: reciprocal_runway_designator += self.runway_designator[-1] else: reciprocal_runway_designator = str((int(self.runway_designator) + 18) % 36).rjust(2, '0') reciprocal_runway_id = '=' + self.runway_id + reciprocal_runway_designator if reciprocal_runway_id in Fix.fixes: return Fix.fixes[reciprocal_runway_id] reciprocal_runway_latlon = self.meters_on_heading(self.runway_length_meters, self.runway_heading_true, True) reciprocal_runway_heading_true = (self.runway_heading_true + 180) % 360 return RunwayFix(self.runway_id, reciprocal_runway_designator, reciprocal_runway_heading_true, self.runway_length_meters, latlon=reciprocal_runway_latlon) @classmethod def from_definition(cls, runway_definition): runway_data = runway_definition.split(',') runway_id = runway_data[0].strip() runway_designator = runway_data[1].strip() runway_lat = runway_data[2] runway_lon = runway_data[3] runway_heading_true = float(runway_data[4].strip()) runway_length_meters = ft2m(float(runway_data[5].strip())) return RunwayFix(runway_id, runway_designator, runway_heading_true, runway_length_meters, lat=runway_lat, lon=runway_lon) def is_hidden(self): return True class RadialDMEFix(Fix, name_prefix="@"): def __init__(self, name, fix=None, distance=None, radial=None, heading="!", pronunciation=""): if fix is None: try: match = re.match(r'@(?P[a-zA-Z0-9]+)(?P\d{3}[Tt]?)D(?P[0-9]+(?:\.[0-9]+)?)', name) fix = match['fix'] distance = float(match['distance']) radial = match['radial'] pronunciation = f"{Fix.fixes[fix].pronunciation} {heading} Radial {distance} D-M-E" except Exception as e: raise RuntimeError(f"failed to create fix from {name}") from e else: name = name.strip('@') fix = fix.strip() distance = float(distance.strip().lstrip('D')) radial = radial.strip() heading = heading.strip() latlon = Fix.fixes[fix].nmi_on_heading(distance, radial) self.runway_heading_true = getattr(latlon, 'runway_heading_true', None) super().__init__(name, heading=heading, pronunciation=pronunciation, latlon=latlon) class RadialIntersectFix(Fix, name_prefix='#+'): def __init__(self, name, fix1=None, radial1=None, fix2=None, radial2=None, heading="!", pronunciation=""): try: if fix1 is None: match = re.match(r'[#+](?P[a-zA-Z0-9]+)(?P\d{3})@?(?P[a-zA-Z0-9]+)(?P\d{3})', name) fix1 = match['fix1'] radial1 = match['radial1'] fix2 = match['fix2'] radial2 = match['radial2'] pronunciation = f'{Fix.fixes[fix1].pronunciation} Radial {"-".join(radial1)} at ' + \ f'{Fix.fixes[fix2].pronunciation} Radial {"-".join(radial2)}' else: name = name.strip('#+') fix1 = fix1.strip() radial1 = radial1.strip() fix2 = fix2.strip() radial2 = radial2.strip() super().__init__(name, heading=heading, pronunciation=pronunciation, latlon=Fix.fixes[fix1].intersect(radial1, Fix.fixes[fix2], radial2)) except Exception as e: raise RuntimeError(f"failed to create fix from {name}") from e class CircleIntersectFix(Fix, name_prefix='&'): def __init__(self, name, fix1=None, radius1=None, fix2=None, radius2=None, direction=None, heading="!", pronunciation=""): try: if fix1 is None: match = re.match(r'&(?P[a-zA-Z0-9]+)D(?P[0-9]+(?:\.[0-9]+)?)&?(?P[a-zA-Z0-9]+)D(?P[0-9]+(?:\.[0-9]+)?)\.(?P[NSWEnswe])', name) fix1 = match['fix1'] radius1 = match['radius1'] fix2 = match['fix2'] radius2 = match['radius2'] direction = match['direction'] else: name = name.strip('%') fix1 = fix1.strip() radius1 = radius1.strip() fix2 = fix2.strip() radius2 = radius2.strip() latlon_1, latlon_2 = Fix.fixes[fix1].intersects_nmi(radius1, Fix.fixes[fix2], radius2) if direction is None: if latlon_1 is not latlon_2: raise ValueError(f'''No selector direction specified for intersection of non-abutting circles. intersect 1: {latlon_1} intersect 2: {latlon_2}''') else: latlon = latlon_1 else: if direction in 'NnSs': if direction in 'Nn': latlon = latlon_1 if latlon_1.lat > latlon_2.lat else latlon_2 else: latlon = latlon_1 if latlon_1.lat < latlon_2.lat else latlon_2 else: if direction in 'Ee': latlon = latlon_1 if latlon_1.lon > latlon_2.lon else latlon_2 else: latlon = latlon_1 if latlon_1.lon < latlon_2.lon else latlon_2 super().__init__(name, heading=heading, pronunciation=pronunciation, latlon=latlon) except Exception as e: raise RuntimeError(f"failed to create fix from {name}") from e @dataclass class Airline: """Simple class to represent an airline declaration.""" callsign: str frequency: int types: str pronunciation: str directions: str callsigns = None use_callsigns = True test_callsigns = False def __init__(self, callsign, frequency, types, *data, gateways=None): """Create an airline from an entry in the airlines= list. `data` should be `(pronunciation, directions)`. If `Airline.callsigns` is defined, `pronunciation` should be omitted from `data`. If `gateways` is provided, `directions` becomes `arrival_gateways`, `departure_gateways`.""" self.frequency = int(frequency.strip()) self.types = types.strip() self.callsign = callsign.strip() try: if Airline.callsigns is not None and (Airline.use_callsigns or Airline.test_callsigns): if '-' not in callsign: self.pronunciation = Airline.callsigns[callsign] else: # if length of key is more than 3, assume it is not registration key = callsign[:callsign.index('-')] self.callsign = callsign.strip('_') # if key length is longer than 3 and key doesn't have match, # we strip '_' to allow for mil callsigns with length <= 3 self.pronunciation = Airline.callsigns.get(key, key.strip('_')) if len(key) > 3 \ else Airline.callsigns.get(key, '0') if Airline.test_callsigns: print(f'{self.callsign}: {self.pronunciation}') if Airline.callsigns is None or not Airline.use_callsigns: self.pronunciation = data[0].strip() data = data[1:] if gateways is not None: arrival_gateways = {gateway.strip() for gateway in data[0].split("/")} departure_gateways = {gateway.strip() for gateway in data[1].split("/")} self.directions = "".join(sorted( {gateways[gateway] for gateway in arrival_gateways | departure_gateways})) else: self.directions = data[0].strip() except Exception as e: raise ValueError(f'''Could not create airline from ({callsign}, {frequency}, {types}, {str(data)}) Callsign pronunciation lookup = {Airline.use_callsigns}''') from e def process_fix_line(line, fixes): """Expands special commands for a fix definition in short format and returns the expanded definition as the result. Substitute "![, ]" in `line` with "lat, lon[, ]" based on `fixes`. Args: `line` (list): A fix definition. `fixes` (dict): A lookup of `Fix`es.""" if line.startswith('!'): try: def_fix, def_sep, def_data = line.lstrip('!').partition(',') def_fix = def_fix.strip() if def_fix[0] in Fix.special_prefixes: if def_fix not in fixes: Fix(def_fix) if def_fix not in fixes: raise KeyError(f'Fix lookup failed: No fix defined with name {def_fix}') return fixes[def_fix].short_def + def_sep + def_data except Exception as e: raise RuntimeError(f'Failed to process line {line}') from e else: return line def process_fix_list(fix_list, fixes): """Expands special commands in a list of fixes in short format and produces an iterable of definitions as the result. Substitute any "![, ]" in `fix_list` with "lat, lon[, ]" based on `fixes`. Args: `fix_list` (list): A list of fix definitions. `fixes` (dict): A lookup of `Fix`es.""" for line in fix_list: yield process_fix_line(line, fixes) def _generate_approach(heading, starting_fix): return {"heading": heading, "beacon": starting_fix, "route": []} def _process_simple_approach_fix_list(fix_list, runway, fixes, tagged_routes, generated_approaches, current_tag, top_level=True): """Inner worker function for `process_approach_fix_list()`. Works the same as `process_fix_list`, but in addition, any fix definitions processed will be added to any generated approaches in `generated_approaches`. Also, any fixes processed will be checked for any approach generator parameters; if found, a new generated approach is added to `generated_approaches`. The route of such generated approach will also be added to `tagged_routes` if tagged in the fix definition. Args: `fix_list` (list): A list of fix definitions. `runway` (str): The runway this approach is for. If `None`, this is a multi-runway approach. `fixes` (dict): A lookup of `Fix`es. `tagged_routes` (dict): A lookup of approach routes that were tagged for lookup. `generated_approaches`: A dict keyed by runway of dicts of parameters to be used to generate derived approaches in post-processing. `generate_approaches`: Whether or not to process any approach generator commands. `debug` (bool): Whether to print debug information.""" for line in fix_list: def_data, _, approach_generator_params = line.rpartition(',') if def_data and ('!' in approach_generator_params or '@' in approach_generator_params): fix_name, _, _ = def_data.partition(',') fix_name = fix_name.lstrip('!') heading, _, tag = approach_generator_params.lstrip(' !').partition('@') if top_level: generated_approach = _generate_approach(heading, fix_name) if tag: generated_approach['tag'] = tag tagged_routes[tag] = [] generated_approaches[runway or current_tag].append(generated_approach) else: def_data = line if line.startswith("@"): print(f'''Warning: An @ reference {line} was found in the wrong place while processing:''', f'''\n{fix_list}\nCheck output for unresolved @.''') try: result = process_fix_line(def_data, fixes) for generated_approach in generated_approaches[runway or current_tag]: generated_approach['route'].append(result) if top_level and 'tag' in generated_approach: tagged_routes[generated_approach['tag']].append(line) except Exception as e: raise RuntimeError(f'''failed to process approach route {fix_list} for runway {runway}''') from e def _process_approach_fix_list(fix_list, runway, fixes, tagged_routes, generated_approaches, current_tag=None, top_level=True, debug=False): """Inner worker function for `process_approach_fix_list`. Args: `fix_list` (list): A list of fix definitions. `runway` (tuple): The runway this approach is for. If `None`, this is a multi-runway approach. Otherwise, this should be a tuple of runway_id, <"," or "">, <" rev" or "">, e.g. the result of running partition on runway=. `fixes` (dict): A lookup of `Fix`es. `tagged_routes` (dict): A dict keyed by runway of dicts of approach routes that were tagged for lookup. `generated_approaches`: A dict keyed by runway of dicts of parameters to be used to generate derived approaches in post-processing. `top_level`: Whether or not to process any approach generator commands. `debug` (bool): Whether to print debug information.""" if debug: print(f"_process_approach_fix_list: processing route: {fix_list}") if fix_list[-1].startswith('@'): following_tags = (tag.strip().lstrip('@') for tag in fix_list[-1].split(',')) _process_simple_approach_fix_list(fix_list[:-1], runway, fixes, tagged_routes[runway], generated_approaches, current_tag, top_level) for generated_approach in generated_approaches[runway or current_tag]: if 'tag' in generated_approach: tagged_routes[runway][generated_approach['tag']].append(fix_list[-1]) del generated_approach['tag'] for following_tag in following_tags: if following_tag.startswith('!'): remove_first_fix = True following_tag = following_tag.removeprefix('!') else: remove_first_fix = False if current_tag is not None and current_tag == following_tag: raise RuntimeError(f'''Unable to build as approach tagged as @{current_tag} is trying to reference itself. The following is the route= contents after the @tag: {fix_list}''') following_tag_runways = [] for route_runway, route_tags in tagged_routes.items(): if runway is not None and runway != route_runway: continue if following_tag in route_tags: following_tag_runways.append(route_runway) if runway is None: try: generated_approaches[route_runway or following_tag] = [ deepcopy(generated_approach) for generated_approach in generated_approaches[current_tag]] except Exception as e: raise RuntimeError(f'{generated_approaches}\n{fix_list}\n{runway}') from e for generated_approach in generated_approaches[route_runway or following_tag]: if 'tag' in generated_approach: del generated_approach['tag'] if not following_tag_runways: raise RuntimeError(f'''Unable to build as approach continuation couldn't be found. There was no approach route tagged {following_tag} for {runway and "runway " + repr(runway) or "any runway"}. The requesting approach route was {fix_list}''') for following_tag_runway in following_tag_runways: following_route = tagged_routes[following_tag_runway][following_tag] if remove_first_fix: following_route = following_route[1:] _process_approach_fix_list(tagged_routes[following_tag_runway][following_tag], following_tag_runway, fixes, tagged_routes, generated_approaches, following_tag, False, debug=debug) else: _process_simple_approach_fix_list(fix_list, runway, fixes, tagged_routes[runway], generated_approaches, current_tag, top_level) return generated_approaches def process_approach_fix_list(fix_list, runway, fixes, tagged_routes, starting_fix, debug=False): """Processes special commands in a list of approach fixes in short format and produces an iterable of definitions as the result. Substitute any "![, ]" in `fix_list` with "lat, lon[, ]" based on `fixes`. If the last item in `fix_list` is "@", substitute in the contents of the approach route tagged . If `runway` is None, Args: `fix_list` (list): A list of fix definitions. `runway` (tuple): The runway this approach is for. If `None`, this is a multi-runway approach. Otherwise, this should be a tuple of runway_id, <"," or "">, <" rev" or "">, e.g. the result of running partition on runway=. `fixes` (dict): A lookup of `Fix`es. `tagged_routes` (dict): A dict keyed by runway of dicts of approach routes that were tagged for lookup. `starting_fix` (str): The name of the starting fix of this approach. `debug` (bool): Whether to print debug information.""" if fix_list: while not fix_list[0]: fix_list = fix_list[1:] if debug: print(f"process_approach_fix_list: processing route: {fix_list}") if runway not in tagged_routes: tagged_routes[runway] = {} tagged_routes_for_runway = tagged_routes[runway] if fix_list[0].startswith('@'): tagged_route = fix_list[2:] if fix_list[0].startswith('@!'): tagged_route = tagged_route[1:] current_tag = fix_list[0].lstrip('@!') tagged_routes_for_runway[current_tag] = tagged_route fix_list = fix_list[1:] else: current_tag = None generated_approaches = {runway or current_tag: []} if starting_fix: generated_approaches[runway or current_tag].append(_generate_approach(fix_list[0], starting_fix)) fix_list = fix_list[1:] return _process_approach_fix_list(fix_list, runway, fixes, tagged_routes, generated_approaches, current_tag, debug=debug) else: raise RuntimeError(f"Tried to process an empty approach route {fix_list} for runway {runway}") def process_departure_fix_list(fix_list, runway, fixes, tagged_routes): """Processes special commands in a list of departure fixes in short format and produces an iterable of definitions as the result, or `None` if there is no result (fix_list was recorded in `tagged_routes`. Substitute any "![, ]" in `fix_list` with "lat, lon[, ]" based on `fixes`. If the first item of fix_list is "@", record the rest of the `fix_list` in `tagged_routes` under `runway` and "name". If the first item of fix_list is "@!", this route is a common route. If the second (first if a tagged departure route) item in `fix_list` is "@", substitute in the contents of the departure route tagged . Args: `fix_list` (list): A list of fix definitions. `runway` (str): The runway this departure is for. `fixes` (dict): A lookup of `Fix`es. `tagged_routes` (dict): A lookup of departure routes.""" if fix_list: if runway not in tagged_routes: tagged_routes[runway] = {} if fix_list[0].startswith('@'): tag = fix_list[0].lstrip('@') tagged_routes[not tag.startswith('!') and runway or None][tag] = fix_list[1:] return None else: return _process_departure_fix_list(fix_list, runway, fixes, tagged_routes) def _process_departure_fix_list(fix_list, runway, fixes, tagged_routes, top_level=True): """Inner worker function for `process_departure_fix_list()`. This produces the actual generator with the route= lines. Args: `fix_list` (list): A list of fix definitions. `runway` (str): The runway this departure is for. `fixes` (dict): A lookup of `Fix`es. `tagged_routes` (dict): A lookup of departure routes.""" if fix_list: try: if top_level: yield fix_list[0] fix_list = fix_list[1:] if fix_list[0].startswith('@'): tag = fix_list[0].lstrip('@') yield from _process_departure_fix_list( tagged_routes[not tag.startswith('!') and runway or None][fix_list[0].lstrip('@')], runway, fixes, tagged_routes, top_level=False) yield from _process_departure_fix_list(fix_list[1:], runway, fixes, tagged_routes, top_level=False) else: yield from process_fix_list(fix_list, fixes) except Exception as e: raise RuntimeError( f"Could not process departure route {fix_list} for runway {runway}" ) from e def process_sids_fix_list(fix_list, fixes): """Processes special commands in a list of fixes in minor format and produces an iterable of definitions as the result. Substitute any "![, ]" in `fix_list` with "lat, lon[, ]" based on `fixes`. Args: `fix_list` (list): A list of fix definitions. `fixes` (dict): A lookup of `Fix`es.""" for line in fix_list: if line.startswith('!'): def_fix, def_sep, def_data = line.lstrip('!').partition(',') yield fixes[def_fix.strip()].minor_def + def_sep + def_data else: yield line def process_repeatable_departure_fix_list(fix_list, runway, fixes, tagged_routes): """`Processes special commands in a list of departure fixes in short format and produces an iterable of n iterables of definitions as the result. If the first item of `fix_list` is *n, n iterables are produced. Otherwise, `n = 1`. See `process_departure_fix_list` for further special commands. Args: `fix_list` (list): A list of fix definitions. `runway` (str): The runway this departure is for. `fixes` (dict): A lookup of `Fix`es. `tagged_routes` (dict): A lookup of departure routes.""" if not fix_list[0]: fix_list = fix_list[1:] if fix_list[0].startswith('*'): result = list(process_departure_fix_list(fix_list[1:], runway, fixes, tagged_routes)) for i in range(int(fix_list[0].removeprefix('*'))): yield result else: yield process_departure_fix_list(fix_list, runway, fixes, tagged_routes) def process_beacons(fixes): """Returns a generator of [airspace] beacons= lines from a list of fixes. Fixes marked not to be exported are omitted.""" for fix in fixes.values(): if not fix.is_hidden(): yield fix.full_def def process_handoffs(handoffs, center): """Processes fix references in [airspace] handoff=. Returns a generator of processed [airspace] handoff= lines.""" for handoff in handoffs.strip().splitlines(): direction, separator, parameters = handoff.partition(',') if direction.startswith('!'): direction = str(int(center.heading_to(direction[1:]))) yield ",".join([direction, parameters]).rstrip(", ") def process_airlines_list(airline_list): """Returns generator of airline declaration strings based on the list of `Airline`s `airline_list`. If frequency is greater than 10, write floor(frequency / 10) 10 frequency declarations and 1x (frequency mod 10) declaration.""" for airline in airline_list: n, r = divmod(airline.frequency, 10) for i in range(n): yield f"{airline.callsign}, 10, {airline.types}, {airline.pronunciation}, {airline.directions}" if r: yield f"{airline.callsign}, {r}, {airline.types}, {airline.pronunciation}, {airline.directions}" def enumerate_routes(route_list, start=1): """built-in `enumerate()` but the enumeration is a string "route#".""" for route in route_list: yield f"route{start}", route start += 1 def process(args, input_file=None, preprocessed_input=None): """Parses an Endless ATC custom airport "source" file and expands certain special commands, then rewrites as a minimized output suitable for use by the game. Refer to argparse help for detailed syntax of special commands. Args: `args`: An `argparse.Namespace`. The command line args from the invoking module. `file`: The file to process. Defaults to `input_file` in `args`.""" # if input_file is None, we were invoked standalone and not from deploy.py if input_file is None: input_file = args.input_file output_file = input_file if args.output_file is None else args.output_file else: output_file = os.path.join(os.path.dirname(input_file), args.output_path, os.path.basename(input_file)) print(f"Building {input_file} to {output_file}") config = configparser.ConfigParser() config.read("common.ini") config.read(os.path.join(os.path.dirname(input_file), "common.ini")) Airline.callsigns = config['expand.callsigns'] if 'expand.callsigns' in config else {} default_gateways = config['expand.gateways'] if 'expand.gateways' in config else {} if 'legacy' not in args or not args.legacy: source = configparser.ConfigParser() if preprocessed_input is None: source.read(input_file) else: source.read_file(preprocessed_input, input_file) # read optional header to be written in output header = '' if 'meta' in source: if 'header' in source['meta']: header = ["# " + line for line in source['meta']['header'].splitlines()] header.extend([ "", f"# This file is generated from the source file {os.path.relpath(input_file, os.path.dirname(output_file))} using expand.py.", "# All comments have been stripped, and edits are not made directly to this file.", "# If you would like to contribute, or see the author's comments, please refer to the source file.", "", ""]) header = "\n".join(header) if 'callsigns' not in source['meta']: Airline.use_callsigns = False else: Airline.use_callsigns = source['meta'].getboolean('callsigns') if args.test_callsigns: Airline.test_callsigns = True # remove meta section so it won't be written in output del source['meta'] airspace = source['airspace'] Fix.initialize(airspace.getfloat('magneticvar')) # add runways to fix database airports = {section: source[section] for section in source if section.startswith('airport')} if pygeodesy: for airport_data in airports.values(): runways = airport_data['runways'].strip().splitlines() for runway_definition in runways: RunwayFix.from_definition(runway_definition).reciprocal() Fix('_CTR', *airspace['center'].split(","), heading='!') # build a fix database from [airspace] beacons= for definition in airspace['beacons'].strip().splitlines(): Fix(*definition.split(",")) airspace['beacons'] = "\n".join(process_beacons(Fix.fixes)) airspace['handoff'] = "\n".join(process_handoffs(airspace['handoff'], Fix.fixes['_CTR'])) airspace['boundary'] = "\n".join( process_fix_list(airspace['boundary'].splitlines(), Fix.fixes)) areas = {section: source[section] for section in source if section.startswith('area')} for area_data in areas.values(): if 'points' in area_data: area_data['points'] = "\n".join( process_fix_list(area_data['points'].splitlines(), Fix.fixes)) if args.draw_all_areas and 'draw' in area_data: del area_data['draw'] if 'position' in area_data: area_position = area_data['position'].strip() if area_position in Fix.fixes: area_data['position'] = Fix.fixes[area_position].short_def if 'labelpos' in area_data: area_position = area_data['labelpos'].strip() if area_position in Fix.fixes: area_data['labelpos'] = Fix.fixes[area_position].short_def # process airport sections for airport_data in airports.values(): gateways = dict((tuple(map(str.strip, gateway.split(","))) for gateway in airport_data['gateways'].strip().splitlines()), **default_gateways) if 'gateways' in airport_data else None if 'airlines' in airport_data: airlines = [Airline(*airline.split(","), gateways=gateways) for airline in airport_data['airlines'].splitlines() if airline] airport_data['airlines'] = "\n".join(process_airlines_list(airlines)) if 'sids' in airport_data: airport_data['sids'] = "\n".join( process_sids_fix_list(airport_data['sids'].splitlines(), Fix.fixes)) # process approach/transition sections approaches = {section: source[section] for section in source if section.startswith('approach') or section.startswith('transition')} tagged_approach_routes = {} generated_approaches = {} highest_app_index = 0 discarded_approaches = deque() for approach_section, approach_data in approaches.items(): approach_runway = approach_data.get('runway', fallback=None) approach_debug = approach_data.getboolean('debug') if approach_runway is not None: approach_runway_id, _, approach_runway_direction = approach_runway.partition(',') approach_runway = approach_runway_id.strip(), approach_runway_direction.strip() else: discarded_approaches.append(approach_section) if 'beacon' in approach_data and ',' not in approach_data['beacon']: approach_beacon = approach_data['beacon'].removeprefix("!") if approach_beacon in Fix.fixes and Fix.fixes[approach_beacon].heading.startswith('!'): approach_data['beacon'] = Fix.fixes[approach_beacon].full_def else: approach_data['beacon'] = approach_beacon else: approach_beacon = approach_data['beacon'] for option in approach_data: if option.startswith('route'): new_generated_approaches = process_approach_fix_list(approach_data[option].splitlines(), approach_runway, Fix.fixes, tagged_approach_routes, approach_beacon, approach_debug) if approach_runway is not None: generated_approach = new_generated_approaches[approach_runway][0] if generated_approach['beacon'] != approach_beacon: raise RuntimeError(f'''Unexpected behaviour during approach processing. A single runway approach generated an approach with a mismatched beacon. Defined beacon was {generated_approach.beacon}, actual beacon was {approach_beacon}.''') approach_data[option] = generated_approach['heading'] + "\n" + "\n".join( generated_approach['route']) new_generated_approaches[approach_runway] = new_generated_approaches[approach_runway][1:] for runway, new_generated_approaches_for_runway in new_generated_approaches.items(): if runway in generated_approaches: generated_approaches[runway].extend(new_generated_approaches_for_runway) elif isinstance(runway, tuple): generated_approaches[runway] = new_generated_approaches_for_runway # find highest approach index as we need to add more approaches for section in source: if section.startswith('approach'): highest_app_index = max(highest_app_index, int(section[8:])) for runway, generated_approaches_for_runway in generated_approaches.items(): for generated_approach in generated_approaches_for_runway: if not generated_approach['heading']: continue if discarded_approaches: section = discarded_approaches.popleft() else: highest_app_index += 1 section = f'approach{highest_app_index}' beacon = generated_approach['beacon'] if Fix.fixes[beacon].heading.startswith('!'): beacon = Fix.fixes[beacon].full_def source[section] = { "runway": ', '.join(runway), "beacon": beacon, "route1": generated_approach['heading'] + "\n" + "\n".join( generated_approach['route']) } # process departure sections departures = [source[section] for section in source if section.startswith('departure')] tagged_departures = {None: {}} for departure_data in departures: departure_runway = departure_data['runway'] departure_runway = departure_runway.partition(',') routes = {int(option.removeprefix('route')): departure_data[option] for option in departure_data if option.startswith('route')} processed_routes = [] for route_index in sorted(routes): processed_routes.extend("\n".join(route) for route in process_repeatable_departure_fix_list( routes[route_index].splitlines(), departure_runway, Fix.fixes, tagged_departures) if route) del departure_data[f'route{route_index}'] departure_data.update(enumerate_routes(processed_routes, start=1)) # write output file if not args.parse_only: with open(output_file, 'w', newline='') as airport_file: airport_file.write(header) source.write(airport_file) # legacy processor in regex. Don't use for new projects. else: pattern = re.compile(r"^(?P\[airport(?P\d*)\])|(?P#!\t(?P[-\w]*), (?P\d*), (?P[\w/d]*, [-\w ]*, [nswe]*))|(?P#!expansionoutput(?P\d+))|(?P#!expansionoutputend)|(?P#!sid(?P[\d]+)x)") result = {'output': []} airport = 0 ignore_lines = False ignore_one_line = False sid_frequency = 0 sid_lines = [] with open(input_file, 'r', newline='') as airport_file: for line in airport_file: match = pattern.match(line) if match: if match['airport_section']: airport = match['airport_id'] if airport not in result: result[airport] = [] elif match['airline_entry']: total_frequency = int(match['airline_frequency']) frequencies = [] while total_frequency > 10: frequencies.append(10) total_frequency -= 10 frequencies.append(total_frequency) for frequency in frequencies: result[airport].append( f"\t{match['airline_code']}, {frequency}, {match['airline_parameters']}\n" ) elif match['result_marker']: result['output'].append(line) for result_line in result[match['result_id']]: result['output'].append(result_line) ignore_lines = True elif match['result_end_marker']: ignore_lines = False elif match['sid_marker']: sid_frequency = int(match['sid_frequency']) - 1 ignore_one_line = True if sid_frequency: if line.isspace(): for _ in range(sid_frequency): result['output'].extend(sid_lines) sid_frequency = 0 sid_lines = [] elif not len(sid_lines): sid_lines.append("\n") else: sid_lines.append(line) if not ignore_lines and not ignore_one_line: result['output'].append(line) if ignore_one_line: ignore_one_line = False if not args.parse_only: with open(output_file, 'w', newline='') as airport_file: airport_file.writelines(result['output']) return output_file if not args.parse_only else None if __name__ == "__main__": parser = argparse.ArgumentParser(description='''Expands certain commands to allow for concise Endless ATC airport source files. \n\n Available functions: \n\n In [airspace] boundary=, [area] points=, or the route= of an [approach/departure/transition], specify "!" instead of lat, lon to substitute the lat, lon from the fix with the corresponding name in [airspace] beacons=. In [airport] sids=, "!" can also be specified, and will become name, lat, lon instead. In [approach/departure/transition], "!" can also be specified in beacon=, where it will become the full fix definition. \n\n In [airport] airlines=, definitions with frequency >10 with be broken down into multiple definitions of frequency 10 or less. \n\n In [airport], you can define gateways=, a list of , direction. If gateways= is defined, the ", "" in [airport] airlines= becomes ", , ", and the directions are used based on the defined gateways. Default gateways can be specified in a common.ini in the same directory as the source file, or in the folder where this tool is located; gateways= still needs to be defined in an [airport] to activate this feature for the [airport]. \n\n In [airport] airlines=, the airline can be omitted if [meta] callsigns= is true. A lookup will be loaded from [expand.callsigns] defined in a common.ini in the same directory as the source file, or in the folder where this tool is located. The former takes precedence. [expand.callsigns] is a list of , . The first item in each line of airlines (stripped of a dash and anything after it, hereafter referred to as the "key") is used to lookup in [expand.callsigns] to obtain the pronunciation. If nothing is found, pronunication defaults to the key if it is longer than 3 characters (assuming it is a military callsign), otherwise it defaults to "0". \n\n In a [approach/transition] route=, specify "@" to "tag" the approach route. Any subsequent [approach] route= can then specify "@" as the last point to chain the approach route tagged as "name" to the end. \n\n In a [departure] route=, specify "@" to "tag" the route. This will remove the departure route from the resulting file (as it is an incomplete departure route). Any subsequent [departure] route= can then specify "@" as the *first* point (second line, after the route name) to chain the departure route tagged as "name" to the beginning. \n\n *n as the first line of a [departure] route= value will repeat that route n times. Obviously, a repeated route= cannot be tagged as it wouldn't make any sense.''') parser.add_argument('input_file') parser.add_argument('output_file', nargs='?') parser.add_argument('-l', '--legacy', action="store_true", help='''Use legacy processing method. Don't use for new projects. \n\n #!expansionoutput can be inserted on its own line in a source file terminated by #!expansionoutputend on a following line. This block, which should remain empty, will be used to write the result of expanding any airline definitions in a #! comment. Any #! definitions with frequency greater are split into entries with max 10 frequency each. \n\n #!sidx can be inserted before any "routex =" declaration in a [departure] section to repeat the route times. This can be used to adjust the distribution of traffic on each SID. Note the numbering of each "route" will not be adjusted. See renumber.py for such operation.''') process(parser.parse_args())