Simple flake8 pass

This commit is contained in:
Calvin Ng
2020-12-29 17:15:23 -08:00
parent 20647aa35a
commit ab111ae950
4 changed files with 261 additions and 201 deletions
+4
View File
@@ -0,0 +1,4 @@
[flake8]
ignore =
E501,
E128
+68 -41
View File
@@ -6,52 +6,79 @@ import shutil
config = configparser.ConfigParser()
config.read('eatcdev.ini')
default_destination = config['deploy'].get("path", "C:\\Program Files (x86)\\Steam\\steamapps\\common\\Endless ATC\\locations\\") if 'deploy' in config else "C:\\Program Files (x86)\\Steam\\steamapps\\common\\Endless ATC\\locations\\"
if 'deploy' not in config:
config.add_section('deploy')
default_destination = config['deploy'].get(
"path",
"C:\\Program Files (x86)\\Steam\\steamapps\\common\\Endless ATC\\locations\\"
)
def main(args):
if args.build:
import expand
import renumber
if args.build:
import expand
import renumber
destination = args.destination_path or default_destination
if args.deploy:
print(f"Deploying to {args.destination_path or destination}.")
destination = args.destination_path or default_destination
if args.deploy:
print(f"Deploying to {args.destination_path or destination}.")
if not args.codes:
import distutils.util
if not distutils.util.strtobool(input("Confirm you wish to process all airport files? (y/n) ")):
print("Aborting.")
return
args.codes.append("")
if not args.codes:
from distutils.util import strtobool
if not strtobool(input("Confirm you wish to process all airport files? (y/n) ")):
print("Aborting.")
return
args.codes.append("")
for code in args.codes:
path = os.path.join(
args.input_dir,
"**",
args.source_dir,
args.pattern.format(code=code))
for file in glob.glob(path, recursive=True):
print(f'Found {file}')
if args.build:
if not args.legacy:
renumber.main(args, file)
file = expand.main(args, file)
if args.legacy:
renumber.main(args, file)
if args.deploy:
result = shutil.copy(file, destination)
print(f"Copied {file} to {result}")
for code in args.codes:
path = os.path.join(args.input_dir, "**", args.source_dir, args.pattern.format(code = code))
for file in glob.glob(path, recursive=True):
print(f'Found {file}')
if args.build:
if not args.legacy:
renumber.main(args, file)
file = expand.main(args, file)
if args.legacy:
renumber.main(args, file)
if args.deploy:
result = shutil.copy(file, destination)
print(f"Copied {file} to {result}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Build specified Endless ATC airports and copy to the Endless ATC directory.", epilog="https://github.com/AdamJCavanaugh/EndlessATCAirports")
parser.add_argument('codes', nargs='*', help="Airport codes to build and deploy. Prefixes can be used. If no code specified, you will be prompted if all airport files are to be processed.", metavar="code")
parser.add_argument('-w', '--input-dir', default=os.path.join(os.pardir, 'final'), help='''The directory containing the airport files.
Subdirectories that are not source directories will also be searched. Defaults to '../final'.''')
parser.add_argument('-s', '--source-dir', default='source', help="The name of the folders that will contain source files. Defaults to 'source'.")
parser.add_argument('-p', '--pattern', default='{code}*.txt', help="The glob pattern for the file names to build based on the input codes. Defaults to '{code}*.txt'.")
parser.add_argument('-o', '--output-path', default=os.pardir, help='''The path to the directory to store the output of the build process relative to the source file.
Defaults to the parent directory relative to the source file.''')
parser.add_argument('-d', '--destination-path', help='''The directory to copy the output of the build process to (e.g. Endless ATC locations folder).
Defaults to "C:\\Program Files (x86)\\Steam\\steamapps\\common\\Endless ATC\\locations\\".
This default can be overridden by a 'path = ' entry under a [deploy] section in an eatcdev.ini.''')
parser.add_argument('-n', '--no-build', action='store_false', dest='build', help='Specify to skip build, and just copy sources to output folder.')
parser.add_argument('-b', '--build-only', action='store_false', dest='deploy', help="Specify this option to skip copying output of build processes to destination folder.")
parser.add_argument('-l', '--legacy', action="store_true", help="Use legacy processing method.")
parser = argparse.ArgumentParser(
description="Build specified Endless ATC airports and copy to the Endless ATC directory.",
epilog="https://github.com/AdamJCavanaugh/EndlessATCAirports")
parser.add_argument('codes', nargs='*', metavar="code",
help='''Airport codes to build and deploy. Prefixes can be used.
If no code specified, you will be prompted if all airport files are to be processed.''')
parser.add_argument('-w', '--input-dir',
default=os.path.join(os.pardir, 'final'),
help='''The directory containing the airport files.
Subdirectories that are not source directories will also be searched.
Defaults to '../final'.''')
parser.add_argument('-s', '--source-dir', default='source',
help='''The name of the folders that will contain source files.
Defaults to 'source'.''')
parser.add_argument('-p', '--pattern', default='{code}*.txt',
help="The glob pattern for the file names to build based on the input codes. Defaults to '{code}*.txt'.")
parser.add_argument('-o', '--output-path', default=os.pardir,
help='''The path to the directory to store the output of the build process relative to the source file.
Defaults to the parent directory relative to the source file.''')
parser.add_argument('-d', '--destination-path',
help='''The directory to copy the output of the build process to (e.g. Endless ATC locations folder).
Defaults to "C:\\Program Files (x86)\\Steam\\steamapps\\common\\Endless ATC\\locations\\".
This default can be overridden by a 'path = ' entry under a [deploy] section in an eatcdev.ini.''')
parser.add_argument('-n', '--no-build', action='store_false', dest='build',
help='Specify to skip build, and just copy sources to output folder.')
parser.add_argument('-b', '--build-only',
action='store_false', dest='deploy',
help="Specify this option to skip copying output of build processes to destination folder.")
parser.add_argument('-l', '--legacy',
action="store_true", help="Use legacy processing method.")
main(parser.parse_args())
main(parser.parse_args())
+168 -141
View File
@@ -7,171 +7,198 @@ import re
Fix = collections.namedtuple("Point", ['name', 'lat', 'lon', 'heading', 'pronunciation'])
Airline = collections.namedtuple("Airline", ['callsign', 'frequency', 'types', 'pronunciation', 'directions'])
def process_fix_list(l, fixes):
for line in l:
if line.startswith('!'):
def_fix, def_sep, def_data = line.lstrip('!').partition(',')
yield f"{fixes[def_fix.strip()].lat}, {fixes[def_fix.strip()].lon}" + def_sep + def_data
else:
yield line
def process_repeatable_fix_list(l, fixes):
if l[0].startswith('*'):
result = list(process_fix_list(l[1:], fixes))
for i in range(int(l[0].removeprefix('*'))):
yield result
else:
yield process_fix_list(l, fixes)
def process_fix_list(fix_list, fixes):
for line in fix_list:
if line.startswith('!'):
def_fix, def_sep, def_data = line.lstrip('!').partition(',')
yield f"{fixes[def_fix.strip()].lat}, {fixes[def_fix.strip()].lon}" + def_sep + def_data
else:
yield line
def process_airlines_list(l):
for airline in l:
n, r = divmod(int(airline.frequency), 10)
for i in range(n):
yield f"{airline.callsign}, 10, {airline.types}, {airline.pronunciation}, {airline.directions}"
if r:
yield f"{airline.callsign}, {r}, {airline.types}, {airline.pronunciation}, {airline.directions}"
def enumerate_routes(l, start=1):
for route in l:
yield f"route{start}", route
start += 1
def process_repeatable_fix_list(fix_list, fixes):
if fix_list[0].startswith('*'):
result = list(process_fix_list(fix_list[1:], fixes))
for i in range(int(fix_list[0].removeprefix('*'))):
yield result
else:
yield process_fix_list(fix_list, fixes)
def process_airlines_list(airline_list):
for airline in airline_list:
n, r = divmod(int(airline.frequency), 10)
for i in range(n):
yield f"{airline.callsign}, 10, {airline.types}, {airline.pronunciation}, {airline.directions}"
if r:
yield f"{airline.callsign}, {r}, {airline.types}, {airline.pronunciation}, {airline.directions}"
def enumerate_routes(route_list, start=1):
for route in route_list:
yield f"route{start}", route
start += 1
def main(args, input_file=None):
if input_file is None:
input_file = args.input_file
output_file = input_file if args.output_file is None else args.output_file
else:
output_file = os.path.join(os.path.dirname(input_file), args.output_path, os.path.basename(input_file))
print("Building {0} to {1}".format(input_file, output_file))
if not 'legacy' in args or not args.legacy:
config = configparser.ConfigParser()
config.read(input_file)
# if input_file is None, we were invoked standalone and not from deploy.py
if input_file is None:
input_file = args.input_file
output_file = input_file if args.output_file is None else args.output_file
else:
output_file = os.path.join(os.path.dirname(input_file), args.output_path, os.path.basename(input_file))
print(f"Building {input_file} to {output_file}")
header = None
if 'meta' in config and 'header' in config['meta']:
header = ["# " + line for line in config['meta']['header'].splitlines()]
header.extend([
"",
f"# This file is generated from the source file {os.path.relpath(input_file, os.path.dirname(output_file))} using expand.py.",
"# All comments have been stripped, and edits are not made directly to this file.",
"# If you would like to contribute, or see the author's comments, please refer to the source file.",
"",
""])
header = "\n".join(header)
del config['meta']
if 'legacy' not in args or not args.legacy:
config = configparser.ConfigParser()
config.read(input_file)
fixes = {fix.name: fix for fix in (Fix(*map(str.strip, definition.split(","))) for definition in config['airspace']['beacons'].strip().splitlines())}
# read optional header to be written in output
header = None
if 'meta' in config and 'header' in config['meta']:
header = ["# " + line for line in config['meta']['header'].splitlines()]
header.extend([
"",
f"# This file is generated from the source file {os.path.relpath(input_file, os.path.dirname(output_file))} using expand.py.",
"# All comments have been stripped, and edits are not made directly to this file.",
"# If you would like to contribute, or see the author's comments, please refer to the source file.",
"",
""])
header = "\n".join(header)
# remove meta section so it won't be written in output
del config['meta']
config['airspace']['boundary'] = "\n".join(process_fix_list(config['airspace']['boundary'].splitlines(), fixes))
# build a fix database from [airspace] beacons=
fixes = {fix.name: fix for fix in (
Fix(*map(str.strip, definition.split(","))) for definition in
config['airspace']['beacons'].strip().splitlines()
)}
airports = {section: config[section] for section in config if section.startswith('airport')}
config['airspace']['boundary'] = "\n".join(
process_fix_list(config['airspace']['boundary'].splitlines(), fixes))
for airport_data in airports.values():
if 'airlines' in airport_data:
airlines = [Airline(*(value.strip() for value in airline.split(","))) for airline in airport_data['airlines'].splitlines() if airline]
airport_data['airlines'] = "\n".join(process_airlines_list(airlines))
# process airport sections
airports = {section: config[section] for section in config if section.startswith('airport')}
approaches = {section: config[section] for section in config if section.startswith('approach') or section.startswith('transition')}
for approach in approaches.values():
for option in approach:
if option.startswith('route'):
approach[option] = "\n".join(process_fix_list(approach[option].splitlines(), fixes))
for airport_data in airports.values():
if 'airlines' in airport_data:
airlines = [Airline(*(value.strip() for value in airline.split(",")))
for airline in airport_data['airlines'].splitlines() if airline]
airport_data['airlines'] = "\n".join(process_airlines_list(airlines))
departures = {section: config[section] for section in config if section.startswith('departure')}
for departure_data in departures.values():
routes = {option.removeprefix('route'): departure_data[option] for option in departure_data if option.startswith('route')}
processed_routes = []
for route_index in sorted(routes):
processed_routes.extend("\n".join(route) for route in process_repeatable_fix_list(routes[route_index].splitlines(), fixes))
departure_data.update(enumerate_routes(processed_routes, start=1))
# process approach/transition sections
approaches = {section: config[section] for section in config
if section.startswith('approach') or section.startswith('transition')}
with open(output_file, 'w', newline='') as airport_file:
airport_file.write(header)
config.write(airport_file)
for approach in approaches.values():
for option in approach:
if option.startswith('route'):
approach[option] = "\n".join(process_fix_list(approach[option].splitlines(), fixes))
else:
pattern = re.compile(r"^(?P<airport_section>\[airport(?P<airport_id>\d*)\])|(?P<airline_entry>#!\t(?P<airline_code>[-\w]*), (?P<airline_frequency>\d*), (?P<airline_parameters>[\w/d]*, [-\w ]*, [nswe]*))|(?P<result_marker>#!expansionoutput(?P<result_id>\d+))|(?P<result_end_marker>#!expansionoutputend)|(?P<sid_marker>#!sid(?P<sid_frequency>[\d]+)x)")
result = {'output': []}
airport = 0
ignore_lines = False
ignore_one_line = False
sid_frequency = 0
sid_lines = []
# process departure sections
departures = {section: config[section] for section in config if section.startswith('departure')}
with open(input_file, 'r', newline='') as airport_file:
for line in airport_file:
match = pattern.match(line)
if match:
if match['airport_section']:
airport = match['airport_id']
if not airport in result:
result[airport] = []
for departure_data in departures.values():
routes = {option.removeprefix('route'): departure_data[option] for option in departure_data if option.startswith('route')}
processed_routes = []
for route_index in sorted(routes):
processed_routes.extend("\n".join(route) for route in process_repeatable_fix_list(routes[route_index].splitlines(), fixes))
departure_data.update(enumerate_routes(processed_routes, start=1))
elif match['airline_entry']:
total_frequency = int(match['airline_frequency'])
frequencies = []
while total_frequency > 10:
frequencies.append(10)
total_frequency -= 10
frequencies.append(total_frequency)
for frequency in frequencies:
result[airport].append("\t{match[airline_code]}, {frequency}, {match[airline_parameters]}\n".format(match=match, frequency=frequency))
# write output file
with open(output_file, 'w', newline='') as airport_file:
airport_file.write(header)
config.write(airport_file)
elif match['result_marker']:
result['output'].append(line)
for result_line in result[match['result_id']]:
result['output'].append(result_line)
ignore_lines = True
# legacy processor in regex. Don't use for new projects.
else:
pattern = re.compile(r"^(?P<airport_section>\[airport(?P<airport_id>\d*)\])|(?P<airline_entry>#!\t(?P<airline_code>[-\w]*), (?P<airline_frequency>\d*), (?P<airline_parameters>[\w/d]*, [-\w ]*, [nswe]*))|(?P<result_marker>#!expansionoutput(?P<result_id>\d+))|(?P<result_end_marker>#!expansionoutputend)|(?P<sid_marker>#!sid(?P<sid_frequency>[\d]+)x)")
elif match['result_end_marker']:
ignore_lines = False
result = {'output': []}
airport = 0
ignore_lines = False
ignore_one_line = False
sid_frequency = 0
sid_lines = []
elif match['sid_marker']:
sid_frequency = int(match['sid_frequency']) - 1
ignore_one_line = True
with open(input_file, 'r', newline='') as airport_file:
for line in airport_file:
match = pattern.match(line)
if match:
if match['airport_section']:
airport = match['airport_id']
if airport not in result:
result[airport] = []
if sid_frequency:
if line.isspace():
for _ in range(sid_frequency):
result['output'].extend(sid_lines)
sid_frequency = 0
sid_lines = []
elif not len(sid_lines):
sid_lines.append("\n")
else:
sid_lines.append(line)
elif match['airline_entry']:
total_frequency = int(match['airline_frequency'])
frequencies = []
while total_frequency > 10:
frequencies.append(10)
total_frequency -= 10
frequencies.append(total_frequency)
for frequency in frequencies:
result[airport].append(
f"\t{match['airline_code']}, {frequency}, {match['airline_parameters']}\n"
)
if not ignore_lines and not ignore_one_line:
result['output'].append(line)
elif match['result_marker']:
result['output'].append(line)
for result_line in result[match['result_id']]:
result['output'].append(result_line)
ignore_lines = True
if ignore_one_line:
ignore_one_line = False
elif match['result_end_marker']:
ignore_lines = False
with open(output_file, 'w', newline='') as airport_file:
airport_file.writelines(result['output'])
return output_file
elif match['sid_marker']:
sid_frequency = int(match['sid_frequency']) - 1
ignore_one_line = True
if sid_frequency:
if line.isspace():
for _ in range(sid_frequency):
result['output'].extend(sid_lines)
sid_frequency = 0
sid_lines = []
elif not len(sid_lines):
sid_lines.append("\n")
else:
sid_lines.append(line)
if not ignore_lines and not ignore_one_line:
result['output'].append(line)
if ignore_one_line:
ignore_one_line = False
with open(output_file, 'w', newline='') as airport_file:
airport_file.writelines(result['output'])
return output_file
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='''Expands certain commands to allow for concise Endless ATC airport source files.
\n\n
in [airspace] boundary=, or the route= of an [approach/departure/transition], specify !<name> instead of lat, lon
to substitute the lat, lon from the fix with the corresponding name in [airspace] beacons=.
\n\n
in [airport] airlines=, definitions with frequency >10 with be broken down into multiple definitions of frequency 10 or less.
\n\n
*n as the first line of a [departure] route= value will repeat that route n times.''')
parser.add_argument('input_file')
parser.add_argument('output_file', nargs='?')
parser.add_argument('-l', '--legacy', action="store_true", help='''Use legacy processing method. #!expansionoutput<airport_id> can be inserted on its own line in a source file terminated by
#!expansionoutputend on a following line. This block, which should remain empty, will be used
to write the result of expanding any airline definitions in a #! comment. Any #! definitions
with frequency greater are split into entries with max 10 frequency each.
parser = argparse.ArgumentParser(description='''Expands certain commands to allow for concise Endless ATC airport source files.
\n\n
in [airspace] boundary=, or the route= of an [approach/departure/transition], specify !<name> instead of lat, lon
to substitute the lat, lon from the fix with the corresponding name in [airspace] beacons=.
\n\n
in [airport] airlines=, definitions with frequency >10 with be broken down into multiple definitions of frequency 10 or less.
\n\n
*n as the first line of a [departure] route= value will repeat that route n times.''')
parser.add_argument('input_file')
parser.add_argument('output_file', nargs='?')
parser.add_argument('-l', '--legacy', action="store_true",
help='''Use legacy processing method. Don't use for new projects.
\n\n
#!expansionoutput<airport_id> can be inserted on its own line in a source file terminated by
#!expansionoutputend on a following line. This block, which should remain empty, will be used
to write the result of expanding any airline definitions in a #! comment. Any #! definitions
with frequency greater are split into entries with max 10 frequency each.
\n\n
#!sid<n>x can be inserted before any "routex =" declaration in a [departure] section to repeat the
route <n> times. This can be used to adjust the distribution of traffic on each SID. Note the
numbering of each "route" will not be adjusted. See renumber.py for such operation.''')
#!sid<n>x can be inserted before any "routex =" declaration in a [departure] section to repeat the
route <n> times. This can be used to adjust the distribution of traffic on each SID. Note the
numbering of each "route" will not be adjusted. See renumber.py for such operation.''')
main(parser.parse_args())
main(parser.parse_args())
+21 -19
View File
@@ -3,31 +3,33 @@ import re
header_re = re.compile(r"^(?:\[(?P<header>(?:approach)|(?:transition)|(?:departure)|(?:area))\d*\])|^(?:route\d+ *= *)")
def main(args, file=None):
def number_approach(match, indexes={'approach': 0, 'transition': 0, 'departure': 0, 'area': 0, 'route': 0}):
header = match.group("header")
if header:
indexes['route'] = 0
else:
header = 'route'
indexes[header] += 1
return (header == 'route' and "{header}{index} = " or "[{header}{index}]").format(header = header, index = indexes[header])
def number_approach(match, indexes={'approach': 0, 'transition': 0, 'departure': 0, 'area': 0, 'route': 0}):
header = match.group("header")
if header:
indexes['route'] = 0
else:
header = 'route'
indexes[header] += 1
return header == 'route' and f"{header}{indexes[header]} = " or f"[{header}{indexes[header]}]"
if file is None:
file = args.airport_file
if file is None:
file = args.airport_file
result = []
result = []
with open(file, 'r', newline='') as airport_file:
for line in airport_file:
result.append(header_re.sub(number_approach, line))
with open(file, 'r', newline='') as airport_file:
for line in airport_file:
result.append(header_re.sub(number_approach, line))
with open(file, 'w', newline='') as airport_file:
airport_file.writelines(result)
with open(file, 'w', newline='') as airport_file:
airport_file.writelines(result)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Re-number [approach] sections for Endless ATC airport files.')
parser.add_argument('airport_file')
parser = argparse.ArgumentParser(description='Re-number [approach] sections for Endless ATC airport files.')
parser.add_argument('airport_file')
main(parser.parse_args())
main(parser.parse_args())