python_vpn_audit/vpn_audit/vpn_spreadsheet.py

693 lines
42 KiB
Python

import pandas
import xlsxwriter
from xlsxwriter.utility import xl_col_to_name # xlsxwriter.utility.xl_rowcol_to_cell is in use
import pymongo
from bson.json_util import dumps, loads
import logging
import re
import json
def build_spreadsheet(collection, devices_dict, outfile):
def workbook_sheets_order(writer, collection, devices_dict, static_sheets):
## pre-create workbook sheets in desired order, any reordering of the sheets after the fact, can break formula/hyperlink index relationships
devices = [n for n in devices_dict.keys()]
# find populated collections, write populated collection sheets in alphabetical order
devices_with_collections = [n for n in devices if not collection[n].find_one({},{"$item": 1, '_id': 1}) == None]
sheets = static_sheets + sorted(devices_with_collections)
dummy_df = pandas.DataFrame()
for s in sheets:
dummy_df.to_excel(writer, index=False, sheet_name=s)
del dummy_df
return [s for s in sheets if s not in static_sheets]
def populate_device_sheets(collection, writer, sheets):
# init
logger = logging.getLogger('main')
devices_dataframes = {}
spokes_df = pandas.DataFrame() # 'VPN Spokes'
#
## PROBLEM - phase2 profile (dmvpn not rri) not present in device collections, yet it is collected in 'cisco_transform_set' but not written - rather than full rescrape merge incrementally with ignore_src_schema_keys with 'p2_profile' - transform sets already captured so not sure necessary?
#
## define which columns and the order of columns to present in sheet, remove mongodb document '_id', not all db keys are useful
all_column_order = ["last_modified", "crypto_map", "crypto_map_template", "crypto_map_interface", "crypto_session_interface", "p1_profile", "RRI_enabled", "p1_status", "p2_status", "session_status", "local_ip", "local_port", "peer_ip", "peer_port", "peer_vpn_id", "p1_auth_type", "p1_dh_group", "p1_encr_algo", "p1_hash_algo", "p2_encr_algo", "p2_hash_algo", "pfs", "transform_sets", "ordered_transform_set", "p2_interface", "ipsec_flow", "p1_ivrf", "p2_fvrf", "protected_vrf", "p2_default_3des", "spoke_p2_default_3des", "spoke_p2_algo_preference", "DeviceName", "DeviceRecNum", "nhrp_nexthop", "Manufacturer", "Model"]
filtered_column_order = ["last_modified", "DeviceName", "Manufacturer", "Model", "DeviceRecNum", "crypto_map", "crypto_map_template", "crypto_session_interface", "p1_profile", "RRI_enabled", "session_status", "local_ip", "local_port", "peer_ip", "peer_port", "peer_vpn_id", "nhrp_nexthop", "p1_auth_type", "p1_dh_group", "p1_encr_algo", "p1_hash_algo", "p2_encr_algo", "p2_hash_algo", "pfs", "protected_vrf", "ordered_transform_set", "ipsec_flow", "p2_default_3des", "spoke_p2_default_3des", "spoke_p2_algo_preference"]
projection = {'_id': 0}
for c in filtered_column_order:
projection.update({c: 1})
## populate device sheets, update devices sheet
for sheet in sheets:
# print(f'building excel sheet {sheet}')
logger.info(f'building excel sheet {sheet}')
## load new device dataframe
device_table_df = pandas.DataFrame.from_dict(collection[sheet].find({}, projection)) # would like to order by vpn_id to make spotting config changes simple
## copy device dataframe spoke records to 'VPN Spokes' dataframe
cols_to_copy = ['last_modified', 'DeviceName', 'Manufacturer', 'Model', 'DeviceRecNum', 'peer_ip', 'peer_vpn_id', 'nhrp_nexthop']
cols_to_copy_exist = []
for col in cols_to_copy:
if col in device_table_df.columns:
cols_to_copy_exist.append(col)
temp_df = pandas.DataFrame(device_table_df[cols_to_copy_exist])
temp_df.insert(0, 'Hub', sheet)
spokes_df = pandas.concat([spokes_df, temp_df])
## create missing columns containing NaN values - see gotcha below, when searching for strings in a 'was missing now NaN only' column, the column has no type (string), use .astype(str) to help
column_order = filtered_column_order
missing_columns = [c for c in column_order if c not in list(device_table_df.columns)]
if len(missing_columns) >0:
# print(missing_columns)
device_table_df = device_table_df.reindex(columns = device_table_df.columns.tolist() + missing_columns)
## reorder columns in device sheet
column_count = 0
for c in column_order:
if c in list(device_table_df.columns):
device_table_df.insert(column_count, c, device_table_df.pop(c))
column_count += 1
# print(device_table_df.columns.values.tolist())
# ## debug - check columns / NaN values exist
# if sheet == 'lon-vpn03':
# pandas.set_option('display.max_rows', None)
# pandas.set_option('display.max_columns', None)
# pandas.set_option('display.width', None)
# logger.error(missing_columns)
# logger.error(device_table_df)
# logger.error(device_table_df.columns.values.tolist())
# pandas.reset_option('all')
## check for p1/p2 3des
# p1_3des = True
device_table_df.loc[
~(device_table_df['p1_encr_algo'].isna()) &
~(device_table_df['p2_encr_algo'].isna()) &
(device_table_df['p1_encr_algo'].astype(str).str.contains("3des", na=False, case=False))
, 'p1_3des'] = True
# p1_3des = False
device_table_df.loc[
~(device_table_df['p1_encr_algo'].isna()) &
~(device_table_df['p2_encr_algo'].isna()) &
~(device_table_df['p1_encr_algo'].astype(str).str.contains("3des", na=False, case=False))
, 'p1_3des'] = False
# p2_3des = True
# .astype(str) required when all records are UP-IDLE and the p2_encr_algo column was filled with NaN values, the NAN populated column has no type (just one entry in this column would give it string type)
device_table_df.loc[
~(device_table_df['p1_encr_algo'].isna()) &
~(device_table_df['p2_encr_algo'].isna()) &
(device_table_df['p2_encr_algo'].astype(str).str.contains("3des", na=False, case=False))
, 'p2_3des'] = True
# p2_3des = False
device_table_df.loc[
~(device_table_df['p1_encr_algo'].isna()) &
~(device_table_df['p2_encr_algo'].isna()) &
~(device_table_df['p2_encr_algo'].astype(str).str.contains("3des", na=False, case=False))
, 'p2_3des'] = False
## write new sheet to workbook
device_table_df.to_excel(writer, index=False, sheet_name=sheet)
## add device dataframe to dict, 'VPN Hubs' will analyse each dataframe for stats
devices_dataframes.update({sheet: device_table_df})
## copy each spoke to 'VPN Spokes'
spokes_df.to_excel(writer, index=False, sheet_name='VPN Spokes')
## return dict of device/sheets dataframes for stats collection
return devices_dataframes, spokes_df
def transform_devices_sheet(collection, writer, devices_df):
logger = logging.getLogger('main')
logger.info('building excel sheet VPN Hubs')
## vars
manufacturer_models = {}
devices_3des_stats = {'p1_3des_p2_3des_count': [], 'p1_3des_p2_ok_count': [], 'p1_ok_p2_3des_count': [], 'p1_ok_p2_ok_count': []}
## load 'VPN Hubs' collection into devices dataframe in alphabetical order
projection = {'_id': 0, 'scrapli_platform': 0}
vpn_devices_table_df = pandas.DataFrame.from_dict(collection.find({}, projection).sort('DeviceName', 1))
## use xlsxwriter directly where pandas xlsxwriter wrapper is limited
workbook = writer.book
worksheet = workbook.get_worksheet_by_name('VPN Hubs')
link_format = workbook.add_format({'bold': False, 'font_color': 'blue', 'underline': True})
## create missing columns containing NaN values (may occur with 'ssh failed auth' / 'ssh failed connection' devices)
# column_order = ["DeviceName", "FQDN", "IPAddress", "session_protocol", "DeviceRecNum", "DeviceType", "DeviceDescription", "DeviceStatus", "Site", "Region", "Country", "Division", "vendor", "os_flavour", "chassis", "serial", "os_version", "image", "p1_ok_p2_3des", "p1_3des_p2_ok", "p1_3des_p2_3des", "p1_ok_p2_ok", "tunnel_count", "transform_default_3des", "transform_default_3des_name", "spoke_aes_known_support", "spoke_default_p2_3des", "spoke_default_p2_not_3des", "spoke_default_p2_algo_unknown", "isakmp_policy_default_p1_3des", "isakmp_policy", "compliant"]
column_order = ["DeviceName", "FQDN", "IPAddress", "session_protocol", "DeviceRecNum", "DeviceType", "DeviceDescription", "DeviceStatus", "Site", "Region", "Country", "Division", "vendor", "os_flavour", "chassis", "serial", "os_version", "image", "p1_ok_p2_3des", "p1_3des_p2_ok", "p1_3des_p2_3des", "p1_ok_p2_ok", "tunnel_count", "transform_default_3des", "transform_default_3des_name", "spoke_aes_known_support", "spoke_default_p2_3des", "spoke_default_p2_not_3des", "spoke_default_p2_algo_unknown", "isakmp_policy_default_p1_3des", "isakmp_policy"]
missing_columns = [c for c in column_order if c not in list(vpn_devices_table_df.columns)]
if len(missing_columns) >0:
vpn_devices_table_df = vpn_devices_table_df.reindex(columns = vpn_devices_table_df.columns.tolist() + missing_columns)
## reorder columns in 'VPN Hubs' (_default) dataframe
column_count = 0
for c in column_order:
if c in list(vpn_devices_table_df.columns):
vpn_devices_table_df.insert(column_count, c, vpn_devices_table_df.pop(c))
column_count += 1
## loop each 'device dataframe', update 'VPN Hubs' with stats
for k, v in devices_df.items():
sheet = k
device_table_df = v
## update vpn_devices_table_df dataframe with device_table_df tunnel compliance state
vpn_devices_table_df.loc[vpn_devices_table_df['DeviceName'] == sheet, 'p1_ok_p2_3des'] = len(device_table_df[
~(device_table_df['p1_encr_algo'].isna()) &
~(device_table_df['p2_encr_algo'].isna()) &
(device_table_df['p1_3des'].dropna() == False) &
(device_table_df['p2_3des'].dropna() == True)
])
vpn_devices_table_df.loc[vpn_devices_table_df['DeviceName'] == sheet, 'p1_3des_p2_ok'] = len(device_table_df[
~(device_table_df['p1_encr_algo'].isna()) &
~(device_table_df['p2_encr_algo'].isna()) &
(device_table_df['p1_3des'].dropna() == True) &
(device_table_df['p2_3des'].dropna() == False)
])
vpn_devices_table_df.loc[vpn_devices_table_df['DeviceName'] == sheet, 'p1_3des_p2_3des'] = len(device_table_df[
~(device_table_df['p1_encr_algo'].isna()) &
~(device_table_df['p2_encr_algo'].isna()) &
(device_table_df['p1_3des'].dropna() == True) &
(device_table_df['p2_3des'].dropna() == True)
])
vpn_devices_table_df.loc[vpn_devices_table_df['DeviceName'] == sheet, 'p1_ok_p2_ok'] = len(device_table_df[
~(device_table_df['p1_encr_algo'].isna()) &
~(device_table_df['p2_encr_algo'].isna()) &
(device_table_df['p1_3des'].dropna() == False) &
(device_table_df['p2_3des'].dropna() == False)
])
vpn_devices_table_df.loc[vpn_devices_table_df['DeviceName'] == sheet, 'tunnel_count'] = len(device_table_df[
~(device_table_df['p1_encr_algo'].isna()) &
~(device_table_df['p2_encr_algo'].isna())
])
# not evaluating transform sets by the dataframe, this information is now in the device collection
# tfsa = device_table_df.loc[~(device_table_df['transform_sets'].isna())]['transform_sets'].values
# for i in tfsa:
# print('##############')
# print(i)
# print(i[0])
# print(i[0]['DeviceName'])
# print('##############')
## indicate if device has transform set(s) where default policy is 3des
# default_p2_3des_count = len(device_table_df[(device_table_df['tunnel_complete'].dropna() == True) & (device_table_df['p2_default_3des'].dropna() == True)])
default_p2_3des_count = len(device_table_df[
~(device_table_df['p1_encr_algo'].isna()) &
~(device_table_df['p2_encr_algo'].isna()) &
(device_table_df['p2_default_3des'].dropna() == True)
])
# print(default_p2_3des_count)
if default_p2_3des_count >0:
vpn_devices_table_df.loc[vpn_devices_table_df['DeviceName'] == sheet, 'transform_default_3des'] = default_p2_3des_count
## find transform sets that list 3des as the first entry
# tfs = device_table_df.loc[device_table_df['p2_default_3des'] == True]['transform_sets'].values
tfs = device_table_df.loc[device_table_df['p2_default_3des'] == True]['ordered_transform_set'].values
tfs_l = []
for i in tfs:
tfs_l.append(i[0]['name']) # list inception, pandas returns list of fields that contain list of dicts, take 'name' of first (ordered) transform set entry
# print(list(set(tfs_l)))
vpn_devices_table_df.loc[vpn_devices_table_df['DeviceName'] == sheet, 'transform_default_3des_name'] = list(set(tfs_l))
## indicate if we can determine where the spoke is configured only for 3des, use the logic from the device collection keys
# this is known where the transform set lists aes as the primary option yet the spoke negotiates 3des (or visa versa)
# the flag is 'unknown' when the transform set is only/primary 3des and the spoke uses 3des (or transform set is only aes and the spoke uses aes - the spoke may still prefer 3des)
spoke_default_p2_3des_count = len(device_table_df[
~(device_table_df['p1_encr_algo'].isna()) &
~(device_table_df['p2_encr_algo'].isna()) &
~(device_table_df['spoke_p2_default_3des'].isna()) &
(device_table_df['spoke_p2_default_3des'].dropna() == True)
])
spoke_default_p2_ok_count = len(device_table_df[
~(device_table_df['p1_encr_algo'].isna()) &
~(device_table_df['p2_encr_algo'].isna()) &
~(device_table_df['spoke_p2_default_3des'].isna()) &
(device_table_df['spoke_p2_default_3des'].dropna() == False)
])
spoke_unknown_p2_preference_count = len(device_table_df[
~(device_table_df['p1_encr_algo'].isna()) &
~(device_table_df['p2_encr_algo'].isna()) &
~(device_table_df['spoke_p2_algo_preference'].isna()) &
(device_table_df['spoke_p2_algo_preference'].dropna() == 'unknown')
])
if spoke_default_p2_3des_count >0:
vpn_devices_table_df.loc[vpn_devices_table_df['DeviceName'] == sheet, 'spoke_default_p2_3des'] = spoke_default_p2_3des_count
if spoke_default_p2_ok_count >0:
vpn_devices_table_df.loc[vpn_devices_table_df['DeviceName'] == sheet, 'spoke_default_p2_not_3des'] = spoke_default_p2_ok_count
if spoke_unknown_p2_preference_count >0:
vpn_devices_table_df.loc[vpn_devices_table_df['DeviceName'] == sheet, 'spoke_default_p2_algo_unknown'] = spoke_unknown_p2_preference_count
## spokes capable of aes
spoke_aes_supported_count = len(device_table_df[
~(device_table_df['p1_encr_algo'].isna()) &
~(device_table_df['p2_encr_algo'].isna()) &
((device_table_df['p1_encr_algo'].astype(str).str.contains("aes", na=False, case=False)) |
(device_table_df['p2_encr_algo'].astype(str).str.contains("aes", na=False, case=False)))
])
if spoke_aes_supported_count >0:
vpn_devices_table_df.loc[vpn_devices_table_df['DeviceName'] == sheet, 'spoke_aes_known_support'] = spoke_aes_supported_count
## query mongo to get contextual spoke stats for summary
# (this is an easy place to grab stats in the vpn_spreadsheet module, mongo queries are more flexible than pandas queries for this task, but this is a much more expensive operation)
# find unique combos of manufacturer/model
spoke_models = collection[sheet].aggregate([
{ "$group": {
"_id" : { "manufacturer": "$Manufacturer", "model": "$Model" },
"count": {"$sum": 1}
}
}
])
spoke_manufacturer_models = []
for unique_device in spoke_models:
manufacturer = unique_device['_id']['manufacturer'] if 'manufacturer' in unique_device['_id'] else 'unknown'
model = unique_device['_id']['model'] if 'model' in unique_device['_id'] else 'unknown'
count = unique_device['count'] if 'count' in unique_device else 0
entry = {'manufacturer': manufacturer, 'model': model, 'count': count}
# entry = {'manufacturer': manufacturer, 'model': model}
spoke_manufacturer_models.append(entry)
# add to top level manufacturer/model dict
if manufacturer in manufacturer_models:
if model in manufacturer_models[manufacturer]:
rolling_count = manufacturer_models[manufacturer][model]['count'] + count
manufacturer_models[manufacturer][model].update({'count': rolling_count})
else:
manufacturer_models[manufacturer].update({model: {'count': count}})
else:
manufacturer_models.update({manufacturer: {model: {'count': count}}})
# add spoke hardware info to device sheet
vpn_devices_table_df.loc[vpn_devices_table_df['DeviceName'] == sheet, 'spoke_hardware'] = str(spoke_manufacturer_models)
# TODO - could this be done in pandas faster? come back to this, there are 4 queries per host, info already in dataframes but the syntax is not as flexible
# find 3des stats by manufacturer/model
for unique_device in spoke_manufacturer_models:
manufacturer = unique_device['manufacturer']
model = unique_device['model']
triple_des_regex = re.compile('(?i).*3DES.*')
not_triple_des_regex = {'$not': re.compile('(?i).*3DES.*')}
# ensure stats for devices with no manufacturer/model are matched
if manufacturer == 'unknown':
manufacturer_match = {'$exists': False}
else:
manufacturer_match = manufacturer
if model == 'unknown':
model_match = {'$exists': False}
else:
model_match = model
p1_3des_p2_3des_query = {'Manufacturer': manufacturer_match, 'Model': model_match, 'p1_encr_algo': {'$exists': True}, 'p2_encr_algo': {'$exists': True}, 'p1_encr_algo': triple_des_regex, 'p2_encr_algo': triple_des_regex} # if current_config implemented it needs to be in these queries
p1_3des_p2_ok_query = {'Manufacturer': manufacturer_match, 'Model': model_match, 'p1_encr_algo': {'$exists': True}, 'p2_encr_algo': {'$exists': True}, 'p1_encr_algo': triple_des_regex, 'p2_encr_algo': not_triple_des_regex}
p1_ok_p2_3des_query = {'Manufacturer': manufacturer_match, 'Model': model_match, 'p1_encr_algo': {'$exists': True}, 'p2_encr_algo': {'$exists': True}, 'p1_encr_algo': not_triple_des_regex, 'p2_encr_algo': triple_des_regex}
p1_ok_p2_ok_query = {'Manufacturer': manufacturer_match, 'Model': model_match, 'p1_encr_algo': {'$exists': True}, 'p2_encr_algo': {'$exists': True}, 'p1_encr_algo': not_triple_des_regex, 'p2_encr_algo': not_triple_des_regex}
p1_3des_p2_3des_count = collection[sheet].count(p1_3des_p2_3des_query)
p1_3des_p2_ok_count = collection[sheet].count(p1_3des_p2_ok_query)
p1_ok_p2_3des_count = collection[sheet].count(p1_ok_p2_3des_query)
p1_ok_p2_ok_count = collection[sheet].count(p1_ok_p2_ok_query)
# add 3des stats to top level manufacturer/model dict
count_3des_conditions = {'p1_3des_p2_3des_count': p1_3des_p2_3des_count, 'p1_3des_p2_ok_count': p1_3des_p2_ok_count, 'p1_ok_p2_3des_count': p1_ok_p2_3des_count, 'p1_ok_p2_ok_count': p1_ok_p2_ok_count}
for k,v in count_3des_conditions.items():
if k in manufacturer_models[manufacturer][model]:
rolling_count = manufacturer_models[manufacturer][model][k] + v
manufacturer_models[manufacturer][model].update({k: rolling_count})
else:
manufacturer_models[manufacturer][model].update({k: v})
# add device name to by 3des status to lists (could be done in pandas but the info is available here at hand)
if v >0:
if sheet not in devices_3des_stats[k]:
devices_3des_stats[k].append(sheet)
## add hyperlinks in 'VPN Hubs' to each device sheet, are written into the excel sheets directly
name_col_idx = vpn_devices_table_df.columns.get_loc("DeviceName")
device_row_idx = vpn_devices_table_df.loc[vpn_devices_table_df['DeviceName'] == sheet].index.to_list()[0] + 1
cell = xlsxwriter.utility.xl_rowcol_to_cell(device_row_idx, name_col_idx)
worksheet.write_url(cell, f"internal:'{sheet}'!A1", link_format, string=sheet, tip="Device page")
worksheet.conditional_format(cell, {'type': 'no_errors','format': link_format}) # cell format doesnt apply over an existing field with the write_url method, reapply link formatting
## add hyperlink from device sheet back to 'VPN Hubs'
devicesheet = workbook.get_worksheet_by_name(sheet)
devicesheet.write_url('A1', f"internal:'VPN Hubs'!{cell}", link_format, tip="Back")
devicesheet.conditional_format('A1', {'type': 'no_errors','format': link_format})
## debug manufacturer/model and per device 3des stats
# logger.info(json.dumps(manufacturer_models, indent=4))
# logger.info(json.dumps(devices_3des_stats, indent=4))
## create temporary devices stats data object
devices_with_tunnels = sorted(list(set(devices_3des_stats['p1_3des_p2_3des_count'] + devices_3des_stats['p1_3des_p2_ok_count'] + devices_3des_stats['p1_ok_p2_3des_count'] + devices_3des_stats['p1_ok_p2_ok_count'])))
devices_with_3des_tunnels = sorted(list(set(devices_3des_stats['p1_3des_p2_3des_count'] + devices_3des_stats['p1_3des_p2_ok_count'] + devices_3des_stats['p1_ok_p2_3des_count'])))
devices_with_no_3des_tunnels = sorted([device for device in devices_3des_stats['p1_ok_p2_ok_count'] if device not in list(set(devices_3des_stats['p1_3des_p2_3des_count'] + devices_3des_stats['p1_3des_p2_ok_count'] + devices_3des_stats['p1_ok_p2_3des_count']))])
devices_with_only_p1_3des_p2_3des_tunnels = sorted([device for device in devices_3des_stats['p1_3des_p2_3des_count'] if device not in list(set(devices_3des_stats['p1_3des_p2_ok_count'] + devices_3des_stats['p1_ok_p2_3des_count'] + devices_3des_stats['p1_ok_p2_ok_count']))])
devices_with_only_p1_3des_p2_ok = sorted([device for device in devices_3des_stats['p1_3des_p2_ok_count'] if device not in list(set(devices_3des_stats['p1_3des_p2_3des_count'] + devices_3des_stats['p1_ok_p2_3des_count'] + devices_3des_stats['p1_ok_p2_ok_count']))])
devices_with_only_p1_ok_p2_3des = sorted([device for device in devices_3des_stats['p1_ok_p2_3des_count'] if device not in list(set(devices_3des_stats['p1_3des_p2_3des_count'] + devices_3des_stats['p1_3des_p2_ok_count'] + devices_3des_stats['p1_ok_p2_ok_count']))])
devices_with_p1_3des_p2_3des = sorted(devices_3des_stats['p1_3des_p2_3des_count'])
devices_with_p1_3des_p2_ok = sorted(devices_3des_stats['p1_3des_p2_ok_count'])
devices_with_p1_ok_p2_3des = sorted(devices_3des_stats['p1_ok_p2_3des_count'])
devices_with_p1_ok_p2_ok = sorted(devices_3des_stats['p1_ok_p2_ok_count'])
hub_3des_stats = {'devices_with_tunnels': devices_with_tunnels,
'devices_with_no_3des_tunnels': devices_with_no_3des_tunnels,
'devices_with_3des_tunnels': devices_with_3des_tunnels,
'devices_with_p1_3des_p2_3des': devices_with_p1_3des_p2_3des,
'devices_with_p1_3des_p2_ok': devices_with_p1_3des_p2_ok,
'devices_with_p1_ok_p2_3des': devices_with_p1_ok_p2_3des,
'devices_with_p1_ok_p2_ok': devices_with_p1_ok_p2_ok,
'devices_with_only_p1_3des_p2_3des_tunnels': devices_with_only_p1_3des_p2_3des_tunnels,
'devices_with_only_p1_3des_p2_ok': devices_with_only_p1_3des_p2_ok,
'devices_with_only_p1_ok_p2_3des': devices_with_only_p1_ok_p2_3des
}
device_3des_stats_dict = {'hub': hub_3des_stats, 'spoke': manufacturer_models}
# TODO - can do this same logic with 'manufacturer_models' - find spoke device types that only have p1_ok_p2_ok_count or p1_3des_p2_3des_count - might be easier shoved into a dataframe, sorted and then 'deduced'
## tidy up 'VPN Hubs' dataframe with contextual information
# devices not in DNS or SSH failed auth/connection or junos
vpn_devices_table_df.loc[
(vpn_devices_table_df['FQDN'] == 'unknown') &
(vpn_devices_table_df['IPAddress'] == 'unknown'),
['os_flavour', 'os_version', 'image', 'vendor', 'chassis', 'serial']
] = 'unknown'
vpn_devices_table_df.loc[
(vpn_devices_table_df['session_protocol'] == 'ssh failed auth') |
(vpn_devices_table_df['session_protocol'] == 'ssh failed connection'),
['os_flavour', 'os_version', 'image', 'vendor', 'chassis', 'serial']
] = 'unknown'
vpn_devices_table_df.loc[
(vpn_devices_table_df['vendor'] == 'junos'),
['os_flavour', 'os_version', 'image', 'chassis', 'serial']
] = 'unknown'
# populate 0 counts for empty stats where the 'cisco' device is contactable (readability)
possible_empty = ['tunnel_count', 'p1_3des_p2_ok', 'p1_ok_p2_3des', 'p1_3des_p2_3des', 'p1_ok_p2_ok', 'transform_default_3des', 'spoke_default_p2_3des', 'spoke_default_p2_not_3des', 'spoke_default_p2_algo_unknown', 'spoke_aes_known_support']
for f in possible_empty:
vpn_devices_table_df.loc[
(vpn_devices_table_df['session_protocol'] == 'ssh') &
(vpn_devices_table_df['vendor'] == 'cisco') &
(vpn_devices_table_df[f].isna()),
[f]
] = 0
# ## compliance bool tally
# # the device maybe compliant where all spokes are !3des but there is still an unused 3des transform set - technically uncompliant but functionally not
# # the device maybe compliant where the CPE is not TNS owned but there are 3des tunnels and transform set - technically uncompliant but outside of scope / remediation
# vpn_devices_table_df.loc[~(vpn_devices_table_df['tunnel_count'].isna()), 'compliant'] = ~(vpn_devices_table_df.fillna(0)['p1_3des_p2_ok'] + vpn_devices_table_df.fillna(0)['p1_ok_p2_3des'] + vpn_devices_table_df.fillna(0)['p1_3des_p2_3des']).astype(bool)
## write sheet to workbook
vpn_devices_table_df.to_excel(writer, index=False, sheet_name='VPN Hubs')
return vpn_devices_table_df, device_3des_stats_dict
def generate_summary_sheet(collection, writer, vpn_devices_table_df, device_3des_stats_dict):
# print('building excel sheet VPN Hub Summary')
logger = logging.getLogger('main')
logger.info('building excel sheet VPN Hub Summary')
### collect stats from 'VPN Hubs' sheet, populate a hub device dict of stats
# vars
hub_summary_dict = {}
# device list sources
hub_summary_dict.update({"Hub Devices": ""})
hub_summary_dict.update({"break1": "break"})
hub_summary_dict.update({"DMVPN device list": "http://ipdesk.corp.tnsi.com/newipdesk/dmvpnhubscan.php"})
hub_summary_dict.update({"IP-P2PAGG device list": "http://ipdesk.corp.tnsi.com/newipdesk/report_p2p_aggs.php"})
hub_summary_dict.update({"break2": "break"})
## tunnel stats
# total tunnels
tunnel_count = vpn_devices_table_df['tunnel_count'].sum()
hub_summary_dict.update({"Total tunnel count": tunnel_count})
# complaint p1_ok_p2_ok
compliant_tunnel_count = vpn_devices_table_df['p1_ok_p2_ok'].sum()
hub_summary_dict.update({"Total compliant tunnel count": compliant_tunnel_count})
# uncompliant tunnel count
uncompliant_tunnel_count = vpn_devices_table_df['p1_3des_p2_ok'].sum() + vpn_devices_table_df['p1_ok_p2_3des'].sum() + vpn_devices_table_df['p1_3des_p2_3des'].sum()
hub_summary_dict.update({"Total uncompliant tunnel count": uncompliant_tunnel_count})
# uncompliant p1_3des_p2_ok
uncompliant_p1_3des_p2_ok = vpn_devices_table_df['p1_3des_p2_ok'].sum()
hub_summary_dict.update({"Total uncompliant p1_3des_p2_ok": uncompliant_p1_3des_p2_ok})
# uncompliant p1_ok_p2_3des
uncompliant_p1_ok_p2_3des = vpn_devices_table_df['p1_ok_p2_3des'].sum()
hub_summary_dict.update({"Total uncompliant p1_ok_p2_3des": uncompliant_p1_ok_p2_3des})
# uncompliant p1_3des_p2_3des
uncompliant_p1_3des_p2_3des = vpn_devices_table_df['p1_3des_p2_3des'].sum()
hub_summary_dict.update({"Total uncompliant p1_3des_p2_3des": uncompliant_p1_3des_p2_3des})
hub_summary_dict.update({"break3": "break"})
## hub stats
# total devices
total_devices = len(vpn_devices_table_df)
hub_summary_dict.update({"Total devices": total_devices})
# total contactable devices
total_contactable_devices = len(vpn_devices_table_df[~vpn_devices_table_df['session_protocol'].isin(['unknown', 'ssh failed auth', 'ssh failed connection'])])
hub_summary_dict.update({"Total contactable devices": total_contactable_devices})
# total uncontactable devices
total_uncontactable_devices = len(vpn_devices_table_df[vpn_devices_table_df['session_protocol'].isin(['unknown', 'ssh failed connection'])])
hub_summary_dict.update({"Total uncontactable devices": total_uncontactable_devices})
# total auth fail
total_auth_fail = len(vpn_devices_table_df[vpn_devices_table_df['session_protocol'] == 'ssh failed auth'])
hub_summary_dict.update({"Total auth fail": total_auth_fail})
hub_summary_dict.update({"break4": "break"})
# devices with tunnels
devices_with_tunnel = len(vpn_devices_table_df[(vpn_devices_table_df['tunnel_count'] > 0).dropna()])
# hub_summary_dict.update({"Total devices with tunnels": devices_with_tunnel})
hub_summary_dict.update({"Total devices with tunnels": len(device_3des_stats_dict['hub']['devices_with_tunnels']) }) # values differ by 1 - locate source of issue
# devices without tunnels
devices_without_tunnel = len(vpn_devices_table_df[(vpn_devices_table_df['tunnel_count'] == 0).dropna()])
hub_summary_dict.update({"Total devices without tunnels": devices_without_tunnel})
# devices where 3DES not seen in p1 or p2
hub_summary_dict.update({"Total devices with tunnels matching: 3DES not seen in p1 or p2": len(device_3des_stats_dict['hub']['devices_with_no_3des_tunnels'])})
# hub_summary_dict.update({"Device list where ALL tunnels matching: 3DES not seen in p1 or p2": device_3des_stats_dict['hub']['devices_with_no_3des_tunnels']})
# devices where 3DES seen in p1 or p2
hub_summary_dict.update({"Total devices with tunnels matching: 3DES seen in p1 or p2": len(device_3des_stats_dict['hub']['devices_with_3des_tunnels'])})
# devices where p1 = 3DES, p2 = 3DES
hub_summary_dict.update({"Total devices with tunnels matching: 3DES seen in p1 and p2": len(device_3des_stats_dict['hub']['devices_with_p1_3des_p2_3des'])})
# devices where p1 = 3DES, p2 = OK
hub_summary_dict.update({"Total devices with tunnels matching: 3DES seen in p1 and not p2": len(device_3des_stats_dict['hub']['devices_with_p1_3des_p2_ok'])})
# devices where p1 = OK, p2 = 3DES
hub_summary_dict.update({"Total devices with tunnels matching: 3DES not seen in p1 and in p2": len(device_3des_stats_dict['hub']['devices_with_p1_ok_p2_3des'])})
# devices where p1 = OK, p2 = OK
hub_summary_dict.update({"Total devices with tunnels matching: 3DES not seen in p1 or p2": len(device_3des_stats_dict['hub']['devices_with_p1_ok_p2_ok'])})
# devices where only p1 = 3DES, p2 = 3DES
hub_summary_dict.update({"Total devices where ALL tunnels matching: 3DES seen in p1 and p2": len(device_3des_stats_dict['hub']['devices_with_only_p1_3des_p2_3des_tunnels'])})
hub_summary_dict.update({"Device list where ALL tunnels matching: 3DES not seen in p1 or p2": device_3des_stats_dict['hub']['devices_with_only_p1_3des_p2_3des_tunnels']})
# devices where only p1 = 3DES, p2 = ok
hub_summary_dict.update({"Total devices where ALL tunnels matching: 3DES seen in p1 and not p2": len(device_3des_stats_dict['hub']['devices_with_only_p1_3des_p2_ok'])})
hub_summary_dict.update({"Device list where ALL tunnels matching: 3DES not seen in p1 and not p2": device_3des_stats_dict['hub']['devices_with_only_p1_3des_p2_ok']})
# devices where only p1 = ok, p2 = 3DES
hub_summary_dict.update({"Total devices where ALL tunnels matching: 3DES not seen in p1 and in p2": len(device_3des_stats_dict['hub']['devices_with_only_p1_ok_p2_3des'])})
hub_summary_dict.update({"Device list where ALL tunnels matching: 3DES not seen in p1 and in p2": device_3des_stats_dict['hub']['devices_with_only_p1_ok_p2_3des']})
hub_summary_dict.update({"break5": "break"})
# devices where transform set has primary definition with 3des
devices_default_tf_triple_des = len(vpn_devices_table_df[(vpn_devices_table_df['transform_default_3des'] > 0).dropna()])
hub_summary_dict.update({"Total devices with 3DES in primary transform set": devices_default_tf_triple_des})
# devices where isakmp policy has primary definition with 3des
devices_default_isakmp_triple_des = len(vpn_devices_table_df[(vpn_devices_table_df['isakmp_policy_default_p1_3des'] == True).dropna()])
hub_summary_dict.update({"Total devices with 3DES in primary ISAKMP policy": devices_default_isakmp_triple_des})
# ## devices compliant - disabled as complaince check is fuzzy
# devices_compliant = len(vpn_devices_table_df[(vpn_devices_table_df['compliant'] == True).dropna()])
# hub_summary_dict.update({"Total devices compliant": devices_compliant})
# # devices uncompliant
# devices_uncompliant = len(vpn_devices_table_df[(vpn_devices_table_df['compliant'] == False).dropna()])
# hub_summary_dict.update({"Total devices uncompliant": devices_uncompliant})
# hub_summary_dict.update({"break7": "break"})
## device attributes, bit of a cheat using mongo to sidestep sorting lists from the dataframe
device_attributes = {
'DeviceType': 'Total devices type',
'vendor': 'Total devices vendor',
'os_flavour': 'Total devices OS',
'chassis': 'Total devices model',
'image': 'Total devices firmware',
'Region': 'Total devices region',
'Site': 'Total devices site',
'Division': 'Total devices division'
}
for attribute,message in device_attributes.items():
uniq_attribute = collection.distinct(attribute)
for a in uniq_attribute:
if a not in ['unknown', '']:
attribute_count = collection.count_documents({attribute: a})
uniq_message = f'{message}' + ' ' + f'{a}'
hub_summary_dict.update({uniq_message: attribute_count})
### collect stats, populate a spoke device dict of stats
# vars
spoke_summary_dict = {}
# spoke column
spoke_summary_dict.update({"Spoke Devices": ""})
spoke_summary_dict.update({"break1": "break"})
# spokes that support aes, seen in p1 or p2
spoke_aes_supported = vpn_devices_table_df['spoke_aes_known_support'].sum()
spoke_summary_dict.update({"Total spokes that support AES (AES seen in either p1/p2)": spoke_aes_supported})
# spokes where transform set has primary definition with 3des
spokes_default_tf_triple_des = vpn_devices_table_df['spoke_default_p2_3des'].sum()
spoke_summary_dict.update({"Total spokes with 3DES in phase2 config (offered AES chose 3DES)": spokes_default_tf_triple_des})
# spokes where transform set cannot have primary definition with 3des
spokes_default_tf_not_triple_des = vpn_devices_table_df['spoke_default_p2_not_3des'].sum()
spoke_summary_dict.update({"Total spokes with 3DES not in phase2 config (offered 3DES chose AES)": spokes_default_tf_not_triple_des})
# spokes where transform set is unknown, they negotiate with the device preference
spokes_unknown_tf_triple_des = vpn_devices_table_df['spoke_default_p2_algo_unknown'].sum()
spoke_summary_dict.update({"Total spokes where phase2 algo preference unknown (negotiate same as device transform)": spokes_unknown_tf_triple_des})
spoke_summary_dict.update({"break2": "break"})
# ## debug
# logger.info(spoke_summary_dict)
### write hub_summary_dict directly to the summary sheet (no pandas)
workbook = writer.book
colA_format_summary = workbook.add_format({'bold': True, 'text_wrap': False, 'align': 'left', 'valign': 'top', 'fg_color': '#77B0D1', 'border': 1})
colB_format_summary = workbook.add_format({'bold': False, 'text_wrap': False, 'align': 'right', 'valign': 'top', 'border': 1})
colB_hyperlink_format_summary = workbook.add_format({'bold': False, 'text_wrap': False, 'align': 'left', 'valign': 'top', 'border': 1, 'font_color': 'blue', 'underline': True})
colD_format_summary = workbook.add_format({'bold': True, 'text_wrap': False, 'align': 'left', 'valign': 'top', 'fg_color': '#79C9A5', 'border': 1})
# worksheet = writer.sheets['VPN Hub Summary']
worksheet = writer.sheets['VPN Device Summary']
## write hub dict to sheet
row = 0
for k, v in hub_summary_dict.items():
stat = k
if isinstance(v, list):
count = str(v)
else:
count = v
row += 1
if 'DMVPN device list' in stat or 'IP-P2PAGG device list' in stat:
worksheet.write(f'A{row}', stat, colA_format_summary)
worksheet.write_url(f'B{row}', count, colB_hyperlink_format_summary)
elif count == 'break':
worksheet.write(f'A{row}', '', colA_format_summary)
worksheet.write(f'B{row}', '', colB_format_summary)
else:
worksheet.write(f'A{row}', stat, colA_format_summary)
worksheet.write(f'B{row}', count, colB_format_summary)
## write spoke dict to sheet
row = 0
for k,v in spoke_summary_dict.items():
stat = k
count = v
row +=1
if count == 'break':
worksheet.write(f'D{row}', '', colD_format_summary)
worksheet.write(f'E{row}', '', colB_format_summary)
else:
worksheet.write(f'D{row}', stat, colD_format_summary)
worksheet.write(f'E{row}', count, colB_format_summary)
## write spoke 3des summary stats to sheet
row +=1
spoke_filter_start = f'D{row}'
worksheet.write(f'D{row}', 'Model', colD_format_summary)
worksheet.write(f'E{row}', 'Count', colD_format_summary)
worksheet.write(f'F{row}', 'p1_ok_p2_ok_count', colD_format_summary)
worksheet.write(f'G{row}', 'p1_3des_p2_3des_count', colD_format_summary)
worksheet.write(f'H{row}', 'p1_3des_p2_ok_count', colD_format_summary)
worksheet.write(f'I{row}', 'p1_ok_p2_3des_count', colD_format_summary)
for m in device_3des_stats_dict['spoke'].keys():
manufacturer = m
for k,v in device_3des_stats_dict['spoke'][m].items():
row +=1
model = f'{manufacturer} {k}'
count = v['count']
worksheet.write(f'D{row}', model, colD_format_summary)
worksheet.write(f'E{row}', count, colB_format_summary)
worksheet.write(f'F{row}', v['p1_ok_p2_ok_count'], colB_format_summary)
worksheet.write(f'G{row}', v['p1_3des_p2_3des_count'], colB_format_summary)
worksheet.write(f'H{row}', v['p1_3des_p2_ok_count'], colB_format_summary)
worksheet.write(f'I{row}', v['p1_ok_p2_3des_count'], colB_format_summary)
spoke_filter_end = f'I{row}'
## autofit, set column widths, autofilter
worksheet.autofit()
worksheet.set_column('B:B', 55)
worksheet.set_column('C:C', 5)
worksheet.set_column('E:E', 8)
worksheet.set_column('F:I', 24)
worksheet.autofilter(f'{spoke_filter_start}:{spoke_filter_end}')
def pretty_sheet(device_df, writer, header_format, sheet):
worksheet = writer.sheets[sheet]
# add sheet header from dataframe column names
for col_num, value in enumerate(device_df.columns.values):
worksheet.write(0, col_num, value, header_format)
# set scope of autofilter
(max_row, max_col) = device_df.shape
worksheet.autofilter(0, 0, max_row, max_col - 1)
# autofit columns
worksheet.autofit()
# set column width for specific columns (device + devices sheets), overwrite autofit for specific fields for readability
column_width = {'FQDN': 10,
'image': 10,
'chassis': 12,
'DeviceDescription': 17,
'ipsec_flow': 30,
'ordered_transform_set': 50,
'crypto_map_interface': 25,
'last_modified': 20,
'session_protocol': 15,
"p1_ok_p2_3des": 16,
"p1_3des_p2_ok": 16,
"p1_3des_p2_3des": 16,
"p1_ok_p2_ok": 16,
"tunnel_count": 12,
"transform_default_3des": 16,
"transform_default_3des_name": 16,
"spoke_aes_known_support": 16,
"spoke_default_p2_3des": 16,
"spoke_default_p2_not_3des": 16,
"spoke_default_p2_algo_unknown": 16,
'isakmp_policy': 35,
"isakmp_policy_default_p1_3des": 16
}
for col_num, value in enumerate(device_df.columns.values):
if value in list(column_width.keys()):
width = column_width[value]
worksheet.set_column(col_num, col_num, width)
# scrolling header
worksheet.freeze_panes(1, 0)
# header_format_devices uses 'text_wrap': True, set row to double depth for long field names
if sheet == 'VPN Hubs':
worksheet.set_row(0, 45)
# init excel workbook
with pandas.ExcelWriter(outfile, engine='xlsxwriter') as writer:
workbook = writer.book
# define header formats
header_format_devices = workbook.add_format({'bold': True, 'text_wrap': True, 'valign': 'top', 'fg_color': '#77B0D1', 'border': 1})
header_format_device = workbook.add_format({'bold': True, 'text_wrap': False, 'valign': 'top', 'fg_color': '#79C9A5', 'border': 1})
# create sheets in workbook order
#static_sheets = ['VPN Hub Summary', 'VPN Devices'] # remove
static_sheets = ['VPN Device Summary', 'VPN Spokes', 'VPN Hubs']
sheets = workbook_sheets_order(writer, collection, devices_dict, static_sheets)
# populate devices sheets and 'VPN Spokes' sheet
devices_df_dict, spokes_df = populate_device_sheets(collection, writer, sheets)
# transform and populate 'VPN Hubs' sheet
vpn_devices_table_df, device_3des_stats_dict = transform_devices_sheet(collection, writer, devices_df_dict)
# generate 'VPN Device Summary' sheet
generate_summary_sheet(collection, writer, vpn_devices_table_df, device_3des_stats_dict)
# pretty device sheets
for k, v in devices_df_dict.items():
sheet = k
device_df = v
pretty_sheet(device_df, writer, header_format_device, sheet)
pretty_sheet(vpn_devices_table_df, writer, header_format_devices, 'VPN Hubs')
pretty_sheet(spokes_df, writer, header_format_device, 'VPN Spokes')