python_price_for_terraform_.../main.py

298 lines
13 KiB
Python
Raw Permalink Normal View History

2022-10-26 17:55:09 +00:00
#!/usr/bin/python3
import pdb
import os
import json
import jmespath
import requests
import datetime
from currency_converter import CurrencyConverter
terrafrom_directory = '/home/tseed/WORK/OCF_GIT/hetzner-terraform'
config = 'config.json'
class Terraform:
'run terraform'
def __init__(self, directory):
self.directory = directory
self.plan_result = None
self.plan_result_dict = None
self.__plan(self.directory)
self.provider = self.get_provider_name()
def __plan(self, directory):
os.chdir(directory)
os.system('terraform init')
os.system('terraform plan -out ./planfile')
self.plan_result = os.popen('terraform show -json planfile')
self.plan_result_dict = json.loads(self.plan_result.read())
def get_provider_name(self):
jmes_query = "configuration.provider_config.*.name|[0]"
return jmespath.search(jmes_query, self.plan_result_dict)
def get_plan_results(self):
return self.plan_result_dict
def apply_plan(self):
os.chdir(self.directory)
os.system('terraform apply -auto-approve')
lucky = Terraform(terrafrom_directory)
#print(lucky.get_provider_name())
#print(lucky.get_plan_results())
plan_results = lucky.get_plan_results()
#print(plan_results)
provider_name = lucky.get_provider_name()
class ConfItems:
'read the main config and get provider config'
def __init__(self, config, provider_name):
self.config = config
self.provider_name = provider_name
self.config_dict = self.__read_config(self.config)
self.provider_config = self.__read_provider_config(self.config_dict, self.provider_name)
def __read_config(self, config):
with open(config, "r") as json_file:
return json.load(json_file)
def __read_provider_config(self, config_dict, provider_name):
jmes_query = "provider_config." + provider_name + "_config"
provider_config = jmespath.search(jmes_query, config_dict)
with open(provider_config, "r") as json_file:
return json.load(json_file)
def get_currency(self):
jmes_query = "currency"
currency = jmespath.search(jmes_query, self.config_dict)
return currency
def get_api_token(self):
jmes_query = "provider_config." + self.provider_name + "_token"
api_token = jmespath.search(jmes_query, self.config_dict)
return api_token
def get_discount_percentage(self):
jmes_query = "provider_config." + self.provider_name + "_discount_percentage"
discount_percentage = jmespath.search(jmes_query, self.config_dict)
return discount_percentage
def get_pricelist_cache_timeout(self):
jmes_query = "provider_config." + self.provider_name + "_pricelist_cache_timeout"
pricelist_cache_timeout = jmespath.search(jmes_query, self.config_dict)
return pricelist_cache_timeout
def get_provider_config(self):
return self.provider_config
def get_billable_items(self):
jmes_query = "billable_items"
billable_items = jmespath.search(jmes_query, self.provider_config)
return billable_items
lucky1 = ConfItems(config, provider_name)
#print(json.dumps(lucky1.get_currency()))
#print(json.dumps(lucky1.get_discount_percentage()))
#print(json.dumps(lucky1.get_pricelist_cache_timeout()))
#print(json.dumps(lucky1.get_provider_config(), indent=4, sort_keys=True))
#print(json.dumps(lucky1.get_billable_items(), indent=4, sort_keys=True))
currency = lucky1.get_currency()
discount_percentage = lucky1.get_discount_percentage()
billable_items = lucky1.get_billable_items()
provider_config = lucky1.get_provider_config()
pricelist_cache_timeout = lucky1.get_pricelist_cache_timeout()
api_token = lucky1.get_api_token()
class GetPriceList:
'get provider price list'
def __init__(self, provider_name, provider_config, provider_api_token, pricelist_cache_timeout):
self.provider_name = provider_name
self.provider_config = provider_config
self.provider_api_token = provider_api_token
self.pricelist_cache_timeout = pricelist_cache_timeout
self.base_url = self.__get_base_url(self.provider_config)
self.price_endpoint = self.__get_price_endpoint(self.provider_config)
self.price_url = self.base_url + self.price_endpoint
self.currency_path = self.__get_currency_path(self.provider_config)
self.provider_currency = None
self.price_cache = "/tmp/" + self.provider_name + "_pricing.json"
if self.__check_price_cache(self.price_cache, self.pricelist_cache_timeout):
self.refresh_price_list()
def __get_base_url(self, provider_config):
jmes_query = "base_url"
base_url = jmespath.search(jmes_query, provider_config)
return base_url
def __get_price_endpoint(self, provider_config):
jmes_query = "price_endpoint"
price_endpoint = jmespath.search(jmes_query, provider_config)
return price_endpoint
def __get_currency_path(self, provider_config):
jmes_query = "currency"
currency_path = jmespath.search(jmes_query, provider_config)
return currency_path
def __check_price_cache(self, price_cache, pricelist_cache_timeout):
if os.path.exists(price_cache) and os.path.isfile(price_cache):
if float(os.path.getmtime(price_cache)) >= float((datetime.datetime.now().timestamp()) - pricelist_cache_timeout):
#print("price cache current, not updating")
return False
else:
#print ("price cache stale, updating")
return True
else:
#print ("no price cache, updating")
return True
def __fetch_price_list(self, price_url, price_cache, currency_path, provider_api_token):
headers = {'Content-Type': 'application/json', 'Authorization': 'Bearer {0}'.format(provider_api_token)}
response = requests.get(price_url, headers=headers)
if response.status_code == 200:
pricing = json.loads(response.content.decode('utf-8'))
with open(price_cache, "w") as json_file:
json_file.write(json.dumps(pricing, indent=2, sort_keys=True))
self.provider_currency = jmespath.search(currency_path, pricing)
def refresh_price_list(self):
self.__fetch_price_list(self.price_url, self.price_cache, self.currency_path, self.provider_api_token)
def get_price_list(self):
with open(self.price_cache, "r") as json_file:
return json.load(json_file)
def get_provider_currency(self):
if self.provider_currency is None:
with open(self.price_cache, "r") as json_file:
pricing = json.load(json_file)
self.provider_currency = jmespath.search(self.currency_path, pricing)
return self.provider_currency
lucky2 = GetPriceList(provider_name, provider_config, api_token, pricelist_cache_timeout)
#lucky2.refresh_price_list()
#print(lucky2.get_price_list())
price_list = lucky2.get_price_list()
provider_currency = lucky2.get_provider_currency()
class GetPrices:
'get the provider config'
def __init__(self, plan_results, billable_items, price_list, provider_currency, currency, discount_percentage):
self.plan_results = plan_results
self.plan_resources_path = 'planned_values.root_module.resources'
self.billable_items = billable_items
self.price_list = price_list
self.provider_currency = provider_currency
self.currency = currency
self.discount_percentage = discount_percentage
self.plan_billable_items = self.__build_plan_billable_items(self.plan_results, self.plan_resources_path, self.price_list)
self.plan_prices = self.__currency_plan_billablle_items(self.plan_billable_items, self.provider_currency, self.currency)
def __build_plan_billable_items(self, plan_results, plan_resources_path, price_list):
bill = {'resources': []}
for i in jmespath.search(plan_resources_path, plan_results):
for j in billable_items.keys():
if i['type'] == j:
bill_entry = {}
resource_name = i['name']
bill_entry['resource_name'] = resource_name
resource_type = i['type']
bill_entry['resource_type'] = resource_type
jmes_query = billable_items[resource_type]
# will get a price returned as None type for server, dont convert to float
price = jmespath.search(jmes_query, price_list)
if resource_type.startswith('hcloud_'):
jmes_query = 'values.location'
location = jmespath.search(jmes_query, i)
#if location != 'None': # works as this is a value from a doc
if location is not None:
resource_location = location
bill_entry['resource_location'] = resource_location
if resource_type == 'hcloud_floating_ip':
jmes_query = 'values.home_location'
resource_location = jmespath.search(jmes_query, i)
bill_entry['resource_location'] = resource_location
if resource_type == 'hcloud_volume':
jmes_query = "values.size"
resource_size = jmespath.search(jmes_query, i)
price = float(resource_size) * float(price)
bill_entry['price'] = price
resource_size = str(resource_size) + "GB"
bill_entry['resource_size'] = resource_size
if resource_type == 'hcloud_server':
jmes_query = "values.server_type"
resource_size = jmespath.search(jmes_query, i)
bill_entry['resource_size'] = resource_size
jmes_query = ((billable_items[resource_type]).replace("REPLACE_SERVER_TYPE", resource_size)).replace("REPLACE_LOCATION", location)
price = float(jmespath.search(jmes_query, price_list))
bill_entry['price'] = price
if price != 'None':
price = float(price)
bill_entry['price'] = price
bill['resources'].append(bill_entry)
#print(json.dumps(bill, indent=2, sort_keys=True))
return bill
def __currency_plan_billablle_items(self, plan_billable_items, provider_currency, currency):
c = CurrencyConverter()
plan_prices = {'resources': []}
provider_total_price = 0
currency_total_price = 0
for i in plan_billable_items['resources']:
provider_entry_key = "price_" + provider_currency
currency_entry_key = "price_" + currency
provider_entry_value = round(float(i['price']), 2)
currency_entry_value = round(float(c.convert(i['price'], provider_currency, currency)), 2)
price_entry = {
'price': i['price'],
provider_entry_key: provider_entry_value,
currency_entry_key: currency_entry_value
}
i['price'] = price_entry
plan_prices['resources'].append(i)
provider_total_price = provider_total_price + provider_entry_value
currency_total_price = currency_total_price + currency_entry_value
total_price_entry = {
'price': provider_total_price,
provider_currency: provider_total_price,
currency: currency_total_price
}
plan_prices['total_price'] = total_price_entry
#print(json.dumps(plan_prices, indent=2, sort_keys=True))
return plan_prices
def get_plan_cost(self):
return self.plan_prices
def get_provider_currency(self):
return self.provider_currency
def get_currency(self):
return self.currency
def get_plan_total(self, *args):
if not args:
return self.plan_prices['total_price']['price']
if self.currency in args:
return self.plan_prices['total_price'][self.currency]
else:
return self.plan_prices['total_price']['price']
lucky3 = GetPrices(plan_results, billable_items, price_list, provider_currency, currency, discount_percentage)
#print(json.dumps((lucky3.get_plan_cost()), indent=2, sort_keys=True))
#print(lucky3.get_currency())
#print(lucky3.get_provider_currency())
#print(lucky3.get_plan_total(currency))
#print(lucky3.get_plan_total())
print(lucky3.get_currency())
print(lucky3.get_plan_total(lucky3.get_currency()))
lucky.apply_plan()
# terrafrom module needs a new function to apply terraform
# want discount percentage