diff --git a/contrib/runners/python_runner/python_runner/__init__.py b/contrib/runners/python_runner/python_runner/__init__.py index 0db4ef8096..2eaa3f4225 100644 --- a/contrib/runners/python_runner/python_runner/__init__.py +++ b/contrib/runners/python_runner/python_runner/__init__.py @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = '3.3.0' +__version__ = '3.3.2' diff --git a/contrib/runners/python_runner/python_runner/python_action_wrapper.py b/contrib/runners/python_runner/python_runner/python_action_wrapper.py index 119f6bdf84..77f5e93210 100644 --- a/contrib/runners/python_runner/python_runner/python_action_wrapper.py +++ b/contrib/runners/python_runner/python_runner/python_action_wrapper.py @@ -47,6 +47,7 @@ import sys import json import argparse +import resource import six @@ -83,6 +84,12 @@ READ_STDIN_INPUT_TIMEOUT = 2 +def limit_memory(maxsize): + if maxsize: + soft, hard = resource.getrlimit(resource.RLIMIT_AS) + resource.setrlimit(resource.RLIMIT_AS, (maxsize, hard)) + + class ActionService(object): """ Instance of this class is passed to the action instance and exposes "public" methods which can @@ -143,7 +150,7 @@ def delete_value(self, name, local=True, scope=SYSTEM_SCOPE): class PythonActionWrapper(object): def __init__(self, pack, file_path, config=None, parameters=None, user=None, parent_args=None, - log_level=PYTHON_RUNNER_DEFAULT_LOG_LEVEL): + log_level=PYTHON_RUNNER_DEFAULT_LOG_LEVEL, max_memory=0, max_output_size=0): """ :param pack: Name of the pack this action belongs to. :type pack: ``str`` @@ -171,6 +178,8 @@ def __init__(self, pack, file_path, config=None, parameters=None, user=None, par self._user = user self._parent_args = parent_args or [] self._log_level = log_level + self._max_memory = max_memory # in MB + self._max_output_size = max_output_size # in MB self._class_name = None self._logger = logging.getLogger('PythonActionWrapper') @@ -189,49 +198,62 @@ def __init__(self, pack, file_path, config=None, parameters=None, user=None, par self._user = cfg.CONF.system_user.user def run(self): - action = self._get_action_instance() - output = action.run(**self._parameters) - - if isinstance(output, tuple) and len(output) == 2: - # run() method returned status and data - (status, data) - action_status = output[0] - action_result = output[1] - else: - # run() method returned only data, no status (pre StackStorm v1.6) - action_status = None - action_result = output - - action_output = { - 'result': action_result, - 'status': None - } - - if action_status is not None and not isinstance(action_status, bool): - sys.stderr.write('Status returned from the action run() method must either be ' - 'True or False, got: %s\n' % (action_status)) - sys.stderr.write(INVALID_STATUS_ERROR_MESSAGE) - sys.exit(PYTHON_RUNNER_INVALID_ACTION_STATUS_EXIT_CODE) - - if action_status is not None and isinstance(action_status, bool): - action_output['status'] = action_status - - # Special case if result object is not JSON serializable - aka user wanted to return a - # non-simple type (e.g. class instance or other non-JSON serializable type) - try: - json.dumps(action_output['result']) - except TypeError: - action_output['result'] = str(action_output['result']) + # limit the memory of the action + limit_memory(self._max_memory * 1024 * 1024) try: - print_output = json.dumps(action_output) - except Exception: - print_output = str(action_output) + action = self._get_action_instance() + output = action.run(**self._parameters) + + if isinstance(output, tuple) and len(output) == 2: + # run() method returned status and data - (status, data) + action_status = output[0] + action_result = output[1] + else: + # run() method returned only data, no status (pre StackStorm v1.6) + action_status = None + action_result = output + + action_output = { + 'result': action_result, + 'status': None + } + + if action_status is not None and not isinstance(action_status, bool): + sys.stderr.write('Status returned from the action run() method must either be ' + 'True or False, got: %s\n' % (action_status)) + sys.stderr.write(INVALID_STATUS_ERROR_MESSAGE) + sys.exit(PYTHON_RUNNER_INVALID_ACTION_STATUS_EXIT_CODE) + + if action_status is not None and isinstance(action_status, bool): + action_output['status'] = action_status + + # Special case if result object is not JSON serializable - aka user wanted to return a + # non-simple type (e.g. class instance or other non-JSON serializable type) + try: + json.dumps(action_output['result']) + except TypeError: + action_output['result'] = str(action_output['result']) - # Print output to stdout so the parent can capture it - sys.stdout.write(ACTION_OUTPUT_RESULT_DELIMITER) - sys.stdout.write(print_output + '\n') - sys.stdout.write(ACTION_OUTPUT_RESULT_DELIMITER) - sys.stdout.flush() + try: + print_output = json.dumps(action_output) + except Exception: + print_output = str(action_output) + + if self._max_output_size and sys.getsizeof(print_output) > self._max_output_size * 1024 * 1024: + sys.stderr.write(f'The action has reached the maximum allowable output size.\n' + f'Maximum allowable output size: {self._max_output_size}MB.\n') + sys.exit(PYTHON_RUNNER_INVALID_ACTION_STATUS_EXIT_CODE) + + # Print output to stdout so the parent can capture it + sys.stdout.write(ACTION_OUTPUT_RESULT_DELIMITER) + sys.stdout.write(print_output + '\n') + sys.stdout.write(ACTION_OUTPUT_RESULT_DELIMITER) + sys.stdout.flush() + except MemoryError: + sys.stderr.write(f'The action has reached the maximum allowable memory.\n' + f'Maximum allowable memory: {self._max_memory}MB') + sys.exit(PYTHON_RUNNER_INVALID_ACTION_STATUS_EXIT_CODE) def _get_action_instance(self): try: @@ -281,12 +303,17 @@ def _get_action_instance(self): ' JSON') parser.add_argument('--log-level', required=False, default=PYTHON_RUNNER_DEFAULT_LOG_LEVEL, help='Log level for actions') + parser.add_argument('--max-memory', required=False, default=0, help='Maximum allowed memory') + parser.add_argument('--max-output-size', required=False, default=0, help='Maximum allowed output size') + args = parser.parse_args() config = json.loads(args.config) if args.config else {} user = args.user parent_args = json.loads(args.parent_args) if args.parent_args else [] log_level = args.log_level + max_output_size = int(args.max_output_size) + max_memory = int(args.max_memory) if not isinstance(config, dict): raise ValueError('Pack config needs to be a dictionary') @@ -329,6 +356,8 @@ def _get_action_instance(self): parameters=parameters, user=user, parent_args=parent_args, - log_level=log_level) + log_level=log_level, + max_memory=max_memory, + max_output_size=max_output_size) obj.run() diff --git a/contrib/runners/python_runner/python_runner/python_runner.py b/contrib/runners/python_runner/python_runner/python_runner.py index fd412c890e..1c5840b3ad 100644 --- a/contrib/runners/python_runner/python_runner/python_runner.py +++ b/contrib/runners/python_runner/python_runner/python_runner.py @@ -54,6 +54,7 @@ from st2common.util.shell import quote_unix from st2common.services.action import store_execution_output_data from st2common.runners.utils import make_read_and_store_stream_func +import st2common.config as common_config from python_runner import python_action_wrapper @@ -82,6 +83,18 @@ WRAPPER_SCRIPT_PATH = os.path.join(BASE_DIR, WRAPPER_SCRIPT_NAME) +action_performance_opts = [ + cfg.IntOpt( + 'action_max_memory_mb', default=0, + help='Action maximum allowable memory (in MB) - 0 is unlimited'), + cfg.IntOpt( + 'action_max_output_size_mb', default=0, + help='Action maximum allowable output size (in MB) - 0 is unlimited'), + ] + +common_config.do_register_opts(action_performance_opts, 'performance', True) + + class PythonRunner(GitWorktreeActionRunner): def __init__(self, runner_id, config=None, timeout=PYTHON_RUNNER_DEFAULT_ACTION_TIMEOUT, @@ -167,6 +180,8 @@ def run(self, action_parameters): '--file-path=%s' % (self.entry_point), '--user=%s' % (user), '--parent-args=%s' % (parent_args), + '--max-memory=%s' % cfg.CONF.performance.action_max_memory_mb, + '--max-output-size=%s' % cfg.CONF.performance.action_max_output_size_mb, ] subprocess = concurrency.get_subprocess_module()