Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 163 additions & 22 deletions kintree/search/digikey_api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
import logging
import os
import digikey
import time
from importlib import import_module

import requests

try:
dk_api_client = import_module('dk_api_client')
except ModuleNotFoundError:
dk_api_client = None

from ..config import settings, config_interface

Expand Down Expand Up @@ -28,6 +36,12 @@
'package_type'
]

TOKEN_URL = 'https://api.digikey.com/v1/oauth2/token'
_token_cache = {
'access_token': None,
'expires_at': 0,
}

os.environ['DIGIKEY_STORAGE_PATH'] = settings.DIGIKEY_STORAGE_PATH
# Check if storage path exists, else create it
if not os.path.exists(os.environ['DIGIKEY_STORAGE_PATH']):
Expand All @@ -36,11 +50,93 @@

def disable_api_logger():
# Digi-Key API logger
logging.getLogger('digikey.v3.api').setLevel(logging.CRITICAL)
logging.getLogger('dk_api_client').setLevel(logging.CRITICAL)
# Disable DEBUG
logging.disable(logging.DEBUG)


def _normalize_object(value):
if value is None:
return None

if hasattr(value, 'to_dict'):
try:
return value.to_dict()
except Exception:
pass

if isinstance(value, dict):
return {k: _normalize_object(v) for k, v in value.items()}

if isinstance(value, list):
return [_normalize_object(item) for item in value]

return value


def _get_access_token() -> str:
now = int(time.time())
if _token_cache['access_token'] and _token_cache['expires_at'] > now + 30:
return _token_cache['access_token']

client_id = os.environ.get('DIGIKEY_CLIENT_ID', '').strip()
client_secret = os.environ.get('DIGIKEY_CLIENT_SECRET', '').strip()

if not client_id or not client_secret:
return ''

response = requests.post(
TOKEN_URL,
data={'grant_type': 'client_credentials'},
auth=(client_id, client_secret),
timeout=20,
)
response.raise_for_status()

payload = response.json() if response.text else {}
access_token = str(payload.get('access_token', '')).strip()
expires_in = int(payload.get('expires_in', 0) or 0)

if not access_token:
return ''

_token_cache['access_token'] = access_token
_token_cache['expires_at'] = now + max(60, expires_in)

return access_token


def _extract_product_from_response(response_dict: dict):
if not isinstance(response_dict, dict):
return None, ''

currency = ''
search_locale = response_dict.get('search_locale_used', {})
if isinstance(search_locale, dict):
currency = str(search_locale.get('currency', '') or '').strip()

if response_dict.get('product'):
return response_dict.get('product'), currency

products = response_dict.get('products')
if isinstance(products, list) and products:
return products[0], currency

if response_dict.get('product_details'):
return response_dict.get('product_details'), currency

return response_dict, currency


def _normalize_url(value):
url = str(value or '').strip()
if not url:
return ''
if url.startswith('//'):
return f'https:{url}'
return url


def check_environment() -> bool:
DIGIKEY_CLIENT_ID = os.environ.get('DIGIKEY_CLIENT_ID', None)
DIGIKEY_CLIENT_SECRET = os.environ.get('DIGIKEY_CLIENT_SECRET', None)
Expand All @@ -65,7 +161,7 @@ def setup_environment(force=False) -> bool:

