289 lines
13 KiB
Python
289 lines
13 KiB
Python
#!/usr/bin/env python
|
|
|
|
import sys
|
|
from pathlib import Path
|
|
import os
|
|
import yaml
|
|
import jinja2
|
|
|
|
class FileChecks:
|
|
'file check rules'
|
|
def __init__(self, config_contexts_dir, templates_dir, inventory_file):
|
|
self.config_contexts_dir = config_contexts_dir
|
|
self.templates_dir = templates_dir
|
|
self.inventory_file = inventory_file
|
|
self.inventory_dict = self.__read_inventory_file(self.inventory_file)
|
|
self.__check_context_files(self.config_contexts_dir)
|
|
self.__check_platform_templates(self.inventory_dict, self.templates_dir)
|
|
|
|
def __read_inventory_file(self, inventory_file):
|
|
try:
|
|
with open(inventory_file, "r") as file:
|
|
return yaml.safe_load(file)
|
|
except Exception as ex:
|
|
if type(ex).__name__ == 'ScannerError':
|
|
print(f'Error {inventory_file}: is not valid YAML')
|
|
sys.exit(1)
|
|
elif type(ex).__name__ == 'FileNotFoundError':
|
|
print(f'Error {inventory_file}: {ex.args[1]}')
|
|
sys.exit(1)
|
|
else:
|
|
print(f'Error: {inventory_file}')
|
|
sys.exit(1)
|
|
|
|
def __check_context_files(self, config_contexts_dir):
|
|
context_files = {}
|
|
# check for contexts directory - need an all file, dont need a device file
|
|
for path in Path(config_contexts_dir).rglob('*.yml'):
|
|
context_files.update({str(path): { 'file': path.name,'path': str(path.parent)}})
|
|
file_names = [context_files[i]['file'] for i in context_files.keys()]
|
|
duplicate_files = list(set([i for i in file_names if file_names.count(i) > 1]))
|
|
if len(duplicate_files) >0:
|
|
print('==== to maintain readability there no logic to handle duplicate file names under the config_contexts directory')
|
|
for i in duplicate_files:
|
|
for j in context_files.keys():
|
|
if context_files[j]['file'] == i:
|
|
print(f"duplicate file name {context_files[j]['file']} @ {j}")
|
|
sys.exit(1)
|
|
|
|
def __check_platform_templates(self, inventory_dict, templates_dir):
|
|
platforms = set([inventory_dict[i]['platform'] for i in inventory_dict.keys()])
|
|
for i in platforms:
|
|
template = Path(f'{templates_dir}/{i}.j2')
|
|
if not template.is_file():
|
|
target_device_list = [k for k, v in inventory_dict.items() if v['platform'] == i]
|
|
target_devices = '\n'.join(target_device_list)
|
|
print(f'==== missing template {template}, nothing will be rendered for devices:\n{target_devices}')
|
|
|
|
class LoadInventory(FileChecks):
|
|
'load inventory'
|
|
def __init__(self, config_contexts_dir, templates_dir, inventory_file):
|
|
FileChecks.__init__(self, config_contexts_dir, templates_dir, inventory_file)
|
|
self.all_config_contexts, self.devices = self.__load_contexts(self.inventory_dict)
|
|
#print(yaml.dump(self.all_config_contexts))
|
|
|
|
def __remove_meta(self, data):
|
|
try:
|
|
del data['_metadata']
|
|
except KeyError:
|
|
pass
|
|
return data
|
|
|
|
def __load_contexts(self, inventory_dict):
|
|
# get path of all device and context files
|
|
devices_path = set()
|
|
contexts_path = set()
|
|
devices = []
|
|
for i in inventory_dict.keys():
|
|
devices_path.add(f'{self.config_contexts_dir}/device/{i}.yml')
|
|
devices.append(i)
|
|
for k, v in inventory_dict[i]['config_contexts'].items():
|
|
if isinstance(v, list):
|
|
for i in v:
|
|
contexts_path.add(f'{self.config_contexts_dir}/{k}/{i}.yml')
|
|
else:
|
|
contexts_path.add(f'{self.config_contexts_dir}/{k}/{v}.yml')
|
|
# print(devices_path)
|
|
# print(contexts_path)
|
|
|
|
# load config contexts
|
|
all_config_contexts = {}
|
|
for i in contexts_path:
|
|
try:
|
|
with open(i, 'r') as file:
|
|
data = yaml.safe_load(file)
|
|
if data['_metadata']['weight'] and data['_metadata']['is_active']:
|
|
# build the context_name from the file parent directory
|
|
# context naming could be sourced from the metadata dict but the potential for duplicate filenames under config_contexts subdirectories is to be discouraged
|
|
context = os.path.splitext(os.path.basename(i))[0]
|
|
all_config_contexts.update({context: {'weight': data['_metadata']['weight'], 'content': self.__remove_meta(data)}})
|
|
except:
|
|
pass
|
|
# print(all_config_contexts)
|
|
|
|
# load device contexts
|
|
for i in devices_path:
|
|
try:
|
|
with open(i, 'r') as file:
|
|
data = yaml.safe_load(file)
|
|
context = os.path.splitext(os.path.basename(i))[0]
|
|
all_config_contexts.update({context: {'weight': 0, 'content': self.__remove_meta(data)}})
|
|
except:
|
|
pass
|
|
|
|
# load all contexts
|
|
all_weight = sorted([all_config_contexts[k]['weight'] for k, v in all_config_contexts.items()], reverse=True)
|
|
max_weight = all_weight[0]+1
|
|
try:
|
|
with open(f'{self.config_contexts_dir}/all.yml', 'r') as file:
|
|
data = yaml.safe_load(file)
|
|
if data['_metadata']['is_active']:
|
|
all_config_contexts.update({'all': {'weight': max_weight, 'content': self.__remove_meta(data)}})
|
|
except:
|
|
pass
|
|
|
|
return all_config_contexts, devices
|
|
|
|
def get_contexts(self):
|
|
return self.all_config_contexts
|
|
|
|
def get_devices(self):
|
|
return self.devices
|
|
|
|
def get_device_platform(self, device):
|
|
return self.inventory_dict[device]['platform']
|
|
|
|
def get_device_template(self, device):
|
|
platform = self.get_device_platform(device)
|
|
return f'{self.templates_dir}/{platform}.j2'
|
|
|
|
def get_device_contexts(self, device):
|
|
device_contexts = {}
|
|
all_device_contexts = ['all', device]
|
|
for i in all_device_contexts:
|
|
if i in self.all_config_contexts:
|
|
device_contexts.update({i: self.all_config_contexts[i]})
|
|
for k, v in self.inventory_dict[device]['config_contexts'].items():
|
|
if isinstance(v, list):
|
|
for i in v:
|
|
if i in self.all_config_contexts:
|
|
device_contexts.update({i: self.all_config_contexts[i]})
|
|
elif v in self.all_config_contexts:
|
|
device_contexts.update({v: self.all_config_contexts[v]})
|
|
return device_contexts
|
|
|
|
class BuildDeviceContext:
|
|
'build config_context for a device'
|
|
def __init__(self, LoadInventory_instance, device):
|
|
self.device = device
|
|
self.contexts = LoadInventory_instance.get_device_contexts(self.device)
|
|
self.platform = LoadInventory_instance.get_device_platform(self.device)
|
|
# print(self.contexts)
|
|
# print(self.platform)
|
|
self.config_context = self.__config_context(self.device, self.contexts)
|
|
|
|
def __config_context(self, device, contexts):
|
|
# check for equally weighted clashing keys
|
|
all_weight = sorted([contexts[k]['weight'] for k, v in contexts.items()], reverse=True)
|
|
dupe_weight = list(set([x for x in all_weight if all_weight.count(x) > 1]))
|
|
equal_weight = {}
|
|
for i in dupe_weight:
|
|
equal_weight.update({i: []})
|
|
for k in contexts.keys():
|
|
if contexts[k]['weight'] == i:
|
|
equal_weight[i].append(k)
|
|
# print(f'equal weight contexts: {equal_weight}')
|
|
|
|
# list all keys in equally weighted contexts
|
|
keys_list = []
|
|
for k in equal_weight.keys():
|
|
for i in equal_weight[k]:
|
|
for k in contexts[i]['content'].keys():
|
|
keys_list.append(k)
|
|
keys_list = list(set(keys_list))
|
|
# print(f'all keys in equal weight contexts: {keys_list}')
|
|
|
|
# check each equally weighted context for clashing keys
|
|
equal_weight_key_clash_errors = []
|
|
for w in equal_weight:
|
|
for k in keys_list:
|
|
key_occurance = []
|
|
for i in equal_weight[w]:
|
|
if k in contexts[i]['content']:
|
|
key_occurance.append(i)
|
|
if len(key_occurance) >1:
|
|
equal_weight_key_clash_errors.append(f'clashing key found in equally weighted contexts: \nkey: {k} \ncontexts: {key_occurance}')
|
|
if len(equal_weight_key_clash_errors) >0:
|
|
error_log = f'==== device config_context render issue: {device}\n'
|
|
for i in equal_weight_key_clash_errors:
|
|
error_log = error_log + f'\n{i}\n'
|
|
print(error_log)
|
|
return {}
|
|
|
|
# reorder all_config_contexts by weight, update the config_context with each context in order of precedence, overwriting top level keys where they exist (the last context being the lowest weight and highest precedence)
|
|
config_context = {}
|
|
contexts = {k: v for k, v in sorted(contexts.items(), key=lambda x: x[1]['weight'], reverse=True)}
|
|
for k, v in contexts.items():
|
|
config_context = {**config_context, **v['content']}
|
|
config_context = dict(sorted(config_context.items()))
|
|
# print(yaml.dump(config_context))
|
|
return config_context
|
|
|
|
def get_config_context(self):
|
|
return self.config_context
|
|
|
|
def get_device_name(self):
|
|
return self.device
|
|
|
|
class RenderDeviceConfig(BuildDeviceContext):
|
|
'build config_context for a device'
|
|
def __init__(self, LoadInventory_instance, device):
|
|
BuildDeviceContext.__init__(self, LoadInventory_instance, device)
|
|
self.device_template = LoadInventory_instance.get_device_template(self.device)
|
|
self.rendered_template = self.__rendered_template(self.device, self.config_context, self.device_template)
|
|
|
|
def __rendered_template(self, device, config_context, device_template):
|
|
if len(config_context) >0:
|
|
template_directory = os.path.dirname(device_template)
|
|
template_file = os.path.basename(device_template)
|
|
# rendered_template_file = f'{device}.txt'
|
|
environment = jinja2.Environment(loader=jinja2.FileSystemLoader(template_directory))
|
|
environment.trim_blocks = True
|
|
environment.lstrip_blocks = True
|
|
# add config_context dict to a global in the rendering environment, access variables like golden config
|
|
environment.globals['config_context'] = config_context
|
|
# pretty_config_context is for illustration only, non functional variable as keys cannot be accessed
|
|
environment.globals['pretty_config_context'] = yaml.dump(config_context, default_flow_style=0)
|
|
template = environment.get_template(template_file)
|
|
# render without specifying variables or a dict as input, instead use environment.globals['config_context']
|
|
rendered_template_content = template.render()
|
|
# rendered_template_content = template.render(config_context)
|
|
return rendered_template_content
|
|
else:
|
|
print(f'==== device template render issue: {device}\n\n{os.path.dirname(device_template)}/{os.path.basename(device_template)}\nempty config_context {config_context} (clashing top level keys in config_contexts)\n')
|
|
return
|
|
|
|
def print_rendered_template(self):
|
|
if self.rendered_template is not None:
|
|
print(f'==== {self.device} file content:\n\n{self.rendered_template}\n')
|
|
|
|
def write_rendered_template(self):
|
|
if self.rendered_template is not None:
|
|
# this needs more logic to pass another parameter for dest dir
|
|
rendered_template_file = f'{self.device}.txt'
|
|
with open(rendered_template_file, mode="w", encoding="utf-8") as message:
|
|
message.write(self.rendered_template)
|
|
print(f'==== {self.device} file written: {rendered_template_file}')
|
|
|
|
def main():
|
|
config_contexts_dir = './config_contexts'
|
|
templates_dir = './templates'
|
|
inventory_file = "./inventory.yml"
|
|
inventory = LoadInventory(config_contexts_dir, templates_dir, inventory_file)
|
|
# print(inventory.get_devices())
|
|
# print(yaml.dump(inventory.get_contexts()))
|
|
|
|
config_contexts_list = []
|
|
for i in inventory.get_devices():
|
|
context_obj = RenderDeviceConfig(inventory, i)
|
|
if len(context_obj.get_config_context()) >0:
|
|
# print(f'device:\n{context_obj.get_device_name()}\n')
|
|
# print(f'config_context:\n{yaml.dump(context_obj.get_config_context())}')
|
|
config_contexts_list.append(context_obj)
|
|
else:
|
|
del context_obj
|
|
|
|
for i in config_contexts_list:
|
|
i.print_rendered_template()
|
|
i.write_rendered_template()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|
|
# turn this into a tool?
|
|
# add comment block to each renderd config such as rendered date (maybe list templates in use?)
|
|
# needs a config file to point to contexts/folders, use toml top allow for comments
|
|
# needs a rendered artefact directory parameter somewhere, maybe this should be a write_rendered_template parameter to allow it to be placed in another git repo
|
|
# needs proper logging
|
|
# needs a mode to create example files/dirs
|