Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/runners/python_runner/python_runner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

__version__ = '3.3.0'
__version__ = '3.3.2'
113 changes: 71 additions & 42 deletions contrib/runners/python_runner/python_runner/python_action_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import sys
import json
import argparse
import resource

import six

Expand Down Expand Up @@ -83,6 +84,12 @@
READ_STDIN_INPUT_TIMEOUT = 2


def limit_memory(maxsize):
if maxsize:
soft, hard = resource.getrlimit(resource.RLIMIT_AS)
resource.setrlimit(resource.RLIMIT_AS, (maxsize, hard))


class ActionService(object):
"""
Instance of this class is passed to the action instance and exposes "public" methods which can
Expand Down Expand Up @@ -143,7 +150,7 @@ def delete_value(self, name, local=True, scope=SYSTEM_SCOPE):

class PythonActionWrapper(object):
def __init__(self, pack, file_path, config=None, parameters=None, user=None, parent_args=None,
log_level=PYTHON_RUNNER_DEFAULT_LOG_LEVEL):
log_level=PYTHON_RUNNER_DEFAULT_LOG_LEVEL, max_memory=0, max_output_size=0):
"""
:param pack: Name of the pack this action belongs to.
:type pack: ``str``
Expand Down Expand Up @@ -171,6 +178,8 @@ def __init__(self, pack, file_path, config=None, parameters=None, user=None, par
self._user = user
self._parent_args = parent_args or []
self._log_level = log_level
self._max_memory = max_memory # in MB
self._max_output_size = max_output_size # in MB

self._class_name = None
self._logger = logging.getLogger('PythonActionWrapper')
Expand All @@ -189,49 +198,62 @@ def __init__(self, pack, file_path, config=None, parameters=None, user=None, par
self._user = cfg.CONF.system_user.user

def run(self):
action = self._get_action_instance()
output = action.run(**self._parameters)

if isinstance(output, tuple) and len(output) == 2:
# run() method returned status and data - (status, data)
action_status = output[0]
action_result = output[1]
else:
# run() method returned only data, no status (pre StackStorm v1.6)
action_status = None
action_result = output

action_output = {
'result': action_result,
'status': None
}

if action_status is not None and not isinstance(action_status, bool):
sys.stderr.write('Status returned from the action run() method must either be '
'True or False, got: %s\n' % (action_status))
sys.stderr.write(INVALID_STATUS_ERROR_MESSAGE)
sys.exit(PYTHON_RUNNER_INVALID_ACTION_STATUS_EXIT_CODE)

if action_status is not None and isinstance(action_status, bool):
action_output['status'] = action_status

# Special case if result object is not JSON serializable - aka user wanted to return a
# non-simple type (e.g. class instance or other non-JSON serializable type)
try:
json.dumps(action_output['result'])
except TypeError:
action_output['result'] = str(action_output['result'])
# limit the memory of the action
limit_memory(self._max_memory * 1024 * 1024)

try:
print_output = json.dumps(action_output)
except Exception:
print_output = str(action_output)
action = self._get_action_instance()
output = action.run(**self._parameters)

if isinstance(output, tuple) and len(output) == 2:
# run() method returned status and data - (status, data)
action_status = output[0]
action_result = output[1]
else:
# run() method returned only data, no status (pre StackStorm v1.6)
action_status = None
action_result = output

action_output = {
'result': action_result,
'status': None
}

if action_status is not None and not isinstance(action_status, bool):
sys.stderr.write('Status returned from the action run() method must either be '
'True or False, got: %s\n' % (action_status))
sys.stderr.write(INVALID_STATUS_ERROR_MESSAGE)
sys.exit(PYTHON_RUNNER_INVALID_ACTION_STATUS_EXIT_CODE)

if action_status is not None and isinstance(action_status, bool):
action_output['status'] = action_status

# Special case if result object is not JSON serializable - aka user wanted to return a
# non-simple type (e.g. class instance or other non-JSON serializable type)
try:
json.dumps(action_output['result'])
except TypeError:
action_output['result'] = str(action_output['result'])

# Print output to stdout so the parent can capture it
sys.stdout.write(ACTION_OUTPUT_RESULT_DELIMITER)
sys.stdout.write(print_output + '\n')
sys.stdout.write(ACTION_OUTPUT_RESULT_DELIMITER)
sys.stdout.flush()
try:
print_output = json.dumps(action_output)
except Exception:
print_output = str(action_output)

if self._max_output_size and sys.getsizeof(print_output) > self._max_output_size * 1024 * 1024:
sys.stderr.write(f'The action has reached the maximum allowable output size.\n'
f'Maximum allowable output size: {self._max_output_size}MB.\n')
sys.exit(PYTHON_RUNNER_INVALID_ACTION_STATUS_EXIT_CODE)

# Print output to stdout so the parent can capture it
sys.stdout.write(ACTION_OUTPUT_RESULT_DELIMITER)
sys.stdout.write(print_output + '\n')
sys.stdout.write(ACTION_OUTPUT_RESULT_DELIMITER)
sys.stdout.flush()
except MemoryError:
sys.stderr.write(f'The action has reached the maximum allowable memory.\n'
f'Maximum allowable memory: {self._max_memory}MB')
sys.exit(PYTHON_RUNNER_INVALID_ACTION_STATUS_EXIT_CODE)

def _get_action_instance(self):
try:
Expand Down Expand Up @@ -281,12 +303,17 @@ def _get_action_instance(self):
' JSON')
parser.add_argument('--log-level', required=False, default=PYTHON_RUNNER_DEFAULT_LOG_LEVEL,
help='Log level for actions')
parser.add_argument('--max-memory', required=False, default=0, help='Maximum allowed memory')
parser.add_argument('--max-output-size', required=False, default=0, help='Maximum allowed output size')

args = parser.parse_args()

config = json.loads(args.config) if args.config else {}
user = args.user
parent_args = json.loads(args.parent_args) if args.parent_args else []
log_level = args.log_level
max_output_size = int(args.max_output_size)
max_memory = int(args.max_memory)

if not isinstance(config, dict):
raise ValueError('Pack config needs to be a dictionary')
Expand Down Expand Up @@ -329,6 +356,8 @@ def _get_action_instance(self):
parameters=parameters,
user=user,
parent_args=parent_args,
log_level=log_level)
log_level=log_level,
max_memory=max_memory,
max_output_size=max_output_size)