def get_default_search_keys():
return [
'product_description',
'manufacturer_product_number',
'product_description',
'revision',
'keywords',
Expand All @@ -82,11 +178,19 @@ def find_categories(part_details: str):
''' Find categories '''
category = part_details.get('category')
subcategory = None
if category:
subcategory = category.get('child_categories')[0]
if isinstance(category, dict):
children = category.get('child_categories') or category.get('children') or []
if isinstance(children, list) and children:
first_child = children[0]
if isinstance(first_child, dict):
subcategory = first_child.get('name')
else:
subcategory = str(first_child)
category = category.get('name')
elif category:
category = str(category)
if subcategory:
subcategory = subcategory.get('name')
subcategory = str(subcategory)
return category, subcategory


Expand All @@ -100,16 +204,34 @@ def fetch_part_info(part_number: str) -> dict:
cprint('[INFO]\tWarning: DigiKey API settings are not configured')
return part_info

if dk_api_client is None:
from ..common.tools import cprint
cprint('[INFO]\tWarning: digikey-apiv4 package is not installed')
return part_info

# THIS METHOD CAN SOMETIMES RETURN INCORRECT MATCH
# Added logic to check the result in the GUI flow
@timeout(dec_timeout=20)
def digikey_search_timeout():
return digikey.product_details(
access_token = _get_access_token()
if not access_token:
return None

configuration = dk_api_client.Configuration()
configuration.api_key['X-DIGIKEY-Client-Id'] = os.environ['DIGIKEY_CLIENT_ID']
configuration.access_token = access_token

api_client = dk_api_client.ApiClient(configuration)
api_instance = dk_api_client.ProductSearchApi(api_client)

result = api_instance.product_details(
part_number,
x_digikey_client_id=os.environ['DIGIKEY_CLIENT_ID'],
x_digikey_locale_site=os.environ['DIGIKEY_LOCAL_SITE'],
x_digikey_locale_language=os.environ['DIGIKEY_LOCAL_LANGUAGE'],
x_digikey_locale_currency=os.environ['DIGIKEY_LOCAL_CURRENCY'],
).to_dict()
)
return _normalize_object(result)

# Method to process price breaks
def process_price_break(product_variation):
Expand All @@ -122,22 +244,23 @@ def process_price_break(product_variation):
# Query part number
try:
part = digikey_search_timeout()
except:
except Exception:
part = None

if not part:
return part_info
if 'product' not in part or not part['product']:

part, currency = _extract_product_from_response(part)
if not part:
return part_info

part_info['currency'] = part['search_locale_used']['currency']
part = part['product']
part_info['currency'] = currency or os.environ.get('DIGIKEY_LOCAL_CURRENCY', 'USD')

category, subcategory = find_categories(part)
try:
part_info['category'] = category
part_info['subcategory'] = subcategory
except:
except Exception:
part_info['category'] = ''
part_info['subcategory'] = ''

Expand All @@ -146,20 +269,37 @@ def process_price_break(product_variation):
for key in part:
if key in headers:
if key == 'manufacturer':
part_info[key] = part['manufacturer'].get('name')
manufacturer = part.get('manufacturer', {})
part_info[key] = manufacturer.get('name') if isinstance(manufacturer, dict) else str(manufacturer)
elif key == 'description':
part_info['product_description'] = part['description'].get('product_description')
part_info['detailed_description'] = part['description'].get('detailed_description')
description = part.get('description', {})
if isinstance(description, dict):
product_name = part.get('name') or description.get('name') or description.get('product_name') or ''
product_description = description.get('product_description') or description.get('detailed_description') or product_name
detailed = description.get('detailed_description') or product_description
else:
product_name = str(description or '')
product_description = product_name
detailed = ''
product_name = str(product_name or '')
product_description = str(product_description or product_name)
detailed = str(detailed or product_name)
part_info['product_name'] = product_name
part_info['product_description'] = product_description
part_info['detailed_description'] = detailed
else:
part_info[key] = part[key]
value = part[key]
if key in {'datasheet_url', 'photo_url', 'product_url'}:
value = _normalize_url(value)
part_info[key] = value

# Parameters
part_info['parameters'] = {}
[parameter_key, name_key, value_key] = PARAMETERS_MAP

for parameter in part[parameter_key]:
parameter_name = parameter.get(name_key, '')
parameter_value = parameter.get(value_key, '')
for parameter in part.get(parameter_key, []):
parameter_name = parameter.get(name_key) or parameter.get('name', '')
parameter_value = parameter.get(value_key) or parameter.get('value', '')
# Append to parameters dictionary
part_info['parameters'][parameter_name] = parameter_value
# process classifications as parameters
Expand All @@ -175,13 +315,14 @@ def process_price_break(product_variation):
price_key,
package_key] = PRICING_MAP

variations = part[variations_key]
variations = part.get(variations_key, [])
if len(variations) == 1:
process_price_break(variations[0])
else:
for variation in variations:
# we try to get the not TR or Digi-Reel option
package_type = variation.get(package_key).get('id')
package = variation.get(package_key, {})
package_type = package.get('id') if isinstance(package, dict) else None
if all(package_type != x for x in [1, 243]):
process_price_break(variation)
break
Expand Down
Loading
Loading