import argparse from collections import namedtuple import configparser from dataclasses import dataclass import os import re """Endless ATC custom airport file build utility.""" @dataclass class Fix: """Simple class to represent a fix.""" name: str lat: str lon: str heading: str pronunciation: str def __init__(self, name, lat, lon, heading, pronunciation): self.name = name.strip() self.lat = lat.strip() self.lon = lon.strip() self.heading = heading.strip() self.pronunciation = pronunciation.strip() @property def short_def(self): """Position definition as string.""" return f"{self.lat}, {self.lon}" @property def minor_def(self): """Definition without holding as string.""" return f"{self.name}, {self.lat}, {self.lon}, {self.pronunciation}" @property def full_def(self): """Full definition as string.""" return f"{self.name}, {self.lat}, {self.lon}, {self.heading.lstrip('!') or 0}, {self.pronunciation}" @dataclass class Airline: """Simple class to represent an airline declaration.""" callsign: str frequency: int types: str pronunciation: str directions: str callsigns = None use_callsigns = True test_callsigns = False def __init__(self, callsign, frequency, types, *data, gateways=None): """Create an airline from an entry in the airlines= list. `data` should be `(pronunciation, directions)`. If `Airline.callsigns` is defined, `pronunciation` should be omitted from `data`. If `gateways` is provided, `directions` becomes `arrival_gateways`, `departure_gateways`.""" self.frequency = int(frequency.strip()) self.types = types.strip() self.callsign = callsign.strip() try: if Airline.callsigns is not None and (Airline.use_callsigns or Airline.test_callsigns): if '-' not in callsign: self.pronunciation = Airline.callsigns[callsign] else: # if length of key is more than 3, assume it is not registration key = callsign[:callsign.index('-')] self.callsign = callsign.strip('_') # if key length is longer than 3 and key doesn't have match, # we strip '_' to allow for mil callsigns with length <= 3 self.pronunciation = Airline.callsigns.get(key, key.strip('_')) if len(key) > 3 \ else Airline.callsigns.get(key, '0') if Airline.test_callsigns: print(f'{self.callsign}: {self.pronunciation}') if Airline.callsigns is None or not Airline.use_callsigns: self.pronunciation = data[0].strip() data = data[1:] if gateways is not None: arrival_gateways = {gateway.strip() for gateway in data[0].split("/")} departure_gateways = {gateway.strip() for gateway in data[1].split("/")} self.directions = "".join(sorted( {gateways[gateway] for gateway in arrival_gateways | departure_gateways})) else: self.directions = data[0].strip() except Exception as e: raise ValueError(f"Could not create airline from ({callsign}, {frequency}, {types}, {str(data)})" + f"\nCallsign pronunciation lookup = {Airline.use_callsigns}") from e def process_fix_line(line, fixes): """Expands special commands for a fix definition in short format and returns the expanded definition as the result. Substitute "![, ]" in `line` with "lat, lon[, ]" based on `fixes`. Args: `line` (list): A fix definition. `fixes` (dict): A lookup of `Fix`es.""" if line.startswith('!'): def_fix, def_sep, def_data = line.lstrip('!').partition(',') return fixes[def_fix.strip()].short_def + def_sep + def_data else: return line def process_fix_list(fix_list, fixes): """Expands special commands in a list of fixes in short format and produces an iterable of definitions as the result. Substitute any "![, ]" in `fix_list` with "lat, lon[, ]" based on `fixes`. Args: `fix_list` (list): A list of fix definitions. `fixes` (dict): A lookup of `Fix`es.""" for line in fix_list: yield process_fix_line(line, fixes) def _process_approach_fix_list(fix_list, fixes, tagged_routes, generated_approaches): """Inner worker function for `process_approach_fix_list()`. Works the same as `process_fix_list`, but in addition, any fix definitions processed will be added to any generated approaches in `generated_approaches`. Also, any fixes processed will be checked for any approach generator parameters; if found, a new generated approach is added to `generated_approaches`. The route of such generated approach will also be added to `tagged_routes` if tagged in the fix definition. Args: `fix_list` (list): A list of fix definitions. `fixes` (dict): A lookup of `Fix`es. `tagged_routes` (dict): A lookup of approach routes that were tagged for lookup. `generated_approaches`: A list of dicts of parameters to be used to generate derived approaches in post-processing.""" for line in fix_list: def_data, _, approach_generator_params = line.rpartition(',') if def_data and '!' in approach_generator_params: fix_name, _, _ = def_data.partition(',') fix_name = fix_name.lstrip('!') heading, _, tag = approach_generator_params.lstrip(' !').partition('@') route = [] generated_approach = {"heading": heading, "beacon": fix_name, "route": route} generated_approaches.append(generated_approach) if tag: tagged_routes[tag] = route else: def_data = line result = process_fix_line(def_data, fixes) for generated_approach in generated_approaches: generated_approach['route'].append(result) yield result def process_approach_fix_list(fix_list, fixes, tagged_routes, generated_approaches, generate_approaches=True): """Processes special commands in a list of departure fixes in short format and produces an iterable of definitions as the result. Substitute any "![, ]" in `fix_list` with "lat, lon[, ]" based on `fixes`. If the last item in `fix_list` is "@", substitute in the contents of the approach route tagged . Args: `fix_list` (list): A list of fix definitions. `fixes` (dict): A lookup of `Fix`es. `tagged_routes` (dict): A lookup of approach routes that were tagged for lookup. `generated_approaches`: A list of dicts of parameters to be used to generate derived approaches in post-processing. `generate_approaches`: Whether or not to process any approach generator commands.""" current_tag = None if fix_list: if fix_list[0].startswith('@'): tagged_route = fix_list[2:] if fix_list[0].startswith('@!'): tagged_route = tagged_route[1:] current_tag = fix_list[0].lstrip('@!') tagged_routes[current_tag] = tagged_route fix_list = fix_list[1:] if fix_list[-1].startswith('@'): following_tag = fix_list[-1].lstrip('@') if current_tag is not None and current_tag == following_tag: raise RuntimeError( '''Unable to build as approach tagged as @{tag} is trying to reference itself. The following is the route= contents after the @tag: \n{lines}''' .format(tag=current_tag, lines="\n".join(fix_list))) yield from _process_approach_fix_list(fix_list[:-1], fixes, tagged_routes, generated_approaches) yield from process_approach_fix_list(tagged_routes[following_tag], fixes, tagged_routes, generated_approaches, False) else: yield from _process_approach_fix_list(fix_list, fixes, tagged_routes, generated_approaches) def process_departure_fix_list(fix_list, fixes, tagged_routes): """Processes special commands in a list of departure fixes in short format and produces an iterable of definitions as the result, or `None` if there is no result (fix_list was recorded in `tagged_routes`. Substitute any "![, ]" in `fix_list` with "lat, lon[, ]" based on `fixes`. If and the first item of fix_list is "@", record the rest of the `fix_list` in `tagged_routes` under "name". If the second (first if a tagged departure route) item in `fix_list` is "@", substitute in the contents of the departure route tagged . Args: `fix_list` (list): A list of fix definitions. `fixes` (dict): A lookup of `Fix`es. `tagged_routes` (dict): A lookup of departure routes.""" if fix_list: if fix_list[0].startswith('@'): tagged_routes[fix_list[0].lstrip('@')] = fix_list[1:] return None else: return _process_departure_fix_list(fix_list, fixes, tagged_routes) def _process_departure_fix_list(fix_list, fixes, tagged_routes, top_level=True): if fix_list: if top_level: yield fix_list[0] fix_list = fix_list[1:] if fix_list[0].startswith('@'): yield from _process_departure_fix_list(tagged_routes[fix_list[0].lstrip('@')], fixes, tagged_routes, top_level=False) yield from process_fix_list(fix_list[1:], fixes) else: yield from process_fix_list(fix_list, fixes) def process_sids_fix_list(fix_list, fixes): """Processes special commands in a list of fixes in minor format and produces an iterable of definitions as the result. Substitute any "![, ]" in `fix_list` with "lat, lon[, ]" based on `fixes`. Args: `fix_list` (list): A list of fix definitions. `fixes` (dict): A lookup of `Fix`es.""" for line in fix_list: if line.startswith('!'): def_fix, def_sep, def_data = line.lstrip('!').partition(',') yield fixes[def_fix.strip()].minor_def + def_sep + def_data else: yield line def process_repeatable_departure_fix_list(fix_list, fixes, departure_routes): """`Processes special commands in a list of departure fixes in short format and produces an iterable of n iterables of definitions as the result. If the first item of `fix_list` is *n, n iterables are produced. Otherwise, `n = 1`. See `process_departure_fix_list` for further special commands.""" if not fix_list[0]: fix_list = fix_list[1:] if fix_list[0].startswith('*'): result = list(process_departure_fix_list(fix_list[1:], fixes, departure_routes)) for i in range(int(fix_list[0].removeprefix('*'))): yield result else: yield process_departure_fix_list(fix_list, fixes, departure_routes) def process_beacons(fixes): """Returns a generator of [airspace] beacons= lines, while removing any beacons with ! as the holding heading.""" for fix in fixes.values(): if not fix.heading.startswith("!"): yield fix.full_def def process_airlines_list(airline_list): """Returns generator of airline declaration strings based on the list of `Airline`s `airline_list`. If frequency is greater than 10, write floor(frequency / 10) 10 frequency declarations and 1x (frequency mod 10) declaration.""" for airline in airline_list: n, r = divmod(airline.frequency, 10) for i in range(n): yield f"{airline.callsign}, 10, {airline.types}, {airline.pronunciation}, {airline.directions}" if r: yield f"{airline.callsign}, {r}, {airline.types}, {airline.pronunciation}, {airline.directions}" def enumerate_routes(route_list, start=1): """built-in `enumerate()` but the enumeration is a string "route#".""" for route in route_list: yield f"route{start}", route start += 1 def process(args, input_file=None): """Parses an Endless ATC custom airport "source" file and expands certain special commands, then rewrites as a minimized output suitable for use by the game. Refer to argparse help for detailed syntax of special commands. Args: `args`: An `argparse.Namespace`. The command line args from the invoking module. `file`: The file to process. Defaults to `input_file` in `args`.""" # if input_file is None, we were invoked standalone and not from deploy.py if input_file is None: input_file = args.input_file output_file = input_file if args.output_file is None else args.output_file else: output_file = os.path.join(os.path.dirname(input_file), args.output_path, os.path.basename(input_file)) print(f"Building {input_file} to {output_file}") config = configparser.ConfigParser() config.read("common.ini") config.read(os.path.join(os.path.dirname(input_file), "common.ini")) Airline.callsigns = config['expand.callsigns'] if 'expand.callsigns' in config else {} default_gateways = config['expand.gateways'] if 'expand.gateways' in config else {} if 'legacy' not in args or not args.legacy: source = configparser.ConfigParser() source.read(input_file) # read optional header to be written in output header = None if 'meta' in source: if 'header' in source['meta']: header = ["# " + line for line in source['meta']['header'].splitlines()] header.extend([ "", f"# This file is generated from the source file {os.path.relpath(input_file, os.path.dirname(output_file))} using expand.py.", "# All comments have been stripped, and edits are not made directly to this file.", "# If you would like to contribute, or see the author's comments, please refer to the source file.", "", ""]) header = "\n".join(header) if 'callsigns' not in source['meta']: Airline.use_callsigns = False else: Airline.use_callsigns = source['meta'].getboolean('callsigns') if args.test_callsigns: Airline.test_callsigns = True # remove meta section so it won't be written in output del source['meta'] # build a fix database from [airspace] beacons= fixes = {fix.name: fix for fix in ( Fix(*definition.split(",")) for definition in source['airspace']['beacons'].strip().splitlines() )} source['airspace']['beacons'] = "\n".join(process_beacons(fixes)) source['airspace']['boundary'] = "\n".join( process_fix_list(source['airspace']['boundary'].splitlines(), fixes)) areas = {section: source[section] for section in source if section.startswith('area')} for area_data in areas.values(): if 'points' in area_data: area_data['points'] = "\n".join( process_fix_list(area_data['points'].splitlines(), fixes)) # process airport sections airports = {section: source[section] for section in source if section.startswith('airport')} for airport_data in airports.values(): gateways = dict((tuple(map(str.strip, gateway.split(","))) for gateway in airport_data['gateways'].strip().splitlines()), **default_gateways) if 'gateways' in airport_data else None if 'airlines' in airport_data: airlines = [Airline(*airline.split(","), gateways=gateways) for airline in airport_data['airlines'].splitlines() if airline] airport_data['airlines'] = "\n".join(process_airlines_list(airlines)) if 'sids' in airport_data: airport_data['sids'] = "\n".join( process_sids_fix_list(airport_data['sids'].splitlines(), fixes)) # process approach/transition sections approaches = [source[section] for section in source if section.startswith('approach') or section.startswith('transition')] tagged_approaches_by_runway = {} generated_approaches_by_runway = {} highest_app_index = 0 for approach_data in approaches: runway = approach_data['runway'] if runway not in tagged_approaches_by_runway: tagged_approaches_by_runway[runway] = {} tagged_approaches = tagged_approaches_by_runway[runway] generated_approaches = [] if 'beacon' in approach_data and ',' not in approach_data['beacon']: approach_data['beacon'] = approach_data['beacon'].removeprefix("!") if approach_data['beacon'] in fixes and fixes[approach_data['beacon']].heading.startswith('!'): approach_data['beacon'] = fixes[approach_data['beacon']].full_def for option in approach_data: if option.startswith('route'): approach_data[option] = "\n".join(process_approach_fix_list(approach_data[option].splitlines(), fixes, tagged_approaches, generated_approaches)) if runway not in generated_approaches_by_runway: generated_approaches_by_runway[approach_data['runway']] = [] generated_approaches_by_runway[approach_data['runway']].extend(generated_approaches) # find highest approach index as we need to add more approaches for section in source: if section.startswith('approach'): highest_app_index = max(highest_app_index, int(section[8:])) for runway, generated_approaches in generated_approaches_by_runway.items(): for generated_approach in generated_approaches: section = f'approach{highest_app_index}' highest_app_index += 1 beacon = generated_approach['beacon'] if fixes[beacon].heading.startswith('!'): beacon = fixes[beacon].full_def source[section] = { "runway": runway, "beacon": beacon, "route1": generated_approach['heading'] + "\n" + "\n".join( generated_approach['route']) } # process departure sections departures = [source[section] for section in source if section.startswith('departure')] tagged_departures = {} for departure_data in departures: routes = {int(option.removeprefix('route')): departure_data[option] for option in departure_data if option.startswith('route')} processed_routes = [] for route_index in sorted(routes): processed_routes.extend("\n".join(route) for route in process_repeatable_departure_fix_list( routes[route_index].splitlines(), fixes, tagged_departures) if route) del departure_data[f'route{route_index}'] departure_data.update(enumerate_routes(processed_routes, start=1)) # write output file if not args.parse_only: with open(output_file, 'w', newline='') as airport_file: airport_file.write(header) source.write(airport_file) # legacy processor in regex. Don't use for new projects. else: pattern = re.compile(r"^(?P\[airport(?P\d*)\])|(?P#!\t(?P[-\w]*), (?P\d*), (?P[\w/d]*, [-\w ]*, [nswe]*))|(?P#!expansionoutput(?P\d+))|(?P#!expansionoutputend)|(?P#!sid(?P[\d]+)x)") result = {'output': []} airport = 0 ignore_lines = False ignore_one_line = False sid_frequency = 0 sid_lines = [] with open(input_file, 'r', newline='') as airport_file: for line in airport_file: match = pattern.match(line) if match: if match['airport_section']: airport = match['airport_id'] if airport not in result: result[airport] = [] elif match['airline_entry']: total_frequency = int(match['airline_frequency']) frequencies = [] while total_frequency > 10: frequencies.append(10) total_frequency -= 10 frequencies.append(total_frequency) for frequency in frequencies: result[airport].append( f"\t{match['airline_code']}, {frequency}, {match['airline_parameters']}\n" ) elif match['result_marker']: result['output'].append(line) for result_line in result[match['result_id']]: result['output'].append(result_line) ignore_lines = True elif match['result_end_marker']: ignore_lines = False elif match['sid_marker']: sid_frequency = int(match['sid_frequency']) - 1 ignore_one_line = True if sid_frequency: if line.isspace(): for _ in range(sid_frequency): result['output'].extend(sid_lines) sid_frequency = 0 sid_lines = [] elif not len(sid_lines): sid_lines.append("\n") else: sid_lines.append(line) if not ignore_lines and not ignore_one_line: result['output'].append(line) if ignore_one_line: ignore_one_line = False if not args.parse_only: with open(output_file, 'w', newline='') as airport_file: airport_file.writelines(result['output']) return output_file if not args.parse_only else None if __name__ == "__main__": parser = argparse.ArgumentParser(description='''Expands certain commands to allow for concise Endless ATC airport source files. \n\n Available functions: \n\n In [airspace] boundary=, [area] points=, or the route= of an [approach/departure/transition], specify "!" instead of lat, lon to substitute the lat, lon from the fix with the corresponding name in [airspace] beacons=. In [airport] sids=, "!" can also be specified, and will become name, lat, lon instead. In [approach/departure/transition], "!" can also be specified in beacon=, where it will become the full fix definition. \n\n In [airport] airlines=, definitions with frequency >10 with be broken down into multiple definitions of frequency 10 or less. \n\n In [airport], you can define gateways=, a list of , direction. If gateways= is defined, the ", "" in [airport] airlines= becomes ", , ", and the directions are used based on the defined gateways. Default gateways can be specified in a common.ini in the same directory as the source file, or in the folder where this tool is located; gateways= still needs to be defined in an [airport] to activate this feature for the [airport]. \n\n In [airport] airlines=, the airline can be omitted if [meta] callsigns= is true. A lookup will be loaded from [expand.callsigns] defined in a common.ini in the same directory as the source file, or in the folder where this tool is located. The former takes precedence. [expand.callsigns] is a list of , . The first item in each line of airlines (stripped of a dash and anything after it, hereafter referred to as the "key") is used to lookup in [expand.callsigns] to obtain the pronunciation. If nothing is found, pronunication defaults to the key if it is longer than 3 characters (assuming it is a military callsign), otherwise it defaults to "0". \n\n In a [approach/transition] route=, specify "@" to "tag" the approach route. Any subsequent [approach] route= can then specify "@" as the last point to chain the approach route tagged as "name" to the end. \n\n In a [departure] route=, specify "@" to "tag" the route. This will remove the departure route from the resulting file (as it is an incomplete departure route). Any subsequent [departure] route= can then specify "@" as the *first* point (second line, after the route name) to chain the departure route tagged as "name" to the beginning. \n\n *n as the first line of a [departure] route= value will repeat that route n times. Obviously, a repeated route= cannot be tagged as it wouldn't make any sense.''') parser.add_argument('input_file') parser.add_argument('output_file', nargs='?') parser.add_argument('-l', '--legacy', action="store_true", help='''Use legacy processing method. Don't use for new projects. \n\n #!expansionoutput can be inserted on its own line in a source file terminated by #!expansionoutputend on a following line. This block, which should remain empty, will be used to write the result of expanding any airline definitions in a #! comment. Any #! definitions with frequency greater are split into entries with max 10 frequency each. \n\n #!sidx can be inserted before any "routex =" declaration in a [departure] section to repeat the route times. This can be used to adjust the distribution of traffic on each SID. Note the numbering of each "route" will not be adjusted. See renumber.py for such operation.''') process(parser.parse_args())