Skip to content

Commit 4a6fe2e

Browse files
committed
fix: address all PR #504 review findings - security, performance, and robustness improvements
This commit addresses all 14 issues identified in PR #504 review, including must-fix, strongly recommended, and nice-to-have items. ### Security Enhancements (Issues 3.1, 3.2, 3.3, 3.4)
1 parent 60c6613 commit 4a6fe2e

8 files changed

Lines changed: 787 additions & 57 deletions

okta/config/config_setter.py

Lines changed: 37 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,14 @@
88
# See the License for the specific language governing permissions and limitations under the License.
99
# coding: utf-8
1010

11+
import copy
1112
import logging
1213
import os
1314

1415
import yaml
1516

1617
from okta.constants import _GLOBAL_YAML_PATH, _LOCAL_YAML_PATH
17-
from okta.utils import flatten_dict, unflatten_dict, remove_empty_values
18+
from okta.utils import flatten_dict, unflatten_dict, remove_empty_values, deep_merge
1819

1920

2021
class ConfigSetter:
@@ -46,22 +47,26 @@ class ConfigSetter:
4647

4748
def __init__(self):
4849
"""
49-
Consructor for Configuration Setter class. Sets default config
50+
Constructor for Configuration Setter class. Sets default config
5051
and checks for configuration settings to update config.
5152
"""
52-
# Create configuration using default config
53-
self._config = ConfigSetter._DEFAULT_CONFIG
53+
# Create configuration using deep copy of default config
54+
# This prevents shared mutable state across instances
55+
self._config = copy.deepcopy(ConfigSetter._DEFAULT_CONFIG)
5456
# Update configuration
5557
self._update_config()
5658

5759
def get_config(self):
5860
"""
59-
Return Okta client configuration
61+
Return Okta client configuration.
62+
63+
Returns a deep copy to prevent external modification of internal state
64+
and to avoid holding references to sensitive values.
6065
6166
Returns:
62-
dict -- Dictionary containing the client configuration
67+
dict -- Deep copy of the client configuration dictionary
6368
"""
64-
return self._config
69+
return copy.deepcopy(self._config)
6570

6671
def _prune_config(self, config):
6772
"""
@@ -113,32 +118,39 @@ def _apply_config(self, new_config: dict):
113118
overwriting values and adding new entries (if present).
114119
115120
Arguments:
116-
config {dict} -- A dictionary of client configuration details
121+
new_config {dict} -- A dictionary of client configuration details
117122
"""
118-
# Update current configuration with new configuration
119-
# Flatten both dictionaries to account for nested dictionary values
120-
flat_current_client = flatten_dict(self._config["client"], delimiter="::")
121-
flat_current_testing = flatten_dict(self._config["testing"], delimiter="::")
122-
123-
flat_new_client = flatten_dict(new_config.get("client", {}), delimiter="::")
124-
flat_new_testing = flatten_dict(new_config.get("testing", {}), delimiter="::")
125-
126-
# Update with new values
127-
flat_current_client.update(flat_new_client)
128-
flat_current_testing.update(flat_new_testing)
129-
130-
# Update values in current config and unflatten
131-
self._config = {
132-
"client": unflatten_dict(flat_current_client, delimiter="::"),
133-
"testing": unflatten_dict(flat_current_testing, delimiter="::"),
134-
}
123+
# Update current configuration with new configuration using deep merge
124+
# Use 'or {}' to handle None values from YAML (e.g., "client: null")
125+
if "client" in new_config:
126+
self._config["client"] = deep_merge(
127+
self._config["client"],
128+
new_config.get("client") or {}
129+
)
130+
if "testing" in new_config:
131+
self._config["testing"] = deep_merge(
132+
self._config["testing"],
133+
new_config.get("testing") or {}
134+
)
135135

136136
def _apply_yaml_config(self, path: str):
137137
"""This method applies a YAML configuration to the Okta Client Config
138138
139139
Arguments:
140140
path {string} -- The filepath of the corresponding YAML file
141+
142+
Raises:
143+
ValueError: If config file exceeds maximum allowed size
141144
"""
145+
# Check file size before loading (prevent memory exhaustion)
146+
MAX_CONFIG_SIZE = 1_048_576 # 1 MB - generous for config files
147+
file_size = os.path.getsize(path)
148+
if file_size > MAX_CONFIG_SIZE:
149+
raise ValueError(
150+
f"Config file {path} ({file_size} bytes) exceeds maximum "
151+
f"allowed size of {MAX_CONFIG_SIZE} bytes"
152+
)
153+
142154
# Start with empty config
143155
config = {}
144156
with open(path, "r") as file:

