import csv import json import re import os from collections import defaultdict from pathlib import Path import configparser script_dir = os.path.dirname(__file__) # this script was written by Captain Tux - https://github.com/CaptainTux # the latest version will be available at https://github.com/AdamJCavanaugh/EndlessATCAirports # this is parser for the CIFP, published by the FAA # it provides SIDS and STARS for airports in the USA for EndlessATC # it also generates routes to final, i.e., procedures where pilots navigate to final from an IAF # the download of the current version is available here: # https://www.faa.gov/air_traffic/flight_info/aeronav/digital_products/cifp/download/ # don't touch this fieldnames = ['apt', 'id', 'transition', 'aob', 'fix', 'type', 'f_type', 'fix_type', 'index', 'name', 'lat', 'lon', 'speed', 'alt_top', 'alt_min', 'alt_2', 'hdg'] def parse_cifp(filename): rows = [] with open(filename, 'r') as f: lines = [line for line in f.readlines() if line[:5] in ['SUSAP', 'SUSAD'] or line[:10] == 'SUSAEAENRT'] for line in lines: apt = line[6:10].strip() or 'VOR' if apt == 'ENRT': apt = 'RNAV_FIX' apt_id = line[13:19].strip() transition = line[19:25].strip() hdg = line[70:73] # heading to fly when STAR terminates aob = line[82] fix = line[29:34].strip() entry_type = line[12] f_type = line[47:49] fix_type = line[26] sequence_index = line[26:29] name = line[93:123].strip() lat = f'{line[32:35]}.{line[35:37]}.{line[37:39]}.{line[39:41]}' lon = f'{line[41:45]}.{line[45:47]}.{line[47:49]}.{line[49:51]}' speed = line[99:102].strip() alt_top = line[84:89].strip() alt_min = line[89:94].strip() alt_2 = line[94:99].strip() rows.append({ 'apt': apt, 'id': apt_id, 'transition': transition, 'aob': aob, 'fix': fix, 'type': entry_type, 'f_type': f_type, 'fix_type': fix_type, 'index': sequence_index, 'name': name, 'lat': lat, 'lon': lon, 'speed': speed, 'alt_top': alt_top, 'alt_min': alt_min, 'alt_2': alt_2, 'hdg': hdg }) with open('cifp_out/cifp_parsed.csv', 'w', newline='') as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(rows) def write_sids(lines, beacons, apt, config, runway_prefix='', start_index=0): replace_runway_names = config['replace_runway_names'] runway_sids = defaultdict(lambda: defaultdict(list)) for line in lines: if line['type'] == 'A': beacons[line['apt']] = line if line['type'] == 'C': beacons[line['id']] = line sid_lines = [line for line in lines if line['type'] == 'D'] sid_list = defaultdict(list) all_runways = [line['id'] for line in lines if line['type'] == 'G'] for line in sid_lines: sid_list[line['id']].append(line) for sid_id, s_lines in sid_list.items(): transitions = defaultdict(list) for line in s_lines: transitions[line['transition']].append(line) runways = [re.match(r'.?RW[0-9]{2}.?|.?ALL', t) for t in transitions.keys()] runways = [m.group(0)[1:] for m in runways if m] init_sid = [] for transition, tlines in transitions.items(): if transition[1:] in runways: for name, trans in runway_sids[transition[1:]].items(): if sid_id in name: trans += tlines runway_sids[transition[1:]][f'{sid_id}.{tlines[-1]["fix"]}'] += tlines init_sid = tlines elif transition[1:] in beacons.keys(): for runway in runways: runway_sids[runway][f'{sid_id}.{transition[1:] or tlines[-1]["fix"]}'] += init_sid + tlines # for line in sid_lines: # print(line) # sid_id = line['id'] # transition = line['transition'][1:] # if transition in all_runways: # runway_sids[transition][sid_id].append(line) with open(f'cifp_out/{apt.lower()}/{apt}_sid_output.txt', 'w', newline='', encoding='utf8') as f: f.write(f'\n\n{"#" * 50}\n' f'# {apt.upper()} SIDS\n{"#" * 50}\n\n') i = start_index used_fixes = [] for s_runway, sids in runway_sids.items(): i += 1 runway_list = all_runways if s_runway == 'ALL' else [s_runway] for runway in runway_list: runway_name = f'{runway_prefix}{runway}' if runway_name in replace_runway_names: runway_name = replace_runway_names[runway_name] f.write(f'\n\n{"#" * 50}\n' f'[departure{i}]\n' f'runway = {runway_name}\n') j = 0 # route counter for sid, route in sids.items(): j += 1 f.write(f'\nroute{j} = \n' f' {sid}, {sid.split(".")[0]}\n') fix_name = '' for e in route: if fix_name != e['fix'] and e['fix']: fix_name = e['fix'] fix = beacons[fix_name] used_fixes.append(fix_name) f.write(f' {fix["lat"]}, {fix["lon"]}\n' f'# {fix_name}\n') f.write(f'\n\n{"#" * 50}\n' f'# BEACONS\n{"#" * 50}\n\n' f'beacons = \n') for name in sorted(set(used_fixes)): fix = beacons[name] f.write(f' {fix["id"]}, {fix["lat"]}, {fix["lon"]}, {fix["name"].lower()}\n') return i def write_stars(lines, beacons, apt, config, runway_prefix='', start_index=0): entrypoints = {k.upper(): json.loads(v) for k, v in config['entrypoints'].items()} replace_runway_names = {k.upper(): v for k, v in config['replace_runway_names'].items()} defaults = config['defaults'] alt_min_speeds = config['alt_min_speeds'] runway_stars = defaultdict(lambda: defaultdict(list)) for line in lines: if line['type'] == 'A': beacons[line['apt']] = line if line['type'] == 'C': beacons[line['id']] = line star_lines = [line for line in lines if line['type'] == 'E'] star_list = defaultdict(list) all_runways = [line['id'] for line in lines if line['type'] == 'G'] for line in star_lines: star_list[line['id']].append(line) for star_id, s_lines in star_list.items(): transitions = defaultdict(list) for line in s_lines: transitions[line['transition']].append(line) runways = [re.match(r'.?RW[0-9]{2}.?|.?ALL', t) for t in transitions.keys()] runways = [m.group(0)[1:] for m in runways if m] empty_trans_exists = False for transition, tlines in transitions.items(): if transition[1:] in beacons.keys() or not transition[1:]: if not transition[1:]: empty_trans_exists = True for runway in runways: runway_stars[runway][f'{transition[1:] or tlines[0]["fix"]}.{star_id}'] += tlines elif transition[1:] in runways: for name, trans in runway_stars[transition[1:]].items(): if star_id in name: trans += tlines if not empty_trans_exists: runway_stars[transition[1:]][f'{tlines[0]["fix"]}.{star_id}'] += tlines with open(f'cifp_out/{apt.lower()}/{apt}_star_output.txt', 'w', newline='') as f: f.write(f'\n\n{"#" * 50}\n' f'# {apt.upper()} STARS\n{"#" * 50}\n\n') i = start_index used_fixes = [] # get transition and first fix on route to calculate bearing for entrypoint in the end transitions = [] entries = [] for s_runway, stars in runway_stars.items(): runway_list = all_runways if s_runway == 'ALL' else [s_runway] for runway in runway_list: for star, route in stars.items(): fix_name = '' entry = None for e in route: if e['fix'] in entrypoints.keys() and not entry: entry = e['fix'] entryname = f'{entry}.{star.split(".")[1]}' if (entryname, runway) in transitions: entry = None break transitions.append((entryname, runway)) entries.append(entry) alt = int(entrypoints[entry]['alt']) or int(defaults['star_top_alt']) speed = int(entrypoints[entry]['speed']) or int(defaults['star_top_speed']) i += 1 runway_name = f'{runway_prefix}{runway}' if runway_name in replace_runway_names.keys(): runway_name = replace_runway_names[runway_name] f.write(f'\n\n{"#" * 50}\n' f'[approach{i}]\n' f'{"#" * 50}\n' f'# {entryname}\n' f'runway = {runway_name}\n' f'beacon = {entry}\n\n' f'route1 = \n' f' {str(entrypoints[entry]["bearing"]).zfill(3)}\n') if e['fix'] != fix_name and entry: fix_name = e['fix'] fix = beacons[fix_name] used_fixes.append(fix_name) alt_top = e['alt_top'] alt_min = e['alt_min'] if e['speed']: speed = e['speed'] if alt_min: if 'FL' in alt_min: alt_min = int(alt_min[2:]) * 100 alt = min(alt, int(alt_min)) elif alt_top: if 'FL' in alt_top: alt_top = int(alt_top[2:]) * 100 alt = min(alt, int(alt_top)) speed_alts = [a for a in alt_min_speeds.keys() if alt <= int(a)] if speed_alts: speed = min(speed, alt_min_speeds[max(speed_alts)]) line = f' {fix["lat"]}, {fix["lon"]}, {alt}, {speed} ; {e["fix"]}\n' f.write(line) if entry: f.write(f' end, {e["hdg"].strip() or "090"}') # f.write(f' 99.9, {alt}, {speed} ; end\n\n') f.write(f'\n\n{"#" * 50}\n' f'# BEACONS\n{"#" * 50}\n\n' f'beacons = \n') for name in sorted(set(used_fixes)): fix = beacons[name] f.write(f' {fix["id"]}, {fix["lat"]}, {fix["lon"]}, {fix["name"].lower()}\n') f.write(f'\n\n{"#" * 50}\n' f'# ENTRY POINTS\n{"#" * 50}\n\n' f'entrypoints = \n') for entrypoint in set(entries): bearing = entrypoints[entrypoint]['bearing'] f.write(f' {str(bearing).zfill(3)}, {entrypoint}\n') return i def write_approaches(lines, beacons, apt, config, runway_prefix='', start_index=0): final_app_fixes = {k.upper(): json.loads(v) for k, v in config['final_app_fixes'].items()} replace_runway_names = {k.upper(): v for k, v in config['replace_runway_names'].items()} defaults = config['defaults'] alt_min_speeds = config['alt_min_speeds'] runway_approaches = defaultdict(lambda: defaultdict(list)) for line in lines: if line['type'] == 'A': beacons[line['apt']] = line if line['type'] == 'C': beacons[line['id']] = line approach_lines = [line for line in lines if line['type'] == 'F'] approach_list = defaultdict(list) for line in approach_lines: approach_list[line['id']].append(line) for approach_id, a_lines in approach_list.items(): transitions = defaultdict(list) for line in a_lines: transitions[line['transition']].append(line) runway = f'{approach_id[1:]}' for transition, tlines in transitions.items(): contains_valid_faf = [e['fix'] for e in tlines if e['fix'] in final_app_fixes.keys()] if contains_valid_faf: if transition[1:] in beacons.keys(): runway_approaches[runway][f'{transition[1:] or tlines[0]["fix"]}.{approach_id}'] += tlines # for name, trans in runway_approaches[runway].items(): # trans += tlines if not transition[1:]: # final approach starts here for t in runway_approaches[runway].keys(): runway_approaches[runway][t] += tlines runway_approaches[runway][f'{tlines[0]["fix"]}.{approach_id}'] += tlines with open(f'cifp_out/{apt.lower()}/{apt}_approach_output.txt', 'w', newline='') as f: f.write(f'\n\n{"#" * 50}\n' f'# {apt.upper()} APPROACHES\n{"#" * 50}\n\n') i = start_index used_fixes = [] used_approaches = [] for runway, approaches in runway_approaches.items(): for approach, route in approaches.items(): apch_string = '' iaf = approach.split('.')[0] # if not any(re.match(rf'{iaf}\..{runway}.?', a) for a in used_approaches): contains_valid_faf = [e['fix'] for e in route if e['fix'] in final_app_fixes.keys()] if any(contains_valid_faf): used_approaches.append(approach) i += 1 runway_name = f'{runway_prefix}RW{runway}' replace_candidate = [v for r, v in replace_runway_names.items() if r in runway_name] if replace_candidate: runway_name = replace_candidate[0] used_fixes.append(iaf) apch_string += f'\n\n{"#" * 50}\n' \ f'[approach{i}]\n' \ f'{"#" * 50}\n' \ f'# {approach}\n' \ f'runway = {runway_name}\n' \ f'beacon = {iaf}\n\n' \ f'route1 = \n' \ f' 360\n' fix_name = '' speed = 250 alt = int(defaults['apch_top_alt']) entry = '' faf = '' app_string = '' for e in route: entry = e['fix'] if entry != fix_name and entry: fix_name = entry try: fix = beacons[fix_name] except KeyError: break # used_fixes.append(fix_name) alt_top = e['alt_top'] alt_min = e['alt_min'] if not e['transition'][1:]: speed = min(speed, int(defaults['max_approach_speed'])) alt = min(alt, int(defaults['max_approach_alt'])) if e['speed']: speed = e['speed'] if alt_min: if 'FL' in alt_min: alt_min = int(alt_min[2:]) * 100 alt = min(alt, int(alt_min)) elif alt_top: if 'FL' in alt_top: alt_top = int(alt_top[2:]) * 100 alt = min(alt, int(alt_top)) speed_alts = [a for a in alt_min_speeds.keys() if int(alt) <= int(a)] if speed_alts: speed = min(int(speed), int(alt_min_speeds[max(speed_alts)])) app_string += f' {fix["lat"]}, {fix["lon"]}, {alt}, {speed} ; {e["fix"]}\n' if entry in final_app_fixes.keys(): faf = entry apch_string += app_string app_string = '' if faf: apch_string += f' {final_app_fixes[faf]["final_length"]}, {final_app_fixes[faf]["alt"]}, ' \ f'{final_app_fixes[faf]["speed"]} ; end\n' f.write(apch_string) f.write(f'\n\n{"#" * 50}\n' f'# BEACONS\n{"#" * 50}\n\n' f'beacons = \n') for name in sorted(set(used_fixes)): fix = beacons[name] f.write(f' {fix["id"]}, {fix["lat"]}, {fix["lon"]}, {fix["name"].lower()}\n') return i def write_apt_data(apt): with open('cifp_out/cifp_parsed.csv', 'r') as f: reader = csv.DictReader(f) lines = list(reader) with open(f'cifp_out/apt_data/cifp_{apt.lower()}.csv', 'w', newline='') as fo: writer = csv.DictWriter(fo, fieldnames=fieldnames) writer.writeheader() wlines = [line for line in lines if line['apt'] == apt] wlines += [line for line in lines if line['apt'] in ['RNAV_FIX', 'VOR']] writer.writerows(wlines) def write_fix_coords(): with open('cifp_out/cifp_parsed.csv', 'r') as f: reader = csv.DictReader(f) lines = list(reader) with open(f'cifp_out/cifp_coords.csv', 'w', newline='') as fo: writer = csv.DictWriter(fo, fieldnames=fieldnames) writer.writeheader() wlines = [line for line in lines if line['apt'] in ['RNAV_FIX', 'VOR']] writer.writerows(wlines) def parse_approaches(apt, config, start_index=0): i = start_index write_apt_data(apt) with open(f'cifp_out/apt_data/cifp_{apt.lower()}.csv', 'r') as f: reader = csv.DictReader(f) lines = list(reader) with open('cifp_out/cifp_coords.csv', 'r') as f: beacon_list = list(csv.DictReader(f)) beacons = {line['id']: line for line in beacon_list if line['apt'] == 'RNAV_FIX' or line['apt'] == 'VOR'} runway_prefix = f'{apt}_' i = write_stars(lines, beacons, apt.lower(), config, runway_prefix=runway_prefix, start_index=i) i = write_approaches(lines, beacons, apt.lower(), config, runway_prefix=runway_prefix, start_index=i) return i def parse_sids(apt, config, start_index=0): i = start_index write_apt_data(apt) with open(f'cifp_out/apt_data/cifp_{apt.lower()}.csv', 'r') as f: reader = csv.DictReader(f) lines = list(reader) with open('cifp_out/cifp_coords.csv', 'r') as f: beacon_list = list(csv.DictReader(f)) beacons = {line['id']: line for line in beacon_list if line['apt'] == 'RNAV_FIX' or line['apt'] == 'VOR'} runway_prefix = f'{apt}_' i = write_sids(lines, beacons, apt.lower(), config, runway_prefix=runway_prefix, start_index=i) return i # you only need to run the parse_cifp() function once, it parses the CIFP data to a better format, used in this script # same thing for write_fix_coords() - note: has to run AFTER the parse_cifp() function if __name__ == '__main__': Path('EndlessATCAirports/tools/cifp_out/apt_data').mkdir(parents=True, exist_ok=True) config = configparser.ConfigParser(allow_no_value=True) config.read('cifp_config.ini') # parse_cifp() # write_fix_coords() airports = [a.upper() for a in config._sections['airports']] app_start_index = 0 # for approach numbering sid_start_index = 0 # for sid numbering for airport in airports: # make sure the output folder for each airport exists Path(f'cifp_out/{airport.lower()}').mkdir(parents=True, exist_ok=True) app_start_index = parse_approaches(airport, config._sections, app_start_index) sid_start_index = parse_sids(airport, config, sid_start_index) pass