From 4a3f5d8dda4b87d9692f36ac88009323cce9b6a6 Mon Sep 17 00:00:00 2001 From: KJStick Date: Tue, 9 Apr 2024 02:11:56 -0400 Subject: [PATCH 1/3] spacing and no nested ifs --- mdd/scripts/generate_inventory.py | 126 ++++++++---------- mdd/scripts/generate_tags.py | 73 ++++++----- mdd/scripts/nautobot_add_sites.py | 19 ++- mdd/scripts/netbox_class.py | 26 ++-- mdd/scripts/netbox_device_add.py | 115 ++++++++++------- mdd/scripts/netbox_device_csv.py | 92 +++++++------ mdd/scripts/netbox_generate_sites.py | 56 ++++---- mdd/scripts/nso_acl_ip_to_nautobot.py | 15 ++- mdd/scripts/nso_to_netbox_device.py | 164 ++++++++++++++---------- mdd/scripts/nso_to_netbox_interfaces.py | 35 +++-- mdd/scripts/nso_to_netbox_inventory.py | 22 ++-- mdd/scripts/nso_to_netbox_platform.py | 120 +++++++++-------- 12 files changed, 487 insertions(+), 376 deletions(-) diff --git a/mdd/scripts/generate_inventory.py b/mdd/scripts/generate_inventory.py index 1bb67aa..23ac611 100644 --- a/mdd/scripts/generate_inventory.py +++ b/mdd/scripts/generate_inventory.py @@ -2,87 +2,73 @@ import requests import yaml -def fetch_netbox_devices(): - # Retrieve API key and URL from environment variables - netbox_api_url = os.environ.get('NETBOX_API') - netbox_api_token = os.environ.get('NETBOX_TOKEN') - - if not netbox_api_url or not netbox_api_token: - print("Error: NETBOX_API or NETBOX_TOKEN environment variables are not set.") - exit(1) - - # Construct the API request headers - headers = { - 'Authorization': f'Token {netbox_api_token}' +NETBOX_URL = os.environ.get('NETBOX_URL') # Was NETBOX_API +NETBOX_TOKEN = os.environ.get('NETBOX_TOKEN') + +if not NETBOX_URL or not NETBOX_TOKEN: + print("Error: NETBOX_API or NETBOX_TOKEN environment variables are not set.") + exit(1) + +headers = {'Authorization': f'Token {NETBOX_TOKEN}'} +verify_ssl = False +existing_inventory_path = '/path/to/existing/inventory.yml' # Replace with the actual path + +def get_inventory_structure(): #TODO: Compare to generate_tags + """Generated default inventory structure""" + return { + 'all': { + 'children': { + 'network': { + 'children': {} + } + } + } } - # Disable SSL certificate validation - verify_ssl = False +def fetch_netbox_devices(): + existing_inventory = {} # Make API requests to NetBox to retrieve device information response = requests.get( - f'{netbox_api_url}/api/dcim/devices/', + f'{NETBOX_URL}/api/dcim/devices/', headers=headers, verify=verify_ssl # Disable SSL certificate validation ) - if response.status_code == 200: - # Parse the JSON response into a Python dictionary - device_data = response.json() - - # Create an empty inventory dictionary - inventory_data = { - 'all': { - 'children': { - 'network': { - 'children': {} - } - } - } - } - - # Check for an existing inventory file in a different directory - existing_inventory_path = '/path/to/existing/inventory.yml' # Replace with the actual path - existing_inventory = {} - - if os.path.exists(existing_inventory_path): - with open(existing_inventory_path, 'r') as existing_inventory_file: - existing_inventory = yaml.safe_load(existing_inventory_file) - - # Populate the inventory with devices based on device role - for device in device_data['results']: - device_name = device['name'] - device_role = device['device_role']['slug'] - - # Check if the device is not in the existing inventory - if device_name not in existing_inventory.get('all', {}).get('children', {}).get('network', {}).get('children', {}).get(device_role, {}).get('hosts', {}): - # Check if primary_ip4 exists and is not None - primary_ip_data = device.get('primary_ip4') - if primary_ip_data and primary_ip_data.get('address'): - primary_ip = primary_ip_data['address'].split('/')[0] # Extract IP without the /32 - # Create groups for each device role under the "network" group if they don't exist - if device_role not in inventory_data['all']['children']['network']['children']: - inventory_data['all']['children']['network']['children'][device_role] = { - 'hosts': {} - } - - # Add the device to its respective role group and include the ansible_host - inventory_data['all']['children']['network']['children'][device_role]['hosts'][device_name] = { - 'ansible_host': primary_ip - } - else: - # If there is no IP address available, create groups without ansible_host - if device_role not in inventory_data['all']['children']['network']['children']: - inventory_data['all']['children']['network']['children'][device_role] = { - 'hosts': {} - } - inventory_data['all']['children']['network']['children'][device_role]['hosts'][device_name] = {} - - return inventory_data - else: - print(f"Error fetching data from NetBox: {response.status_code}") + if response.status_code != 200: + print(f"Error fetching devices from NetBox: {response.status_code}") exit(1) + # Create an empty inventory dictionary + inventory_data = get_inventory_structure() + + # Check for an existing inventory file in a different directory + if os.path.exists(existing_inventory_path): + with open(existing_inventory_path, 'r') as existing_inventory_file: + existing_inventory = yaml.safe_load(existing_inventory_file) + + # Populate the inventory with devices based on device role + for device in response.json()['results']: + device_name = device['name'] + device_role = device['device_role']['slug'] + + # Check if the device is not in the existing inventory + if device_name in existing_inventory.get('all', {}).get('children', {}).get('network', {}).get('children', {}).get(device_role, {}).get('hosts', {}): + continue + + # Check if primary_ip4 exists and is not None + primary_ip_data = device.get('primary_ip4') + if not primary_ip_data: + inventory_data['all']['children']['network']['children'].setdefault(device_role, {}).setdefault('hosts', {}).setdefault(device_name, {}) + continue + + primary_ip_address = primary_ip_data.get('address') #TODO: will this ever not be there + primary_ip = primary_ip_address.split('/')[0] # Extract IP without the /32 + + inventory_data['all']['children']['network']['children'].setdefault(device_role, {}).setdefault('hosts', {}).setdefault(device_name, {})['ansible_host'] = primary_ip + + return inventory_data + def write_inventory_file(inventory_data): # Define the inventory file path as "inventory/network.yml" inventory_file_path = os.path.join(os.path.dirname(__file__), '..', 'inventory', 'network.yml') diff --git a/mdd/scripts/generate_tags.py b/mdd/scripts/generate_tags.py index 1792745..68cccc8 100644 --- a/mdd/scripts/generate_tags.py +++ b/mdd/scripts/generate_tags.py @@ -3,19 +3,20 @@ import yaml import json -def fetch_netbox_devices(): - netbox_api_url = os.environ.get('NETBOX_API') - netbox_api_token = os.environ.get('NETBOX_TOKEN') +NETBOX_URL = os.environ.get("NETBOX_URL") # was NETBOX_URL +NETBOX_TOKEN = os.environ.get("NETBOX_TOKEN") - if not netbox_api_url or not netbox_api_token: - print("NETBOX_API or NETBOX_TOKEN environment variables are not set.") - exit(1) +if not NETBOX_URL or not NETBOX_TOKEN: + print("NETBOX_API or NETBOX_TOKEN environment variables are not set.") + exit(1) - headers = {'Authorization': f'Token {netbox_api_token}', 'Accept': 'application/json'} - verify_ssl = False +headers = {'Authorization': f'Token {NETBOX_TOKEN}', 'Accept': 'application/json'} +verify_ssl = False - # Initialize the inventory structure with vars for sites - inventory_data = { +def get_inventory_structure(): + """Returns the default inventory structure""" + + return { 'all': { 'vars': { 'sites': [] @@ -28,36 +29,40 @@ def fetch_netbox_devices(): } } +def fetch_netbox_devices(): + # Initialize the inventory structure with vars for sites + inventory_data = get_inventory_structure() + + # Fetch sites from NetBox + response = requests.get(f'{NETBOX_URL}/api/dcim/sites/?limit=1000', headers=headers, verify=verify_ssl) + + if response.status_code != 200: + print(f"Failed to fetch sites from NetBox: {response.status_code}") + exit(1) + + # Populate the sites list + for site in response.json()['results']: + inventory_data['all']['vars']['sites'].append(site['name']) + # Fetch devices from NetBox - response = requests.get(f'{netbox_api_url}/api/dcim/sites/?limit=1000', headers=headers, verify=verify_ssl) - if response.status_code == 200: - sites_data = response.json() - # Populate the sites list - for site in sites_data['results']: - inventory_data['all']['vars']['sites'].append(site['name']) - - response = requests.get(f'{netbox_api_url}/api/dcim/devices/?limit=1000', headers=headers, verify=verify_ssl) - if response.status_code == 200: - device_data = response.json() - - for device in device_data['results']: - device_name = device['name'] - site_name = device.get('site', {}).get('name', 'undefined') - device_tags = [tag['name'] for tag in device.get('tags', [])] - - if site_name not in inventory_data['all']['children']['org']['children']: - inventory_data['all']['children']['org']['children'][site_name] = {'hosts': {}} - - # Add the device under the site with its tags - inventory_data['all']['children']['org']['children'][site_name]['hosts'][device_name] = {'tags': device_tags} - else: - print(f"Failed to fetch data from NetBox: {response.status_code}") + response = requests.get(f'{NETBOX_URL}/api/dcim/devices/?limit=1000', headers=headers, verify=verify_ssl) + + if response.status_code != 200: + print(f"Failed to fetch devices from NetBox: {response.status_code}") exit(1) + for device in response.json()['results']: + device_name = device['name'] + site_name = device.get('site', {}).get('name', 'undefined') + device_tags = [tag['name'] for tag in device.get('tags', [])] + + # Add the device under the site with its tags + inventory_data['all']['children']['org']['children'].setdefault(site_name, {}).setdefault('hosts', {}).setdefault(device_name, {})['tags'] = device_tags + return inventory_data def write_inventory_file(inventory_data): - inventory_file_path = os.path.join(os.path.dirname(__file__),'..', 'inventory', 'tags.yml') + inventory_file_path = os.path.join(os.path.dirname(__file__), '..', 'inventory', 'tags.yml') with open(inventory_file_path, 'w') as f: yaml.dump(inventory_data, f, default_flow_style=False, sort_keys=False) diff --git a/mdd/scripts/nautobot_add_sites.py b/mdd/scripts/nautobot_add_sites.py index d18fc05..6359153 100644 --- a/mdd/scripts/nautobot_add_sites.py +++ b/mdd/scripts/nautobot_add_sites.py @@ -1,9 +1,10 @@ import csv import requests +import os # Nautobot details -NAUTOBOT_URL = -NAUTOBOT_TOKEN = +NAUTOBOT_URL = os.environ.get("NAUTOBOT_URL") +NAUTOBOT_TOKEN = os.environ.get("NAUTOBOT_TOKEN") HEADERS = { 'Authorization': f'Token {NAUTOBOT_TOKEN}', 'Content-Type': 'application/json', @@ -16,6 +17,7 @@ def create_site(site): url = f"{NAUTOBOT_URL}/dcim/sites/" response = requests.post(url, headers=HEADERS, json=site, verify=False) + if response.status_code == 201: print(f"Site '{site['name']}' created successfully.") else: @@ -24,21 +26,24 @@ def create_site(site): def create_device(device): url = f"{NAUTOBOT_URL}/dcim/devices/" response = requests.post(url, headers=HEADERS, json=device, verify=False) + if response.status_code == 201: print(f"Device '{device['name']}' created successfully.") return response.json()['id'] # Return device ID for further use - else: - print(f"Failed to create device '{device['name']}': {response.text}") + + print(f"Failed to create device '{device['name']}': {response.text}") def create_interface(device_id, interface): url = f"{NAUTOBOT_URL}/dcim/interfaces/" interface['device'] = device_id + response = requests.post(url, headers=HEADERS, json=interface, verify=False) + if response.status_code == 201: print(f"Interface '{interface['name']}' created successfully.") return response.json()['id'] - else: - print(f"Failed to create interface '{interface['name']}': {response.text}") + + print(f"Failed to create interface '{interface['name']}': {response.text}") def assign_ip_to_interface(interface_id, ip): url = f"{NAUTOBOT_URL}/ipam/ip-addresses/" @@ -47,7 +52,9 @@ def assign_ip_to_interface(interface_id, ip): "assigned_object_type": "dcim.interface", "assigned_object_id": interface_id } + response = requests.post(url, headers=HEADERS, json=ip_data, verify=False) + if response.status_code == 201: print(f"IP Address '{ip}' assigned successfully.") else: diff --git a/mdd/scripts/netbox_class.py b/mdd/scripts/netbox_class.py index 91ed00e..d6fc566 100644 --- a/mdd/scripts/netbox_class.py +++ b/mdd/scripts/netbox_class.py @@ -2,16 +2,18 @@ import requests class netbox_helper(): - def __init__(self,NETBOX_URL, NETBOX_HEADERS): - self.netbox_url=NETBOX_URL - self.netbox_headers=NETBOX_HEADERS + def __init__(self, NETBOX_URL, NETBOX_HEADERS): + self.netbox_url=NETBOX_URL + self.netbox_headers=NETBOX_HEADERS - def fetch_netbox_devices(self): - """Fetch all devices from NetBox.""" - url = f"{self.netbox_url}/api/dcim/devices/" - response = requests.get(url, headers=self.netbox_headers, verify=False) - if response.status_code == 200: - return response.json().get('results', []) - else: - print(f"Failed to retrieve devices from NetBox: {response.status_code}, {response.text}") - return [] + def fetch_netbox_devices(self): + """Fetch all devices from NetBox.""" + + url = f"{self.netbox_url}/api/dcim/devices/" + response = requests.get(url, headers=self.netbox_headers, verify=False) + + if response.status_code != 200: + print(f"Failed to retrieve devices from NetBox: {response.status_code}, {response.text}") + return [] + + return response.json().get('results', []) diff --git a/mdd/scripts/netbox_device_add.py b/mdd/scripts/netbox_device_add.py index 1a04cb0..742e070 100644 --- a/mdd/scripts/netbox_device_add.py +++ b/mdd/scripts/netbox_device_add.py @@ -1,6 +1,19 @@ import requests import urllib3 import json +import os + +# Setup for NetBox and NSO +NETBOX_URL = os.environ.get("NETBOX_URL") +NETBOX_TOKEN = os.environ.get("NETBOX_TOKEN") +HEADERS = {"Authorization": f"Token {NETBOX_TOKEN}", "Content-Type": "application/json", "Accept": "application/json"} + +NSO_URL = os.environ.get("NSO_URL") +NSO_USERNAME = os.environ.get("NSO_USERNAME") +NSO_PASSWORD = os.environ.get("NSO_PASSWORD") + +DEVICE_NAME = os.environ.get("DEVICE_NAME") +PLATFORM_URL = os.environ.get("PLATFORM_URL") # Disable SSL warnings urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) @@ -10,70 +23,78 @@ def get_or_create_device_type(platform_data, headers, netbox_url): Ensures the device type exists in NetBox based on the NSO platform data, and returns the device type ID. """ + model = platform_data["tailf-ncs:platform"]["model"] slug = model.replace("-", "_").lower() - + # Check if the device type already exists search_response = requests.get(f"{netbox_url}/api/dcim/device-types/?slug={slug}", headers=headers, verify=False) - if search_response.status_code == 200 and search_response.json()['count'] == 0: - # Device type does not exist, proceed with creation - payload = { - "manufacturer": "2", # Replace with actual manufacturer ID - "model": model, - "slug": slug, - "part_number": model, # Using model as part_number for simplicity - "name": model - } - response = requests.post(f"{netbox_url}/api/dcim/device-types/", headers=headers, data=json.dumps(payload), verify=False) - if response.status_code in [200, 201]: - print(f"Successfully added device type: {model}") - return response.json()["id"] - else: - print(f"Failed to add device type {model}: {response.status_code}, {response.text}") - return None - else: + + if search_response.status_code != 200: + return None + + + if search_response.json()['count'] > 0: # Device type exists return search_response.json()["results"][0]["id"] + # Device type does not exist, proceed with creation + payload = { + "manufacturer": "2", # Replace with actual manufacturer ID + "model": model, + "slug": slug, + "part_number": model, # Using model as part_number for simplicity + "name": model + } + + response = requests.post(f"{netbox_url}/api/dcim/device-types/", headers=headers, data=json.dumps(payload), verify=False) + + if response.status_code not in [200, 201]: + print(f"Failed to add device type {model}: {response.status_code}, {response.text}") + return None + + print(f"Successfully added device type: {model}") + return response.json()["id"] + def update_device(platform_data, device_type_id, headers, netbox_url): """ Updates an existing device in NetBox with the device type based on NSO platform data. """ + device_name = "" serial_number = platform_data["tailf-ncs:platform"]["serial-number"] + # Attempt to find the device in NetBox search_response = requests.get(f"{netbox_url}/api/dcim/devices/?name={device_name}", headers=headers, verify=False) - if search_response.status_code == 200 and search_response.json()['count'] > 0: - device_id = search_response.json()["results"][0]["id"] - payload = { - "device_type": device_type_id, - "serial": serial_number - } - update_response = requests.patch(f"{netbox_url}/api/dcim/devices/{device_id}/", headers=headers, data=json.dumps(payload), verify=False) - if update_response.status_code in [200, 201, 204]: - print(f"Successfully updated device: {device_name}") - else: - print(f"Failed to update device {device_name}: {update_response.status_code}, {update_response.text}") - else: + + if search_response.status_code != 200: + return + + if search_response.json()['count'] == 0: print(f"Device {device_name} does not exist. Skipping.") + return -# Setup for NetBox and NSO -netbox_url = -netbox_token = -headers = {"Authorization": f"Token {netbox_token}", "Content-Type": "application/json", "Accept": "application/json"} -nso_url = -nso_username = -nso_password = -device_name = -platform_url = + device_id = search_response.json()["results"][0]["id"] + payload = { + "device_type": device_type_id, + "serial": serial_number + } + + update_response = requests.patch(f"{netbox_url}/api/dcim/devices/{device_id}/", headers=headers, data=json.dumps(payload), verify=False) + + if update_response.status_code in [200, 201, 204]: + print(f"Successfully updated device: {device_name}") + else: + print(f"Failed to update device {device_name}: {update_response.status_code}, {update_response.text}") # Make the GET request to retrieve the platform information -response = requests.get(platform_url, auth=(nso_username, nso_password), headers={"Accept": "application/yang-data+json"}, verify=False) -if response.status_code == 200: - print("Platform information retrieved successfully") - platform_data = response.json() - device_type_id = get_or_create_device_type(platform_data, headers, netbox_url) - if device_type_id: - update_device(platform_data, device_type_id, headers, netbox_url) -else: +response = requests.get(PLATFORM_URL, auth=(NSO_USERNAME, NSO_PASSWORD), headers={"Accept": "application/yang-data+json"}, verify=False) +if response.status_code != 200: print(f"Failed to retrieve platform information: {response.status_code}, {response.text}") + os._exit(1) + +print("Platform information retrieved successfully") +platform_data = response.json() +device_type_id = get_or_create_device_type(platform_data, HEADERS, NETBOX_URL) +if device_type_id: + update_device(platform_data, device_type_id, HEADERS, NETBOX_URL) diff --git a/mdd/scripts/netbox_device_csv.py b/mdd/scripts/netbox_device_csv.py index 49957b4..ed155f1 100644 --- a/mdd/scripts/netbox_device_csv.py +++ b/mdd/scripts/netbox_device_csv.py @@ -17,14 +17,21 @@ def fetch_id(endpoint, search_param, search_field='name'): url = f"{NETBOX_URL}/api/{endpoint}/" params = {search_field: search_param} + response = requests.get(url, headers=headers, params=params, verify=False) - if response.status_code == 200 and response.json()['count'] == 1: + + if response.status_code != 200: + print(f"Error fetching ID for {search_param} from {endpoint}.") + return None + + if response.json()['count'] == 1: return response.json()['results'][0]['id'] - elif response.status_code == 200 and response.json()['count'] > 1: + + if response.json()['count'] > 1: print(f"Multiple matches found for {search_param} from {endpoint}, using the first one.") return response.json()['results'][0]['id'] - print(f"Error fetching ID for {search_param} from {endpoint}.") - return None + + return None # count == 0 def load_device_lookup(): lookup = {} @@ -38,63 +45,71 @@ def fetch_sites(): site_mapping = {} url = f"{NETBOX_URL}/api/dcim/sites/" response = requests.get(url, headers=headers, verify=False) + for site in response.json().get('results', []): site_mapping[site['slug'].upper()] = site['id'] + return site_mapping def add_management_interface_and_ip(device_id, ip_address): print(f"Starting to add management interface to device ID {device_id}...") - + # Step 1: Create the management interface url = f"{NETBOX_URL}/api/dcim/interfaces/" data = {"device": device_id, "name": "Management", "type": "virtual", "enabled": True} interface_response = requests.post(url, headers=headers, json=data, verify=False) - - if interface_response.status_code in [200, 201]: - interface_id = interface_response.json()['id'] - print(f"'Management' interface with ID {interface_id} created successfully.") - - # Step 2: Assign the IP address to the management interface - ip_url = f"{NETBOX_URL}/api/ipam/ip-addresses/" - ip_data = { - "address": ip_address, - "assigned_object_id": interface_id, - "assigned_object_type": "dcim.interface" - } - ip_response = requests.post(ip_url, headers=headers, json=ip_data, verify=False) - - if ip_response.status_code in [200, 201]: - print(f"IP address {ip_address} assigned successfully to 'Management' interface.") - ip_id = ip_response.json()['id'] - - # Step 3: Set the IP address as the primary IP for the device - device_update_url = f"{NETBOX_URL}/api/dcim/devices/{device_id}/" - device_update_data = { - "primary_ip4": ip_id - } - device_update_response = requests.patch(device_update_url, headers=headers, json=device_update_data, verify=False) - - if device_update_response.status_code in [200, 201, 204]: - print(f"IP address {ip_address} set as primary IP for device ID {device_id}.") - else: - print(f"Failed to set IP address as primary for device ID {device_id}: {device_update_response.text}") - else: - print(f"Failed to assign IP address {ip_address} to 'Management' interface: {ip_response.text}") - else: + + if interface_response.status_code not in [200, 201]: print(f"Failed to create 'Management' interface for device ID {device_id}: {interface_response.text}") + return + + interface_id = interface_response.json()['id'] + print(f"'Management' interface with ID {interface_id} created successfully.") + + # Step 2: Assign the IP address to the management interface + ip_url = f"{NETBOX_URL}/api/ipam/ip-addresses/" + ip_data = { + "address": ip_address, + "assigned_object_id": interface_id, + "assigned_object_type": "dcim.interface" + } + ip_response = requests.post(ip_url, headers=headers, json=ip_data, verify=False) + + if ip_response.status_code not in [200, 201]: + print(f"Failed to assign IP address {ip_address} to 'Management' interface: {ip_response.text}") + return + + print(f"IP address {ip_address} assigned successfully to 'Management' interface.") + ip_id = ip_response.json()['id'] + + # Step 3: Set the IP address as the primary IP for the device + device_update_url = f"{NETBOX_URL}/api/dcim/devices/{device_id}/" + device_update_data = { + "primary_ip4": ip_id + } + device_update_response = requests.patch(device_update_url, headers=headers, json=device_update_data, verify=False) + + if device_update_response.status_code in [200, 201, 204]: + print(f"IP address {ip_address} set as primary IP for device ID {device_id}.") + else: + print(f"Failed to set IP address as primary for device ID {device_id}: {device_update_response.text}") def create_device(row, site_mapping, device_lookup): site_prefix = row['Name'][:4].upper() site_id = site_mapping.get(site_prefix, site_mapping.get('DEFAULT')) device_code = row['Name'][9:11] lookup = device_lookup.get(device_code, {}) + device_role_id = fetch_id('dcim/device-roles', lookup.get('DeviceRole'), 'name') device_type_id = fetch_id('dcim/device-types', lookup.get('DeviceType'), 'model') + if not (site_id and device_role_id and device_type_id): print(f"Skipping device creation for {row['Name']} due to missing required IDs.") return + data = {"name": row['Name'], "site": site_id, "device_role": device_role_id, "device_type": device_type_id, "serial": row.get('Serial number', '')} device_response = requests.post(f"{NETBOX_URL}/api/dcim/devices/", headers=headers, json=data, verify=False) + if device_response.status_code in [200, 201]: print(f"Device '{row['Name']}' created successfully.") device_id = device_response.json()['id'] @@ -104,10 +119,13 @@ def create_device(row, site_mapping, device_lookup): def main(): print("Script started.") + site_mapping = fetch_sites() print(f"Loaded {len(site_mapping)} sites.") + device_lookup = load_device_lookup() print(f"Loaded device lookup with codes: {list(device_lookup.keys())}") + with open(DEVICES_CSV_FILE_PATH, mode='r', encoding='utf-8') as csvfile: reader = csv.DictReader(csvfile) for row in reader: diff --git a/mdd/scripts/netbox_generate_sites.py b/mdd/scripts/netbox_generate_sites.py index 1667881..bee1311 100644 --- a/mdd/scripts/netbox_generate_sites.py +++ b/mdd/scripts/netbox_generate_sites.py @@ -6,37 +6,48 @@ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # Load environment variables -netbox_token = os.getenv('NETBOX_TOKEN') -netbox_url = os.getenv('NETBOX_URL') +NETBOX_URL = os.getenv('NETBOX_URL') +NETBOX_TOKEN = os.getenv('NETBOX_TOKEN') + +headers = {'Authorization': f'Token {NETBOX_TOKEN}', 'Content-Type': 'application/json'} + +# Path to your CSV file +csv_file_path = './docs/netbox_sites.csv' def create_region(region_name): """Create a region in NetBox if it doesn't already exist.""" + if region_name == '': return None - url = urljoin(netbox_url, 'api/dcim/regions/') - headers = {'Authorization': f'Token {netbox_token}', 'Content-Type': 'application/json'} + + url = urljoin(NETBOX_URL, 'api/dcim/regions/') response = requests.get(url, headers=headers, params={'name': region_name}, verify=False) # Added verify=False here - if response.status_code == 200 and response.json()['count'] == 0: - data = {'name': region_name, 'slug': region_name.lower()} - create_response = requests.post(url, json=data, headers=headers, verify=False) # And here - if create_response.status_code in [200, 201]: - print(f"Region '{region_name}' created successfully.") - return create_response.json()['id'] - else: - print(f"Failed to create region '{region_name}'.") - return None - elif response.json()['count'] > 0: - print(f"Region '{region_name}' already exists.") - return response.json()['results'][0]['id'] - else: + + if response.status_code != 200: print(f"Failed to check if region '{region_name}' exists.") return None + if response.json()['count'] > 0: + print(f"Region '{region_name}' already exists.") + return response.json()['results'][0]['id'] + + # Region doesn't exist - create it + data = {'name': region_name, 'slug': region_name.lower()} + create_response = requests.post(url, json=data, headers=headers, verify=False) # And here + + if create_response.status_code in [200, 201]: + print(f"Region '{region_name}' created successfully.") + return create_response.json()['id'] + + print(f"Failed to create region '{region_name}'.") + return None + def create_site(row): - """Create a site in NetBox using row data from the CSV.""" + """Create a site in NetBox using row data from the CSV""" + region_id = create_region(row['Region']) - url = urljoin(netbox_url, 'api/dcim/sites/') - headers = {'Authorization': f'Token {netbox_token}', 'Content-Type': 'application/json'} + url = urljoin(NETBOX_URL, 'api/dcim/sites/') + headers = {'Authorization': f'Token {NETBOX_TOKEN}', 'Content-Type': 'application/json'} data = { 'name': row['Name'], 'slug': row['Slug'], @@ -46,15 +57,14 @@ def create_site(row): 'longitude': row['Longitude'] if row['Longitude'] else None, 'description': row['Comments'], } + response = requests.post(url, json=data, headers=headers, verify=False) # Added verify=False here + if response.status_code in [200, 201]: print(f"Site '{row['Name']}' created successfully.") else: print(f"Failed to create site '{row['Name']}': {response.text}") -# Path to your CSV file -csv_file_path = './docs/netbox_sites.csv' - # Read the CSV file and process each row with open(csv_file_path, mode='r', encoding='utf-8') as csvfile: reader = csv.DictReader(csvfile) diff --git a/mdd/scripts/nso_acl_ip_to_nautobot.py b/mdd/scripts/nso_acl_ip_to_nautobot.py index 58d8551..34aab12 100644 --- a/mdd/scripts/nso_acl_ip_to_nautobot.py +++ b/mdd/scripts/nso_acl_ip_to_nautobot.py @@ -29,6 +29,7 @@ def fetch_nso_devices(): """Fetches a list of devices from NSO.""" + url = f"{NSO_URL}/restconf/tailf/query" payload = { "tailf-rest-query:immediate-query": { @@ -36,14 +37,17 @@ def fetch_nso_devices(): "select": [{"label": "name", "expression": "name", "result-type": "string"}] } } + response = requests.post(url, headers=nso_headers, json=payload, verify=False) devices = response.json().get("tailf-rest-query:query-result", {}).get("result", []) return [device["select"][0]["value"] for device in devices] def get_acl_for_device(device_name): """Fetches ACLs for a specified device and processes them.""" + url = f"{NSO_URL}/restconf/data/tailf-ncs:devices/device={device_name}/config/tailf-ned-cisco-ios:access-list" response = requests.get(url, headers=nso_headers, verify=False) + if response.status_code == 200: acl_data = response.json().get("tailf-ned-cisco-ios:access-list", {}).get("access-list", []) for acl in acl_data: @@ -55,6 +59,7 @@ def get_acl_for_device(device_name): def process_acl_rule(rule_text): """Processes a single ACL rule, extracting IPs and pushing to Nautobot.""" + ip_pattern = re.compile(r'permit\s+(\d+\.\d+\.\d+\.\d+)(?:\s+(\d+\.\d+\.\d+\.\d+))?') match = ip_pattern.search(rule_text) if match: @@ -69,12 +74,14 @@ def process_acl_rule(rule_text): def convert_to_cidr(mask): """Converts a subnet mask to CIDR notation.""" + return sum([bin(int(x)).count('1') for x in mask.split('.')]) -def push_ip_to_nautobot(ip_address): +def push_ip_to_nautobot(ip_address): # TODO: CHANGE DATA VARIABLE """Pushes an IP address to Nautobot.""" + url = f"{NAUTOBOT_URL}/ipam/ip-addresses/" - data = data = { + data = { "address": ip_address, "namespace": { "id": "fa8052af-7f81-4818-8eab-d7f08ed192a3", # Example ID, replace with actual if different @@ -85,10 +92,12 @@ def push_ip_to_nautobot(ip_address): "status": { "id": "cd5378a5-ed32-4f75-9e9e-980f4280e7c1", # Example status ID, replace with actual if different "object_type": "app_label.modelname", # Adjust as necessary - "url": "https://52.61.163.54/extras/statuses/cd5378a5-ed32-4f75-9e9e-980f4280e7c1" + "url": "https://IP/extras/statuses/cd5378a5-ed32-4f75-9e9e-980f4280e7c1" } } + response = requests.post(url, headers=nautobot_headers, json=data, verify=False) + if response.status_code in [200, 201]: print(f"IP {ip_address} pushed to Nautobot successfully.") else: diff --git a/mdd/scripts/nso_to_netbox_device.py b/mdd/scripts/nso_to_netbox_device.py index 9100480..cb86bd7 100644 --- a/mdd/scripts/nso_to_netbox_device.py +++ b/mdd/scripts/nso_to_netbox_device.py @@ -9,8 +9,8 @@ import base64 # Setup and configurations -print("Loading environment variables...") urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + NSO_URL = os.getenv('NSO_URL') NSO_USERNAME = os.getenv('NSO_USERNAME') NSO_PASSWORD = os.getenv('NSO_PASSWORD') @@ -26,6 +26,7 @@ def create_device_in_netbox(device, site_id, device_lookup): print(f"Creating device in NetBox: {device['name']}") + device_code = device['name'].split('-')[2] # Extract device code if device_code in device_lookup: device_role = fetch_id('dcim/device-roles', device_lookup[device_code]['DeviceRole'], 'name') @@ -42,75 +43,86 @@ def create_device_in_netbox(device, site_id, device_lookup): "serial": "", } device_response = requests.post(f"{NETBOX_URL}/api/dcim/devices/", headers=headers_netbox, json=device_payload, verify=False) - if device_response.status_code in [200, 201]: - device_id = device_response.json()["id"] - print(f"Device {device['name']} created successfully with ID {device_id}.") - create_management_interface_and_set_primary_ip(device_id, device['address']) - else: + if device_response.status_code not in [200, 201]: print(f"Failed to create device {device['name']}: {device_response.text}") + return + + device_id = device_response.json()["id"] + print(f"Device {device['name']} created successfully with ID {device_id}.") + create_management_interface_and_set_primary_ip(device_id, device['address']) def create_management_interface_and_set_primary_ip(device_id, ip_address): print(f"Creating 'Management' interface for device ID {device_id}...") - + if not ip_address.endswith('/32'): ip_address += '/32' - + # Create the Management interface interface_payload = {"device": device_id, "name": "Management", "type": "virtual"} interface_response = requests.post(f"{NETBOX_URL}/api/dcim/interfaces/", headers=headers_netbox, json=interface_payload, verify=False) - - if interface_response.status_code in [200, 201]: - interface_id = interface_response.json()["id"] - print(f"'Management' interface with ID {interface_id} created successfully.") - - # Create the IP address and assign it to the interface - ip_payload = {"address": ip_address, "status": "active", "assigned_object_id": interface_id, "assigned_object_type": "dcim.interface"} - ip_response = requests.post(f"{NETBOX_URL}/api/ipam/ip-addresses/", headers=headers_netbox, json=ip_payload, verify=False) - - if ip_response.status_code in [200, 201]: - ip_id = ip_response.json()["id"] - print(f"IP address {ip_address} created and assigned successfully.") - - # Set the IP address as the primary IP for the device - device_update_payload = {"primary_ip4": ip_id} # or "primary_ip6" for IPv6 - device_update_response = requests.patch(f"{NETBOX_URL}/api/dcim/devices/{device_id}/", headers=headers_netbox, json=device_update_payload, verify=False) - - if device_update_response.status_code in [200, 201, 204]: - print(f"Primary IP set successfully for device ID {device_id}.") - else: - print(f"Failed to set primary IP for device ID {device_id}: {device_update_response.text}") - else: - print(f"Failed to create and assign IP address {ip_address}: {ip_response.text}") - else: + + if interface_response.status_code not in [200, 201]: print(f"Failed to create 'Management' interface: {interface_response.text}") + return + + interface_id = interface_response.json()["id"] + print(f"'Management' interface with ID {interface_id} created successfully.") + + # Create the IP address and assign it to the interface + ip_payload = {"address": ip_address, "status": "active", "assigned_object_id": interface_id, "assigned_object_type": "dcim.interface"} + ip_response = requests.post(f"{NETBOX_URL}/api/ipam/ip-addresses/", headers=headers_netbox, json=ip_payload, verify=False) + + if ip_response.status_code not in [200, 201]: + print(f"Failed to create and assign IP address {ip_address}: {ip_response.text}") + return + + ip_id = ip_response.json()["id"] + print(f"IP address {ip_address} created and assigned successfully.") + + # Set the IP address as the primary IP for the device + device_update_payload = {"primary_ip4": ip_id} # or "primary_ip6" for IPv6 + device_update_response = requests.patch(f"{NETBOX_URL}/api/dcim/devices/{device_id}/", headers=headers_netbox, json=device_update_payload, verify=False) + + if device_update_response.status_code in [200, 201, 204]: + print(f"Primary IP set successfully for device ID {device_id}.") + else: + print(f"Failed to set primary IP for device ID {device_id}: {device_update_response.text}") def fetch_id(endpoint, search_param, search_field='name'): """Fetch a NetBox entity's ID based on a search field and parameter.""" + print(f"Fetching ID for '{search_param}' from {endpoint}...") url = f"{NETBOX_URL}/api/{endpoint}/" params = {search_field: search_param} response = requests.get(url, headers=headers_netbox, params=params, verify=False) - if response.status_code == 200 and response.json()['count'] == 1: - return response.json()['results'][0]['id'] - else: + + data = response.json() + + if response.status_code != 200 or data['count'] != 1: print(f"Failed to fetch ID for {search_param} from {endpoint}.") return None + return data['results'][0]['id'] + def fetch_sites(): """Fetch all sites from NetBox and return a dictionary mapping site slugs to their IDs.""" + site_mapping = {} url = f"{NETBOX_URL}/api/dcim/sites/" response = requests.get(url, headers=headers_netbox, verify=False) - if response.status_code == 200: - for site in response.json()['results']: - site_mapping[site['slug'].upper()] = site['id'] - else: + + if response.status_code != 200: print("Failed to fetch sites from NetBox.") + return {} + + for site in response.json()['results']: + site_mapping[site['slug'].upper()] = site['id'] return site_mapping def load_device_lookup(): """Load the device role and type lookup from a CSV file.""" + lookup = {} with open(DEVICE_TYPE_LOOKUP_CSV_PATH, mode='r', encoding='utf-8') as file: reader = csv.DictReader(file) @@ -120,6 +132,7 @@ def load_device_lookup(): def fetch_netbox_devices(): """Fetch existing devices from NetBox.""" + netbox_devices = {} url = f"{NETBOX_URL}/api/dcim/devices/" response = requests.get(url, headers=headers_netbox, verify=False) @@ -132,7 +145,10 @@ def fetch_netbox_devices(): def fetch_nso_devices(): """Fetch device information from NSO using a specific query.""" + print("Fetching devices from NSO...") + + devices = [] url = f"{NSO_URL}/restconf/tailf/query" payload = json.dumps({ "tailf-rest-query:immediate-query": { @@ -150,42 +166,46 @@ def fetch_nso_devices(): } response = requests.post(url, headers=headers, data=payload, auth=(NSO_USERNAME, NSO_PASSWORD), verify=False) - if response.status_code == 200: - results = response.json().get("tailf-rest-query:query-result", {}).get("result", []) - devices = [] - for result in results: - device = {select["label"]: select["value"] for select in result["select"]} - devices.append(device) - print("NSO devices fetched successfully.") - return devices - else: + + if response.status_code != 200: print(f"Failed to fetch devices from NSO: {response.text}") return [] + results = response.json().get("tailf-rest-query:query-result", {}).get("result", []) + + for result in results: + device = {select["label"]: select["value"] for select in result["select"]} + devices.append(device) + print("NSO devices fetched successfully.") + return devices def assign_ip_to_interface(device_id, interface_id, ip_address): + # Create the IP address ip_payload = { "address": ip_address, "status": "active" } ip_response = requests.post(f"{NETBOX_URL}/api/ipam/ip-addresses/", headers=headers_netbox, json=ip_payload, verify=False) - if ip_response.status_code in [200, 201]: - print(f"IP address {ip_address} created successfully.") - ip_id = ip_response.json()["id"] - - # Associate the IP address with the interface - association_payload = { - "assigned_object_type": "dcim.interface", - "assigned_object_id": interface_id - } - association_response = requests.patch(f"{NETBOX_URL}/api/ipam/ip-addresses/{ip_id}/", headers=headers_netbox, json=association_payload, verify=False) - if association_response.status_code in [200, 201, 204]: - print(f"IP address {ip_address} assigned to interface ID {interface_id} successfully.") - else: - print(f"Failed to assign IP address {ip_address} to interface: {association_response.text}") - else: + + if ip_response.status_code not in [200, 201]: print(f"Failed to create IP address {ip_address}: {ip_response.text}") + return + + print(f"IP address {ip_address} created successfully.") + ip_id = ip_response.json()["id"] + + # Associate the IP address with the interface + association_payload = { + "assigned_object_type": "dcim.interface", + "assigned_object_id": interface_id + } + association_response = requests.patch(f"{NETBOX_URL}/api/ipam/ip-addresses/{ip_id}/", headers=headers_netbox, json=association_payload, verify=False) + + if association_response.status_code in [200, 201, 204]: + print(f"IP address {ip_address} assigned to interface ID {interface_id} successfully.") + else: + print(f"Failed to assign IP address {ip_address} to interface: {association_response.text}") def main(): print("Starting script...") @@ -195,15 +215,17 @@ def main(): nso_devices = fetch_nso_devices() for device in nso_devices: - if device['name'] not in netbox_devices: - site_code = device['name'][:4].upper() - site_id = site_mapping.get(site_code) - if site_id: - create_device_in_netbox(device, site_id, device_lookup) - else: - print(f"Site code {site_code} not found in NetBox.") - else: + if device['name'] in netbox_devices: print(f"Device {device['name']} already exists in NetBox.") + continue + + site_code = device['name'][:4].upper() + site_id = site_mapping.get(site_code) + + if site_id: + create_device_in_netbox(device, site_id, device_lookup) + else: + print(f"Site code {site_code} not found in NetBox.") if __name__ == "__main__": main() \ No newline at end of file diff --git a/mdd/scripts/nso_to_netbox_interfaces.py b/mdd/scripts/nso_to_netbox_interfaces.py index d4047b6..3bf6d2f 100644 --- a/mdd/scripts/nso_to_netbox_interfaces.py +++ b/mdd/scripts/nso_to_netbox_interfaces.py @@ -22,30 +22,38 @@ def get_netbox_devices(): """Fetch all devices from NetBox.""" + print("Fetching devices from NetBox...") + url = f"{NETBOX_URL}/api/dcim/devices/" response = requests.get(url, headers=netbox_headers, verify=False) - if response.status_code == 200: - print("Devices successfully fetched from NetBox.") - return response.json()['results'] - else: + + if response.status_code != 200: print("Failed to fetch devices from NetBox.") return [] + print("Devices successfully fetched from NetBox.") + return response.json()['results'] + def get_nso_interfaces(device_name): """Fetch interfaces for a specific device from NSO.""" + print(f"Fetching interfaces for {device_name} from NSO...") + url = f"{NSO_URL}/restconf/data/tailf-ncs:devices/device={device_name}/live-status/tailf-ned-cisco-ios-stats:interfaces" response = requests.get(url, auth=(NSO_USERNAME, NSO_PASSWORD), headers={"Accept": "application/yang-data+json"}, verify=False) - if response.status_code == 200: - print(f"Interfaces successfully fetched for {device_name} from NSO.") - return response.json().get('tailf-ned-cisco-ios-stats:interfaces', []) - else: + + if response.status_code != 200: print(f"Failed to fetch interfaces for {device_name} from NSO.") return [] + print(f"Interfaces successfully fetched for {device_name} from NSO.") + return response.json().get('tailf-ned-cisco-ios-stats:interfaces', []) + + def type_translation(nso_type): """Translate NSO interface type to NetBox interface type.""" + translation = { "GigabitEthernet": "1000Base-T", "TenGigabitEthernet": "10GBase-T", @@ -55,7 +63,9 @@ def type_translation(nso_type): def add_interface_to_device(device_id, interface): """Add an interface to a device in NetBox.""" + print(f"Adding interface {interface['name']} to device ID {device_id} in NetBox...") + data = { "device": device_id, "name": f"{interface['type']} {interface['name']}", @@ -66,13 +76,14 @@ def add_interface_to_device(device_id, interface): } url = f"{NETBOX_URL}/api/dcim/interfaces/" response = requests.post(url, headers=netbox_headers, json=data, verify=False) - if response.status_code in [200, 201]: - print(f"Interface {interface['name']} added to device ID {device_id} in NetBox.") - return response.json()['id'] # Return the new interface's ID for further use - else: + + if response.status_code not in [200, 201]: print(f"Failed to add interface {interface['name']} to device ID {device_id} in NetBox: {response.text}") return None + print(f"Interface {interface['name']} added to device ID {device_id} in NetBox.") + return response.json()['id'] # Return the new interface's ID for further use + def main(): devices = get_netbox_devices() for device in devices: diff --git a/mdd/scripts/nso_to_netbox_inventory.py b/mdd/scripts/nso_to_netbox_inventory.py index 334b439..d145396 100644 --- a/mdd/scripts/nso_to_netbox_inventory.py +++ b/mdd/scripts/nso_to_netbox_inventory.py @@ -21,31 +21,36 @@ } def get_netbox_devices(): - """Fetch all devices from NetBox.""" + """Fetch all devices from NetBox""" + url = f"{NETBOX_URL}/api/dcim/devices/" response = requests.get(url, headers=netbox_headers, verify=False) + return response.json()['results'] if response.status_code == 200 else [] def get_nso_inventory(device_name): - """Fetch inventory for a specific device from NSO.""" + """Fetch inventory for a specific device from NSO""" + url = f"{NSO_URL}/restconf/data/tailf-ncs:devices/device={device_name}/live-status/tailf-ned-cisco-ios-stats:inventory" response = requests.get(url, auth=(NSO_USERNAME, NSO_PASSWORD), headers={"Accept": "application/yang-data+json"}, verify=False) + return response.json().get('tailf-ned-cisco-ios-stats:inventory', []) if response.status_code == 200 else [] def fetch_existing_serial_numbers(device_id): - """Fetch existing serial numbers of inventory items for a given device.""" + """Fetch existing serial numbers of inventory items for a given device""" + url = f"{NETBOX_URL}/api/dcim/inventory-items/?device_id={device_id}" response = requests.get(url, headers=netbox_headers, verify=False) - if response.status_code == 200: - return [item['serial'] for item in response.json()['results'] if item['serial']] - return [] + + return [item['serial'] for item in response.json()['results'] if item['serial']] if response.status_code == 200 else [] def add_netbox_inventory_item(device_id, device_name, inventory_item, existing_serials): - """Add an inventory item to a NetBox device, skipping duplicates and items without 'pid'.""" + """Add an inventory item to a NetBox device, skipping duplicates and items without 'pid'""" + pid = inventory_item.get('pid') sn = inventory_item.get('sn') - if not pid or sn in existing_serials or pid == device_name: + if (not pid) or (sn in existing_serials) or (pid == device_name): return # Skip if pid is missing, serial is duplicate, or pid matches device name data = { @@ -58,6 +63,7 @@ def add_netbox_inventory_item(device_id, device_name, inventory_item, existing_s "serial": sn, } response = requests.post(f"{NETBOX_URL}/api/dcim/inventory-items/", headers=netbox_headers, json=data, verify=False) + if response.status_code in [200, 201]: print(f"Inventory item {pid} added to device {device_id}.") else: diff --git a/mdd/scripts/nso_to_netbox_platform.py b/mdd/scripts/nso_to_netbox_platform.py index e64ed40..456382e 100644 --- a/mdd/scripts/nso_to_netbox_platform.py +++ b/mdd/scripts/nso_to_netbox_platform.py @@ -37,82 +37,96 @@ def fetch_nso_platform_info(device_name): response = requests.get(url, headers=headers, verify=False) - if response.status_code == 200: - platform_info = response.json().get("tailf-ncs:platform") - if platform_info: - print(json.dumps(platform_info, indent=4)) # Debug print - return platform_info - else: - print(f"Platform information is missing for {device_name}.") - return None - else: + if response.status_code != 200: print(f"Failed to retrieve platform information for {device_name} from NSO: {response.status_code}, {response.text}") return None + platform_info = response.json().get("tailf-ncs:platform") + if not platform_info: + print(f"Platform information is missing for {device_name}.") + return None + + print(json.dumps(platform_info, indent=4)) # Debug print + return platform_info + def get_or_create_platform(name, version): """Ensure the platform exists in NetBox, based on the combined name and version, and return its ID.""" platform_identifier = f"{name} {version}" url = f"{NETBOX_URL}/api/dcim/platforms/?name={platform_identifier}" response = requests.get(url, headers=netbox_headers, verify=False) - if response.status_code == 200 and response.json()['count'] == 0: - # Platform does not exist, create it - data = { - "name": platform_identifier, - "slug": platform_identifier.lower().replace(" ", "-").replace("(", "").replace(")", "").replace(".", "-") - } - create_response = requests.post(f"{NETBOX_URL}/api/dcim/platforms/", headers=netbox_headers, data=json.dumps(data), verify=False) - if create_response.status_code in [200, 201]: - print(f"Successfully added platform: {platform_identifier}") - return create_response.json()['id'] - else: - print(f"Failed to create platform {platform_identifier}: {create_response.text}") - return None - else: - # Platform exists - return response.json()['results'][0]['id'] + + if response.status_code != 200: + return None # Server error? + + response = response.json() + + if response['count'] != 0: # Platform exists + return response['results'][0]['id'] + + # Platform doesn't exist - so create one + data = { + "name": platform_identifier, + "slug": platform_identifier.lower().replace(" ", "-").replace("(", "").replace(")", "").replace(".", "-") + } + create_response = requests.post(f"{NETBOX_URL}/api/dcim/platforms/", headers=netbox_headers, data=json.dumps(data), verify=False) + + if create_response.status_code not in [200, 201]: + print(f"Failed to create platform {platform_identifier}: {create_response.text}") + return None + + print(f"Successfully added platform: {platform_identifier}") + return create_response.json()['id'] def get_or_create_device_type(model): """Ensure the device type exists in NetBox, based on the model from NSO, and return its ID.""" + url = f"{NETBOX_URL}/api/dcim/device-types/?model={model}" response = requests.get(url, headers=netbox_headers, verify=False) - if response.status_code == 200 and response.json()['count'] == 0: - # Device type does not exist, create it - data = { - "model": model, - "slug": model.lower().replace(" ", "-").replace("_", "-"), - "manufacturer": 1 # Example manufacturer ID, adjust accordingly - } - create_response = requests.post(f"{NETBOX_URL}/api/dcim/device-types/", headers=netbox_headers, data=json.dumps(data), verify=False) - if create_response.status_code in [200, 201]: - return create_response.json()['id'] - else: - print(f"Failed to create device type {model}: {create_response.text}") - return None - else: + + if response.status_code != 200 or response.json()['count'] > 0: + # Device exists return response.json()['results'][0]['id'] + # Device type does not exist, create it + data = { + "model": model, + "slug": model.lower().replace(" ", "-").replace("_", "-"), + "manufacturer": 1 # Example manufacturer ID, adjust accordingly + } + create_response = requests.post(f"{NETBOX_URL}/api/dcim/device-types/", headers=netbox_headers, data=json.dumps(data), verify=False) + + if create_response.status_code not in [200, 201]: + print(f"Failed to create device type {model}: {create_response.text}") + return None + + return create_response.json()['id'] + def update_netbox_device(device_id, platform_info): """Update a device in NetBox with the correct serial number, device type, and platform.""" + model = platform_info.get("model") serial_number = platform_info.get("serial-number") name = platform_info.get("name") version = platform_info.get("version") - + device_type_id = get_or_create_device_type(model) platform_id = get_or_create_platform(name, version) if name and version else None - - if device_type_id and platform_id: - data = { - "serial": serial_number, - "device_type": device_type_id, - "platform": platform_id - } - url = f"{NETBOX_URL}/api/dcim/devices/{device_id}/" - response = requests.patch(url, headers=netbox_headers, data=json.dumps(data), verify=False) - if response.status_code in [200, 201, 204]: - print(f"Device {device_id} updated successfully with new type, serial, and platform.") - else: - print(f"Failed to update device {device_id}: {response.status_code}, {response.text}") + + if not device_type_id or not platform_id: + return + + data = { + "serial": serial_number, + "device_type": device_type_id, + "platform": platform_id + } + url = f"{NETBOX_URL}/api/dcim/devices/{device_id}/" + response = requests.patch(url, headers=netbox_headers, data=json.dumps(data), verify=False) + + if response.status_code in [200, 201, 204]: + print(f"Device {device_id} updated successfully with new type, serial, and platform.") + else: + print(f"Failed to update device {device_id}: {response.status_code}, {response.text}") def main(): netbox=netbox_helper(NETBOX_URL,netbox_headers) From de3de08baecf53dd1087be3bcf39b0af3a3ed91c Mon Sep 17 00:00:00 2001 From: KJStick Date: Wed, 10 Apr 2024 22:58:01 -0400 Subject: [PATCH 2/3] added env vars --- mdd/scripts/generate_inventory.py | 17 ++++++------ mdd/scripts/generate_tags.py | 18 ++++++------- mdd/scripts/load_env_vars.py | 20 ++++++++++++++ mdd/scripts/nautobot_add_sites.py | 27 +++++++++++-------- mdd/scripts/netbox_device_add.py | 39 ++++++++++++++------------- mdd/scripts/nso_to_netbox_platform.py | 3 ++- 6 files changed, 77 insertions(+), 47 deletions(-) create mode 100644 mdd/scripts/load_env_vars.py diff --git a/mdd/scripts/generate_inventory.py b/mdd/scripts/generate_inventory.py index 23ac611..a1e4b7f 100644 --- a/mdd/scripts/generate_inventory.py +++ b/mdd/scripts/generate_inventory.py @@ -1,15 +1,16 @@ import os import requests import yaml +import load_env_vars -NETBOX_URL = os.environ.get('NETBOX_URL') # Was NETBOX_API -NETBOX_TOKEN = os.environ.get('NETBOX_TOKEN') +ENV_VARS = { + "NAUTOBOT_URL" : None, + "NAUTOBOT_TOKEN" : None +} -if not NETBOX_URL or not NETBOX_TOKEN: - print("Error: NETBOX_API or NETBOX_TOKEN environment variables are not set.") - exit(1) +ENV_VARS = load_env_vars.load_env_vars(os.environ, ENV_VARS) -headers = {'Authorization': f'Token {NETBOX_TOKEN}'} +headers = {'Authorization': f'Token {ENV_VARS['NAUTOBOT_TOKEN']}'} verify_ssl = False existing_inventory_path = '/path/to/existing/inventory.yml' # Replace with the actual path @@ -30,7 +31,7 @@ def fetch_netbox_devices(): # Make API requests to NetBox to retrieve device information response = requests.get( - f'{NETBOX_URL}/api/dcim/devices/', + f'{ENV_VARS['NAUTOBOT_URL']}/api/dcim/devices/', headers=headers, verify=verify_ssl # Disable SSL certificate validation ) @@ -58,7 +59,7 @@ def fetch_netbox_devices(): # Check if primary_ip4 exists and is not None primary_ip_data = device.get('primary_ip4') - if not primary_ip_data: + if not primary_ip_data: # Still need to create the role so that it shows up in the inventory file - because line 69 won't hit these cases inventory_data['all']['children']['network']['children'].setdefault(device_role, {}).setdefault('hosts', {}).setdefault(device_name, {}) continue diff --git a/mdd/scripts/generate_tags.py b/mdd/scripts/generate_tags.py index 68cccc8..647bb1e 100644 --- a/mdd/scripts/generate_tags.py +++ b/mdd/scripts/generate_tags.py @@ -1,16 +1,16 @@ import os import requests import yaml -import json +import load_env_vars -NETBOX_URL = os.environ.get("NETBOX_URL") # was NETBOX_URL -NETBOX_TOKEN = os.environ.get("NETBOX_TOKEN") +ENV_VARS = { + "NAUTOBOT_URL" : None, + "NAUTOBOT_TOKEN" : None +} -if not NETBOX_URL or not NETBOX_TOKEN: - print("NETBOX_API or NETBOX_TOKEN environment variables are not set.") - exit(1) +ENV_VARS = load_env_vars.load_env_vars(os.environ, ENV_VARS) -headers = {'Authorization': f'Token {NETBOX_TOKEN}', 'Accept': 'application/json'} +headers = {'Authorization': f'Token {ENV_VARS['NAUTOBOT_TOKEN']}', 'Accept': 'application/json'} verify_ssl = False def get_inventory_structure(): @@ -34,7 +34,7 @@ def fetch_netbox_devices(): inventory_data = get_inventory_structure() # Fetch sites from NetBox - response = requests.get(f'{NETBOX_URL}/api/dcim/sites/?limit=1000', headers=headers, verify=verify_ssl) + response = requests.get(f'{ENV_VARS['NAUTOBOT_URL']}/api/dcim/sites/?limit=1000', headers=headers, verify=verify_ssl) if response.status_code != 200: print(f"Failed to fetch sites from NetBox: {response.status_code}") @@ -45,7 +45,7 @@ def fetch_netbox_devices(): inventory_data['all']['vars']['sites'].append(site['name']) # Fetch devices from NetBox - response = requests.get(f'{NETBOX_URL}/api/dcim/devices/?limit=1000', headers=headers, verify=verify_ssl) + response = requests.get(f'{ENV_VARS['NAUTOBOT_URL']}/api/dcim/devices/?limit=1000', headers=headers, verify=verify_ssl) if response.status_code != 200: print(f"Failed to fetch devices from NetBox: {response.status_code}") diff --git a/mdd/scripts/load_env_vars.py b/mdd/scripts/load_env_vars.py new file mode 100644 index 0000000..61b0323 --- /dev/null +++ b/mdd/scripts/load_env_vars.py @@ -0,0 +1,20 @@ +def load_env_vars(OS_ENV, ENV_VARS): + """ + Sees if any environment variables are not set - if they are not, exits the program + OS_ENV should be passed as os.environ + """ + missing_vars = [] + for var in ENV_VARS: + value = OS_ENV.get(var) + if not value: + missing_vars.append(var) + else: + # Create a variable with the name from the environment variable + ENV_VARS[var] = value + + # If any required environment variable is missing or empty, print an error message and exit + if missing_vars: + print(f"Error: The following environment variables are not set or empty: {', '.join(missing_vars)}") + exit(1) + + return ENV_VARS \ No newline at end of file diff --git a/mdd/scripts/nautobot_add_sites.py b/mdd/scripts/nautobot_add_sites.py index 6359153..c7bed0a 100644 --- a/mdd/scripts/nautobot_add_sites.py +++ b/mdd/scripts/nautobot_add_sites.py @@ -1,21 +1,26 @@ import csv import requests +import load_env_vars as load_env_vars import os -# Nautobot details -NAUTOBOT_URL = os.environ.get("NAUTOBOT_URL") -NAUTOBOT_TOKEN = os.environ.get("NAUTOBOT_TOKEN") +# Disable SSL Warnings (for self-signed certificates) +requests.packages.urllib3.disable_warnings() + +ENV_VARS = { + "NAUTOBOT_URL" : None, + "NAUTOBOT_TOKEN" : None +} + +ENV_VARS = load_env_vars.load_env_vars(os.environ, ENV_VARS) + HEADERS = { - 'Authorization': f'Token {NAUTOBOT_TOKEN}', + 'Authorization': f'Token {ENV_VARS['NAUTOBOT_TOKEN']}', 'Content-Type': 'application/json', 'Accept': 'application/json', } -# Disable SSL Warnings (for self-signed certificates) -requests.packages.urllib3.disable_warnings() - def create_site(site): - url = f"{NAUTOBOT_URL}/dcim/sites/" + url = f"{ENV_VARS['NAUTOBOT_URL']}/dcim/sites/" response = requests.post(url, headers=HEADERS, json=site, verify=False) if response.status_code == 201: @@ -24,7 +29,7 @@ def create_site(site): print(f"Failed to create site '{site['name']}': {response.text}") def create_device(device): - url = f"{NAUTOBOT_URL}/dcim/devices/" + url = f"{ENV_VARS['NAUTOBOT_URL']}/dcim/devices/" response = requests.post(url, headers=HEADERS, json=device, verify=False) if response.status_code == 201: @@ -34,7 +39,7 @@ def create_device(device): print(f"Failed to create device '{device['name']}': {response.text}") def create_interface(device_id, interface): - url = f"{NAUTOBOT_URL}/dcim/interfaces/" + url = f"{ENV_VARS['NAUTOBOT_URL']}/dcim/interfaces/" interface['device'] = device_id response = requests.post(url, headers=HEADERS, json=interface, verify=False) @@ -46,7 +51,7 @@ def create_interface(device_id, interface): print(f"Failed to create interface '{interface['name']}': {response.text}") def assign_ip_to_interface(interface_id, ip): - url = f"{NAUTOBOT_URL}/ipam/ip-addresses/" + url = f"{ENV_VARS['NAUTOBOT_URL']}/ipam/ip-addresses/" ip_data = { "address": ip, "assigned_object_type": "dcim.interface", diff --git a/mdd/scripts/netbox_device_add.py b/mdd/scripts/netbox_device_add.py index 742e070..f304538 100644 --- a/mdd/scripts/netbox_device_add.py +++ b/mdd/scripts/netbox_device_add.py @@ -2,23 +2,26 @@ import urllib3 import json import os +import load_env_vars -# Setup for NetBox and NSO -NETBOX_URL = os.environ.get("NETBOX_URL") -NETBOX_TOKEN = os.environ.get("NETBOX_TOKEN") -HEADERS = {"Authorization": f"Token {NETBOX_TOKEN}", "Content-Type": "application/json", "Accept": "application/json"} +ENV_VARS = { + "ENV_VARS['NETBOX_URL']": None, + "NETBOX_TOKEN": None, + "NSO_URL": None, + "NSO_USERNAME": None, + "NSO_PASSWORD": None, + "DEVICE_NAME": None, + "PLATFORM_URL": None +} -NSO_URL = os.environ.get("NSO_URL") -NSO_USERNAME = os.environ.get("NSO_USERNAME") -NSO_PASSWORD = os.environ.get("NSO_PASSWORD") +ENV_VARS = load_env_vars.load_env_vars(os.environ, ENV_VARS) -DEVICE_NAME = os.environ.get("DEVICE_NAME") -PLATFORM_URL = os.environ.get("PLATFORM_URL") +HEADERS = {"Authorization": f"Token {ENV_VARS['NETBOX_TOKEN']}", "Content-Type": "application/json", "Accept": "application/json"} # Disable SSL warnings urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) -def get_or_create_device_type(platform_data, headers, netbox_url): +def get_or_create_device_type(platform_data, headers, ENV_VARS['NETBOX_URL']): """ Ensures the device type exists in NetBox based on the NSO platform data, and returns the device type ID. @@ -28,7 +31,7 @@ def get_or_create_device_type(platform_data, headers, netbox_url): slug = model.replace("-", "_").lower() # Check if the device type already exists - search_response = requests.get(f"{netbox_url}/api/dcim/device-types/?slug={slug}", headers=headers, verify=False) + search_response = requests.get(f"{ENV_VARS['NETBOX_URL']}/api/dcim/device-types/?slug={slug}", headers=headers, verify=False) if search_response.status_code != 200: return None @@ -47,7 +50,7 @@ def get_or_create_device_type(platform_data, headers, netbox_url): "name": model } - response = requests.post(f"{netbox_url}/api/dcim/device-types/", headers=headers, data=json.dumps(payload), verify=False) + response = requests.post(f"{ENV_VARS['NETBOX_URL']}/api/dcim/device-types/", headers=headers, data=json.dumps(payload), verify=False) if response.status_code not in [200, 201]: print(f"Failed to add device type {model}: {response.status_code}, {response.text}") @@ -56,7 +59,7 @@ def get_or_create_device_type(platform_data, headers, netbox_url): print(f"Successfully added device type: {model}") return response.json()["id"] -def update_device(platform_data, device_type_id, headers, netbox_url): +def update_device(platform_data, device_type_id, headers, ENV_VARS['NETBOX_URL']): """ Updates an existing device in NetBox with the device type based on NSO platform data. """ @@ -65,7 +68,7 @@ def update_device(platform_data, device_type_id, headers, netbox_url): serial_number = platform_data["tailf-ncs:platform"]["serial-number"] # Attempt to find the device in NetBox - search_response = requests.get(f"{netbox_url}/api/dcim/devices/?name={device_name}", headers=headers, verify=False) + search_response = requests.get(f"{ENV_VARS['NETBOX_URL']}/api/dcim/devices/?name={device_name}", headers=headers, verify=False) if search_response.status_code != 200: return @@ -80,7 +83,7 @@ def update_device(platform_data, device_type_id, headers, netbox_url): "serial": serial_number } - update_response = requests.patch(f"{netbox_url}/api/dcim/devices/{device_id}/", headers=headers, data=json.dumps(payload), verify=False) + update_response = requests.patch(f"{ENV_VARS['NETBOX_URL']}/api/dcim/devices/{device_id}/", headers=headers, data=json.dumps(payload), verify=False) if update_response.status_code in [200, 201, 204]: print(f"Successfully updated device: {device_name}") @@ -88,13 +91,13 @@ def update_device(platform_data, device_type_id, headers, netbox_url): print(f"Failed to update device {device_name}: {update_response.status_code}, {update_response.text}") # Make the GET request to retrieve the platform information -response = requests.get(PLATFORM_URL, auth=(NSO_USERNAME, NSO_PASSWORD), headers={"Accept": "application/yang-data+json"}, verify=False) +response = requests.get(ENV_VARS['PLATFORM_URL'], auth=(ENV_VARS['NSO_USERNAME'], ENV_VARS['NSO_PASSWORD']), headers={"Accept": "application/yang-data+json"}, verify=False) if response.status_code != 200: print(f"Failed to retrieve platform information: {response.status_code}, {response.text}") os._exit(1) print("Platform information retrieved successfully") platform_data = response.json() -device_type_id = get_or_create_device_type(platform_data, HEADERS, NETBOX_URL) +device_type_id = get_or_create_device_type(platform_data, HEADERS, ENV_VARS['NETBOX_URL']) if device_type_id: - update_device(platform_data, device_type_id, HEADERS, NETBOX_URL) + update_device(platform_data, device_type_id, HEADERS, ENV_VARS['NETBOX_URL']) diff --git a/mdd/scripts/nso_to_netbox_platform.py b/mdd/scripts/nso_to_netbox_platform.py index 456382e..278d1a5 100644 --- a/mdd/scripts/nso_to_netbox_platform.py +++ b/mdd/scripts/nso_to_netbox_platform.py @@ -129,7 +129,8 @@ def update_netbox_device(device_id, platform_info): print(f"Failed to update device {device_id}: {response.status_code}, {response.text}") def main(): - netbox=netbox_helper(NETBOX_URL,netbox_headers) + netbox=netbox_helper(NETBOX_URL, + netbox_headers) for device in netbox.fetch_netbox_devices(): device_name = device.get('name') platform_info = fetch_nso_platform_info(device_name) From 0076ec9887afcbdf00817a5dbd0125ca11af4453 Mon Sep 17 00:00:00 2001 From: KJStick Date: Wed, 24 Apr 2024 13:03:47 -0400 Subject: [PATCH 3/3] changed to pynetbox --- mdd/scripts/load_env_vars.py | 15 +- mdd/scripts/nautobot_add_sites.py | 183 ++++++++++---- mdd/scripts/netbox_class.py | 63 ++++- mdd/scripts/netbox_device_add.py | 140 +++++++---- mdd/scripts/netbox_device_csv.py | 262 ++++++++++++-------- mdd/scripts/netbox_generate_sites.py | 164 ++++++++----- mdd/scripts/nso_acl_ip_to_nautobot.py | 113 ++++++--- mdd/scripts/nso_class.py | 18 ++ mdd/scripts/nso_to_netbox_device.py | 313 ++++++++++++++++-------- mdd/scripts/nso_to_netbox_interfaces.py | 36 +-- mdd/scripts/nso_to_netbox_inventory.py | 29 ++- mdd/scripts/nso_to_netbox_platform.py | 30 ++- mdd/scripts/test.py | 15 ++ 13 files changed, 945 insertions(+), 436 deletions(-) create mode 100644 mdd/scripts/nso_class.py create mode 100644 mdd/scripts/test.py diff --git a/mdd/scripts/load_env_vars.py b/mdd/scripts/load_env_vars.py index 61b0323..ab16c79 100644 --- a/mdd/scripts/load_env_vars.py +++ b/mdd/scripts/load_env_vars.py @@ -1,3 +1,4 @@ +# CALL FILE TOOLS? def load_env_vars(OS_ENV, ENV_VARS): """ Sees if any environment variables are not set - if they are not, exits the program @@ -17,4 +18,16 @@ def load_env_vars(OS_ENV, ENV_VARS): print(f"Error: The following environment variables are not set or empty: {', '.join(missing_vars)}") exit(1) - return ENV_VARS \ No newline at end of file + return ENV_VARS + +def get_sub_dict(dict, level): + """ + Returns reference to a sub dictionary based on first key + Used for inventory where you only have 1 key + """ + sub_dict = dict + for _ in range(level): + key = list(sub_dict.keys())[0] # get first key + sub_dict = sub_dict[key] + + return sub_dict # reference \ No newline at end of file diff --git a/mdd/scripts/nautobot_add_sites.py b/mdd/scripts/nautobot_add_sites.py index c7bed0a..92085d7 100644 --- a/mdd/scripts/nautobot_add_sites.py +++ b/mdd/scripts/nautobot_add_sites.py @@ -1,5 +1,5 @@ import csv -import requests +from nautobot import Nautobot #pip install nautobot-client import load_env_vars as load_env_vars import os @@ -10,18 +10,19 @@ "NAUTOBOT_URL" : None, "NAUTOBOT_TOKEN" : None } +NETBOX_DEVICES_PATH = './docs/netbox_devices.csv' +NETBOX_SITES_PATH = './docs/netbox_sites.csv' ENV_VARS = load_env_vars.load_env_vars(os.environ, ENV_VARS) -HEADERS = { - 'Authorization': f'Token {ENV_VARS['NAUTOBOT_TOKEN']}', - 'Content-Type': 'application/json', - 'Accept': 'application/json', -} +# Initialize the Nautobot client +nautobot = Nautobot( + url=ENV_VARS['NAUTOBOT_URL'], + token=ENV_VARS['NAUTOBOT_TOKEN'] +) def create_site(site): - url = f"{ENV_VARS['NAUTOBOT_URL']}/dcim/sites/" - response = requests.post(url, headers=HEADERS, json=site, verify=False) + response = nautobot.dcim.sites.create(**site) if response.status_code == 201: print(f"Site '{site['name']}' created successfully.") @@ -29,70 +30,154 @@ def create_site(site): print(f"Failed to create site '{site['name']}': {response.text}") def create_device(device): - url = f"{ENV_VARS['NAUTOBOT_URL']}/dcim/devices/" - response = requests.post(url, headers=HEADERS, json=device, verify=False) + response = nautobot.dcim.devices.create(**device) if response.status_code == 201: print(f"Device '{device['name']}' created successfully.") - return response.json()['id'] # Return device ID for further use + return response.id # Return device ID for further use print(f"Failed to create device '{device['name']}': {response.text}") def create_interface(device_id, interface): - url = f"{ENV_VARS['NAUTOBOT_URL']}/dcim/interfaces/" interface['device'] = device_id - - response = requests.post(url, headers=HEADERS, json=interface, verify=False) + response = nautobot.dcim.interfaces.create(**interface) if response.status_code == 201: print(f"Interface '{interface['name']}' created successfully.") - return response.json()['id'] + return response.id print(f"Failed to create interface '{interface['name']}': {response.text}") def assign_ip_to_interface(interface_id, ip): - url = f"{ENV_VARS['NAUTOBOT_URL']}/ipam/ip-addresses/" ip_data = { "address": ip, "assigned_object_type": "dcim.interface", "assigned_object_id": interface_id } - - response = requests.post(url, headers=HEADERS, json=ip_data, verify=False) + response = nautobot.ipam.ip_addresses.create(**ip_data) if response.status_code == 201: print(f"IP Address '{ip}' assigned successfully.") else: print(f"Failed to assign IP Address '{ip}': {response.text}") -# Import Sites from CSV -with open('docs/netbox_sites.csv', mode='r') as csvfile: - reader = csv.DictReader(csvfile) - for row in reader: - site = { - "name": row['Name'], - "slug": row['Slug'], - "status": "active" if row['Active'] == "Active" else "inactive", - "region": None, # Adjust this if you are using regions - "description": row['Comments'], - # Include more fields as needed - } - create_site(site) - -# Import Devices from CSV -with open('docs/netbox_devices.csv', mode='r') as csvfile: - reader = csv.DictReader(csvfile) - for row in reader: - device = { - "name": row['Name'], - "device_type": None, # Specify device type ID - "device_role": None, # Specify device role ID - "site": row['Name'][:4].upper(), # Match site by first four letters of device name - "status": "active" if row['Status'] == "Active" else "planned", - # Include more fields as needed - } - device_id = create_device(device) - if device_id and row.get('IP Address'): - interface_id = create_interface(device_id, {"name": "Management", "type": "virtual"}) - if interface_id: - assign_ip_to_interface(interface_id, row['IP Address']) + +def main(): + # Import Sites from CSV + with open(NETBOX_SITES_PATH, mode='r') as csvfile: + reader = csv.DictReader(csvfile) + for row in reader: + site = { + "name": row['Name'], + "slug": row['Slug'], + "status": "active" if row['Active'] == "Active" else "inactive", + "region": None, # Adjust this if you are using regions + "description": row['Comments'], + # Include more fields as needed + } + create_site(site) + + # Import Devices from CSV + with open(NETBOX_DEVICES_PATH, mode='r') as csvfile: + reader = csv.DictReader(csvfile) + for row in reader: + device = { + "name": row['Name'], + "device_type": None, # Specify device type ID + "device_role": None, # Specify device role ID + "site": row['Name'][:4].upper(), # Match site by first four letters of device name + "status": "active" if row['Status'] == "Active" else "planned", + # Include more fields as needed + } + device_id = create_device(device) + if device_id and row.get('IP Address'): + interface_id = create_interface(device_id, {"name": "Management", "type": "virtual"}) + if interface_id: + assign_ip_to_interface(interface_id, row['IP Address']) + +if __name__ == "__main__": + main() + +# HEADERS = { +# 'Authorization': f'Token {ENV_VARS['NAUTOBOT_TOKEN']}', +# 'Content-Type': 'application/json', +# 'Accept': 'application/json', +# } + +# def create_site(site): +# url = f"{ENV_VARS['NAUTOBOT_URL']}/dcim/sites/" +# response = requests.post(url, headers=HEADERS, json=site, verify=False) + +# if response.status_code == 201: +# print(f"Site '{site['name']}' created successfully.") +# else: +# print(f"Failed to create site '{site['name']}': {response.text}") + +# def create_device(device): +# url = f"{ENV_VARS['NAUTOBOT_URL']}/dcim/devices/" +# response = requests.post(url, headers=HEADERS, json=device, verify=False) + +# if response.status_code == 201: +# print(f"Device '{device['name']}' created successfully.") +# return response.json()['id'] # Return device ID for further use + +# print(f"Failed to create device '{device['name']}': {response.text}") + +# def create_interface(device_id, interface): +# url = f"{ENV_VARS['NAUTOBOT_URL']}/dcim/interfaces/" +# interface['device'] = device_id + +# response = requests.post(url, headers=HEADERS, json=interface, verify=False) + +# if response.status_code == 201: +# print(f"Interface '{interface['name']}' created successfully.") +# return response.json()['id'] + +# print(f"Failed to create interface '{interface['name']}': {response.text}") + +# def assign_ip_to_interface(interface_id, ip): +# url = f"{ENV_VARS['NAUTOBOT_URL']}/ipam/ip-addresses/" +# ip_data = { +# "address": ip, +# "assigned_object_type": "dcim.interface", +# "assigned_object_id": interface_id +# } + +# response = requests.post(url, headers=HEADERS, json=ip_data, verify=False) + +# if response.status_code == 201: +# print(f"IP Address '{ip}' assigned successfully.") +# else: +# print(f"Failed to assign IP Address '{ip}': {response.text}") + +# # Import Sites from CSV +# with open('docs/netbox_sites.csv', mode='r') as csvfile: +# reader = csv.DictReader(csvfile) +# for row in reader: +# site = { +# "name": row['Name'], +# "slug": row['Slug'], +# "status": "active" if row['Active'] == "Active" else "inactive", +# "region": None, # Adjust this if you are using regions +# "description": row['Comments'], +# # Include more fields as needed +# } +# create_site(site) + +# # Import Devices from CSV +# with open('docs/netbox_devices.csv', mode='r') as csvfile: +# reader = csv.DictReader(csvfile) +# for row in reader: +# device = { +# "name": row['Name'], +# "device_type": None, # Specify device type ID +# "device_role": None, # Specify device role ID +# "site": row['Name'][:4].upper(), # Match site by first four letters of device name +# "status": "active" if row['Status'] == "Active" else "planned", +# # Include more fields as needed +# } +# device_id = create_device(device) +# if device_id and row.get('IP Address'): +# interface_id = create_interface(device_id, {"name": "Management", "type": "virtual"}) +# if interface_id: +# assign_ip_to_interface(interface_id, row['IP Address']) diff --git a/mdd/scripts/netbox_class.py b/mdd/scripts/netbox_class.py index d6fc566..58d9dbe 100644 --- a/mdd/scripts/netbox_class.py +++ b/mdd/scripts/netbox_class.py @@ -1,19 +1,68 @@ import json import requests +import load_env_vars +import os + class netbox_helper(): - def __init__(self, NETBOX_URL, NETBOX_HEADERS): - self.netbox_url=NETBOX_URL - self.netbox_headers=NETBOX_HEADERS + def __init__(self, headers={}, verify_ssl=False): + ENV_VARS = { + "NETBOX_URL" : None, + "NETBOX_TOKEN" : None + } + self.ENV_VARS = load_env_vars.load_env_vars(os.environ, ENV_VARS) + + self.headers = {'Authorization': f'Token {self.ENV_VARS['NETBOX_TOKEN']}', "Content-Type": "application/json", "Accept": "application/json"} + self.headers.update(headers) - def fetch_netbox_devices(self): - """Fetch all devices from NetBox.""" + self.verify_ssl = verify_ssl - url = f"{self.netbox_url}/api/dcim/devices/" - response = requests.get(url, headers=self.netbox_headers, verify=False) + def get_netbox_devices(self, query=""): + """Fetch devices from NetBox""" + + url = f"{self.ENV_VARS["NETBOX_URL"]}/api/dcim/devices/{query}" + response = requests.get(url, headers=self.headers, verify=self.verify_ssl) if response.status_code != 200: print(f"Failed to retrieve devices from NetBox: {response.status_code}, {response.text}") return [] return response.json().get('results', []) + + def get_netbox_sites(self, query=""): + """Fetch sites from NetBox""" + + url = f"{self.ENV_VARS["NETBOX_URL"]}/api/dcim/sites/{query}" + response = requests.get(url, headers=self.headers, verify=self.verify_ssl) + + if response.status_code != 200: + print(f"Failed to retrieve sites from NetBox: {response.status_code}, {response.text}") + return [] + + return response.json().get('results', []) + + def get_netbox_device_type(self, query=""): + """Gets device_types""" + + url = f"{self.ENV_VARS['NETBOX_URL']}/api/dcim/device-types/{query}" + response = requests.get(url, headers=self.headers, verify=self.verify_ssl) + + if response.status_code != 200: + print(f"Failed to retrieve sites from NetBox: {response.status_code}, {response.text}") + return [] + + return response.json().get('results', []) + + def get_netbox_device_type(self, query=""): + """Gets device_types""" + + url = f"{self.ENV_VARS['NETBOX_URL']}/api/dcim/device-types/{query}" + response = requests.get(url, headers=self.headers, verify=self.verify_ssl) + + if response.status_code != 200: + print(f"Failed to retrieve sites from NetBox: {response.status_code}, {response.text}") + return [] + + return response.json().get('results', []) + + response = requests.post(f"{ENV_VARS['NETBOX_URL']}/api/dcim/device-types/", headers=headers, data=json.dumps(payload), verify=False) diff --git a/mdd/scripts/netbox_device_add.py b/mdd/scripts/netbox_device_add.py index f304538..645ee8f 100644 --- a/mdd/scripts/netbox_device_add.py +++ b/mdd/scripts/netbox_device_add.py @@ -3,9 +3,10 @@ import json import os import load_env_vars +from pynetbox import api ENV_VARS = { - "ENV_VARS['NETBOX_URL']": None, + "NETBOX_URL": None, "NETBOX_TOKEN": None, "NSO_URL": None, "NSO_USERNAME": None, @@ -18,86 +19,137 @@ HEADERS = {"Authorization": f"Token {ENV_VARS['NETBOX_TOKEN']}", "Content-Type": "application/json", "Accept": "application/json"} +netbox = api( + url=ENV_VARS['NETBOX_URL'], + token=ENV_VARS['NETBOX_TOKEN'] + ) + # Disable SSL warnings urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) -def get_or_create_device_type(platform_data, headers, ENV_VARS['NETBOX_URL']): +def get_or_create_device_type(platform_data): """ Ensures the device type exists in NetBox based on the NSO platform data, and returns the device type ID. """ - model = platform_data["tailf-ncs:platform"]["model"] - slug = model.replace("-", "_").lower() + # model = platform_data["tailf-ncs:platform"]["model"] + # slug = model.replace("-", "_").lower() - # Check if the device type already exists - search_response = requests.get(f"{ENV_VARS['NETBOX_URL']}/api/dcim/device-types/?slug={slug}", headers=headers, verify=False) + # # Check if the device type already exists + # search_response = requests.get(f"{ENV_VARS['NETBOX_URL']}/api/dcim/device-types/?slug={slug}", headers=HEADERS, verify=False) - if search_response.status_code != 200: - return None + # if search_response.status_code != 200: + # return None - if search_response.json()['count'] > 0: - # Device type exists - return search_response.json()["results"][0]["id"] + # if search_response.json()['count'] > 0: + # # Device type exists + # return search_response.json()["results"][0]["id"] - # Device type does not exist, proceed with creation - payload = { - "manufacturer": "2", # Replace with actual manufacturer ID - "model": model, - "slug": slug, - "part_number": model, # Using model as part_number for simplicity - "name": model - } + model = platform_data["tailf-ncs:platform"]["model"] + slug = model.replace("-", "_").lower() - response = requests.post(f"{ENV_VARS['NETBOX_URL']}/api/dcim/device-types/", headers=headers, data=json.dumps(payload), verify=False) + # Search for the device type in NetBox + device_types = netbox.dcim.device_types.filter(slug=slug) - if response.status_code not in [200, 201]: - print(f"Failed to add device type {model}: {response.status_code}, {response.text}") + # Check if any device types were found + if device_types: + device_types[0].id + + ## MEANS NOT THERE - SO CREATE IT + + # Create the device type using pynetbox + device_type = netbox.dcim.device_types.create( + manufacturer="2", + model=model, + slug=slug, + part_number=model, # Using model as part_number for simplicity + name=model + ) + + # Check if the device type creation was successful + if device_type.id is None: + print(f"Failed to add device type {model}") return None print(f"Successfully added device type: {model}") - return response.json()["id"] + return device_type.id + + # response = requests.post(f"{ENV_VARS['NETBOX_URL']}/api/dcim/device-types/", headers=headers, data=json.dumps(payload), verify=False) + + # if response.status_code not in [200, 201]: + # print(f"Failed to add device type {model}: {response.status_code}, {response.text}") + # return None -def update_device(platform_data, device_type_id, headers, ENV_VARS['NETBOX_URL']): + # print(f"Successfully added device type: {model}") + # return response.json()["id"] + +def update_device(self, device_name, platform_data, device_type_id): # ASK WHY DEVICE_NAME = "" - get all? """ Updates an existing device in NetBox with the device type based on NSO platform data. """ - device_name = "" serial_number = platform_data["tailf-ncs:platform"]["serial-number"] - # Attempt to find the device in NetBox - search_response = requests.get(f"{ENV_VARS['NETBOX_URL']}/api/dcim/devices/?name={device_name}", headers=headers, verify=False) - - if search_response.status_code != 200: - return + # Search for the device in NetBox + devices = self.netbox.dcim.devices.filter(name=device_name) - if search_response.json()['count'] == 0: + # Check if the device exists + if not devices: print(f"Device {device_name} does not exist. Skipping.") return - device_id = search_response.json()["results"][0]["id"] - payload = { - "device_type": device_type_id, - "serial": serial_number - } + # Get the ID of the first matching device + device_id = devices[0].id + + # Update the device with the new device type and serial number + device = self.netbox.dcim.devices.get(device_id) + device.device_type = device_type_id + device.serial = serial_number + device.save() + + print(f"Successfully updated device: {device.name}") + +# def update_device(platform_data, device_type_id): +# """ +# Updates an existing device in NetBox with the device type based on NSO platform data. +# """ + +# device_name = "" +# serial_number = platform_data["tailf-ncs:platform"]["serial-number"] + +# # Attempt to find the device in NetBox +# search_response = requests.get(f"{ENV_VARS['NETBOX_URL']}/api/dcim/devices/?name={device_name}", headers=HEADERS, verify=False) + +# if search_response.status_code != 200: +# return + +# if search_response.json()['count'] == 0: +# print(f"Device {device_name} does not exist. Skipping.") +# return + +# device_id = search_response.json()["results"][0]["id"] +# payload = { +# "device_type": device_type_id, +# "serial": serial_number +# } - update_response = requests.patch(f"{ENV_VARS['NETBOX_URL']}/api/dcim/devices/{device_id}/", headers=headers, data=json.dumps(payload), verify=False) +# update_response = requests.patch(f"{ENV_VARS['NETBOX_URL']}/api/dcim/devices/{device_id}/", headers=HEADERS, data=json.dumps(payload), verify=False) - if update_response.status_code in [200, 201, 204]: - print(f"Successfully updated device: {device_name}") - else: - print(f"Failed to update device {device_name}: {update_response.status_code}, {update_response.text}") +# if update_response.status_code in [200, 201, 204]: +# print(f"Successfully updated device: {device_name}") +# else: +# print(f"Failed to update device {device_name}: {update_response.status_code}, {update_response.text}") # Make the GET request to retrieve the platform information -response = requests.get(ENV_VARS['PLATFORM_URL'], auth=(ENV_VARS['NSO_USERNAME'], ENV_VARS['NSO_PASSWORD']), headers={"Accept": "application/yang-data+json"}, verify=False) +response = requests.get(ENV_VARS['NSO_URL'], auth=(ENV_VARS['NSO_USERNAME'], ENV_VARS['NSO_PASSWORD']), headers={"Accept": "application/yang-data+json"}, verify=False) if response.status_code != 200: print(f"Failed to retrieve platform information: {response.status_code}, {response.text}") os._exit(1) print("Platform information retrieved successfully") platform_data = response.json() -device_type_id = get_or_create_device_type(platform_data, HEADERS, ENV_VARS['NETBOX_URL']) +device_type_id = get_or_create_device_type(platform_data) if device_type_id: - update_device(platform_data, device_type_id, HEADERS, ENV_VARS['NETBOX_URL']) + update_device("", platform_data, device_type_id) diff --git a/mdd/scripts/netbox_device_csv.py b/mdd/scripts/netbox_device_csv.py index ed155f1..e898a90 100644 --- a/mdd/scripts/netbox_device_csv.py +++ b/mdd/scripts/netbox_device_csv.py @@ -1,136 +1,198 @@ import csv import os -import requests -import urllib3 -import netbox_class +from pynetbox import api +import load_env_vars -urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - -# Load environment variables -NETBOX_URL = os.getenv('NETBOX_URL') -NETBOX_TOKEN = os.getenv('NETBOX_TOKEN') +ENV_VARS = { + "NETBOX_URL": None, + "NETBOX_TOKEN": None +} DEVICES_CSV_FILE_PATH = './docs/netbox_devices.csv' DEVICE_TYPE_LOOKUP_CSV_PATH = './docs/device_type_lookup.csv' -headers = {"Authorization": f"Token {NETBOX_TOKEN}", "Content-Type": "application/json"} +ENV_VARS = load_env_vars.load_env_vars(os.environ, ENV_VARS) + +# Initialize NetBox API +netbox = api(url=ENV_VARS['NETBOX_URL'], token=ENV_VARS['NETBOX_TOKEN']) def fetch_id(endpoint, search_param, search_field='name'): - url = f"{NETBOX_URL}/api/{endpoint}/" - params = {search_field: search_param} + try: + objects = getattr(netbox.dcim, endpoint).filter(**{search_field: search_param}) + return objects[0].id if objects else None + except Exception as e: + print(f"Error fetching ID for {search_param} from {endpoint}: {e}") + return None - response = requests.get(url, headers=headers, params=params, verify=False) +def fetch_sites(): + return {site.slug.upper(): site.id for site in netbox.dcim.sites.all()} + +def add_management_interface_and_ip(device, ip_address): + try: + interface = device.interfaces.create(name="Management", type="virtual", enabled=True) + interface.ip_addresses.create(address=ip_address) + device.primary_ip4 = interface.primary_ip.id + device.save() + print(f"Management interface added to device {device.name} with IP {ip_address}.") + except Exception as e: + print(f"Error adding management interface to device {device.name}: {e}") - if response.status_code != 200: - print(f"Error fetching ID for {search_param} from {endpoint}.") - return None +def create_device(row, site_mapping, device_lookup): + site_id = site_mapping.get(row['Name'][:4].upper(), site_mapping.get('DEFAULT')) + device_code = row['Name'][9:11] + lookup = device_lookup.get(device_code, {}) + device_role_id = fetch_id('device_roles', lookup.get('DeviceRole')) + device_type_id = fetch_id('device_types', lookup.get('DeviceType'), 'model') + + if not all([site_id, device_role_id, device_type_id]): + print(f"Skipping device creation for {row['Name']} due to missing required IDs.") + return - if response.json()['count'] == 1: - return response.json()['results'][0]['id'] + try: + device = netbox.dcim.devices.create( + name=row['Name'], + site=site_id, + device_role=device_role_id, + device_type=device_type_id, + serial=row.get('Serial number', '') + ) + print(f"Device '{row['Name']}' created successfully.") + add_management_interface_and_ip(device, row.get('IP Address', '')) + except Exception as e: + print(f"Failed to create device '{row['Name']}': {e}") - if response.json()['count'] > 1: - print(f"Multiple matches found for {search_param} from {endpoint}, using the first one.") - return response.json()['results'][0]['id'] +def main(): + print("Script started.") + site_mapping = fetch_sites() + print(f"Loaded {len(site_mapping)} sites.") - return None # count == 0 + device_lookup = {row['Code']: {'DeviceRole': row['DeviceRole'], 'DeviceType': row['DeviceType']} for row in csv.DictReader(open(DEVICE_TYPE_LOOKUP_CSV_PATH))} + print(f"Loaded device lookup with codes: {list(device_lookup.keys())}") -def load_device_lookup(): - lookup = {} - with open(DEVICE_TYPE_LOOKUP_CSV_PATH, mode='r', encoding='utf-8') as csvfile: - reader = csv.DictReader(csvfile) - for row in reader: - lookup[row['Code']] = {'DeviceRole': row['DeviceRole'], 'DeviceType': row['DeviceType']} - return lookup + for row in csv.DictReader(open(DEVICES_CSV_FILE_PATH)): + print(f"Processing device {row['Name']}...") + create_device(row, site_mapping, device_lookup) -def fetch_sites(): - site_mapping = {} - url = f"{NETBOX_URL}/api/dcim/sites/" - response = requests.get(url, headers=headers, verify=False) +if __name__ == "__main__": + main() - for site in response.json().get('results', []): - site_mapping[site['slug'].upper()] = site['id'] - return site_mapping +# def fetch_id(endpoint, search_param, search_field='name'): +# url = f"{ENV_VARS['NETBOX_URL']}/api/{endpoint}/" +# params = {search_field: search_param} -def add_management_interface_and_ip(device_id, ip_address): - print(f"Starting to add management interface to device ID {device_id}...") +# response = requests.get(url, headers=headers, params=params, verify=False) - # Step 1: Create the management interface - url = f"{NETBOX_URL}/api/dcim/interfaces/" - data = {"device": device_id, "name": "Management", "type": "virtual", "enabled": True} - interface_response = requests.post(url, headers=headers, json=data, verify=False) +# if response.status_code != 200: +# print(f"Error fetching ID for {search_param} from {endpoint}.") +# return None - if interface_response.status_code not in [200, 201]: - print(f"Failed to create 'Management' interface for device ID {device_id}: {interface_response.text}") - return +# if response.json()['count'] == 1: +# return response.json()['results'][0]['id'] - interface_id = interface_response.json()['id'] - print(f"'Management' interface with ID {interface_id} created successfully.") +# if response.json()['count'] > 1: +# print(f"Multiple matches found for {search_param} from {endpoint}, using the first one.") +# return response.json()['results'][0]['id'] + +# return None # count == 0 - # Step 2: Assign the IP address to the management interface - ip_url = f"{NETBOX_URL}/api/ipam/ip-addresses/" - ip_data = { - "address": ip_address, - "assigned_object_id": interface_id, - "assigned_object_type": "dcim.interface" - } - ip_response = requests.post(ip_url, headers=headers, json=ip_data, verify=False) +# def load_device_lookup(): +# lookup = {} +# with open(DEVICE_TYPE_LOOKUP_CSV_PATH, mode='r', encoding='utf-8') as csvfile: +# reader = csv.DictReader(csvfile) +# for row in reader: +# lookup[row['Code']] = {'DeviceRole': row['DeviceRole'], 'DeviceType': row['DeviceType']} +# return lookup - if ip_response.status_code not in [200, 201]: - print(f"Failed to assign IP address {ip_address} to 'Management' interface: {ip_response.text}") - return +# def fetch_sites(): +# site_mapping = {} +# url = f"{ENV_VARS['NETBOX_URL']}/api/dcim/sites/" +# response = requests.get(url, headers=headers, verify=False) - print(f"IP address {ip_address} assigned successfully to 'Management' interface.") - ip_id = ip_response.json()['id'] +# for site in response.json().get('results', []): +# site_mapping[site['slug'].upper()] = site['id'] - # Step 3: Set the IP address as the primary IP for the device - device_update_url = f"{NETBOX_URL}/api/dcim/devices/{device_id}/" - device_update_data = { - "primary_ip4": ip_id - } - device_update_response = requests.patch(device_update_url, headers=headers, json=device_update_data, verify=False) +# return site_mapping - if device_update_response.status_code in [200, 201, 204]: - print(f"IP address {ip_address} set as primary IP for device ID {device_id}.") - else: - print(f"Failed to set IP address as primary for device ID {device_id}: {device_update_response.text}") +# # def add_management_interface_and_ip(device_id, ip_address): +# # print(f"Starting to add management interface to device ID {device_id}...") -def create_device(row, site_mapping, device_lookup): - site_prefix = row['Name'][:4].upper() - site_id = site_mapping.get(site_prefix, site_mapping.get('DEFAULT')) - device_code = row['Name'][9:11] - lookup = device_lookup.get(device_code, {}) +# # # Step 1: Create the management interface +# # url = f"{ENV_VARS['NETBOX_URL']}/api/dcim/interfaces/" +# # data = {"device": device_id, "name": "Management", "type": "virtual", "enabled": True} +# # interface_response = requests.post(url, headers=headers, json=data, verify=False) - device_role_id = fetch_id('dcim/device-roles', lookup.get('DeviceRole'), 'name') - device_type_id = fetch_id('dcim/device-types', lookup.get('DeviceType'), 'model') +# # if interface_response.status_code not in [200, 201]: +# # print(f"Failed to create 'Management' interface for device ID {device_id}: {interface_response.text}") +# # return - if not (site_id and device_role_id and device_type_id): - print(f"Skipping device creation for {row['Name']} due to missing required IDs.") - return +# # interface_id = interface_response.json()['id'] +# # print(f"'Management' interface with ID {interface_id} created successfully.") - data = {"name": row['Name'], "site": site_id, "device_role": device_role_id, "device_type": device_type_id, "serial": row.get('Serial number', '')} - device_response = requests.post(f"{NETBOX_URL}/api/dcim/devices/", headers=headers, json=data, verify=False) +# # # Step 2: Assign the IP address to the management interface +# # ip_url = f"{ENV_VARS['NETBOX_URL']}/api/ipam/ip-addresses/" +# # ip_data = { +# # "address": ip_address, +# # "assigned_object_id": interface_id, +# # "assigned_object_type": "dcim.interface" +# # } +# # ip_response = requests.post(ip_url, headers=headers, json=ip_data, verify=False) - if device_response.status_code in [200, 201]: - print(f"Device '{row['Name']}' created successfully.") - device_id = device_response.json()['id'] - add_management_interface_and_ip(device_id, row.get('IP Address', '')) - else: - print(f"Failed to create device '{row['Name']}': {device_response.text}") +# # if ip_response.status_code not in [200, 201]: +# # print(f"Failed to assign IP address {ip_address} to 'Management' interface: {ip_response.text}") +# # return -def main(): - print("Script started.") +# # print(f"IP address {ip_address} assigned successfully to 'Management' interface.") +# # ip_id = ip_response.json()['id'] - site_mapping = fetch_sites() - print(f"Loaded {len(site_mapping)} sites.") +# # # Step 3: Set the IP address as the primary IP for the device +# # device_update_url = f"{ENV_VARS['NETBOX_URL']}/api/dcim/devices/{device_id}/" +# # device_update_data = { +# # "primary_ip4": ip_id +# # } +# # device_update_response = requests.patch(device_update_url, headers=headers, json=device_update_data, verify=False) - device_lookup = load_device_lookup() - print(f"Loaded device lookup with codes: {list(device_lookup.keys())}") +# # if device_update_response.status_code in [200, 201, 204]: +# # print(f"IP address {ip_address} set as primary IP for device ID {device_id}.") +# # else: +# # print(f"Failed to set IP address as primary for device ID {device_id}: {device_update_response.text}") - with open(DEVICES_CSV_FILE_PATH, mode='r', encoding='utf-8') as csvfile: - reader = csv.DictReader(csvfile) - for row in reader: - print(f"Processing device {row['Name']}...") - create_device(row, site_mapping, device_lookup) +# def create_device(row, site_mapping, device_lookup): +# site_prefix = row['Name'][:4].upper() +# site_id = site_mapping.get(site_prefix, site_mapping.get('DEFAULT')) +# device_code = row['Name'][9:11] +# lookup = device_lookup.get(device_code, {}) -if __name__ == "__main__": - main() \ No newline at end of file +# device_role_id = fetch_id('dcim/device-roles', lookup.get('DeviceRole'), 'name') +# device_type_id = fetch_id('dcim/device-types', lookup.get('DeviceType'), 'model') + +# if not (site_id and device_role_id and device_type_id): +# print(f"Skipping device creation for {row['Name']} due to missing required IDs.") +# return + +# data = {"name": row['Name'], "site": site_id, "device_role": device_role_id, "device_type": device_type_id, "serial": row.get('Serial number', '')} +# device_response = requests.post(f"{ENV_VARS['NETBOX_URL']}/api/dcim/devices/", headers=headers, json=data, verify=False) + +# if device_response.status_code in [200, 201]: +# print(f"Device '{row['Name']}' created successfully.") +# device_id = device_response.json()['id'] +# add_management_interface_and_ip(device_id, row.get('IP Address', '')) +# else: +# print(f"Failed to create device '{row['Name']}': {device_response.text}") + +# def main(): +# print("Script started.") + +# site_mapping = fetch_sites() +# print(f"Loaded {len(site_mapping)} sites.") + +# device_lookup = load_device_lookup() +# print(f"Loaded device lookup with codes: {list(device_lookup.keys())}") + +# with open(DEVICES_CSV_FILE_PATH, mode='r', encoding='utf-8') as csvfile: +# reader = csv.DictReader(csvfile) +# for row in reader: +# print(f"Processing device {row['Name']}...") +# create_device(row, site_mapping, device_lookup) + +# if __name__ == "__main__": +# main() \ No newline at end of file diff --git a/mdd/scripts/netbox_generate_sites.py b/mdd/scripts/netbox_generate_sites.py index bee1311..803781f 100644 --- a/mdd/scripts/netbox_generate_sites.py +++ b/mdd/scripts/netbox_generate_sites.py @@ -1,72 +1,126 @@ -import csv -import requests import os -import urllib3 +import csv from urllib.parse import urljoin -urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - -# Load environment variables -NETBOX_URL = os.getenv('NETBOX_URL') -NETBOX_TOKEN = os.getenv('NETBOX_TOKEN') - -headers = {'Authorization': f'Token {NETBOX_TOKEN}', 'Content-Type': 'application/json'} +from pynetbox import api +import load_env_vars +ENV_VARS = { + "NETBOX_URL": None, + "NETBOX_TOKEN": None +} # Path to your CSV file csv_file_path = './docs/netbox_sites.csv' +ENV_VARS = load_env_vars.load_env_vars(os.environ, ENV_VARS) + +# Initialize NetBox API +netbox = api(url=ENV_VARS['NETBOX_URL'], token=ENV_VARS['NETBOX_TOKEN']) + def create_region(region_name): """Create a region in NetBox if it doesn't already exist.""" - - if region_name == '': + if not region_name: return None - url = urljoin(NETBOX_URL, 'api/dcim/regions/') - response = requests.get(url, headers=headers, params={'name': region_name}, verify=False) # Added verify=False here + try: + region = netbox.dcim.regions.get(name=region_name) + print(f"Region '{region_name}' already exists.") + return region.id + except netbox.core.query.RequestError: + try: + region = netbox.dcim.regions.create(name=region_name, slug=region_name.lower()) + print(f"Region '{region_name}' created successfully.") + return region.id + except Exception as e: + print(f"Failed to create region '{region_name}': {e}") + return None - if response.status_code != 200: - print(f"Failed to check if region '{region_name}' exists.") - return None +def create_site(row): + """Create a site in NetBox using row data from the CSV""" + region_id = create_region(row['Region']) - if response.json()['count'] > 0: - print(f"Region '{region_name}' already exists.") - return response.json()['results'][0]['id'] + try: + site = netbox.dcim.sites.create( + name=row['Name'], + slug=row['Slug'], + status='active' if row['Active'] == 'Active' else 'inactive', + region=region_id, + latitude=row['Latitude'] if row['Latitude'] else None, + longitude=row['Longitude'] if row['Longitude'] else None, + description=row['Comments'] + ) + print(f"Site '{row['Name']}' created successfully.") + except Exception as e: + print(f"Failed to create site '{row['Name']}': {e}") - # Region doesn't exist - create it - data = {'name': region_name, 'slug': region_name.lower()} - create_response = requests.post(url, json=data, headers=headers, verify=False) # And here +def main(): + # Read the CSV file and process each row + with open(csv_file_path, mode='r', encoding='utf-8') as csvfile: + reader = csv.DictReader(csvfile) + for row in reader: + create_site(row) - if create_response.status_code in [200, 201]: - print(f"Region '{region_name}' created successfully.") - return create_response.json()['id'] +if __name__ == "__main__": + main() - print(f"Failed to create region '{region_name}'.") - return None -def create_site(row): - """Create a site in NetBox using row data from the CSV""" +# headers = {'Authorization': f'Token {ENV_VARS["NETBOX_TOKEN"]}', 'Content-Type': 'application/json'} - region_id = create_region(row['Region']) - url = urljoin(NETBOX_URL, 'api/dcim/sites/') - headers = {'Authorization': f'Token {NETBOX_TOKEN}', 'Content-Type': 'application/json'} - data = { - 'name': row['Name'], - 'slug': row['Slug'], - 'status': 'active' if row['Active'] == 'Active' else 'inactive', - 'region': region_id, - 'latitude': row['Latitude'] if row['Latitude'] else None, - 'longitude': row['Longitude'] if row['Longitude'] else None, - 'description': row['Comments'], - } - - response = requests.post(url, json=data, headers=headers, verify=False) # Added verify=False here - - if response.status_code in [200, 201]: - print(f"Site '{row['Name']}' created successfully.") - else: - print(f"Failed to create site '{row['Name']}': {response.text}") - -# Read the CSV file and process each row -with open(csv_file_path, mode='r', encoding='utf-8') as csvfile: - reader = csv.DictReader(csvfile) - for row in reader: - create_site(row) +# # Path to your CSV file +# csv_file_path = './docs/netbox_sites.csv' + +# def create_region(region_name): +# """Create a region in NetBox if it doesn't already exist.""" + +# if region_name == '': +# return None + +# url = urljoin(ENV_VARS['NETBOX_URL'], 'api/dcim/regions/') +# response = requests.get(url, headers=headers, params={'name': region_name}, verify=False) # Added verify=False here + +# if response.status_code != 200: +# print(f"Failed to check if region '{region_name}' exists.") +# return None + +# if response.json()['count'] > 0: +# print(f"Region '{region_name}' already exists.") +# return response.json()['results'][0]['id'] + +# # Region doesn't exist - create it +# data = {'name': region_name, 'slug': region_name.lower()} +# create_response = requests.post(url, json=data, headers=headers, verify=False) # And here + +# if create_response.status_code in [200, 201]: +# print(f"Region '{region_name}' created successfully.") +# return create_response.json()['id'] + +# print(f"Failed to create region '{region_name}'.") +# return None + +# def create_site(row): +# """Create a site in NetBox using row data from the CSV""" + +# region_id = create_region(row['Region']) +# url = urljoin(ENV_VARS['NETBOX_URL'], 'api/dcim/sites/') +# headers = {'Authorization': f'Token {NETBOX_TOKEN}', 'Content-Type': 'application/json'} +# data = { +# 'name': row['Name'], +# 'slug': row['Slug'], +# 'status': 'active' if row['Active'] == 'Active' else 'inactive', +# 'region': region_id, +# 'latitude': row['Latitude'] if row['Latitude'] else None, +# 'longitude': row['Longitude'] if row['Longitude'] else None, +# 'description': row['Comments'], +# } + +# response = requests.post(url, json=data, headers=headers, verify=False) # Added verify=False here + +# if response.status_code in [200, 201]: +# print(f"Site '{row['Name']}' created successfully.") +# else: +# print(f"Failed to create site '{row['Name']}': {response.text}") + +# # Read the CSV file and process each row +# with open(csv_file_path, mode='r', encoding='utf-8') as csvfile: +# reader = csv.DictReader(csvfile) +# for row in reader: +# create_site(row) diff --git a/mdd/scripts/nso_acl_ip_to_nautobot.py b/mdd/scripts/nso_acl_ip_to_nautobot.py index 34aab12..57e708c 100644 --- a/mdd/scripts/nso_acl_ip_to_nautobot.py +++ b/mdd/scripts/nso_acl_ip_to_nautobot.py @@ -2,35 +2,47 @@ import requests import base64 import re -import json import urllib3 - -# Environment variables -NSO_URL = os.getenv('NSO_URL') -NSO_USERNAME = os.getenv('NSO_USERNAME') -NSO_PASSWORD = os.getenv('NSO_PASSWORD') -NAUTOBOT_URL = os.getenv('NAUTOBOT_URL') -NAUTOBOT_TOKEN = os.getenv('NAUTOBOT_TOKEN') +import load_env_vars +from nautobot import Nautobot +from nautobot.extras.models import Status # Disable SSL warnings urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) +ENV_VARS = { + "NSO_URL": None, + "NSO_USERNAME": None, + "NSO_PASSWORD": None, + "NAUTOBOT_URL": None, + "NAUTOBOT_TOKEN": None +} +ENV_VARS = load_env_vars.load_env_vars(os.environ, ENV_VARS) + +# Initialize the Nautobot client +nautobot = Nautobot( + url=ENV_VARS['NAUTOBOT_URL'], + token=ENV_VARS['NAUTOBOT_TOKEN'] +) + +ip_pattern = re.compile(r'permit\s+(\d+\.\d+\.\d+\.\d+)(?:\s+(\d+\.\d+\.\d+\.\d+))?') + # Headers setup nso_headers = { - 'Authorization': f'Basic {base64.b64encode(f"{NSO_USERNAME}:{NSO_PASSWORD}".encode()).decode()}', + 'Authorization': f'Basic {base64.b64encode(f"{ENV_VARS['NSO_USERNAME']}:{ENV_VARS["NSO_PASSWORD"]}".encode()).decode()}', 'Content-Type': 'application/yang-data+json', 'Accept': 'application/yang-data+json' } -nautobot_headers = { - 'Authorization': f'Token {NAUTOBOT_TOKEN}', - 'Content-Type': 'application/json', - 'Accept': 'application/json' -} +# nautobot_headers = { +# 'Authorization': f'Token {ENV_VARS["NAUTOBOT_TOKEN"]}', +# 'Content-Type': 'application/json', +# 'Accept': 'application/json' +# } def fetch_nso_devices(): """Fetches a list of devices from NSO.""" - url = f"{NSO_URL}/restconf/tailf/query" + url = f"{ENV_VARS['NSO_URL']}/restconf/tailf/query" payload = { "tailf-rest-query:immediate-query": { "foreach": "/devices/device", @@ -45,7 +57,7 @@ def fetch_nso_devices(): def get_acl_for_device(device_name): """Fetches ACLs for a specified device and processes them.""" - url = f"{NSO_URL}/restconf/data/tailf-ncs:devices/device={device_name}/config/tailf-ned-cisco-ios:access-list" + url = f"{ENV_VARS['NSO_URL']}/restconf/data/tailf-ncs:devices/device={device_name}/config/tailf-ned-cisco-ios:access-list" response = requests.get(url, headers=nso_headers, verify=False) if response.status_code == 200: @@ -60,7 +72,6 @@ def get_acl_for_device(device_name): def process_acl_rule(rule_text): """Processes a single ACL rule, extracting IPs and pushing to Nautobot.""" - ip_pattern = re.compile(r'permit\s+(\d+\.\d+\.\d+\.\d+)(?:\s+(\d+\.\d+\.\d+\.\d+))?') match = ip_pattern.search(rule_text) if match: ip_address = match.group(1) @@ -77,31 +88,55 @@ def convert_to_cidr(mask): return sum([bin(int(x)).count('1') for x in mask.split('.')]) -def push_ip_to_nautobot(ip_address): # TODO: CHANGE DATA VARIABLE +def push_ip_to_nautobot(ip_address): """Pushes an IP address to Nautobot.""" - url = f"{NAUTOBOT_URL}/ipam/ip-addresses/" - data = { - "address": ip_address, - "namespace": { - "id": "fa8052af-7f81-4818-8eab-d7f08ed192a3", # Example ID, replace with actual if different - "object_type": "app_label.modelname", # Adjust as necessary - "url": "fa8052af-7f81-4818-8eab-d7f08ed192a3" # This seems incorrect; adjust based on actual requirements - }, - "type": "host", # Assuming this is constant for your use case - "status": { - "id": "cd5378a5-ed32-4f75-9e9e-980f4280e7c1", # Example status ID, replace with actual if different - "object_type": "app_label.modelname", # Adjust as necessary - "url": "https://IP/extras/statuses/cd5378a5-ed32-4f75-9e9e-980f4280e7c1" - } - } - - response = requests.post(url, headers=nautobot_headers, json=data, verify=False) - - if response.status_code in [200, 201]: + namespace_id = "fa8052af-7f81-4818-8eab-d7f08ed192a3" # Example ID, replace with actual if different + status_id = "cd5378a5-ed32-4f75-9e9e-980f4280e7c1" # Example status ID, replace with actual if different + + try: + status = Status.objects.get(pk=status_id) + except Status.DoesNotExist: + print(f"Status with ID {status_id} does not exist.") + return + + try: + nautobot.ipam.ip_addresses.create( + address=ip_address, + status=status, + assigned_object_id=namespace_id, + assigned_object_type="extras.namespace", + type="host" + ) print(f"IP {ip_address} pushed to Nautobot successfully.") - else: - print(f"Failed to push IP {ip_address} to Nautobot: {response.text}") + except Exception as e: + print(f"Failed to push IP {ip_address} to Nautobot: {e}") + +# def push_ip_to_nautobot(ip_address): # TODO: CHANGE DATA VARIABLE +# """Pushes an IP address to Nautobot.""" + +# url = f"{ENV_VARS['NAUTOBOT_URL']}/ipam/ip-addresses/" +# data = { +# "address": ip_address, +# "namespace": { +# "id": "fa8052af-7f81-4818-8eab-d7f08ed192a3", # Example ID, replace with actual if different +# "object_type": "app_label.modelname", # Adjust as necessary +# "url": "fa8052af-7f81-4818-8eab-d7f08ed192a3" # This seems incorrect; adjust based on actual requirements +# }, +# "type": "host", # Assuming this is constant for your use case +# "status": { +# "id": "cd5378a5-ed32-4f75-9e9e-980f4280e7c1", # Example status ID, replace with actual if different +# "object_type": "app_label.modelname", # Adjust as necessary +# "url": "https://IP/extras/statuses/cd5378a5-ed32-4f75-9e9e-980f4280e7c1" +# } +# } + +# response = requests.post(url, headers=nautobot_headers, json=data, verify=False) + +# if response.status_code in [200, 201]: +# print(f"IP {ip_address} pushed to Nautobot successfully.") +# else: +# print(f"Failed to push IP {ip_address} to Nautobot: {response.text}") def main(): devices = fetch_nso_devices() diff --git a/mdd/scripts/nso_class.py b/mdd/scripts/nso_class.py new file mode 100644 index 0000000..975a721 --- /dev/null +++ b/mdd/scripts/nso_class.py @@ -0,0 +1,18 @@ +import json +import requests +import load_env_vars +import os + + +class NSO_Helper(): + def __init__(self, headers={}, verify_ssl=False): + ENV_VARS = { + "NSO_URL" : None, + "NSO_TOKEN" : None + } + self.ENV_VARS = load_env_vars.load_env_vars(os.environ, ENV_VARS) + + self.headers = {'Authorization': f'Token {self.ENV_VARS['NSO_TOKEN']}', "Content-Type": "application/json", "Accept": "application/json"} + self.headers.update(headers) + + self.verify_ssl = verify_ssl \ No newline at end of file diff --git a/mdd/scripts/nso_to_netbox_device.py b/mdd/scripts/nso_to_netbox_device.py index cb86bd7..de77767 100644 --- a/mdd/scripts/nso_to_netbox_device.py +++ b/mdd/scripts/nso_to_netbox_device.py @@ -7,34 +7,70 @@ import urllib3 import json import base64 +import load_env_vars +from pynetbox import api # Setup and configurations urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) -NSO_URL = os.getenv('NSO_URL') -NSO_USERNAME = os.getenv('NSO_USERNAME') -NSO_PASSWORD = os.getenv('NSO_PASSWORD') -NETBOX_URL = os.getenv('NETBOX_URL') -NETBOX_TOKEN = os.getenv('NETBOX_TOKEN') +ENV_VARS = { + "NSO_URL": None, + "NSO_USERNAME": None, + "NSO_PASSWORD": None, + "NETBOX_URL": None, + "NETBOX_TOKEN": None +} +ENV_VARS = load_env_vars.load_env_vars(os.environ, ENV_VARS) DEVICE_TYPE_LOOKUP_CSV_PATH = './docs/device_type_lookup.csv' -headers_netbox = { - "Authorization": f"Token {NETBOX_TOKEN}", - "Content-Type": "application/json", - "Accept": "application/json" -} +# Initialize NetBox API +netbox = api(url=ENV_VARS['NETBOX_URL'], token=ENV_VARS['NETBOX_TOKEN']) + +# headers_netbox = { +# "Authorization": f"Token {ENV_VARS['NETBOX_TOKEN']}", +# "Content-Type": "application/json", +# "Accept": "application/json" +# } + +# def create_device_in_netbox(device, site_id, device_lookup): +# print(f"Creating device in NetBox: {device['name']}") + +# device_code = device['name'].split('-')[2] # Extract device code +# if device_code in device_lookup: +# device_role = fetch_id('dcim/device-roles', device_lookup[device_code]['DeviceRole'], 'name') +# device_type = fetch_id('dcim/device-types', device_lookup[device_code]['DeviceType'], 'model') +# else: +# print(f"No device type/role found for code: {device_code}") +# return + +# device_payload = { +# "name": device['name'], +# "site": site_id, +# "device_role": device_role, +# "device_type": device_type, +# "serial": "", +# } +# device_response = requests.post(f"{ENV_VARS['NETBOX_URL']}/api/dcim/devices/", headers=headers_netbox, json=device_payload, verify=False) + +# if device_response.status_code not in [200, 201]: +# print(f"Failed to create device {device['name']}: {device_response.text}") +# return + +# device_id = device_response.json()["id"] +# print(f"Device {device['name']} created successfully with ID {device_id}.") +# create_management_interface_and_set_primary_ip(device_id, device['address']) def create_device_in_netbox(device, site_id, device_lookup): print(f"Creating device in NetBox: {device['name']}") device_code = device['name'].split('-')[2] # Extract device code - if device_code in device_lookup: - device_role = fetch_id('dcim/device-roles', device_lookup[device_code]['DeviceRole'], 'name') - device_type = fetch_id('dcim/device-types', device_lookup[device_code]['DeviceType'], 'model') - else: + if device_code not in device_lookup: print(f"No device type/role found for code: {device_code}") return + device_role = fetch_id(device_lookup[device_code]['DeviceRole']) + device_type = fetch_id(device_lookup[device_code]['DeviceType']) + device_payload = { "name": device['name'], "site": site_id, @@ -42,15 +78,50 @@ def create_device_in_netbox(device, site_id, device_lookup): "device_type": device_type, "serial": "", } - device_response = requests.post(f"{NETBOX_URL}/api/dcim/devices/", headers=headers_netbox, json=device_payload, verify=False) - if device_response.status_code not in [200, 201]: - print(f"Failed to create device {device['name']}: {device_response.text}") - return + try: + device_id = netbox.dcim.devices.create(**device_payload)['id'] + print(f"Device {device['name']} created successfully with ID {device_id}.") + create_management_interface_and_set_primary_ip(device_id, device['address']) + except Exception as e: + print(f"Failed to create device {device['name']}: {e}") + +# def create_management_interface_and_set_primary_ip(device_id, ip_address): +# print(f"Creating 'Management' interface for device ID {device_id}...") + +# if not ip_address.endswith('/32'): +# ip_address += '/32' + +# # Create the Management interface +# interface_payload = {"device": device_id, "name": "Management", "type": "virtual"} +# interface_response = requests.post(f"{ENV_VARS['NETBOX_URL']}/api/dcim/interfaces/", headers=headers_netbox, json=interface_payload, verify=False) + +# if interface_response.status_code not in [200, 201]: +# print(f"Failed to create 'Management' interface: {interface_response.text}") +# return + +# interface_id = interface_response.json()["id"] +# print(f"'Management' interface with ID {interface_id} created successfully.") + +# # Create the IP address and assign it to the interface +# ip_payload = {"address": ip_address, "status": "active", "assigned_object_id": interface_id, "assigned_object_type": "dcim.interface"} +# ip_response = requests.post(f"{ENV_VARS['NETBOX_URL']}/api/ipam/ip-addresses/", headers=headers_netbox, json=ip_payload, verify=False) + +# if ip_response.status_code not in [200, 201]: +# print(f"Failed to create and assign IP address {ip_address}: {ip_response.text}") +# return - device_id = device_response.json()["id"] - print(f"Device {device['name']} created successfully with ID {device_id}.") - create_management_interface_and_set_primary_ip(device_id, device['address']) +# ip_id = ip_response.json()["id"] +# print(f"IP address {ip_address} created and assigned successfully.") + +# # Set the IP address as the primary IP for the device +# device_update_payload = {"primary_ip4": ip_id} # or "primary_ip6" for IPv6 +# device_update_response = requests.patch(f"{ENV_VARS['NETBOX_URL']}/api/dcim/devices/{device_id}/", headers=headers_netbox, json=device_update_payload, verify=False) + +# if device_update_response.status_code in [200, 201, 204]: +# print(f"Primary IP set successfully for device ID {device_id}.") +# else: +# print(f"Failed to set primary IP for device ID {device_id}: {device_update_response.text}") def create_management_interface_and_set_primary_ip(device_id, ip_address): print(f"Creating 'Management' interface for device ID {device_id}...") @@ -60,66 +131,81 @@ def create_management_interface_and_set_primary_ip(device_id, ip_address): # Create the Management interface interface_payload = {"device": device_id, "name": "Management", "type": "virtual"} - interface_response = requests.post(f"{NETBOX_URL}/api/dcim/interfaces/", headers=headers_netbox, json=interface_payload, verify=False) - - if interface_response.status_code not in [200, 201]: - print(f"Failed to create 'Management' interface: {interface_response.text}") + try: + interface_id = netbox.dcim.interfaces.create(**interface_payload)['id'] + print(f"'Management' interface with ID {interface_id} created successfully.") + except Exception as e: + print(f"Failed to create 'Management' interface: {e}") return - interface_id = interface_response.json()["id"] - print(f"'Management' interface with ID {interface_id} created successfully.") - # Create the IP address and assign it to the interface ip_payload = {"address": ip_address, "status": "active", "assigned_object_id": interface_id, "assigned_object_type": "dcim.interface"} - ip_response = requests.post(f"{NETBOX_URL}/api/ipam/ip-addresses/", headers=headers_netbox, json=ip_payload, verify=False) - - if ip_response.status_code not in [200, 201]: - print(f"Failed to create and assign IP address {ip_address}: {ip_response.text}") + try: + ip_id = netbox.ipam.ip_addresses.create(**ip_payload)['id'] + print(f"IP address {ip_address} created and assigned successfully.") + except Exception as e: + print(f"Failed to create and assign IP address {ip_address}: {e}") return - ip_id = ip_response.json()["id"] - print(f"IP address {ip_address} created and assigned successfully.") - # Set the IP address as the primary IP for the device - device_update_payload = {"primary_ip4": ip_id} # or "primary_ip6" for IPv6 - device_update_response = requests.patch(f"{NETBOX_URL}/api/dcim/devices/{device_id}/", headers=headers_netbox, json=device_update_payload, verify=False) - - if device_update_response.status_code in [200, 201, 204]: + try: + netbox.dcim.devices.update(device_id, primary_ip4=ip_id) print(f"Primary IP set successfully for device ID {device_id}.") - else: - print(f"Failed to set primary IP for device ID {device_id}: {device_update_response.text}") + except Exception as e: + print(f"Failed to set primary IP for device ID {device_id}: {e}") + +# def fetch_id(endpoint, search_param, search_field='name'): +# """Fetch a NetBox entity's ID based on a search field and parameter.""" + +# print(f"Fetching ID for '{search_param}' from {endpoint}...") +# url = f"{ENV_VARS['NETBOX_URL']}/api/{endpoint}/" +# params = {search_field: search_param} +# response = requests.get(url, headers=headers_netbox, params=params, verify=False) + +# data = response.json() +# if response.status_code != 200 or data['count'] != 1: +# print(f"Failed to fetch ID for {search_param} from {endpoint}.") +# return None + +# return data['results'][0]['id'] def fetch_id(endpoint, search_param, search_field='name'): """Fetch a NetBox entity's ID based on a search field and parameter.""" + try: + objects = getattr(netbox, endpoint).filter(**{search_field: search_param}) + if len(objects) == 1: + return objects[0].id + else: + print(f"Failed to fetch ID for {search_param} from {endpoint}.") + return None + except Exception as e: + print(f"Failed to fetch ID for {search_param} from {endpoint}: {e}") + return None - print(f"Fetching ID for '{search_param}' from {endpoint}...") - url = f"{NETBOX_URL}/api/{endpoint}/" - params = {search_field: search_param} - response = requests.get(url, headers=headers_netbox, params=params, verify=False) +# def fetch_sites(): +# """Fetch all sites from NetBox and return a dictionary mapping site slugs to their IDs.""" - data = response.json() +# site_mapping = {} +# url = f"{ENV_VARS['NETBOX_URL']}/api/dcim/sites/" +# response = requests.get(url, headers=headers_netbox, verify=False) - if response.status_code != 200 or data['count'] != 1: - print(f"Failed to fetch ID for {search_param} from {endpoint}.") - return None +# if response.status_code != 200: +# print("Failed to fetch sites from NetBox.") +# return {} - return data['results'][0]['id'] +# for site in response.json()['results']: +# site_mapping[site['slug'].upper()] = site['id'] +# return site_mapping def fetch_sites(): """Fetch all sites from NetBox and return a dictionary mapping site slugs to their IDs.""" - - site_mapping = {} - url = f"{NETBOX_URL}/api/dcim/sites/" - response = requests.get(url, headers=headers_netbox, verify=False) - - if response.status_code != 200: - print("Failed to fetch sites from NetBox.") + try: + sites = netbox.dcim.sites.all() + return {site.slug.upper(): site.id for site in sites} + except Exception as e: + print(f"Failed to fetch sites from NetBox: {e}") return {} - for site in response.json()['results']: - site_mapping[site['slug'].upper()] = site['id'] - return site_mapping - def load_device_lookup(): """Load the device role and type lookup from a CSV file.""" @@ -130,18 +216,28 @@ def load_device_lookup(): lookup[row['Code']] = {'DeviceRole': row['DeviceRole'], 'DeviceType': row['DeviceType']} return lookup +# def fetch_netbox_devices(): +# """Fetch existing devices from NetBox.""" + +# netbox_devices = {} +# url = f"{ENV_VARS['NETBOX_URL']}/api/dcim/devices/" +# response = requests.get(url, headers=headers_netbox, verify=False) + +# if response.status_code == 200: +# for device in response.json()['results']: +# netbox_devices[device['name']] = device +# else: +# print("Failed to fetch devices from NetBox.") +# return netbox_devices + def fetch_netbox_devices(): """Fetch existing devices from NetBox.""" - - netbox_devices = {} - url = f"{NETBOX_URL}/api/dcim/devices/" - response = requests.get(url, headers=headers_netbox, verify=False) - if response.status_code == 200: - for device in response.json()['results']: - netbox_devices[device['name']] = device - else: - print("Failed to fetch devices from NetBox.") - return netbox_devices + try: + devices = netbox.dcim.devices.all() + return {device.name: device for device in devices} + except Exception as e: + print(f"Failed to fetch devices from NetBox: {e}") + return {} def fetch_nso_devices(): """Fetch device information from NSO using a specific query.""" @@ -149,7 +245,7 @@ def fetch_nso_devices(): print("Fetching devices from NSO...") devices = [] - url = f"{NSO_URL}/restconf/tailf/query" + url = f"{ENV_VARS['NSO_URL']}/restconf/tailf/query" payload = json.dumps({ "tailf-rest-query:immediate-query": { "foreach": "/devices/device", @@ -162,10 +258,10 @@ def fetch_nso_devices(): headers = { 'Content-Type': 'application/yang-data+json', 'Accept': 'application/yang-data+json', - 'Authorization': f'Basic {base64.b64encode(f"{NSO_USERNAME}:{NSO_PASSWORD}".encode()).decode("utf-8")}' + 'Authorization': f'Basic {base64.b64encode(f"{ENV_VARS['NSO_USERNAME']}:{ENV_VARS['NSO_PASSWORD']}".encode()).decode("utf-8")}' } - response = requests.post(url, headers=headers, data=payload, auth=(NSO_USERNAME, NSO_PASSWORD), verify=False) + response = requests.post(url, headers=headers, data=payload, auth=(ENV_VARS["NSO_USERNAME"], ENV_VARS["NSO_PASSWORD"]), verify=False) if response.status_code != 200: print(f"Failed to fetch devices from NSO: {response.text}") @@ -179,33 +275,50 @@ def fetch_nso_devices(): print("NSO devices fetched successfully.") return devices -def assign_ip_to_interface(device_id, interface_id, ip_address): - - # Create the IP address - ip_payload = { - "address": ip_address, - "status": "active" - } - ip_response = requests.post(f"{NETBOX_URL}/api/ipam/ip-addresses/", headers=headers_netbox, json=ip_payload, verify=False) - - if ip_response.status_code not in [200, 201]: - print(f"Failed to create IP address {ip_address}: {ip_response.text}") - return - - print(f"IP address {ip_address} created successfully.") - ip_id = ip_response.json()["id"] - - # Associate the IP address with the interface - association_payload = { - "assigned_object_type": "dcim.interface", - "assigned_object_id": interface_id - } - association_response = requests.patch(f"{NETBOX_URL}/api/ipam/ip-addresses/{ip_id}/", headers=headers_netbox, json=association_payload, verify=False) +# def assign_ip_to_interface(device_id, interface_id, ip_address): + +# # Create the IP address +# ip_payload = { +# "address": ip_address, +# "status": "active" +# } +# ip_response = requests.post(f"{ENV_VARS['NETBOX_URL']}/api/ipam/ip-addresses/", headers=headers_netbox, json=ip_payload, verify=False) + +# if ip_response.status_code not in [200, 201]: +# print(f"Failed to create IP address {ip_address}: {ip_response.text}") +# return + +# print(f"IP address {ip_address} created successfully.") +# ip_id = ip_response.json()["id"] + +# # Associate the IP address with the interface +# association_payload = { +# "assigned_object_type": "dcim.interface", +# "assigned_object_id": interface_id +# } +# association_response = requests.patch(f"{ENV_VARS['NETBOX_URL']}/api/ipam/ip-addresses/{ip_id}/", headers=headers_netbox, json=association_payload, verify=False) + +# if association_response.status_code in [200, 201, 204]: +# print(f"IP address {ip_address} assigned to interface ID {interface_id} successfully.") +# else: +# print(f"Failed to assign IP address {ip_address} to interface: {association_response.text}") + +def assign_ip_to_interface(device_id, interface_name, ip_address): + try: + device = netbox.dcim.devices.get(device_id) + interface = device.interfaces.get(name=interface_name) + interface_id = interface.id + + ip_data = { + "address": ip_address, + "status": "active", + "interface": interface_id + } + ip = netbox.ipam.ip_addresses.create(**ip_data) - if association_response.status_code in [200, 201, 204]: - print(f"IP address {ip_address} assigned to interface ID {interface_id} successfully.") - else: - print(f"Failed to assign IP address {ip_address} to interface: {association_response.text}") + print(f"IP address {ip.address} assigned to interface {interface.name} successfully.") + except Exception as e: + print(f"Failed to assign IP address {ip_address} to interface {interface_name}: {e}") def main(): print("Starting script...") diff --git a/mdd/scripts/nso_to_netbox_interfaces.py b/mdd/scripts/nso_to_netbox_interfaces.py index 3bf6d2f..96a20d8 100644 --- a/mdd/scripts/nso_to_netbox_interfaces.py +++ b/mdd/scripts/nso_to_netbox_interfaces.py @@ -2,30 +2,34 @@ import os import json import urllib3 +import load_env_vars -# Environment setup -NETBOX_URL = os.getenv('NETBOX_URL') -NETBOX_TOKEN = os.getenv('NETBOX_TOKEN') -NSO_URL = os.getenv('NSO_URL') -NSO_USERNAME = os.getenv('NSO_USERNAME') -NSO_PASSWORD = os.getenv('NSO_PASSWORD') +# Disable SSL warnings +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + +ENV_VARS = { + "NSO_URL": None, + "NSO_USERNAME": None, + "NSO_PASSWORD": None, + "NETBOX_URL": None, + "NETBOX_TOKEN": None +} + +ENV_VARS = load_env_vars.load_env_vars(os.environ, ENV_VARS) # NetBox headers for API requests netbox_headers = { - "Authorization": f"Token {NETBOX_TOKEN}", + "Authorization": f"Token {ENV_VARS["NETBOX_TOKEN"]}", "Content-Type": "application/json", "Accept": "application/json" } -# Disable SSL warnings -urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - def get_netbox_devices(): """Fetch all devices from NetBox.""" print("Fetching devices from NetBox...") - url = f"{NETBOX_URL}/api/dcim/devices/" + url = f"{ENV_VARS["NETBOX_URL"]}/api/dcim/devices/" response = requests.get(url, headers=netbox_headers, verify=False) if response.status_code != 200: @@ -40,8 +44,8 @@ def get_nso_interfaces(device_name): print(f"Fetching interfaces for {device_name} from NSO...") - url = f"{NSO_URL}/restconf/data/tailf-ncs:devices/device={device_name}/live-status/tailf-ned-cisco-ios-stats:interfaces" - response = requests.get(url, auth=(NSO_USERNAME, NSO_PASSWORD), headers={"Accept": "application/yang-data+json"}, verify=False) + url = f"{ENV_VARS['NSO_URL']}/restconf/data/tailf-ncs:devices/device={device_name}/live-status/tailf-ned-cisco-ios-stats:interfaces" + response = requests.get(url, auth=(ENV_VARS['NSO_USERNAME'], ENV_VARS['NSO_PASSWORD']), headers={"Accept": "application/yang-data+json"}, verify=False) if response.status_code != 200: print(f"Failed to fetch interfaces for {device_name} from NSO.") @@ -54,12 +58,12 @@ def get_nso_interfaces(device_name): def type_translation(nso_type): """Translate NSO interface type to NetBox interface type.""" - translation = { + translation_table = { "GigabitEthernet": "1000Base-T", "TenGigabitEthernet": "10GBase-T", "FastEthernet": "100Base-T" } - return translation.get(nso_type, "Other") # Fallback to 'Other' if type not found + return translation_table.get(nso_type, "Other") # Fallback to 'Other' if type not found def add_interface_to_device(device_id, interface): """Add an interface to a device in NetBox.""" @@ -74,7 +78,7 @@ def add_interface_to_device(device_id, interface): "mac_address": interface.get('mac-address', ''), # Additional fields as necessary } - url = f"{NETBOX_URL}/api/dcim/interfaces/" + url = f"{ENV_VARS["NETBOX_URL"]}/api/dcim/interfaces/" response = requests.post(url, headers=netbox_headers, json=data, verify=False) if response.status_code not in [200, 201]: diff --git a/mdd/scripts/nso_to_netbox_inventory.py b/mdd/scripts/nso_to_netbox_inventory.py index d145396..cc75ce7 100644 --- a/mdd/scripts/nso_to_netbox_inventory.py +++ b/mdd/scripts/nso_to_netbox_inventory.py @@ -2,20 +2,25 @@ import os import json import urllib3 +import load_env_vars urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) -# Environment setup -NETBOX_URL = os.getenv('NETBOX_URL') -NETBOX_TOKEN = os.getenv('NETBOX_TOKEN') -NSO_URL = os.getenv('NSO_URL') -NSO_USERNAME = os.getenv('NSO_USERNAME') -NSO_PASSWORD = os.getenv('NSO_PASSWORD') +ENV_VARS = { + "NSO_URL": None, + "NSO_USERNAME": None, + "NSO_PASSWORD": None, + "NETBOX_URL": None, + "NETBOX_TOKEN": None +} + +ENV_VARS = load_env_vars.load_env_vars(os.environ, ENV_VARS) + CISCO_MANUFACTURER_ID = '2' netbox_headers = { - "Authorization": f"Token {NETBOX_TOKEN}", + "Authorization": f"Token {ENV_VARS["NETBOX_TOKEN"]}", "Content-Type": "application/json", "Accept": "application/json" } @@ -23,7 +28,7 @@ def get_netbox_devices(): """Fetch all devices from NetBox""" - url = f"{NETBOX_URL}/api/dcim/devices/" + url = f"{ENV_VARS["NETBOX_URL"]}/api/dcim/devices/" response = requests.get(url, headers=netbox_headers, verify=False) return response.json()['results'] if response.status_code == 200 else [] @@ -31,15 +36,15 @@ def get_netbox_devices(): def get_nso_inventory(device_name): """Fetch inventory for a specific device from NSO""" - url = f"{NSO_URL}/restconf/data/tailf-ncs:devices/device={device_name}/live-status/tailf-ned-cisco-ios-stats:inventory" - response = requests.get(url, auth=(NSO_USERNAME, NSO_PASSWORD), headers={"Accept": "application/yang-data+json"}, verify=False) + url = f"{ENV_VARS['NSO_URL']}/restconf/data/tailf-ncs:devices/device={device_name}/live-status/tailf-ned-cisco-ios-stats:inventory" + response = requests.get(url, auth=(ENV_VARS['NSO_USERNAME'], ENV_VARS['NSO_PASWORD']), headers={"Accept": "application/yang-data+json"}, verify=False) return response.json().get('tailf-ned-cisco-ios-stats:inventory', []) if response.status_code == 200 else [] def fetch_existing_serial_numbers(device_id): """Fetch existing serial numbers of inventory items for a given device""" - url = f"{NETBOX_URL}/api/dcim/inventory-items/?device_id={device_id}" + url = f"{ENV_VARS["NETBOX_URL"]}/api/dcim/inventory-items/?device_id={device_id}" response = requests.get(url, headers=netbox_headers, verify=False) return [item['serial'] for item in response.json()['results'] if item['serial']] if response.status_code == 200 else [] @@ -62,7 +67,7 @@ def add_netbox_inventory_item(device_id, device_name, inventory_item, existing_s "part_id": pid, "serial": sn, } - response = requests.post(f"{NETBOX_URL}/api/dcim/inventory-items/", headers=netbox_headers, json=data, verify=False) + response = requests.post(f"{ENV_VARS["NETBOX_URL"]}/api/dcim/inventory-items/", headers=netbox_headers, json=data, verify=False) if response.status_code in [200, 201]: print(f"Inventory item {pid} added to device {device_id}.") diff --git a/mdd/scripts/nso_to_netbox_platform.py b/mdd/scripts/nso_to_netbox_platform.py index 278d1a5..a180a41 100644 --- a/mdd/scripts/nso_to_netbox_platform.py +++ b/mdd/scripts/nso_to_netbox_platform.py @@ -6,17 +6,21 @@ import json import urllib3 from netbox_class import netbox_helper +import load_env_vars + +ENV_VARS = { + "NSO_URL": None, + "NSO_USERNAME": None, + "NSO_PASSWORD": None, + "NETBOX_URL": None, + "NETBOX_TOKEN": None +} -# Environment Variables -NETBOX_URL = os.getenv('NETBOX_URL') -NETBOX_TOKEN = os.getenv('NETBOX_TOKEN') -NSO_URL = os.getenv('NSO_URL') -NSO_USERNAME = os.getenv('NSO_USERNAME') -NSO_PASSWORD = os.getenv('NSO_PASSWORD') +ENV_VARS = load_env_vars.load_env_vars(os.environ, ENV_VARS) # Headers for NetBox netbox_headers = { - "Authorization": f"Token {NETBOX_TOKEN}", + "Authorization": f"Token {ENV_VARS["NETBOX_TOKEN"]}", "Content-Type": "application/json", "Accept": "application/json" } @@ -52,7 +56,7 @@ def fetch_nso_platform_info(device_name): def get_or_create_platform(name, version): """Ensure the platform exists in NetBox, based on the combined name and version, and return its ID.""" platform_identifier = f"{name} {version}" - url = f"{NETBOX_URL}/api/dcim/platforms/?name={platform_identifier}" + url = f"{ENV_VARS["NETBOX_URL"]}/api/dcim/platforms/?name={platform_identifier}" response = requests.get(url, headers=netbox_headers, verify=False) if response.status_code != 200: @@ -68,7 +72,7 @@ def get_or_create_platform(name, version): "name": platform_identifier, "slug": platform_identifier.lower().replace(" ", "-").replace("(", "").replace(")", "").replace(".", "-") } - create_response = requests.post(f"{NETBOX_URL}/api/dcim/platforms/", headers=netbox_headers, data=json.dumps(data), verify=False) + create_response = requests.post(f"{ENV_VARS["NETBOX_URL"]}/api/dcim/platforms/", headers=netbox_headers, data=json.dumps(data), verify=False) if create_response.status_code not in [200, 201]: print(f"Failed to create platform {platform_identifier}: {create_response.text}") @@ -80,7 +84,7 @@ def get_or_create_platform(name, version): def get_or_create_device_type(model): """Ensure the device type exists in NetBox, based on the model from NSO, and return its ID.""" - url = f"{NETBOX_URL}/api/dcim/device-types/?model={model}" + url = f"{ENV_VARS["NETBOX_URL"]}/api/dcim/device-types/?model={model}" response = requests.get(url, headers=netbox_headers, verify=False) if response.status_code != 200 or response.json()['count'] > 0: @@ -93,7 +97,7 @@ def get_or_create_device_type(model): "slug": model.lower().replace(" ", "-").replace("_", "-"), "manufacturer": 1 # Example manufacturer ID, adjust accordingly } - create_response = requests.post(f"{NETBOX_URL}/api/dcim/device-types/", headers=netbox_headers, data=json.dumps(data), verify=False) + create_response = requests.post(f"{ENV_VARS["NETBOX_URL"]}/api/dcim/device-types/", headers=netbox_headers, data=json.dumps(data), verify=False) if create_response.status_code not in [200, 201]: print(f"Failed to create device type {model}: {create_response.text}") @@ -120,7 +124,7 @@ def update_netbox_device(device_id, platform_info): "device_type": device_type_id, "platform": platform_id } - url = f"{NETBOX_URL}/api/dcim/devices/{device_id}/" + url = f"{ENV_VARS["NETBOX_URL"]}/api/dcim/devices/{device_id}/" response = requests.patch(url, headers=netbox_headers, data=json.dumps(data), verify=False) if response.status_code in [200, 201, 204]: @@ -129,7 +133,7 @@ def update_netbox_device(device_id, platform_info): print(f"Failed to update device {device_id}: {response.status_code}, {response.text}") def main(): - netbox=netbox_helper(NETBOX_URL, + netbox=netbox_helper(ENV_VARS["NETBOX_URL"], netbox_headers) for device in netbox.fetch_netbox_devices(): device_name = device.get('name') diff --git a/mdd/scripts/test.py b/mdd/scripts/test.py new file mode 100644 index 0000000..c1fdab9 --- /dev/null +++ b/mdd/scripts/test.py @@ -0,0 +1,15 @@ +def get_sub_dict(dict, level): + """ + Returns reference to a sub dictionary based on first key + Used for inventory where you only have 1 key + """ + sub_dict = dict + for _ in range(level): + key = list(sub_dict.keys())[0] # get first key + sub_dict = sub_dict[key] + + return sub_dict # reference + +s = {"all": {"child": {"my": {}}}} +get_sub_dict(s, 3)['hi'] = 2 +print(s) \ No newline at end of file