obj.run()
15 changes: 15 additions & 0 deletions contrib/runners/python_runner/python_runner/python_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
from st2common.util.shell import quote_unix
from st2common.services.action import store_execution_output_data
from st2common.runners.utils import make_read_and_store_stream_func
import st2common.config as common_config

from python_runner import python_action_wrapper

Expand Down Expand Up @@ -82,6 +83,18 @@
WRAPPER_SCRIPT_PATH = os.path.join(BASE_DIR, WRAPPER_SCRIPT_NAME)


action_performance_opts = [
cfg.IntOpt(
'action_max_memory_mb', default=0,
help='Action maximum allowable memory (in MB) - 0 is unlimited'),
cfg.IntOpt(
'action_max_output_size_mb', default=0,
help='Action maximum allowable output size (in MB) - 0 is unlimited'),
]

common_config.do_register_opts(action_performance_opts, 'performance', True)


class PythonRunner(GitWorktreeActionRunner):

def __init__(self, runner_id, config=None, timeout=PYTHON_RUNNER_DEFAULT_ACTION_TIMEOUT,
Expand Down Expand Up @@ -167,6 +180,8 @@ def run(self, action_parameters):
'--file-path=%s' % (self.entry_point),
'--user=%s' % (user),
'--parent-args=%s' % (parent_args),
'--max-memory=%s' % cfg.CONF.performance.action_max_memory_mb,
'--max-output-size=%s' % cfg.CONF.performance.action_max_output_size_mb,
]

subprocess = concurrency.get_subprocess_module()
Expand Down