okta/config/config_validator.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,10 @@ def validate_config(self):
4949
ValueError: A configuration provided needs to be corrected.
5050
"""
5151
errors = []
52-
client = self._config.get("client")
52+
# Defensive: default to empty dict if sections are missing
53+
client = self._config.get("client", {})
54+
_ = self._config.get("testing", {})
55+
5356
# check org url
5457
errors += self._validate_org_url(client.get("orgUrl", ""))
5558
# check proxy settings if provided
@@ -143,8 +146,10 @@ def _validate_org_url(self, url: str):
143146
if not url:
144147
url_errors.append(ERROR_MESSAGE_ORG_URL_MISSING)
145148
# if url is not https (in non-testing env)
149+
# Defensive: get testing section with default
150+
testing = self._config.get("testing", {})
146151
if url and not (
147-
self._config["testing"]["testingDisableHttpsCheck"]
152+
testing.get("testingDisableHttpsCheck", False)
148153
or url.startswith("https")
149154
):
150155
url_errors.append(

okta/utils.py

Lines changed: 102 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
from datetime import datetime as dt
1616
from enum import Enum
17-
from typing import Any, Dict
17+
from typing import Any
1818
from urllib.parse import urlsplit, urlunsplit
1919

2020
from okta.constants import DATETIME_FORMAT, EPOCH_DAY, EPOCH_MONTH, EPOCH_YEAR
@@ -85,18 +85,30 @@ def convert_absolute_url_into_relative_url(absolute_url):
8585
# These replace the external flatdict dependency
8686

8787

88-
def flatten_dict(d: Dict[str, Any], parent_key: str = '', delimiter: str = '::') -> Dict[str, Any]:
88+
def flatten_dict(
89+
d: dict[str, Any],
90+
parent_key: str = '',
91+
delimiter: str = '::',
92+
_depth: int = 0,
93+
max_depth: int = 100
94+
) -> dict[str, Any]:
8995
"""
9096
Flatten a nested dictionary into a single-level dictionary.
9197
9298
Args:
9399
d: The dictionary to flatten
94100
parent_key: The base key to prepend to all keys (used in recursion)
95101
delimiter: The delimiter to use when joining keys (default '::' to avoid collision with snake_case)
102+
_depth: Internal recursion depth counter (do not set manually)
103+
max_depth: Maximum allowed nesting depth (default 100)
96104
97105
Returns:
98106
A flattened dictionary with delimited keys
99107
108+
Raises:
109+
TypeError: If d is not a dictionary
110+
ValueError: If nesting depth exceeds max_depth
111+
100112
Examples:
101113
>>> flatten_dict({'a': {'b': 1, 'c': 2}})
102114
{'a::b': 1, 'a::c': 2}
@@ -107,17 +119,26 @@ def flatten_dict(d: Dict[str, Any], parent_key: str = '', delimiter: str = '::')
107119
>>> flatten_dict({'user_name': {'first_name': 'John'}})
108120
{'user_name::first_name': 'John'}
109121
"""
110-
items = []
122+
if not isinstance(d, dict):
123+
raise TypeError(f"flatten_dict expects dict, got {type(d).__name__}")
124+
125+
if _depth > max_depth:
126+
raise ValueError(
127+
f"Dictionary nesting depth exceeds maximum allowed depth of {max_depth}. "
128+
f"This may indicate a circular reference or malformed configuration."
129+
)
130+
131+
result = {}
111132
for key, value in d.items():
112133
new_key = f"{parent_key}{delimiter}{key}" if parent_key else key
113134
if isinstance(value, dict):
114-
items.extend(flatten_dict(value, new_key, delimiter).items())
135+
result.update(flatten_dict(value, new_key, delimiter, _depth + 1, max_depth))
115136
else:
116-
items.append((new_key, value))
117-
return dict(items)
137+
result[new_key] = value
138+
return result
118139

119140

120-
def unflatten_dict(d: Dict[str, Any], delimiter: str = '::') -> Dict[str, Any]:
141+
def unflatten_dict(d: dict[str, Any], delimiter: str = '::') -> dict[str, Any]:
121142
"""
122143
Unflatten a dictionary with delimited keys into a nested structure.
123144
@@ -128,62 +149,119 @@ def unflatten_dict(d: Dict[str, Any], delimiter: str = '::') -> Dict[str, Any]:
128149
Returns:
129150
A nested dictionary
130151
152+
Raises:
153+
TypeError: If d is not a dictionary
154+
ValueError: If there are conflicting keys (key is both a leaf value and a nested dict)
155+
131156
Examples:
132-
>>> unflatten_dict({'a_b': 1, 'a_c': 2})
157+
>>> unflatten_dict({'a::b': 1, 'a::c': 2})
133158
{'a': {'b': 1, 'c': 2}}
134159
135-
>>> unflatten_dict({'x_y_z': 3})
160+
>>> unflatten_dict({'x::y::z': 3})
136161
{'x': {'y': {'z': 3}}}
162+
163+
>>> unflatten_dict({'a_b': 1, 'a_c': 2}, delimiter='_')
164+
{'a': {'b': 1, 'c': 2}}
137165
"""
138-
result: Dict[str, Any] = {}
166+
if not isinstance(d, dict):
167+
raise TypeError(f"unflatten_dict expects dict, got {type(d).__name__}")
168+
169+
result: dict[str, Any] = {}
139170
for key, value in d.items():
140171
parts = key.split(delimiter)
141172
current = result
142-
for part in parts[:-1]:
173+
for i, part in enumerate(parts[:-1]):
143174
if part not in current:
144175
current[part] = {}
176+
elif not isinstance(current[part], dict):
177+
# Conflict: trying to traverse into a leaf value
178+
conflicting_key = delimiter.join(parts[:i + 1])
179+
raise ValueError(
180+
f"Key conflict in unflatten_dict: '{conflicting_key}' is both "
181+
f"a leaf value and a nested dictionary"
182+
)
145183
current = current[part]
184+
185+
# Check final key conflict
186+
if parts[-1] in current and isinstance(current[parts[-1]], dict):
187+
raise ValueError(
188+
f"Key conflict in unflatten_dict: '{key}' would overwrite "
189+
f"existing nested dictionary"
190+
)
191+
146192
current[parts[-1]] = value
147193
return result
148194

149195

150-
def deep_merge(base: Dict[str, Any], updates: Dict[str, Any]) -> Dict[str, Any]:
196+
def deep_merge(
197+
base: dict[str, Any],
198+
updates: dict[str, Any],
199+
_depth: int = 0,
200+
max_depth: int = 100
201+
) -> dict[str, Any]:
151202
"""
152203
Deep merge two dictionaries, with updates overriding base values.
153204
154205
Args:
155206
base: The base dictionary
156207
updates: Dictionary with values to merge/override
208+
_depth: Internal recursion depth counter (do not set manually)
209+
max_depth: Maximum allowed nesting depth (default 100)
157210
158211
Returns:
159212
A new dictionary with merged values
160213
214+
Raises:
215+
TypeError: If base or updates is not a dictionary
216+
ValueError: If nesting depth exceeds max_depth
217+
161218
Examples:
162219
>>> deep_merge({'a': 1, 'b': 2}, {'b': 3, 'c': 4})
163220
{'a': 1, 'b': 3, 'c': 4}
164221
165222
>>> deep_merge({'a': {'b': 1}}, {'a': {'c': 2}})
166223
{'a': {'b': 1, 'c': 2}}
167224
"""
225+
if not isinstance(base, dict):
226+
raise TypeError(f"deep_merge base expects dict, got {type(base).__name__}")
227+
if not isinstance(updates, dict):
228+
raise TypeError(f"deep_merge updates expects dict, got {type(updates).__name__}")
229+
230+
if _depth > max_depth:
231+
raise ValueError(
232+
f"Dictionary nesting depth exceeds maximum allowed depth of {max_depth}. "
233+
f"This may indicate a circular reference or malformed configuration."
234+
)
235+
168236
result = base.copy()
169237
for key, value in updates.items():
170238
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
171-
result[key] = deep_merge(result[key], value)
239+
result[key] = deep_merge(result[key], value, _depth + 1, max_depth)
172240
else:
173241
result[key] = value
174242
return result
175243

176244

177-
def remove_empty_values(d: Dict[str, Any]) -> Dict[str, Any]:
245+
def remove_empty_values(
246+
d: dict[str, Any],
247+
_depth: int = 0,
248+
max_depth: int = 100
249+
) -> dict[str, Any]:
178250
"""
179251
Recursively remove empty string values from a nested dictionary.
180252
181253
Args:
182254
d: The dictionary to clean
255+
_depth: Internal recursion depth counter (do not set manually)
256+
max_depth: Maximum allowed nesting depth (default 100)
183257
184258
Returns:
185259
A new dictionary with empty strings removed
186260
261+
Raises:
262+
TypeError: If d is not a dictionary
263+
ValueError: If nesting depth exceeds max_depth
264+
187265
Examples:
188266
>>> remove_empty_values({'a': '', 'b': 'value'})
189267
{'b': 'value'}
@@ -194,10 +272,19 @@ def remove_empty_values(d: Dict[str, Any]) -> Dict[str, Any]:
194272
Note:
195273
Only removes empty strings (""), not other falsy values like None, 0, False, []
196274
"""
275+
if not isinstance(d, dict):
276+
raise TypeError(f"remove_empty_values expects dict, got {type(d).__name__}")
277+
278+
if _depth > max_depth:
279+
raise ValueError(
280+
f"Dictionary nesting depth exceeds maximum allowed depth of {max_depth}. "
281+
f"This may indicate a circular reference or malformed configuration."
282+
)
283+
197284
result = {}
198285
for key, value in d.items():
199286
if isinstance(value, dict):
200-
nested = remove_empty_values(value)
287+
nested = remove_empty_values(value, _depth + 1, max_depth)
201288
if nested: # Only add if nested dict is not empty after cleaning
202289
result[key] = nested
203290
elif value != "":

tests/test_config_setter.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@
1313
"""
1414

1515
import os
16-
import pytest
17-
import tempfile
1816
import yaml
1917

2018
from okta.config.config_setter import ConfigSetter
@@ -61,7 +59,7 @@ def test_apply_config_merges_nested_dicts(self):
6159
config_setter = ConfigSetter()
6260

6361
# Get initial config
64-
initial_token = config_setter._config["client"].get("token", "")
62+
_ = config_setter._config["client"].get("token", "")
6563

6664
# Apply new config
6765
new_config = {

0 commit comments

Comments
 (0)