diff --git a/mdd/scripts/generate_inventory.py b/mdd/scripts/generate_inventory.py index 1bb67aa..a1e4b7f 100644 --- a/mdd/scripts/generate_inventory.py +++ b/mdd/scripts/generate_inventory.py @@ -1,88 +1,75 @@ import os import requests import yaml - -def fetch_netbox_devices(): - # Retrieve API key and URL from environment variables - netbox_api_url = os.environ.get('NETBOX_API') - netbox_api_token = os.environ.get('NETBOX_TOKEN') - - if not netbox_api_url or not netbox_api_token: - print("Error: NETBOX_API or NETBOX_TOKEN environment variables are not set.") - exit(1) - - # Construct the API request headers - headers = { - 'Authorization': f'Token {netbox_api_token}' +import load_env_vars + +ENV_VARS = { + "NAUTOBOT_URL" : None, + "NAUTOBOT_TOKEN" : None +} + +ENV_VARS = load_env_vars.load_env_vars(os.environ, ENV_VARS) + +headers = {'Authorization': f'Token {ENV_VARS['NAUTOBOT_TOKEN']}'} +verify_ssl = False +existing_inventory_path = '/path/to/existing/inventory.yml' # Replace with the actual path + +def get_inventory_structure(): #TODO: Compare to generate_tags + """Generated default inventory structure""" + return { + 'all': { + 'children': { + 'network': { + 'children': {} + } + } + } } - # Disable SSL certificate validation - verify_ssl = False +def fetch_netbox_devices(): + existing_inventory = {} # Make API requests to NetBox to retrieve device information response = requests.get( - f'{netbox_api_url}/api/dcim/devices/', + f'{ENV_VARS['NAUTOBOT_URL']}/api/dcim/devices/', headers=headers, verify=verify_ssl # Disable SSL certificate validation ) - if response.status_code == 200: - # Parse the JSON response into a Python dictionary - device_data = response.json() - - # Create an empty inventory dictionary - inventory_data = { - 'all': { - 'children': { - 'network': { - 'children': {} - } - } - } - } - - # Check for an existing inventory file in a different directory - existing_inventory_path = '/path/to/existing/inventory.yml' # Replace with the actual path - existing_inventory = {} - - if os.path.exists(existing_inventory_path): - with open(existing_inventory_path, 'r') as existing_inventory_file: - existing_inventory = yaml.safe_load(existing_inventory_file) - - # Populate the inventory with devices based on device role - for device in device_data['results']: - device_name = device['name'] - device_role = device['device_role']['slug'] - - # Check if the device is not in the existing inventory - if device_name not in existing_inventory.get('all', {}).get('children', {}).get('network', {}).get('children', {}).get(device_role, {}).get('hosts', {}): - # Check if primary_ip4 exists and is not None - primary_ip_data = device.get('primary_ip4') - if primary_ip_data and primary_ip_data.get('address'): - primary_ip = primary_ip_data['address'].split('/')[0] # Extract IP without the /32 - # Create groups for each device role under the "network" group if they don't exist - if device_role not in inventory_data['all']['children']['network']['children']: - inventory_data['all']['children']['network']['children'][device_role] = { - 'hosts': {} - } - - # Add the device to its respective role group and include the ansible_host - inventory_data['all']['children']['network']['children'][device_role]['hosts'][device_name] = { - 'ansible_host': primary_ip - } - else: - # If there is no IP address available, create groups without ansible_host - if device_role not in inventory_data['all']['children']['network']['children']: - inventory_data['all']['children']['network']['children'][device_role] = { - 'hosts': {} - } - inventory_data['all']['children']['network']['children'][device_role]['hosts'][device_name] = {} - - return inventory_data - else: - print(f"Error fetching data from NetBox: {response.status_code}") + if response.status_code != 200: + print(f"Error fetching devices from NetBox: {response.status_code}") exit(1) + # Create an empty inventory dictionary + inventory_data = get_inventory_structure() + + # Check for an existing inventory file in a different directory + if os.path.exists(existing_inventory_path): + with open(existing_inventory_path, 'r') as existing_inventory_file: + existing_inventory = yaml.safe_load(existing_inventory_file) + + # Populate the inventory with devices based on device role + for device in response.json()['results']: + device_name = device['name'] + device_role = device['device_role']['slug'] + + # Check if the device is not in the existing inventory + if device_name in existing_inventory.get('all', {}).get('children', {}).get('network', {}).get('children', {}).get(device_role, {}).get('hosts', {}): + continue + + # Check if primary_ip4 exists and is not None + primary_ip_data = device.get('primary_ip4') + if not primary_ip_data: # Still need to create the role so that it shows up in the inventory file - because line 69 won't hit these cases + inventory_data['all']['children']['network']['children'].setdefault(device_role, {}).setdefault('hosts', {}).setdefault(device_name, {}) + continue + + primary_ip_address = primary_ip_data.get('address') #TODO: will this ever not be there + primary_ip = primary_ip_address.split('/')[0] # Extract IP without the /32 + + inventory_data['all']['children']['network']['children'].setdefault(device_role, {}).setdefault('hosts', {}).setdefault(device_name, {})['ansible_host'] = primary_ip + + return inventory_data + def write_inventory_file(inventory_data): # Define the inventory file path as "inventory/network.yml" inventory_file_path = os.path.join(os.path.dirname(__file__), '..', 'inventory', 'network.yml') diff --git a/mdd/scripts/generate_tags.py b/mdd/scripts/generate_tags.py index 1792745..647bb1e 100644 --- a/mdd/scripts/generate_tags.py +++ b/mdd/scripts/generate_tags.py @@ -1,21 +1,22 @@ import os import requests import yaml -import json +import load_env_vars -def fetch_netbox_devices(): - netbox_api_url = os.environ.get('NETBOX_API') - netbox_api_token = os.environ.get('NETBOX_TOKEN') +ENV_VARS = { + "NAUTOBOT_URL" : None, + "NAUTOBOT_TOKEN" : None +} - if not netbox_api_url or not netbox_api_token: - print("NETBOX_API or NETBOX_TOKEN environment variables are not set.") - exit(1) +ENV_VARS = load_env_vars.load_env_vars(os.environ, ENV_VARS) - headers = {'Authorization': f'Token {netbox_api_token}', 'Accept': 'application/json'} - verify_ssl = False +headers = {'Authorization': f'Token {ENV_VARS['NAUTOBOT_TOKEN']}', 'Accept': 'application/json'} +verify_ssl = False - # Initialize the inventory structure with vars for sites - inventory_data = { +def get_inventory_structure(): + """Returns the default inventory structure""" + + return { 'all': { 'vars': { 'sites': [] @@ -28,36 +29,40 @@ def fetch_netbox_devices(): } } +def fetch_netbox_devices(): + # Initialize the inventory structure with vars for sites + inventory_data = get_inventory_structure() + + # Fetch sites from NetBox + response = requests.get(f'{ENV_VARS['NAUTOBOT_URL']}/api/dcim/sites/?limit=1000', headers=headers, verify=verify_ssl) + + if response.status_code != 200: + print(f"Failed to fetch sites from NetBox: {response.status_code}") + exit(1) + + # Populate the sites list + for site in response.json()['results']: + inventory_data['all']['vars']['sites'].append(site['name']) + # Fetch devices from NetBox - response = requests.get(f'{netbox_api_url}/api/dcim/sites/?limit=1000', headers=headers, verify=verify_ssl) - if response.status_code == 200: - sites_data = response.json() - # Populate the sites list - for site in sites_data['results']: - inventory_data['all']['vars']['sites'].append(site['name']) - - response = requests.get(f'{netbox_api_url}/api/dcim/devices/?limit=1000', headers=headers, verify=verify_ssl) - if response.status_code == 200: - device_data = response.json() - - for device in device_data['results']: - device_name = device['name'] - site_name = device.get('site', {}).get('name', 'undefined') - device_tags = [tag['name'] for tag in device.get('tags', [])] - - if site_name not in inventory_data['all']['children']['org']['children']: - inventory_data['all']['children']['org']['children'][site_name] = {'hosts': {}} - - # Add the device under the site with its tags - inventory_data['all']['children']['org']['children'][site_name]['hosts'][device_name] = {'tags': device_tags} - else: - print(f"Failed to fetch data from NetBox: {response.status_code}") + response = requests.get(f'{ENV_VARS['NAUTOBOT_URL']}/api/dcim/devices/?limit=1000', headers=headers, verify=verify_ssl) + + if response.status_code != 200: + print(f"Failed to fetch devices from NetBox: {response.status_code}") exit(1) + for device in response.json()['results']: + device_name = device['name'] + site_name = device.get('site', {}).get('name', 'undefined') + device_tags = [tag['name'] for tag in device.get('tags', [])] + + # Add the device under the site with its tags + inventory_data['all']['children']['org']['children'].setdefault(site_name, {}).setdefault('hosts', {}).setdefault(device_name, {})['tags'] = device_tags + return inventory_data def write_inventory_file(inventory_data): - inventory_file_path = os.path.join(os.path.dirname(__file__),'..', 'inventory', 'tags.yml') + inventory_file_path = os.path.join(os.path.dirname(__file__), '..', 'inventory', 'tags.yml') with open(inventory_file_path, 'w') as f: yaml.dump(inventory_data, f, default_flow_style=False, sort_keys=False) diff --git a/mdd/scripts/load_env_vars.py b/mdd/scripts/load_env_vars.py new file mode 100644 index 0000000..ab16c79 --- /dev/null +++ b/mdd/scripts/load_env_vars.py @@ -0,0 +1,33 @@ +# CALL FILE TOOLS? +def load_env_vars(OS_ENV, ENV_VARS): + """ + Sees if any environment variables are not set - if they are not, exits the program + OS_ENV should be passed as os.environ + """ + missing_vars = [] + for var in ENV_VARS: + value = OS_ENV.get(var) + if not value: + missing_vars.append(var) + else: + # Create a variable with the name from the environment variable + ENV_VARS[var] = value + + # If any required environment variable is missing or empty, print an error message and exit + if missing_vars: + print(f"Error: The following environment variables are not set or empty: {', '.join(missing_vars)}") + exit(1) + + return ENV_VARS + +def get_sub_dict(dict, level): + """ + Returns reference to a sub dictionary based on first key + Used for inventory where you only have 1 key + """ + sub_dict = dict + for _ in range(level): + key = list(sub_dict.keys())[0] # get first key + sub_dict = sub_dict[key] + + return sub_dict # reference \ No newline at end of file diff --git a/mdd/scripts/nautobot_add_sites.py b/mdd/scripts/nautobot_add_sites.py index d18fc05..92085d7 100644 --- a/mdd/scripts/nautobot_add_sites.py +++ b/mdd/scripts/nautobot_add_sites.py @@ -1,86 +1,183 @@ import csv -import requests - -# Nautobot details -NAUTOBOT_URL = -NAUTOBOT_TOKEN = -HEADERS = { - 'Authorization': f'Token {NAUTOBOT_TOKEN}', - 'Content-Type': 'application/json', - 'Accept': 'application/json', -} +from nautobot import Nautobot #pip install nautobot-client +import load_env_vars as load_env_vars +import os # Disable SSL Warnings (for self-signed certificates) requests.packages.urllib3.disable_warnings() +ENV_VARS = { + "NAUTOBOT_URL" : None, + "NAUTOBOT_TOKEN" : None +} +NETBOX_DEVICES_PATH = './docs/netbox_devices.csv' +NETBOX_SITES_PATH = './docs/netbox_sites.csv' + +ENV_VARS = load_env_vars.load_env_vars(os.environ, ENV_VARS) + +# Initialize the Nautobot client +nautobot = Nautobot( + url=ENV_VARS['NAUTOBOT_URL'], + token=ENV_VARS['NAUTOBOT_TOKEN'] +) + def create_site(site): - url = f"{NAUTOBOT_URL}/dcim/sites/" - response = requests.post(url, headers=HEADERS, json=site, verify=False) + response = nautobot.dcim.sites.create(**site) + if response.status_code == 201: print(f"Site '{site['name']}' created successfully.") else: print(f"Failed to create site '{site['name']}': {response.text}") def create_device(device): - url = f"{NAUTOBOT_URL}/dcim/devices/" - response = requests.post(url, headers=HEADERS, json=device, verify=False) + response = nautobot.dcim.devices.create(**device) + if response.status_code == 201: print(f"Device '{device['name']}' created successfully.") - return response.json()['id'] # Return device ID for further use - else: - print(f"Failed to create device '{device['name']}': {response.text}") + return response.id # Return device ID for further use + + print(f"Failed to create device '{device['name']}': {response.text}") def create_interface(device_id, interface): - url = f"{NAUTOBOT_URL}/dcim/interfaces/" interface['device'] = device_id - response = requests.post(url, headers=HEADERS, json=interface, verify=False) + response = nautobot.dcim.interfaces.create(**interface) + if response.status_code == 201: print(f"Interface '{interface['name']}' created successfully.") - return response.json()['id'] - else: - print(f"Failed to create interface '{interface['name']}': {response.text}") + return response.id + + print(f"Failed to create interface '{interface['name']}': {response.text}") def assign_ip_to_interface(interface_id, ip): - url = f"{NAUTOBOT_URL}/ipam/ip-addresses/" ip_data = { "address": ip, "assigned_object_type": "dcim.interface", "assigned_object_id": interface_id } - response = requests.post(url, headers=HEADERS, json=ip_data, verify=False) + response = nautobot.ipam.ip_addresses.create(**ip_data) + if response.status_code == 201: print(f"IP Address '{ip}' assigned successfully.") else: print(f"Failed to assign IP Address '{ip}': {response.text}") -# Import Sites from CSV -with open('docs/netbox_sites.csv', mode='r') as csvfile: - reader = csv.DictReader(csvfile) - for row in reader: - site = { - "name": row['Name'], - "slug": row['Slug'], - "status": "active" if row['Active'] == "Active" else "inactive", - "region": None, # Adjust this if you are using regions - "description": row['Comments'], - # Include more fields as needed - } - create_site(site) - -# Import Devices from CSV -with open('docs/netbox_devices.csv', mode='r') as csvfile: - reader = csv.DictReader(csvfile) - for row in reader: - device = { - "name": row['Name'], - "device_type": None, # Specify device type ID - "device_role": None, # Specify device role ID - "site": row['Name'][:4].upper(), # Match site by first four letters of device name - "status": "active" if row['Status'] == "Active" else "planned", - # Include more fields as needed - } - device_id = create_device(device) - if device_id and row.get('IP Address'): - interface_id = create_interface(device_id, {"name": "Management", "type": "virtual"}) - if interface_id: - assign_ip_to_interface(interface_id, row['IP Address']) + +def main(): + # Import Sites from CSV + with open(NETBOX_SITES_PATH, mode='r') as csvfile: + reader = csv.DictReader(csvfile) + for row in reader: + site = { + "name": row['Name'], + "slug": row['Slug'], + "status": "active" if row['Active'] == "Active" else "inactive", + "region": None, # Adjust this if you are using regions + "description": row['Comments'], + # Include more fields as needed + } + create_site(site) + + # Import Devices from CSV + with open(NETBOX_DEVICES_PATH, mode='r') as csvfile: + reader = csv.DictReader(csvfile) + for row in reader: + device = { + "name": row['Name'], + "device_type": None, # Specify device type ID + "device_role": None, # Specify device role ID + "site": row['Name'][:4].upper(), # Match site by first four letters of device name + "status": "active" if row['Status'] == "Active" else "planned", + # Include more fields as needed + } + device_id = create_device(device) + if device_id and row.get('IP Address'): + interface_id = create_interface(device_id, {"name": "Management", "type": "virtual"}) + if interface_id: + assign_ip_to_interface(interface_id, row['IP Address']) + +if __name__ == "__main__": + main() + +# HEADERS = { +# 'Authorization': f'Token {ENV_VARS['NAUTOBOT_TOKEN']}', +# 'Content-Type': 'application/json', +# 'Accept': 'application/json', +# } + +# def create_site(site): +# url = f"{ENV_VARS['NAUTOBOT_URL']}/dcim/sites/" +# response = requests.post(url, headers=HEADERS, json=site, verify=False) + +# if response.status_code == 201: +# print(f"Site '{site['name']}' created successfully.") +# else: +# print(f"Failed to create site '{site['name']}': {response.text}") + +# def create_device(device): +# url = f"{ENV_VARS['NAUTOBOT_URL']}/dcim/devices/" +# response = requests.post(url, headers=HEADERS, json=device, verify=False) + +# if response.status_code == 201: +# print(f"Device '{device['name']}' created successfully.") +# return response.json()['id'] # Return device ID for further use + +# print(f"Failed to create device '{device['name']}': {response.text}") + +# def create_interface(device_id, interface): +# url = f"{ENV_VARS['NAUTOBOT_URL']}/dcim/interfaces/" +# interface['device'] = device_id + +# response = requests.post(url, headers=HEADERS, json=interface, verify=False) + +# if response.status_code == 201: +# print(f"Interface '{interface['name']}' created successfully.") +# return response.json()['id'] + +# print(f"Failed to create interface '{interface['name']}': {response.text}") + +# def assign_ip_to_interface(interface_id, ip): +# url = f"{ENV_VARS['NAUTOBOT_URL']}/ipam/ip-addresses/" +# ip_data = { +# "address": ip, +# "assigned_object_type": "dcim.interface", +# "assigned_object_id": interface_id +# } + +# response = requests.post(url, headers=HEADERS, json=ip_data, verify=False) + +# if response.status_code == 201: +# print(f"IP Address '{ip}' assigned successfully.") +# else: +# print(f"Failed to assign IP Address '{ip}': {response.text}") + +# # Import Sites from CSV +# with open('docs/netbox_sites.csv', mode='r') as csvfile: +# reader = csv.DictReader(csvfile) +# for row in reader: +# site = { +# "name": row['Name'], +# "slug": row['Slug'], +# "status": "active" if row['Active'] == "Active" else "inactive", +# "region": None, # Adjust this if you are using regions +# "description": row['Comments'], +# # Include more fields as needed +# } +# create_site(site) + +# # Import Devices from CSV +# with open('docs/netbox_devices.csv', mode='r') as csvfile: +# reader = csv.DictReader(csvfile) +# for row in reader: +# device = { +# "name": row['Name'], +# "device_type": None, # Specify device type ID +# "device_role": None, # Specify device role ID +# "site": row['Name'][:4].upper(), # Match site by first four letters of device name +# "status": "active" if row['Status'] == "Active" else "planned", +# # Include more fields as needed +# } +# device_id = create_device(device) +# if device_id and row.get('IP Address'): +# interface_id = create_interface(device_id, {"name": "Management", "type": "virtual"}) +# if interface_id: +# assign_ip_to_interface(interface_id, row['IP Address']) diff --git a/mdd/scripts/netbox_class.py b/mdd/scripts/netbox_class.py index 91ed00e..58d9dbe 100644 --- a/mdd/scripts/netbox_class.py +++ b/mdd/scripts/netbox_class.py @@ -1,17 +1,68 @@ import json import requests +import load_env_vars +import os + class netbox_helper(): - def __init__(self,NETBOX_URL, NETBOX_HEADERS): - self.netbox_url=NETBOX_URL - self.netbox_headers=NETBOX_HEADERS - - def fetch_netbox_devices(self): - """Fetch all devices from NetBox.""" - url = f"{self.netbox_url}/api/dcim/devices/" - response = requests.get(url, headers=self.netbox_headers, verify=False) - if response.status_code == 200: - return response.json().get('results', []) - else: - print(f"Failed to retrieve devices from NetBox: {response.status_code}, {response.text}") - return [] + def __init__(self, headers={}, verify_ssl=False): + ENV_VARS = { + "NETBOX_URL" : None, + "NETBOX_TOKEN" : None + } + self.ENV_VARS = load_env_vars.load_env_vars(os.environ, ENV_VARS) + + self.headers = {'Authorization': f'Token {self.ENV_VARS['NETBOX_TOKEN']}', "Content-Type": "application/json", "Accept": "application/json"} + self.headers.update(headers) + + self.verify_ssl = verify_ssl + + def get_netbox_devices(self, query=""): + """Fetch devices from NetBox""" + + url = f"{self.ENV_VARS["NETBOX_URL"]}/api/dcim/devices/{query}" + response = requests.get(url, headers=self.headers, verify=self.verify_ssl) + + if response.status_code != 200: + print(f"Failed to retrieve devices from NetBox: {response.status_code}, {response.text}") + return [] + + return response.json().get('results', []) + + def get_netbox_sites(self, query=""): + """Fetch sites from NetBox""" + + url = f"{self.ENV_VARS["NETBOX_URL"]}/api/dcim/sites/{query}" + response = requests.get(url, headers=self.headers, verify=self.verify_ssl) + + if response.status_code != 200: + print(f"Failed to retrieve sites from NetBox: {response.status_code}, {response.text}") + return [] + + return response.json().get('results', []) + + def get_netbox_device_type(self, query=""): + """Gets device_types""" + + url = f"{self.ENV_VARS['NETBOX_URL']}/api/dcim/device-types/{query}" + response = requests.get(url, headers=self.headers, verify=self.verify_ssl) + + if response.status_code != 200: + print(f"Failed to retrieve sites from NetBox: {response.status_code}, {response.text}") + return [] + + return response.json().get('results', []) + + def get_netbox_device_type(self, query=""): + """Gets device_types""" + + url = f"{self.ENV_VARS['NETBOX_URL']}/api/dcim/device-types/{query}" + response = requests.get(url, headers=self.headers, verify=self.verify_ssl) + + if response.status_code != 200: + print(f"Failed to retrieve sites from NetBox: {response.status_code}, {response.text}") + return [] + + return response.json().get('results', []) + + response = requests.post(f"{ENV_VARS['NETBOX_URL']}/api/dcim/device-types/", headers=headers, data=json.dumps(payload), verify=False) diff --git a/mdd/scripts/netbox_device_add.py b/mdd/scripts/netbox_device_add.py index 1a04cb0..645ee8f 100644 --- a/mdd/scripts/netbox_device_add.py +++ b/mdd/scripts/netbox_device_add.py @@ -1,79 +1,155 @@ import requests import urllib3 import json +import os +import load_env_vars +from pynetbox import api + +ENV_VARS = { + "NETBOX_URL": None, + "NETBOX_TOKEN": None, + "NSO_URL": None, + "NSO_USERNAME": None, + "NSO_PASSWORD": None, + "DEVICE_NAME": None, + "PLATFORM_URL": None +} + +ENV_VARS = load_env_vars.load_env_vars(os.environ, ENV_VARS) + +HEADERS = {"Authorization": f"Token {ENV_VARS['NETBOX_TOKEN']}", "Content-Type": "application/json", "Accept": "application/json"} + +netbox = api( + url=ENV_VARS['NETBOX_URL'], + token=ENV_VARS['NETBOX_TOKEN'] + ) # Disable SSL warnings urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) -def get_or_create_device_type(platform_data, headers, netbox_url): +def get_or_create_device_type(platform_data): """ Ensures the device type exists in NetBox based on the NSO platform data, and returns the device type ID. """ + + # model = platform_data["tailf-ncs:platform"]["model"] + # slug = model.replace("-", "_").lower() + + # # Check if the device type already exists + # search_response = requests.get(f"{ENV_VARS['NETBOX_URL']}/api/dcim/device-types/?slug={slug}", headers=HEADERS, verify=False) + + # if search_response.status_code != 200: + # return None + + + # if search_response.json()['count'] > 0: + # # Device type exists + # return search_response.json()["results"][0]["id"] + model = platform_data["tailf-ncs:platform"]["model"] slug = model.replace("-", "_").lower() - - # Check if the device type already exists - search_response = requests.get(f"{netbox_url}/api/dcim/device-types/?slug={slug}", headers=headers, verify=False) - if search_response.status_code == 200 and search_response.json()['count'] == 0: - # Device type does not exist, proceed with creation - payload = { - "manufacturer": "2", # Replace with actual manufacturer ID - "model": model, - "slug": slug, - "part_number": model, # Using model as part_number for simplicity - "name": model - } - response = requests.post(f"{netbox_url}/api/dcim/device-types/", headers=headers, data=json.dumps(payload), verify=False) - if response.status_code in [200, 201]: - print(f"Successfully added device type: {model}") - return response.json()["id"] - else: - print(f"Failed to add device type {model}: {response.status_code}, {response.text}") - return None - else: - # Device type exists - return search_response.json()["results"][0]["id"] - -def update_device(platform_data, device_type_id, headers, netbox_url): + + # Search for the device type in NetBox + device_types = netbox.dcim.device_types.filter(slug=slug) + + # Check if any device types were found + if device_types: + device_types[0].id + + ## MEANS NOT THERE - SO CREATE IT + + # Create the device type using pynetbox + device_type = netbox.dcim.device_types.create( + manufacturer="2", + model=model, + slug=slug, + part_number=model, # Using model as part_number for simplicity + name=model + ) + + # Check if the device type creation was successful + if device_type.id is None: + print(f"Failed to add device type {model}") + return None + + print(f"Successfully added device type: {model}") + return device_type.id + + # response = requests.post(f"{ENV_VARS['NETBOX_URL']}/api/dcim/device-types/", headers=headers, data=json.dumps(payload), verify=False) + + # if response.status_code not in [200, 201]: + # print(f"Failed to add device type {model}: {response.status_code}, {response.text}") + # return None + + # print(f"Successfully added device type: {model}") + # return response.json()["id"] + +def update_device(self, device_name, platform_data, device_type_id): # ASK WHY DEVICE_NAME = "" - get all? """ Updates an existing device in NetBox with the device type based on NSO platform data. """ - device_name = "" + serial_number = platform_data["tailf-ncs:platform"]["serial-number"] - # Attempt to find the device in NetBox - search_response = requests.get(f"{netbox_url}/api/dcim/devices/?name={device_name}", headers=headers, verify=False) - if search_response.status_code == 200 and search_response.json()['count'] > 0: - device_id = search_response.json()["results"][0]["id"] - payload = { - "device_type": device_type_id, - "serial": serial_number - } - update_response = requests.patch(f"{netbox_url}/api/dcim/devices/{device_id}/", headers=headers, data=json.dumps(payload), verify=False) - if update_response.status_code in [200, 201, 204]: - print(f"Successfully updated device: {device_name}") - else: - print(f"Failed to update device {device_name}: {update_response.status_code}, {update_response.text}") - else: + + # Search for the device in NetBox + devices = self.netbox.dcim.devices.filter(name=device_name) + + # Check if the device exists + if not devices: print(f"Device {device_name} does not exist. Skipping.") + return + + # Get the ID of the first matching device + device_id = devices[0].id + + # Update the device with the new device type and serial number + device = self.netbox.dcim.devices.get(device_id) + device.device_type = device_type_id + device.serial = serial_number + device.save() + + print(f"Successfully updated device: {device.name}") -# Setup for NetBox and NSO -netbox_url = -netbox_token = -headers = {"Authorization": f"Token {netbox_token}", "Content-Type": "application/json", "Accept": "application/json"} -nso_url = -nso_username = -nso_password = -device_name = -platform_url = +# def update_device(platform_data, device_type_id): +# """ +# Updates an existing device in NetBox with the device type based on NSO platform data. +# """ + +# device_name = "" +# serial_number = platform_data["tailf-ncs:platform"]["serial-number"] + +# # Attempt to find the device in NetBox +# search_response = requests.get(f"{ENV_VARS['NETBOX_URL']}/api/dcim/devices/?name={device_name}", headers=HEADERS, verify=False) + +# if search_response.status_code != 200: +# return + +# if search_response.json()['count'] == 0: +# print(f"Device {device_name} does not exist. Skipping.") +# return + +# device_id = search_response.json()["results"][0]["id"] +# payload = { +# "device_type": device_type_id, +# "serial": serial_number +# } + +# update_response = requests.patch(f"{ENV_VARS['NETBOX_URL']}/api/dcim/devices/{device_id}/", headers=HEADERS, data=json.dumps(payload), verify=False) + +# if update_response.status_code in [200, 201, 204]: +# print(f"Successfully updated device: {device_name}") +# else: +# print(f"Failed to update device {device_name}: {update_response.status_code}, {update_response.text}") # Make the GET request to retrieve the platform information -response = requests.get(platform_url, auth=(nso_username, nso_password), headers={"Accept": "application/yang-data+json"}, verify=False) -if response.status_code == 200: - print("Platform information retrieved successfully") - platform_data = response.json() - device_type_id = get_or_create_device_type(platform_data, headers, netbox_url) - if device_type_id: - update_device(platform_data, device_type_id, headers, netbox_url) -else: +response = requests.get(ENV_VARS['NSO_URL'], auth=(ENV_VARS['NSO_USERNAME'], ENV_VARS['NSO_PASSWORD']), headers={"Accept": "application/yang-data+json"}, verify=False) +if response.status_code != 200: print(f"Failed to retrieve platform information: {response.status_code}, {response.text}") + os._exit(1) + +print("Platform information retrieved successfully") +platform_data = response.json() +device_type_id = get_or_create_device_type(platform_data) +if device_type_id: + update_device("", platform_data, device_type_id) diff --git a/mdd/scripts/netbox_device_csv.py b/mdd/scripts/netbox_device_csv.py index 49957b4..e898a90 100644 --- a/mdd/scripts/netbox_device_csv.py +++ b/mdd/scripts/netbox_device_csv.py @@ -1,118 +1,198 @@ import csv import os -import requests -import urllib3 -import netbox_class +from pynetbox import api +import load_env_vars -urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - -# Load environment variables -NETBOX_URL = os.getenv('NETBOX_URL') -NETBOX_TOKEN = os.getenv('NETBOX_TOKEN') +ENV_VARS = { + "NETBOX_URL": None, + "NETBOX_TOKEN": None +} DEVICES_CSV_FILE_PATH = './docs/netbox_devices.csv' DEVICE_TYPE_LOOKUP_CSV_PATH = './docs/device_type_lookup.csv' -headers = {"Authorization": f"Token {NETBOX_TOKEN}", "Content-Type": "application/json"} +ENV_VARS = load_env_vars.load_env_vars(os.environ, ENV_VARS) + +# Initialize NetBox API +netbox = api(url=ENV_VARS['NETBOX_URL'], token=ENV_VARS['NETBOX_TOKEN']) def fetch_id(endpoint, search_param, search_field='name'): - url = f"{NETBOX_URL}/api/{endpoint}/" - params = {search_field: search_param} - response = requests.get(url, headers=headers, params=params, verify=False) - if response.status_code == 200 and response.json()['count'] == 1: - return response.json()['results'][0]['id'] - elif response.status_code == 200 and response.json()['count'] > 1: - print(f"Multiple matches found for {search_param} from {endpoint}, using the first one.") - return response.json()['results'][0]['id'] - print(f"Error fetching ID for {search_param} from {endpoint}.") - return None - -def load_device_lookup(): - lookup = {} - with open(DEVICE_TYPE_LOOKUP_CSV_PATH, mode='r', encoding='utf-8') as csvfile: - reader = csv.DictReader(csvfile) - for row in reader: - lookup[row['Code']] = {'DeviceRole': row['DeviceRole'], 'DeviceType': row['DeviceType']} - return lookup + try: + objects = getattr(netbox.dcim, endpoint).filter(**{search_field: search_param}) + return objects[0].id if objects else None + except Exception as e: + print(f"Error fetching ID for {search_param} from {endpoint}: {e}") + return None def fetch_sites(): - site_mapping = {} - url = f"{NETBOX_URL}/api/dcim/sites/" - response = requests.get(url, headers=headers, verify=False) - for site in response.json().get('results', []): - site_mapping[site['slug'].upper()] = site['id'] - return site_mapping - -def add_management_interface_and_ip(device_id, ip_address): - print(f"Starting to add management interface to device ID {device_id}...") - - # Step 1: Create the management interface - url = f"{NETBOX_URL}/api/dcim/interfaces/" - data = {"device": device_id, "name": "Management", "type": "virtual", "enabled": True} - interface_response = requests.post(url, headers=headers, json=data, verify=False) - - if interface_response.status_code in [200, 201]: - interface_id = interface_response.json()['id'] - print(f"'Management' interface with ID {interface_id} created successfully.") - - # Step 2: Assign the IP address to the management interface - ip_url = f"{NETBOX_URL}/api/ipam/ip-addresses/" - ip_data = { - "address": ip_address, - "assigned_object_id": interface_id, - "assigned_object_type": "dcim.interface" - } - ip_response = requests.post(ip_url, headers=headers, json=ip_data, verify=False) - - if ip_response.status_code in [200, 201]: - print(f"IP address {ip_address} assigned successfully to 'Management' interface.") - ip_id = ip_response.json()['id'] - - # Step 3: Set the IP address as the primary IP for the device - device_update_url = f"{NETBOX_URL}/api/dcim/devices/{device_id}/" - device_update_data = { - "primary_ip4": ip_id - } - device_update_response = requests.patch(device_update_url, headers=headers, json=device_update_data, verify=False) - - if device_update_response.status_code in [200, 201, 204]: - print(f"IP address {ip_address} set as primary IP for device ID {device_id}.") - else: - print(f"Failed to set IP address as primary for device ID {device_id}: {device_update_response.text}") - else: - print(f"Failed to assign IP address {ip_address} to 'Management' interface: {ip_response.text}") - else: - print(f"Failed to create 'Management' interface for device ID {device_id}: {interface_response.text}") + return {site.slug.upper(): site.id for site in netbox.dcim.sites.all()} + +def add_management_interface_and_ip(device, ip_address): + try: + interface = device.interfaces.create(name="Management", type="virtual", enabled=True) + interface.ip_addresses.create(address=ip_address) + device.primary_ip4 = interface.primary_ip.id + device.save() + print(f"Management interface added to device {device.name} with IP {ip_address}.") + except Exception as e: + print(f"Error adding management interface to device {device.name}: {e}") def create_device(row, site_mapping, device_lookup): - site_prefix = row['Name'][:4].upper() - site_id = site_mapping.get(site_prefix, site_mapping.get('DEFAULT')) + site_id = site_mapping.get(row['Name'][:4].upper(), site_mapping.get('DEFAULT')) device_code = row['Name'][9:11] lookup = device_lookup.get(device_code, {}) - device_role_id = fetch_id('dcim/device-roles', lookup.get('DeviceRole'), 'name') - device_type_id = fetch_id('dcim/device-types', lookup.get('DeviceType'), 'model') - if not (site_id and device_role_id and device_type_id): + device_role_id = fetch_id('device_roles', lookup.get('DeviceRole')) + device_type_id = fetch_id('device_types', lookup.get('DeviceType'), 'model') + + if not all([site_id, device_role_id, device_type_id]): print(f"Skipping device creation for {row['Name']} due to missing required IDs.") return - data = {"name": row['Name'], "site": site_id, "device_role": device_role_id, "device_type": device_type_id, "serial": row.get('Serial number', '')} - device_response = requests.post(f"{NETBOX_URL}/api/dcim/devices/", headers=headers, json=data, verify=False) - if device_response.status_code in [200, 201]: + + try: + device = netbox.dcim.devices.create( + name=row['Name'], + site=site_id, + device_role=device_role_id, + device_type=device_type_id, + serial=row.get('Serial number', '') + ) print(f"Device '{row['Name']}' created successfully.") - device_id = device_response.json()['id'] - add_management_interface_and_ip(device_id, row.get('IP Address', '')) - else: - print(f"Failed to create device '{row['Name']}': {device_response.text}") + add_management_interface_and_ip(device, row.get('IP Address', '')) + except Exception as e: + print(f"Failed to create device '{row['Name']}': {e}") def main(): print("Script started.") site_mapping = fetch_sites() print(f"Loaded {len(site_mapping)} sites.") - device_lookup = load_device_lookup() + + device_lookup = {row['Code']: {'DeviceRole': row['DeviceRole'], 'DeviceType': row['DeviceType']} for row in csv.DictReader(open(DEVICE_TYPE_LOOKUP_CSV_PATH))} print(f"Loaded device lookup with codes: {list(device_lookup.keys())}") - with open(DEVICES_CSV_FILE_PATH, mode='r', encoding='utf-8') as csvfile: - reader = csv.DictReader(csvfile) - for row in reader: - print(f"Processing device {row['Name']}...") - create_device(row, site_mapping, device_lookup) + + for row in csv.DictReader(open(DEVICES_CSV_FILE_PATH)): + print(f"Processing device {row['Name']}...") + create_device(row, site_mapping, device_lookup) if __name__ == "__main__": - main() \ No newline at end of file + main() + + +# def fetch_id(endpoint, search_param, search_field='name'): +# url = f"{ENV_VARS['NETBOX_URL']}/api/{endpoint}/" +# params = {search_field: search_param} + +# response = requests.get(url, headers=headers, params=params, verify=False) + +# if response.status_code != 200: +# print(f"Error fetching ID for {search_param} from {endpoint}.") +# return None + +# if response.json()['count'] == 1: +# return response.json()['results'][0]['id'] + +# if response.json()['count'] > 1: +# print(f"Multiple matches found for {search_param} from {endpoint}, using the first one.") +# return response.json()['results'][0]['id'] + +# return None # count == 0 + +# def load_device_lookup(): +# lookup = {} +# with open(DEVICE_TYPE_LOOKUP_CSV_PATH, mode='r', encoding='utf-8') as csvfile: +# reader = csv.DictReader(csvfile) +# for row in reader: +# lookup[row['Code']] = {'DeviceRole': row['DeviceRole'], 'DeviceType': row['DeviceType']} +# return lookup + +# def fetch_sites(): +# site_mapping = {} +# url = f"{ENV_VARS['NETBOX_URL']}/api/dcim/sites/" +# response = requests.get(url, headers=headers, verify=False) + +# for site in response.json().get('results', []): +# site_mapping[site['slug'].upper()] = site['id'] + +# return site_mapping + +# # def add_management_interface_and_ip(device_id, ip_address): +# # print(f"Starting to add management interface to device ID {device_id}...") + +# # # Step 1: Create the management interface +# # url = f"{ENV_VARS['NETBOX_URL']}/api/dcim/interfaces/" +# # data = {"device": device_id, "name": "Management", "type": "virtual", "enabled": True} +# # interface_response = requests.post(url, headers=headers, json=data, verify=False) + +# # if interface_response.status_code not in [200, 201]: +# # print(f"Failed to create 'Management' interface for device ID {device_id}: {interface_response.text}") +# # return + +# # interface_id = interface_response.json()['id'] +# # print(f"'Management' interface with ID {interface_id} created successfully.") + +# # # Step 2: Assign the IP address to the management interface +# # ip_url = f"{ENV_VARS['NETBOX_URL']}/api/ipam/ip-addresses/" +# # ip_data = { +# # "address": ip_address, +# # "assigned_object_id": interface_id, +# # "assigned_object_type": "dcim.interface" +# # } +# # ip_response = requests.post(ip_url, headers=headers, json=ip_data, verify=False) + +# # if ip_response.status_code not in [200, 201]: +# # print(f"Failed to assign IP address {ip_address} to 'Management' interface: {ip_response.text}") +# # return + +# # print(f"IP address {ip_address} assigned successfully to 'Management' interface.") +# # ip_id = ip_response.json()['id'] + +# # # Step 3: Set the IP address as the primary IP for the device +# # device_update_url = f"{ENV_VARS['NETBOX_URL']}/api/dcim/devices/{device_id}/" +# # device_update_data = { +# # "primary_ip4": ip_id +# # } +# # device_update_response = requests.patch(device_update_url, headers=headers, json=device_update_data, verify=False) + +# # if device_update_response.status_code in [200, 201, 204]: +# # print(f"IP address {ip_address} set as primary IP for device ID {device_id}.") +# # else: +# # print(f"Failed to set IP address as primary for device ID {device_id}: {device_update_response.text}") + +# def create_device(row, site_mapping, device_lookup): +# site_prefix = row['Name'][:4].upper() +# site_id = site_mapping.get(site_prefix, site_mapping.get('DEFAULT')) +# device_code = row['Name'][9:11] +# lookup = device_lookup.get(device_code, {}) + +# device_role_id = fetch_id('dcim/device-roles', lookup.get('DeviceRole'), 'name') +# device_type_id = fetch_id('dcim/device-types', lookup.get('DeviceType'), 'model') + +# if not (site_id and device_role_id and device_type_id): +# print(f"Skipping device creation for {row['Name']} due to missing required IDs.") +# return + +# data = {"name": row['Name'], "site": site_id, "device_role": device_role_id, "device_type": device_type_id, "serial": row.get('Serial number', '')} +# device_response = requests.post(f"{ENV_VARS['NETBOX_URL']}/api/dcim/devices/", headers=headers, json=data, verify=False) + +# if device_response.status_code in [200, 201]: +# print(f"Device '{row['Name']}' created successfully.") +# device_id = device_response.json()['id'] +# add_management_interface_and_ip(device_id, row.get('IP Address', '')) +# else: +# print(f"Failed to create device '{row['Name']}': {device_response.text}") + +# def main(): +# print("Script started.") + +# site_mapping = fetch_sites() +# print(f"Loaded {len(site_mapping)} sites.") + +# device_lookup = load_device_lookup() +# print(f"Loaded device lookup with codes: {list(device_lookup.keys())}") + +# with open(DEVICES_CSV_FILE_PATH, mode='r', encoding='utf-8') as csvfile: +# reader = csv.DictReader(csvfile) +# for row in reader: +# print(f"Processing device {row['Name']}...") +# create_device(row, site_mapping, device_lookup) + +# if __name__ == "__main__": +# main() \ No newline at end of file diff --git a/mdd/scripts/netbox_generate_sites.py b/mdd/scripts/netbox_generate_sites.py index 1667881..803781f 100644 --- a/mdd/scripts/netbox_generate_sites.py +++ b/mdd/scripts/netbox_generate_sites.py @@ -1,62 +1,126 @@ -import csv -import requests import os -import urllib3 +import csv from urllib.parse import urljoin -urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) +from pynetbox import api +import load_env_vars + +ENV_VARS = { + "NETBOX_URL": None, + "NETBOX_TOKEN": None +} +# Path to your CSV file +csv_file_path = './docs/netbox_sites.csv' + +ENV_VARS = load_env_vars.load_env_vars(os.environ, ENV_VARS) -# Load environment variables -netbox_token = os.getenv('NETBOX_TOKEN') -netbox_url = os.getenv('NETBOX_URL') +# Initialize NetBox API +netbox = api(url=ENV_VARS['NETBOX_URL'], token=ENV_VARS['NETBOX_TOKEN']) def create_region(region_name): """Create a region in NetBox if it doesn't already exist.""" - if region_name == '': + if not region_name: return None - url = urljoin(netbox_url, 'api/dcim/regions/') - headers = {'Authorization': f'Token {netbox_token}', 'Content-Type': 'application/json'} - response = requests.get(url, headers=headers, params={'name': region_name}, verify=False) # Added verify=False here - if response.status_code == 200 and response.json()['count'] == 0: - data = {'name': region_name, 'slug': region_name.lower()} - create_response = requests.post(url, json=data, headers=headers, verify=False) # And here - if create_response.status_code in [200, 201]: + + try: + region = netbox.dcim.regions.get(name=region_name) + print(f"Region '{region_name}' already exists.") + return region.id + except netbox.core.query.RequestError: + try: + region = netbox.dcim.regions.create(name=region_name, slug=region_name.lower()) print(f"Region '{region_name}' created successfully.") - return create_response.json()['id'] - else: - print(f"Failed to create region '{region_name}'.") + return region.id + except Exception as e: + print(f"Failed to create region '{region_name}': {e}") return None - elif response.json()['count'] > 0: - print(f"Region '{region_name}' already exists.") - return response.json()['results'][0]['id'] - else: - print(f"Failed to check if region '{region_name}' exists.") - return None def create_site(row): - """Create a site in NetBox using row data from the CSV.""" + """Create a site in NetBox using row data from the CSV""" region_id = create_region(row['Region']) - url = urljoin(netbox_url, 'api/dcim/sites/') - headers = {'Authorization': f'Token {netbox_token}', 'Content-Type': 'application/json'} - data = { - 'name': row['Name'], - 'slug': row['Slug'], - 'status': 'active' if row['Active'] == 'Active' else 'inactive', - 'region': region_id, - 'latitude': row['Latitude'] if row['Latitude'] else None, - 'longitude': row['Longitude'] if row['Longitude'] else None, - 'description': row['Comments'], - } - response = requests.post(url, json=data, headers=headers, verify=False) # Added verify=False here - if response.status_code in [200, 201]: + + try: + site = netbox.dcim.sites.create( + name=row['Name'], + slug=row['Slug'], + status='active' if row['Active'] == 'Active' else 'inactive', + region=region_id, + latitude=row['Latitude'] if row['Latitude'] else None, + longitude=row['Longitude'] if row['Longitude'] else None, + description=row['Comments'] + ) print(f"Site '{row['Name']}' created successfully.") - else: - print(f"Failed to create site '{row['Name']}': {response.text}") + except Exception as e: + print(f"Failed to create site '{row['Name']}': {e}") -# Path to your CSV file -csv_file_path = './docs/netbox_sites.csv' +def main(): + # Read the CSV file and process each row + with open(csv_file_path, mode='r', encoding='utf-8') as csvfile: + reader = csv.DictReader(csvfile) + for row in reader: + create_site(row) + +if __name__ == "__main__": + main() + + +# headers = {'Authorization': f'Token {ENV_VARS["NETBOX_TOKEN"]}', 'Content-Type': 'application/json'} + +# # Path to your CSV file +# csv_file_path = './docs/netbox_sites.csv' + +# def create_region(region_name): +# """Create a region in NetBox if it doesn't already exist.""" + +# if region_name == '': +# return None + +# url = urljoin(ENV_VARS['NETBOX_URL'], 'api/dcim/regions/') +# response = requests.get(url, headers=headers, params={'name': region_name}, verify=False) # Added verify=False here + +# if response.status_code != 200: +# print(f"Failed to check if region '{region_name}' exists.") +# return None + +# if response.json()['count'] > 0: +# print(f"Region '{region_name}' already exists.") +# return response.json()['results'][0]['id'] + +# # Region doesn't exist - create it +# data = {'name': region_name, 'slug': region_name.lower()} +# create_response = requests.post(url, json=data, headers=headers, verify=False) # And here + +# if create_response.status_code in [200, 201]: +# print(f"Region '{region_name}' created successfully.") +# return create_response.json()['id'] + +# print(f"Failed to create region '{region_name}'.") +# return None + +# def create_site(row): +# """Create a site in NetBox using row data from the CSV""" + +# region_id = create_region(row['Region']) +# url = urljoin(ENV_VARS['NETBOX_URL'], 'api/dcim/sites/') +# headers = {'Authorization': f'Token {NETBOX_TOKEN}', 'Content-Type': 'application/json'} +# data = { +# 'name': row['Name'], +# 'slug': row['Slug'], +# 'status': 'active' if row['Active'] == 'Active' else 'inactive', +# 'region': region_id, +# 'latitude': row['Latitude'] if row['Latitude'] else None, +# 'longitude': row['Longitude'] if row['Longitude'] else None, +# 'description': row['Comments'], +# } + +# response = requests.post(url, json=data, headers=headers, verify=False) # Added verify=False here + +# if response.status_code in [200, 201]: +# print(f"Site '{row['Name']}' created successfully.") +# else: +# print(f"Failed to create site '{row['Name']}': {response.text}") -# Read the CSV file and process each row -with open(csv_file_path, mode='r', encoding='utf-8') as csvfile: - reader = csv.DictReader(csvfile) - for row in reader: - create_site(row) +# # Read the CSV file and process each row +# with open(csv_file_path, mode='r', encoding='utf-8') as csvfile: +# reader = csv.DictReader(csvfile) +# for row in reader: +# create_site(row) diff --git a/mdd/scripts/nso_acl_ip_to_nautobot.py b/mdd/scripts/nso_acl_ip_to_nautobot.py index 58d8551..57e708c 100644 --- a/mdd/scripts/nso_acl_ip_to_nautobot.py +++ b/mdd/scripts/nso_acl_ip_to_nautobot.py @@ -2,48 +2,64 @@ import requests import base64 import re -import json import urllib3 - -# Environment variables -NSO_URL = os.getenv('NSO_URL') -NSO_USERNAME = os.getenv('NSO_USERNAME') -NSO_PASSWORD = os.getenv('NSO_PASSWORD') -NAUTOBOT_URL = os.getenv('NAUTOBOT_URL') -NAUTOBOT_TOKEN = os.getenv('NAUTOBOT_TOKEN') +import load_env_vars +from nautobot import Nautobot +from nautobot.extras.models import Status # Disable SSL warnings urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) +ENV_VARS = { + "NSO_URL": None, + "NSO_USERNAME": None, + "NSO_PASSWORD": None, + "NAUTOBOT_URL": None, + "NAUTOBOT_TOKEN": None +} +ENV_VARS = load_env_vars.load_env_vars(os.environ, ENV_VARS) + +# Initialize the Nautobot client +nautobot = Nautobot( + url=ENV_VARS['NAUTOBOT_URL'], + token=ENV_VARS['NAUTOBOT_TOKEN'] +) + +ip_pattern = re.compile(r'permit\s+(\d+\.\d+\.\d+\.\d+)(?:\s+(\d+\.\d+\.\d+\.\d+))?') + # Headers setup nso_headers = { - 'Authorization': f'Basic {base64.b64encode(f"{NSO_USERNAME}:{NSO_PASSWORD}".encode()).decode()}', + 'Authorization': f'Basic {base64.b64encode(f"{ENV_VARS['NSO_USERNAME']}:{ENV_VARS["NSO_PASSWORD"]}".encode()).decode()}', 'Content-Type': 'application/yang-data+json', 'Accept': 'application/yang-data+json' } -nautobot_headers = { - 'Authorization': f'Token {NAUTOBOT_TOKEN}', - 'Content-Type': 'application/json', - 'Accept': 'application/json' -} +# nautobot_headers = { +# 'Authorization': f'Token {ENV_VARS["NAUTOBOT_TOKEN"]}', +# 'Content-Type': 'application/json', +# 'Accept': 'application/json' +# } def fetch_nso_devices(): """Fetches a list of devices from NSO.""" - url = f"{NSO_URL}/restconf/tailf/query" + + url = f"{ENV_VARS['NSO_URL']}/restconf/tailf/query" payload = { "tailf-rest-query:immediate-query": { "foreach": "/devices/device", "select": [{"label": "name", "expression": "name", "result-type": "string"}] } } + response = requests.post(url, headers=nso_headers, json=payload, verify=False) devices = response.json().get("tailf-rest-query:query-result", {}).get("result", []) return [device["select"][0]["value"] for device in devices] def get_acl_for_device(device_name): """Fetches ACLs for a specified device and processes them.""" - url = f"{NSO_URL}/restconf/data/tailf-ncs:devices/device={device_name}/config/tailf-ned-cisco-ios:access-list" + + url = f"{ENV_VARS['NSO_URL']}/restconf/data/tailf-ncs:devices/device={device_name}/config/tailf-ned-cisco-ios:access-list" response = requests.get(url, headers=nso_headers, verify=False) + if response.status_code == 200: acl_data = response.json().get("tailf-ned-cisco-ios:access-list", {}).get("access-list", []) for acl in acl_data: @@ -55,7 +71,7 @@ def get_acl_for_device(device_name): def process_acl_rule(rule_text): """Processes a single ACL rule, extracting IPs and pushing to Nautobot.""" - ip_pattern = re.compile(r'permit\s+(\d+\.\d+\.\d+\.\d+)(?:\s+(\d+\.\d+\.\d+\.\d+))?') + match = ip_pattern.search(rule_text) if match: ip_address = match.group(1) @@ -69,30 +85,58 @@ def process_acl_rule(rule_text): def convert_to_cidr(mask): """Converts a subnet mask to CIDR notation.""" + return sum([bin(int(x)).count('1') for x in mask.split('.')]) def push_ip_to_nautobot(ip_address): """Pushes an IP address to Nautobot.""" - url = f"{NAUTOBOT_URL}/ipam/ip-addresses/" - data = data = { - "address": ip_address, - "namespace": { - "id": "fa8052af-7f81-4818-8eab-d7f08ed192a3", # Example ID, replace with actual if different - "object_type": "app_label.modelname", # Adjust as necessary - "url": "fa8052af-7f81-4818-8eab-d7f08ed192a3" # This seems incorrect; adjust based on actual requirements - }, - "type": "host", # Assuming this is constant for your use case - "status": { - "id": "cd5378a5-ed32-4f75-9e9e-980f4280e7c1", # Example status ID, replace with actual if different - "object_type": "app_label.modelname", # Adjust as necessary - "url": "https://52.61.163.54/extras/statuses/cd5378a5-ed32-4f75-9e9e-980f4280e7c1" - } - } - response = requests.post(url, headers=nautobot_headers, json=data, verify=False) - if response.status_code in [200, 201]: + + namespace_id = "fa8052af-7f81-4818-8eab-d7f08ed192a3" # Example ID, replace with actual if different + status_id = "cd5378a5-ed32-4f75-9e9e-980f4280e7c1" # Example status ID, replace with actual if different + + try: + status = Status.objects.get(pk=status_id) + except Status.DoesNotExist: + print(f"Status with ID {status_id} does not exist.") + return + + try: + nautobot.ipam.ip_addresses.create( + address=ip_address, + status=status, + assigned_object_id=namespace_id, + assigned_object_type="extras.namespace", + type="host" + ) print(f"IP {ip_address} pushed to Nautobot successfully.") - else: - print(f"Failed to push IP {ip_address} to Nautobot: {response.text}") + except Exception as e: + print(f"Failed to push IP {ip_address} to Nautobot: {e}") + +# def push_ip_to_nautobot(ip_address): # TODO: CHANGE DATA VARIABLE +# """Pushes an IP address to Nautobot.""" + +# url = f"{ENV_VARS['NAUTOBOT_URL']}/ipam/ip-addresses/" +# data = { +# "address": ip_address, +# "namespace": { +# "id": "fa8052af-7f81-4818-8eab-d7f08ed192a3", # Example ID, replace with actual if different +# "object_type": "app_label.modelname", # Adjust as necessary +# "url": "fa8052af-7f81-4818-8eab-d7f08ed192a3" # This seems incorrect; adjust based on actual requirements +# }, +# "type": "host", # Assuming this is constant for your use case +# "status": { +# "id": "cd5378a5-ed32-4f75-9e9e-980f4280e7c1", # Example status ID, replace with actual if different +# "object_type": "app_label.modelname", # Adjust as necessary +# "url": "https://IP/extras/statuses/cd5378a5-ed32-4f75-9e9e-980f4280e7c1" +# } +# } + +# response = requests.post(url, headers=nautobot_headers, json=data, verify=False) + +# if response.status_code in [200, 201]: +# print(f"IP {ip_address} pushed to Nautobot successfully.") +# else: +# print(f"Failed to push IP {ip_address} to Nautobot: {response.text}") def main(): devices = fetch_nso_devices() diff --git a/mdd/scripts/nso_class.py b/mdd/scripts/nso_class.py new file mode 100644 index 0000000..975a721 --- /dev/null +++ b/mdd/scripts/nso_class.py @@ -0,0 +1,18 @@ +import json +import requests +import load_env_vars +import os + + +class NSO_Helper(): + def __init__(self, headers={}, verify_ssl=False): + ENV_VARS = { + "NSO_URL" : None, + "NSO_TOKEN" : None + } + self.ENV_VARS = load_env_vars.load_env_vars(os.environ, ENV_VARS) + + self.headers = {'Authorization': f'Token {self.ENV_VARS['NSO_TOKEN']}', "Content-Type": "application/json", "Accept": "application/json"} + self.headers.update(headers) + + self.verify_ssl = verify_ssl \ No newline at end of file diff --git a/mdd/scripts/nso_to_netbox_device.py b/mdd/scripts/nso_to_netbox_device.py index 9100480..de77767 100644 --- a/mdd/scripts/nso_to_netbox_device.py +++ b/mdd/scripts/nso_to_netbox_device.py @@ -7,33 +7,70 @@ import urllib3 import json import base64 +import load_env_vars +from pynetbox import api # Setup and configurations -print("Loading environment variables...") urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) -NSO_URL = os.getenv('NSO_URL') -NSO_USERNAME = os.getenv('NSO_USERNAME') -NSO_PASSWORD = os.getenv('NSO_PASSWORD') -NETBOX_URL = os.getenv('NETBOX_URL') -NETBOX_TOKEN = os.getenv('NETBOX_TOKEN') -DEVICE_TYPE_LOOKUP_CSV_PATH = './docs/device_type_lookup.csv' -headers_netbox = { - "Authorization": f"Token {NETBOX_TOKEN}", - "Content-Type": "application/json", - "Accept": "application/json" +ENV_VARS = { + "NSO_URL": None, + "NSO_USERNAME": None, + "NSO_PASSWORD": None, + "NETBOX_URL": None, + "NETBOX_TOKEN": None } +ENV_VARS = load_env_vars.load_env_vars(os.environ, ENV_VARS) +DEVICE_TYPE_LOOKUP_CSV_PATH = './docs/device_type_lookup.csv' + +# Initialize NetBox API +netbox = api(url=ENV_VARS['NETBOX_URL'], token=ENV_VARS['NETBOX_TOKEN']) + +# headers_netbox = { +# "Authorization": f"Token {ENV_VARS['NETBOX_TOKEN']}", +# "Content-Type": "application/json", +# "Accept": "application/json" +# } + +# def create_device_in_netbox(device, site_id, device_lookup): +# print(f"Creating device in NetBox: {device['name']}") + +# device_code = device['name'].split('-')[2] # Extract device code +# if device_code in device_lookup: +# device_role = fetch_id('dcim/device-roles', device_lookup[device_code]['DeviceRole'], 'name') +# device_type = fetch_id('dcim/device-types', device_lookup[device_code]['DeviceType'], 'model') +# else: +# print(f"No device type/role found for code: {device_code}") +# return + +# device_payload = { +# "name": device['name'], +# "site": site_id, +# "device_role": device_role, +# "device_type": device_type, +# "serial": "", +# } +# device_response = requests.post(f"{ENV_VARS['NETBOX_URL']}/api/dcim/devices/", headers=headers_netbox, json=device_payload, verify=False) + +# if device_response.status_code not in [200, 201]: +# print(f"Failed to create device {device['name']}: {device_response.text}") +# return + +# device_id = device_response.json()["id"] +# print(f"Device {device['name']} created successfully with ID {device_id}.") +# create_management_interface_and_set_primary_ip(device_id, device['address']) def create_device_in_netbox(device, site_id, device_lookup): print(f"Creating device in NetBox: {device['name']}") + device_code = device['name'].split('-')[2] # Extract device code - if device_code in device_lookup: - device_role = fetch_id('dcim/device-roles', device_lookup[device_code]['DeviceRole'], 'name') - device_type = fetch_id('dcim/device-types', device_lookup[device_code]['DeviceType'], 'model') - else: + if device_code not in device_lookup: print(f"No device type/role found for code: {device_code}") return + device_role = fetch_id(device_lookup[device_code]['DeviceRole']) + device_type = fetch_id(device_lookup[device_code]['DeviceType']) + device_payload = { "name": device['name'], "site": site_id, @@ -41,76 +78,137 @@ def create_device_in_netbox(device, site_id, device_lookup): "device_type": device_type, "serial": "", } - device_response = requests.post(f"{NETBOX_URL}/api/dcim/devices/", headers=headers_netbox, json=device_payload, verify=False) - if device_response.status_code in [200, 201]: - device_id = device_response.json()["id"] + + try: + device_id = netbox.dcim.devices.create(**device_payload)['id'] print(f"Device {device['name']} created successfully with ID {device_id}.") create_management_interface_and_set_primary_ip(device_id, device['address']) + except Exception as e: + print(f"Failed to create device {device['name']}: {e}") + +# def create_management_interface_and_set_primary_ip(device_id, ip_address): +# print(f"Creating 'Management' interface for device ID {device_id}...") + +# if not ip_address.endswith('/32'): +# ip_address += '/32' + +# # Create the Management interface +# interface_payload = {"device": device_id, "name": "Management", "type": "virtual"} +# interface_response = requests.post(f"{ENV_VARS['NETBOX_URL']}/api/dcim/interfaces/", headers=headers_netbox, json=interface_payload, verify=False) - else: - print(f"Failed to create device {device['name']}: {device_response.text}") +# if interface_response.status_code not in [200, 201]: +# print(f"Failed to create 'Management' interface: {interface_response.text}") +# return + +# interface_id = interface_response.json()["id"] +# print(f"'Management' interface with ID {interface_id} created successfully.") + +# # Create the IP address and assign it to the interface +# ip_payload = {"address": ip_address, "status": "active", "assigned_object_id": interface_id, "assigned_object_type": "dcim.interface"} +# ip_response = requests.post(f"{ENV_VARS['NETBOX_URL']}/api/ipam/ip-addresses/", headers=headers_netbox, json=ip_payload, verify=False) + +# if ip_response.status_code not in [200, 201]: +# print(f"Failed to create and assign IP address {ip_address}: {ip_response.text}") +# return + +# ip_id = ip_response.json()["id"] +# print(f"IP address {ip_address} created and assigned successfully.") + +# # Set the IP address as the primary IP for the device +# device_update_payload = {"primary_ip4": ip_id} # or "primary_ip6" for IPv6 +# device_update_response = requests.patch(f"{ENV_VARS['NETBOX_URL']}/api/dcim/devices/{device_id}/", headers=headers_netbox, json=device_update_payload, verify=False) + +# if device_update_response.status_code in [200, 201, 204]: +# print(f"Primary IP set successfully for device ID {device_id}.") +# else: +# print(f"Failed to set primary IP for device ID {device_id}: {device_update_response.text}") def create_management_interface_and_set_primary_ip(device_id, ip_address): print(f"Creating 'Management' interface for device ID {device_id}...") - + if not ip_address.endswith('/32'): ip_address += '/32' - + # Create the Management interface interface_payload = {"device": device_id, "name": "Management", "type": "virtual"} - interface_response = requests.post(f"{NETBOX_URL}/api/dcim/interfaces/", headers=headers_netbox, json=interface_payload, verify=False) - - if interface_response.status_code in [200, 201]: - interface_id = interface_response.json()["id"] + try: + interface_id = netbox.dcim.interfaces.create(**interface_payload)['id'] print(f"'Management' interface with ID {interface_id} created successfully.") - - # Create the IP address and assign it to the interface - ip_payload = {"address": ip_address, "status": "active", "assigned_object_id": interface_id, "assigned_object_type": "dcim.interface"} - ip_response = requests.post(f"{NETBOX_URL}/api/ipam/ip-addresses/", headers=headers_netbox, json=ip_payload, verify=False) - - if ip_response.status_code in [200, 201]: - ip_id = ip_response.json()["id"] - print(f"IP address {ip_address} created and assigned successfully.") - - # Set the IP address as the primary IP for the device - device_update_payload = {"primary_ip4": ip_id} # or "primary_ip6" for IPv6 - device_update_response = requests.patch(f"{NETBOX_URL}/api/dcim/devices/{device_id}/", headers=headers_netbox, json=device_update_payload, verify=False) - - if device_update_response.status_code in [200, 201, 204]: - print(f"Primary IP set successfully for device ID {device_id}.") - else: - print(f"Failed to set primary IP for device ID {device_id}: {device_update_response.text}") - else: - print(f"Failed to create and assign IP address {ip_address}: {ip_response.text}") - else: - print(f"Failed to create 'Management' interface: {interface_response.text}") + except Exception as e: + print(f"Failed to create 'Management' interface: {e}") + return + + # Create the IP address and assign it to the interface + ip_payload = {"address": ip_address, "status": "active", "assigned_object_id": interface_id, "assigned_object_type": "dcim.interface"} + try: + ip_id = netbox.ipam.ip_addresses.create(**ip_payload)['id'] + print(f"IP address {ip_address} created and assigned successfully.") + except Exception as e: + print(f"Failed to create and assign IP address {ip_address}: {e}") + return + + # Set the IP address as the primary IP for the device + try: + netbox.dcim.devices.update(device_id, primary_ip4=ip_id) + print(f"Primary IP set successfully for device ID {device_id}.") + except Exception as e: + print(f"Failed to set primary IP for device ID {device_id}: {e}") + +# def fetch_id(endpoint, search_param, search_field='name'): +# """Fetch a NetBox entity's ID based on a search field and parameter.""" + +# print(f"Fetching ID for '{search_param}' from {endpoint}...") +# url = f"{ENV_VARS['NETBOX_URL']}/api/{endpoint}/" +# params = {search_field: search_param} +# response = requests.get(url, headers=headers_netbox, params=params, verify=False) +# data = response.json() + +# if response.status_code != 200 or data['count'] != 1: +# print(f"Failed to fetch ID for {search_param} from {endpoint}.") +# return None + +# return data['results'][0]['id'] def fetch_id(endpoint, search_param, search_field='name'): """Fetch a NetBox entity's ID based on a search field and parameter.""" - print(f"Fetching ID for '{search_param}' from {endpoint}...") - url = f"{NETBOX_URL}/api/{endpoint}/" - params = {search_field: search_param} - response = requests.get(url, headers=headers_netbox, params=params, verify=False) - if response.status_code == 200 and response.json()['count'] == 1: - return response.json()['results'][0]['id'] - else: - print(f"Failed to fetch ID for {search_param} from {endpoint}.") + try: + objects = getattr(netbox, endpoint).filter(**{search_field: search_param}) + if len(objects) == 1: + return objects[0].id + else: + print(f"Failed to fetch ID for {search_param} from {endpoint}.") + return None + except Exception as e: + print(f"Failed to fetch ID for {search_param} from {endpoint}: {e}") return None +# def fetch_sites(): +# """Fetch all sites from NetBox and return a dictionary mapping site slugs to their IDs.""" + +# site_mapping = {} +# url = f"{ENV_VARS['NETBOX_URL']}/api/dcim/sites/" +# response = requests.get(url, headers=headers_netbox, verify=False) + +# if response.status_code != 200: +# print("Failed to fetch sites from NetBox.") +# return {} + +# for site in response.json()['results']: +# site_mapping[site['slug'].upper()] = site['id'] +# return site_mapping + def fetch_sites(): """Fetch all sites from NetBox and return a dictionary mapping site slugs to their IDs.""" - site_mapping = {} - url = f"{NETBOX_URL}/api/dcim/sites/" - response = requests.get(url, headers=headers_netbox, verify=False) - if response.status_code == 200: - for site in response.json()['results']: - site_mapping[site['slug'].upper()] = site['id'] - else: - print("Failed to fetch sites from NetBox.") - return site_mapping + try: + sites = netbox.dcim.sites.all() + return {site.slug.upper(): site.id for site in sites} + except Exception as e: + print(f"Failed to fetch sites from NetBox: {e}") + return {} def load_device_lookup(): """Load the device role and type lookup from a CSV file.""" + lookup = {} with open(DEVICE_TYPE_LOOKUP_CSV_PATH, mode='r', encoding='utf-8') as file: reader = csv.DictReader(file) @@ -118,22 +216,36 @@ def load_device_lookup(): lookup[row['Code']] = {'DeviceRole': row['DeviceRole'], 'DeviceType': row['DeviceType']} return lookup +# def fetch_netbox_devices(): +# """Fetch existing devices from NetBox.""" + +# netbox_devices = {} +# url = f"{ENV_VARS['NETBOX_URL']}/api/dcim/devices/" +# response = requests.get(url, headers=headers_netbox, verify=False) + +# if response.status_code == 200: +# for device in response.json()['results']: +# netbox_devices[device['name']] = device +# else: +# print("Failed to fetch devices from NetBox.") +# return netbox_devices + def fetch_netbox_devices(): """Fetch existing devices from NetBox.""" - netbox_devices = {} - url = f"{NETBOX_URL}/api/dcim/devices/" - response = requests.get(url, headers=headers_netbox, verify=False) - if response.status_code == 200: - for device in response.json()['results']: - netbox_devices[device['name']] = device - else: - print("Failed to fetch devices from NetBox.") - return netbox_devices + try: + devices = netbox.dcim.devices.all() + return {device.name: device for device in devices} + except Exception as e: + print(f"Failed to fetch devices from NetBox: {e}") + return {} def fetch_nso_devices(): """Fetch device information from NSO using a specific query.""" + print("Fetching devices from NSO...") - url = f"{NSO_URL}/restconf/tailf/query" + + devices = [] + url = f"{ENV_VARS['NSO_URL']}/restconf/tailf/query" payload = json.dumps({ "tailf-rest-query:immediate-query": { "foreach": "/devices/device", @@ -146,46 +258,67 @@ def fetch_nso_devices(): headers = { 'Content-Type': 'application/yang-data+json', 'Accept': 'application/yang-data+json', - 'Authorization': f'Basic {base64.b64encode(f"{NSO_USERNAME}:{NSO_PASSWORD}".encode()).decode("utf-8")}' + 'Authorization': f'Basic {base64.b64encode(f"{ENV_VARS['NSO_USERNAME']}:{ENV_VARS['NSO_PASSWORD']}".encode()).decode("utf-8")}' } - response = requests.post(url, headers=headers, data=payload, auth=(NSO_USERNAME, NSO_PASSWORD), verify=False) - if response.status_code == 200: - results = response.json().get("tailf-rest-query:query-result", {}).get("result", []) - devices = [] - for result in results: - device = {select["label"]: select["value"] for select in result["select"]} - devices.append(device) - print("NSO devices fetched successfully.") - return devices - else: + response = requests.post(url, headers=headers, data=payload, auth=(ENV_VARS["NSO_USERNAME"], ENV_VARS["NSO_PASSWORD"]), verify=False) + + if response.status_code != 200: print(f"Failed to fetch devices from NSO: {response.text}") return [] + results = response.json().get("tailf-rest-query:query-result", {}).get("result", []) -def assign_ip_to_interface(device_id, interface_id, ip_address): - # Create the IP address - ip_payload = { - "address": ip_address, - "status": "active" - } - ip_response = requests.post(f"{NETBOX_URL}/api/ipam/ip-addresses/", headers=headers_netbox, json=ip_payload, verify=False) - if ip_response.status_code in [200, 201]: - print(f"IP address {ip_address} created successfully.") - ip_id = ip_response.json()["id"] - - # Associate the IP address with the interface - association_payload = { - "assigned_object_type": "dcim.interface", - "assigned_object_id": interface_id + for result in results: + device = {select["label"]: select["value"] for select in result["select"]} + devices.append(device) + print("NSO devices fetched successfully.") + return devices + +# def assign_ip_to_interface(device_id, interface_id, ip_address): + +# # Create the IP address +# ip_payload = { +# "address": ip_address, +# "status": "active" +# } +# ip_response = requests.post(f"{ENV_VARS['NETBOX_URL']}/api/ipam/ip-addresses/", headers=headers_netbox, json=ip_payload, verify=False) + +# if ip_response.status_code not in [200, 201]: +# print(f"Failed to create IP address {ip_address}: {ip_response.text}") +# return + +# print(f"IP address {ip_address} created successfully.") +# ip_id = ip_response.json()["id"] + +# # Associate the IP address with the interface +# association_payload = { +# "assigned_object_type": "dcim.interface", +# "assigned_object_id": interface_id +# } +# association_response = requests.patch(f"{ENV_VARS['NETBOX_URL']}/api/ipam/ip-addresses/{ip_id}/", headers=headers_netbox, json=association_payload, verify=False) + +# if association_response.status_code in [200, 201, 204]: +# print(f"IP address {ip_address} assigned to interface ID {interface_id} successfully.") +# else: +# print(f"Failed to assign IP address {ip_address} to interface: {association_response.text}") + +def assign_ip_to_interface(device_id, interface_name, ip_address): + try: + device = netbox.dcim.devices.get(device_id) + interface = device.interfaces.get(name=interface_name) + interface_id = interface.id + + ip_data = { + "address": ip_address, + "status": "active", + "interface": interface_id } - association_response = requests.patch(f"{NETBOX_URL}/api/ipam/ip-addresses/{ip_id}/", headers=headers_netbox, json=association_payload, verify=False) - if association_response.status_code in [200, 201, 204]: - print(f"IP address {ip_address} assigned to interface ID {interface_id} successfully.") - else: - print(f"Failed to assign IP address {ip_address} to interface: {association_response.text}") - else: - print(f"Failed to create IP address {ip_address}: {ip_response.text}") + ip = netbox.ipam.ip_addresses.create(**ip_data) + + print(f"IP address {ip.address} assigned to interface {interface.name} successfully.") + except Exception as e: + print(f"Failed to assign IP address {ip_address} to interface {interface_name}: {e}") def main(): print("Starting script...") @@ -195,15 +328,17 @@ def main(): nso_devices = fetch_nso_devices() for device in nso_devices: - if device['name'] not in netbox_devices: - site_code = device['name'][:4].upper() - site_id = site_mapping.get(site_code) - if site_id: - create_device_in_netbox(device, site_id, device_lookup) - else: - print(f"Site code {site_code} not found in NetBox.") - else: + if device['name'] in netbox_devices: print(f"Device {device['name']} already exists in NetBox.") + continue + + site_code = device['name'][:4].upper() + site_id = site_mapping.get(site_code) + + if site_id: + create_device_in_netbox(device, site_id, device_lookup) + else: + print(f"Site code {site_code} not found in NetBox.") if __name__ == "__main__": main() \ No newline at end of file diff --git a/mdd/scripts/nso_to_netbox_interfaces.py b/mdd/scripts/nso_to_netbox_interfaces.py index d4047b6..96a20d8 100644 --- a/mdd/scripts/nso_to_netbox_interfaces.py +++ b/mdd/scripts/nso_to_netbox_interfaces.py @@ -2,60 +2,74 @@ import os import json import urllib3 +import load_env_vars -# Environment setup -NETBOX_URL = os.getenv('NETBOX_URL') -NETBOX_TOKEN = os.getenv('NETBOX_TOKEN') -NSO_URL = os.getenv('NSO_URL') -NSO_USERNAME = os.getenv('NSO_USERNAME') -NSO_PASSWORD = os.getenv('NSO_PASSWORD') +# Disable SSL warnings +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + +ENV_VARS = { + "NSO_URL": None, + "NSO_USERNAME": None, + "NSO_PASSWORD": None, + "NETBOX_URL": None, + "NETBOX_TOKEN": None +} + +ENV_VARS = load_env_vars.load_env_vars(os.environ, ENV_VARS) # NetBox headers for API requests netbox_headers = { - "Authorization": f"Token {NETBOX_TOKEN}", + "Authorization": f"Token {ENV_VARS["NETBOX_TOKEN"]}", "Content-Type": "application/json", "Accept": "application/json" } -# Disable SSL warnings -urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - def get_netbox_devices(): """Fetch all devices from NetBox.""" + print("Fetching devices from NetBox...") - url = f"{NETBOX_URL}/api/dcim/devices/" + + url = f"{ENV_VARS["NETBOX_URL"]}/api/dcim/devices/" response = requests.get(url, headers=netbox_headers, verify=False) - if response.status_code == 200: - print("Devices successfully fetched from NetBox.") - return response.json()['results'] - else: + + if response.status_code != 200: print("Failed to fetch devices from NetBox.") return [] + print("Devices successfully fetched from NetBox.") + return response.json()['results'] + def get_nso_interfaces(device_name): """Fetch interfaces for a specific device from NSO.""" + print(f"Fetching interfaces for {device_name} from NSO...") - url = f"{NSO_URL}/restconf/data/tailf-ncs:devices/device={device_name}/live-status/tailf-ned-cisco-ios-stats:interfaces" - response = requests.get(url, auth=(NSO_USERNAME, NSO_PASSWORD), headers={"Accept": "application/yang-data+json"}, verify=False) - if response.status_code == 200: - print(f"Interfaces successfully fetched for {device_name} from NSO.") - return response.json().get('tailf-ned-cisco-ios-stats:interfaces', []) - else: + + url = f"{ENV_VARS['NSO_URL']}/restconf/data/tailf-ncs:devices/device={device_name}/live-status/tailf-ned-cisco-ios-stats:interfaces" + response = requests.get(url, auth=(ENV_VARS['NSO_USERNAME'], ENV_VARS['NSO_PASSWORD']), headers={"Accept": "application/yang-data+json"}, verify=False) + + if response.status_code != 200: print(f"Failed to fetch interfaces for {device_name} from NSO.") return [] + print(f"Interfaces successfully fetched for {device_name} from NSO.") + return response.json().get('tailf-ned-cisco-ios-stats:interfaces', []) + + def type_translation(nso_type): """Translate NSO interface type to NetBox interface type.""" - translation = { + + translation_table = { "GigabitEthernet": "1000Base-T", "TenGigabitEthernet": "10GBase-T", "FastEthernet": "100Base-T" } - return translation.get(nso_type, "Other") # Fallback to 'Other' if type not found + return translation_table.get(nso_type, "Other") # Fallback to 'Other' if type not found def add_interface_to_device(device_id, interface): """Add an interface to a device in NetBox.""" + print(f"Adding interface {interface['name']} to device ID {device_id} in NetBox...") + data = { "device": device_id, "name": f"{interface['type']} {interface['name']}", @@ -64,15 +78,16 @@ def add_interface_to_device(device_id, interface): "mac_address": interface.get('mac-address', ''), # Additional fields as necessary } - url = f"{NETBOX_URL}/api/dcim/interfaces/" + url = f"{ENV_VARS["NETBOX_URL"]}/api/dcim/interfaces/" response = requests.post(url, headers=netbox_headers, json=data, verify=False) - if response.status_code in [200, 201]: - print(f"Interface {interface['name']} added to device ID {device_id} in NetBox.") - return response.json()['id'] # Return the new interface's ID for further use - else: + + if response.status_code not in [200, 201]: print(f"Failed to add interface {interface['name']} to device ID {device_id} in NetBox: {response.text}") return None + print(f"Interface {interface['name']} added to device ID {device_id} in NetBox.") + return response.json()['id'] # Return the new interface's ID for further use + def main(): devices = get_netbox_devices() for device in devices: diff --git a/mdd/scripts/nso_to_netbox_inventory.py b/mdd/scripts/nso_to_netbox_inventory.py index 334b439..cc75ce7 100644 --- a/mdd/scripts/nso_to_netbox_inventory.py +++ b/mdd/scripts/nso_to_netbox_inventory.py @@ -2,50 +2,60 @@ import os import json import urllib3 +import load_env_vars urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) -# Environment setup -NETBOX_URL = os.getenv('NETBOX_URL') -NETBOX_TOKEN = os.getenv('NETBOX_TOKEN') -NSO_URL = os.getenv('NSO_URL') -NSO_USERNAME = os.getenv('NSO_USERNAME') -NSO_PASSWORD = os.getenv('NSO_PASSWORD') +ENV_VARS = { + "NSO_URL": None, + "NSO_USERNAME": None, + "NSO_PASSWORD": None, + "NETBOX_URL": None, + "NETBOX_TOKEN": None +} + +ENV_VARS = load_env_vars.load_env_vars(os.environ, ENV_VARS) + CISCO_MANUFACTURER_ID = '2' netbox_headers = { - "Authorization": f"Token {NETBOX_TOKEN}", + "Authorization": f"Token {ENV_VARS["NETBOX_TOKEN"]}", "Content-Type": "application/json", "Accept": "application/json" } def get_netbox_devices(): - """Fetch all devices from NetBox.""" - url = f"{NETBOX_URL}/api/dcim/devices/" + """Fetch all devices from NetBox""" + + url = f"{ENV_VARS["NETBOX_URL"]}/api/dcim/devices/" response = requests.get(url, headers=netbox_headers, verify=False) + return response.json()['results'] if response.status_code == 200 else [] def get_nso_inventory(device_name): - """Fetch inventory for a specific device from NSO.""" - url = f"{NSO_URL}/restconf/data/tailf-ncs:devices/device={device_name}/live-status/tailf-ned-cisco-ios-stats:inventory" - response = requests.get(url, auth=(NSO_USERNAME, NSO_PASSWORD), headers={"Accept": "application/yang-data+json"}, verify=False) + """Fetch inventory for a specific device from NSO""" + + url = f"{ENV_VARS['NSO_URL']}/restconf/data/tailf-ncs:devices/device={device_name}/live-status/tailf-ned-cisco-ios-stats:inventory" + response = requests.get(url, auth=(ENV_VARS['NSO_USERNAME'], ENV_VARS['NSO_PASWORD']), headers={"Accept": "application/yang-data+json"}, verify=False) + return response.json().get('tailf-ned-cisco-ios-stats:inventory', []) if response.status_code == 200 else [] def fetch_existing_serial_numbers(device_id): - """Fetch existing serial numbers of inventory items for a given device.""" - url = f"{NETBOX_URL}/api/dcim/inventory-items/?device_id={device_id}" + """Fetch existing serial numbers of inventory items for a given device""" + + url = f"{ENV_VARS["NETBOX_URL"]}/api/dcim/inventory-items/?device_id={device_id}" response = requests.get(url, headers=netbox_headers, verify=False) - if response.status_code == 200: - return [item['serial'] for item in response.json()['results'] if item['serial']] - return [] + + return [item['serial'] for item in response.json()['results'] if item['serial']] if response.status_code == 200 else [] def add_netbox_inventory_item(device_id, device_name, inventory_item, existing_serials): - """Add an inventory item to a NetBox device, skipping duplicates and items without 'pid'.""" + """Add an inventory item to a NetBox device, skipping duplicates and items without 'pid'""" + pid = inventory_item.get('pid') sn = inventory_item.get('sn') - if not pid or sn in existing_serials or pid == device_name: + if (not pid) or (sn in existing_serials) or (pid == device_name): return # Skip if pid is missing, serial is duplicate, or pid matches device name data = { @@ -57,7 +67,8 @@ def add_netbox_inventory_item(device_id, device_name, inventory_item, existing_s "part_id": pid, "serial": sn, } - response = requests.post(f"{NETBOX_URL}/api/dcim/inventory-items/", headers=netbox_headers, json=data, verify=False) + response = requests.post(f"{ENV_VARS["NETBOX_URL"]}/api/dcim/inventory-items/", headers=netbox_headers, json=data, verify=False) + if response.status_code in [200, 201]: print(f"Inventory item {pid} added to device {device_id}.") else: diff --git a/mdd/scripts/nso_to_netbox_platform.py b/mdd/scripts/nso_to_netbox_platform.py index e64ed40..a180a41 100644 --- a/mdd/scripts/nso_to_netbox_platform.py +++ b/mdd/scripts/nso_to_netbox_platform.py @@ -6,17 +6,21 @@ import json import urllib3 from netbox_class import netbox_helper +import load_env_vars -# Environment Variables -NETBOX_URL = os.getenv('NETBOX_URL') -NETBOX_TOKEN = os.getenv('NETBOX_TOKEN') -NSO_URL = os.getenv('NSO_URL') -NSO_USERNAME = os.getenv('NSO_USERNAME') -NSO_PASSWORD = os.getenv('NSO_PASSWORD') +ENV_VARS = { + "NSO_URL": None, + "NSO_USERNAME": None, + "NSO_PASSWORD": None, + "NETBOX_URL": None, + "NETBOX_TOKEN": None +} + +ENV_VARS = load_env_vars.load_env_vars(os.environ, ENV_VARS) # Headers for NetBox netbox_headers = { - "Authorization": f"Token {NETBOX_TOKEN}", + "Authorization": f"Token {ENV_VARS["NETBOX_TOKEN"]}", "Content-Type": "application/json", "Accept": "application/json" } @@ -37,85 +41,100 @@ def fetch_nso_platform_info(device_name): response = requests.get(url, headers=headers, verify=False) - if response.status_code == 200: - platform_info = response.json().get("tailf-ncs:platform") - if platform_info: - print(json.dumps(platform_info, indent=4)) # Debug print - return platform_info - else: - print(f"Platform information is missing for {device_name}.") - return None - else: + if response.status_code != 200: print(f"Failed to retrieve platform information for {device_name} from NSO: {response.status_code}, {response.text}") return None + platform_info = response.json().get("tailf-ncs:platform") + if not platform_info: + print(f"Platform information is missing for {device_name}.") + return None + + print(json.dumps(platform_info, indent=4)) # Debug print + return platform_info + def get_or_create_platform(name, version): """Ensure the platform exists in NetBox, based on the combined name and version, and return its ID.""" platform_identifier = f"{name} {version}" - url = f"{NETBOX_URL}/api/dcim/platforms/?name={platform_identifier}" + url = f"{ENV_VARS["NETBOX_URL"]}/api/dcim/platforms/?name={platform_identifier}" response = requests.get(url, headers=netbox_headers, verify=False) - if response.status_code == 200 and response.json()['count'] == 0: - # Platform does not exist, create it - data = { - "name": platform_identifier, - "slug": platform_identifier.lower().replace(" ", "-").replace("(", "").replace(")", "").replace(".", "-") - } - create_response = requests.post(f"{NETBOX_URL}/api/dcim/platforms/", headers=netbox_headers, data=json.dumps(data), verify=False) - if create_response.status_code in [200, 201]: - print(f"Successfully added platform: {platform_identifier}") - return create_response.json()['id'] - else: - print(f"Failed to create platform {platform_identifier}: {create_response.text}") - return None - else: - # Platform exists - return response.json()['results'][0]['id'] + + if response.status_code != 200: + return None # Server error? + + response = response.json() + + if response['count'] != 0: # Platform exists + return response['results'][0]['id'] + + # Platform doesn't exist - so create one + data = { + "name": platform_identifier, + "slug": platform_identifier.lower().replace(" ", "-").replace("(", "").replace(")", "").replace(".", "-") + } + create_response = requests.post(f"{ENV_VARS["NETBOX_URL"]}/api/dcim/platforms/", headers=netbox_headers, data=json.dumps(data), verify=False) + + if create_response.status_code not in [200, 201]: + print(f"Failed to create platform {platform_identifier}: {create_response.text}") + return None + + print(f"Successfully added platform: {platform_identifier}") + return create_response.json()['id'] def get_or_create_device_type(model): """Ensure the device type exists in NetBox, based on the model from NSO, and return its ID.""" - url = f"{NETBOX_URL}/api/dcim/device-types/?model={model}" + + url = f"{ENV_VARS["NETBOX_URL"]}/api/dcim/device-types/?model={model}" response = requests.get(url, headers=netbox_headers, verify=False) - if response.status_code == 200 and response.json()['count'] == 0: - # Device type does not exist, create it - data = { - "model": model, - "slug": model.lower().replace(" ", "-").replace("_", "-"), - "manufacturer": 1 # Example manufacturer ID, adjust accordingly - } - create_response = requests.post(f"{NETBOX_URL}/api/dcim/device-types/", headers=netbox_headers, data=json.dumps(data), verify=False) - if create_response.status_code in [200, 201]: - return create_response.json()['id'] - else: - print(f"Failed to create device type {model}: {create_response.text}") - return None - else: + + if response.status_code != 200 or response.json()['count'] > 0: + # Device exists return response.json()['results'][0]['id'] + # Device type does not exist, create it + data = { + "model": model, + "slug": model.lower().replace(" ", "-").replace("_", "-"), + "manufacturer": 1 # Example manufacturer ID, adjust accordingly + } + create_response = requests.post(f"{ENV_VARS["NETBOX_URL"]}/api/dcim/device-types/", headers=netbox_headers, data=json.dumps(data), verify=False) + + if create_response.status_code not in [200, 201]: + print(f"Failed to create device type {model}: {create_response.text}") + return None + + return create_response.json()['id'] + def update_netbox_device(device_id, platform_info): """Update a device in NetBox with the correct serial number, device type, and platform.""" + model = platform_info.get("model") serial_number = platform_info.get("serial-number") name = platform_info.get("name") version = platform_info.get("version") - + device_type_id = get_or_create_device_type(model) platform_id = get_or_create_platform(name, version) if name and version else None - - if device_type_id and platform_id: - data = { - "serial": serial_number, - "device_type": device_type_id, - "platform": platform_id - } - url = f"{NETBOX_URL}/api/dcim/devices/{device_id}/" - response = requests.patch(url, headers=netbox_headers, data=json.dumps(data), verify=False) - if response.status_code in [200, 201, 204]: - print(f"Device {device_id} updated successfully with new type, serial, and platform.") - else: - print(f"Failed to update device {device_id}: {response.status_code}, {response.text}") + + if not device_type_id or not platform_id: + return + + data = { + "serial": serial_number, + "device_type": device_type_id, + "platform": platform_id + } + url = f"{ENV_VARS["NETBOX_URL"]}/api/dcim/devices/{device_id}/" + response = requests.patch(url, headers=netbox_headers, data=json.dumps(data), verify=False) + + if response.status_code in [200, 201, 204]: + print(f"Device {device_id} updated successfully with new type, serial, and platform.") + else: + print(f"Failed to update device {device_id}: {response.status_code}, {response.text}") def main(): - netbox=netbox_helper(NETBOX_URL,netbox_headers) + netbox=netbox_helper(ENV_VARS["NETBOX_URL"], + netbox_headers) for device in netbox.fetch_netbox_devices(): device_name = device.get('name') platform_info = fetch_nso_platform_info(device_name) diff --git a/mdd/scripts/test.py b/mdd/scripts/test.py new file mode 100644 index 0000000..c1fdab9 --- /dev/null +++ b/mdd/scripts/test.py @@ -0,0 +1,15 @@ +def get_sub_dict(dict, level): + """ + Returns reference to a sub dictionary based on first key + Used for inventory where you only have 1 key + """ + sub_dict = dict + for _ in range(level): + key = list(sub_dict.keys())[0] # get first key + sub_dict = sub_dict[key] + + return sub_dict # reference + +s = {"all": {"child": {"my": {}}}} +get_sub_dict(s, 3)['hi'] = 2 +print(s) \ No newline at end of file