diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..57af17a --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,49 @@ +{ + "version": "0.2.0", + "configurations": [{ + "name": "cfecs-update", + "type": "python", + "pythonPath": "${config.python.pythonPath}", + "request": "launch", + "stopOnEntry": true, + "console": "none", + //"program": "${file}", + "program": "${workspaceRoot}/cfecs-update", + "args": [ + "us-west-1", + "default", + "pdf_service", + "--debug", + "--kill-tasks" + ], + "cwd": "${workspaceRoot}", + "debugOptions": [ + "WaitOnAbnormalExit", + "WaitOnNormalExit", + "RedirectOutput" + ], + "env": { + //"name": "value" + } + }, + { + "name": "Current File", + "type": "python", + "pythonPath": "${config.python.pythonPath}", + "request": "launch", + "stopOnEntry": true, + "console": "none", + "program": "${file}", + "args": [], + "cwd": "${workspaceRoot}", + "debugOptions": [ + "WaitOnAbnormalExit", + "WaitOnNormalExit", + "RedirectOutput" + ], + "env": { + //"name": "value" + } + } + ] +} \ No newline at end of file diff --git a/README.md b/README.md index 0dc721f..1bd316f 100644 --- a/README.md +++ b/README.md @@ -81,7 +81,8 @@ steps: ### Usage with docker ```bash -docker run --rm -it -e AWS_ACCESS_KEY_ID=**** -e AWS_SECRET_ACCESS_KEY=**** codefresh/cf-ecs-deploy cfecs-update [options] +docker run --rm -it -e AWS_ACCESS_KEY_ID=**** -e AWS_SECRET_ACCESS_KEY=**** codefresh/cf-ecs-deploy \ + cfecs-update [options] ``` ### cfecs-update -h @@ -101,6 +102,8 @@ optional arguments: -h, --help show this help message and exit --wait Wait for deployment to complete (default) --no-wait No Wait for deployment to complete + --kill-tasks De-register current task definition & stop all running tasks related to service + --no-kill-tasks Leave current running taks alone, they will need to be manually killed. On by default --timeout TIMEOUT deployment wait timeout (default 900s) --max-failed MAX_FAILED max failed tasks to consider deployment as failed diff --git a/cfecs-update b/cfecs-update index 1633f59..08e71cc 100755 --- a/cfecs-update +++ b/cfecs-update @@ -1,6 +1,13 @@ #!/usr/bin/env python -# -import sys, argparse, cfecs, logging, pprint +# -*- coding: utf-8 -*- +"""Fill in Google style docstrings here +""" + +import sys +import argparse +import pprint +import logging +import cfecs def print_usage(): print ("Usage: \n" @@ -8,42 +15,58 @@ def print_usage(): if __name__ == '__main__': - parser = argparse.ArgumentParser(description='Codefresh ECS Deploy') + PARSER = argparse.ArgumentParser(description='Codefresh ECS Deploy') - parser.add_argument('region_name', help="AWS Region, ex. us-east-1") - parser.add_argument('cluster_name', help="ECS Cluster Name") - parser.add_argument('service_name', help="ECS Service Name") + PARSER.add_argument('region_name', help="AWS Region, ex. us-east-1") + PARSER.add_argument('cluster_name', help="ECS Cluster Name") + PARSER.add_argument('service_name', help="ECS Service Name") - image_group = parser.add_argument_group() - image_group.add_argument('-i', '--image-name', action = 'store', dest = 'image_name', help='Image Name in ECS Task Definition to set new tag') - image_group.add_argument('-t', '--image-tag', action = 'store', dest = 'image_tag', help='Tag for the image') + IMAGE_GROUP = PARSER.add_argument_group() + IMAGE_GROUP.add_argument('-i', '--image-name', action='store', dest='image_name', \ + help='Image Name in ECS Task Definition to set new tag') + IMAGE_GROUP.add_argument('-t', '--image-tag', action='store', dest='image_tag', \ + help='Tag for the image') - args_wait_group = parser.add_mutually_exclusive_group() - args_wait_group.add_argument('--wait', action = 'store_true', dest = 'wait', default=True, help='Wait for deployment to complete (default)') - args_wait_group.add_argument('--no-wait', action = 'store_false', dest = 'wait', help='No Wait for deployment to complete') + ARGS_WAIT_GROUP = PARSER.add_mutually_exclusive_group() + ARGS_WAIT_GROUP.add_argument('--wait', action='store_true', dest='wait', default=True, \ + help='Wait for deployment to complete (default)') + ARGS_WAIT_GROUP.add_argument('--no-wait', action='store_false', dest='wait', \ + help='No Wait for deployment to complete') - parser.add_argument('--timeout', action = 'store', dest = 'timeout', default = cfecs.DEPLOY_TIMEOUT, help='deployment wait timeout (default 900s)') - parser.add_argument('--max-failed', action = 'store', dest = 'max_failed', default = cfecs.MAX_FAILED_TASKS, help='max failed tasks to consider deployment as failed (default 4)') - parser.add_argument('--debug', action = 'store_true', dest = 'debug', help='show debug messages') + ARGS_KILL_GROUP = PARSER.add_mutually_exclusive_group() + ARGS_KILL_GROUP.add_argument('--no-kill-tasks', action='store_false', dest='kill_tasks', \ + default=False, \ + help='Leave current running taks alone, they will need to be manually killed') + ARGS_KILL_GROUP.add_argument('--kill-tasks', action='store_true', dest='kill_tasks', \ + default=True, \ + help='De-register current task definition & stop all running tasks related to service') + PARSER.add_argument('--timeout', action='store', dest='timeout', default=cfecs.DEPLOY_TIMEOUT, \ + help='deployment wait timeout (default 900s)') + PARSER.add_argument( \ + '--max-failed', action='store', dest='max_failed', default=cfecs.MAX_FAILED_TASKS, \ + help='max failed tasks to consider deployment as failed (default 4)') + PARSER.add_argument('--debug', action='store_true', dest='debug', \ + help='show debug messages') - args = parser.parse_args() + ARGS = PARSER.parse_args() try: - if args.debug: + if ARGS.debug: cfecs.init_log(logging.DEBUG) - response = cfecs.update_service(args.cluster_name, args.service_name, region_name=args.region_name, - wait=args.wait, deploy_timeout=args.timeout, max_failed=args.max_failed, - image_name=args.image_name, image_tag=args.image_tag) + RESPONSE = cfecs.update_service(ARGS.cluster_name, ARGS.service_name, \ + region_name=ARGS.region_name, wait=ARGS.wait, deploy_timeout=ARGS.timeout, \ + max_failed=ARGS.max_failed, image_name=ARGS.image_name, image_tag=ARGS.image_tag, \ + kill_tasks=ARGS.kill_tasks) - if not response.get("status") or response["status"] in (cfecs.C_FAIL, cfecs.C_TIMEOUT): - cfecs.log.error("ERROR: {}".format(pprint.pformat(response))) + if not RESPONSE.get("status") or RESPONSE["status"] in (cfecs.C_FAIL, cfecs.C_TIMEOUT): + cfecs.log.error("ERROR: %s", pprint.pformat(RESPONSE)) sys.exit(1) - cfecs.log.debug(pprint.pformat(response)) - cfecs.log.info("ECS Deploy completed with status {}".format(response.get("status"))) + cfecs.log.debug(pprint.pformat(RESPONSE)) + cfecs.log.info("ECS Deploy completed with status %s", RESPONSE.get("status")) - except Exception as e: - cfecs.log.error("ERROR: {}".format(e.message)) + except Exception as err: + cfecs.log.error("ERROR: %s", err.message) sys.exit(1) diff --git a/cfecs/__init__.py b/cfecs/__init__.py index 976e3fd..4c11708 100644 --- a/cfecs/__init__.py +++ b/cfecs/__init__.py @@ -1,5 +1,15 @@ -import logging, pprint, boto3, time, copy, pytz +# -*- coding: utf-8 -*- +"""Fill in Google style docstrings here +""" + +import time from datetime import datetime +import logging +import pprint +import copy +import boto3 +import pytz +import json C_SUCCESS = 'SUCCESS' C_FAIL = 'FAIL' @@ -11,14 +21,15 @@ def now(): LOGGER_NAME = 'cfecs_logger' def init_log(level=logging.INFO, log_stdout=True): - log = logging.getLogger(LOGGER_NAME) - if not [sh for sh in log.handlers if sh.__class__.__name__ == 'StreamHandler'] and log_stdout: - log.addHandler(logging.StreamHandler()) - - log.setLevel(level) - for h in log.handlers: - h.setLevel(level) - return log + logger = logging.getLogger(LOGGER_NAME) + if not [sh for sh in logger.handlers if sh.__class__.__name__ == 'StreamHandler'] \ + and log_stdout: + logger.addHandler(logging.StreamHandler()) + logger.setLevel(level) + for hdlr in logger.handlers: + hdlr.setLevel(level) + return logger + log = init_log() def get_ecs(**kwargs): @@ -45,10 +56,13 @@ def _ecs_service_dsp(service): del _service["events"] return _service +def _format_json(obj): + return pprint.pformat(obj, indent=4) -WAIT_SLEEP=10 -DEPLOY_TIMEOUT=900 -MAX_FAILED_TASKS=2 +WAIT_SLEEP = 10 +SHORT_SLEEP = 3 +DEPLOY_TIMEOUT = 900 +MAX_FAILED_TASKS = 2 def wait_for_deployment(cluster_name, service_name, ecs=None, **kwargs): """ @@ -60,9 +74,10 @@ def wait_for_deployment(cluster_name, service_name, ecs=None, **kwargs): :return: """ - log.info("\n---------------------\nWaiting For Deployment: cluster = {} , service = {} ...".format(cluster_name, service_name)) + log.info("\n---------------------\nWaiting For Deployment: cluster = {} , \ + service = {} ...".format(cluster_name, service_name)) if not ecs: - ecs = get_ecs(region_name = kwargs.get("region_name")) + ecs = get_ecs(region_name=kwargs.get("region_name")) d_start = datetime.now() deploy_timeout = kwargs.get('deploy_timeout') or DEPLOY_TIMEOUT @@ -71,35 +86,42 @@ def wait_for_deployment(cluster_name, service_name, ecs=None, **kwargs): log.info("Wait until runningCount will be equal to desiredCount for PRIMARY service task ... ") while True: time.sleep(WAIT_SLEEP) - log.info("\n........... {}".format(now())) + log.info("\n........... %s", now()) - service = ecs.describe_services(cluster=cluster_name, services=[service_name])['services'][0] + service = ecs.describe_services( + cluster=cluster_name, services=[service_name])['services'][0] task_definition_arn = service["taskDefinition"] deployments = service["deployments"] deployment_created_at = None - for d in deployments: - log.info( " {} task {} - runningCount = {} , desiredCount = {}, pendingCount = {}". - format(d["status"], _ecs_arn_dsp(d["taskDefinition"]), d.get("runningCount"), d.get("desiredCount"), d.get("pendingCount"))) + for dep in deployments: + log.info(" {} task {} - runningCount = {} , desiredCount = {}, pendingCount = {}".format( \ + dep["status"], _ecs_arn_dsp(dep["taskDefinition"]), dep.get("runningCount"), \ + dep.get("desiredCount"), dep.get("pendingCount"))) - if d['status'] == 'PRIMARY': - deployment_created_at = d['createdAt'] + if dep['status'] == 'PRIMARY': + deployment_created_at = dep['createdAt'] - if d['status'] == 'PRIMARY' and d["desiredCount"] == d["runningCount"]: + if dep['status'] == 'PRIMARY' and dep["desiredCount"] == dep["runningCount"]: log.info("Deployment completed Successfully!!!") return {"status": C_SUCCESS, "service": _ecs_service_dsp(service)} # Check for failed tasks every 30s if (datetime.now() - d_start).total_seconds() > 30: failed_tasks = get_failed_tasks(cluster_name, service_name, task_definition_arn, ecs, - max_results=max_failed_tasks, created_after=deployment_created_at) + max_results=max_failed_tasks, \ + created_after=deployment_created_at) if failed_tasks and len(failed_tasks) >= max_failed_tasks: - log.error("ERROR: {} or more ecs tasks failed".format(max_failed_tasks)) - log.error(pprint.pformat(failed_tasks)) + log.error("ERROR: %d or more ecs tasks failed", max_failed_tasks) + log.error(_format_json(failed_tasks)) return {"status": C_FAIL, "failed_tasks": failed_tasks} if (datetime.now() - d_start).total_seconds() > deploy_timeout: - log.error("ERROR: Deploy Timeout {}s reached ".format(deploy_timeout)) - return {"status": C_TIMEOUT, "service": _ecs_service_dsp(service), "failed_tasks": failed_tasks} + log.error("ERROR: Deploy Timeout %ds reached ", deploy_timeout) + return { + "status": C_TIMEOUT, + "service": _ecs_service_dsp(service), + "failed_tasks": failed_tasks + } def get_failed_tasks(cluster_name, service_name, task_definition_arn, ecs=None, **kwargs): """ @@ -114,24 +136,29 @@ def get_failed_tasks(cluster_name, service_name, task_definition_arn, ecs=None, """ region_name = kwargs.get("region_name") - created_after = (kwargs.get('created_after') or datetime.fromtimestamp(1)).replace(tzinfo=pytz.utc) + created_after = (kwargs.get('created_after') or \ + datetime.fromtimestamp(1)).replace(tzinfo=pytz.utc) + max_results = kwargs.get('max_results') next_token = kwargs.get('next_token') or "" if not ecs: - ecs = get_ecs(region_name = region_name) - + ecs = get_ecs(region_name=region_name) def is_task_failed(task): if task.get('lastStatus') != 'STOPPED': return False - if task.get('stoppedReason') and 'Scaling activity initiated by' not in task.get('stoppedReason'): + if task.get('stoppedReason') and \ + 'Scaling activity initiated by' not in task.get('stoppedReason'): return True - failed_containers = [t for t in task.get('containers') if t.get('reason') and 'error' in (t['reason']).lower or t.get('exitCode') > 0] + failed_containers = [t for t in task.get('containers') \ + if t.get('reason') and 'error' in (t['reason']).lower or \ + t.get('exitCode') > 0] if failed_containers: return True - task_list_resp = ecs.list_tasks(cluster=cluster_name, serviceName=service_name, desiredStatus='STOPPED', - maxResults=min((max_results or 100), 100), nextToken=next_token) + task_list_resp = ecs.list_tasks(cluster=cluster_name, serviceName=service_name, \ + desiredStatus='STOPPED', maxResults=min((max_results or 100), \ + 100), nextToken=next_token) next_token = task_list_resp.get('nextToken') task_arns_resp = task_list_resp.get("taskArns") if not task_arns_resp: @@ -139,29 +166,71 @@ def is_task_failed(task): tasks_all_resp = ecs.describe_tasks(cluster=cluster_name, tasks=task_arns_resp) tasks_all = tasks_all_resp.get('tasks') - failed_tasks = [t for t in tasks_all if t.get('taskDefinitionArn') == task_definition_arn and t.get('createdAt').replace(tzinfo=pytz.utc) > created_after and is_task_failed(t)] + failed_tasks = [t for t in tasks_all \ + if t.get('taskDefinitionArn') == task_definition_arn and \ + t.get('createdAt').replace(tzinfo=pytz.utc) > created_after and is_task_failed(t)] if not next_token or max_results and len(failed_tasks) >= max_results: return failed_tasks else: - return failed_tasks + get_failed_tasks(cluster_name, service_name, task_definition_arn, ecs, created_after=created_after, max_results=max_results, next_token=next_token) + return failed_tasks + get_failed_tasks( \ + cluster_name, service_name, task_definition_arn, ecs, \ + created_after=created_after, max_results=max_results, next_token=next_token) + +def _kill_running_tasks(cluster_name, service, ecs): + service_name = service['serviceName'] + current_task_def_arn = service["taskDefinition"] + log.info("\n---------------------\nDeRegistering: arn = %s, service = %s\n", \ + current_task_def_arn, service_name) + dereg_res = ecs.deregister_task_definition(taskDefinition=current_task_def_arn) + if not dereg_res or not dereg_res.get('taskDefinition'): + raise Exception("ERROR: Invalid response from aws: {}".format(_format_json(dereg_res))) + else: + log.info("De-register task def OK: %s", _format_json(dereg_res)) + time.sleep(SHORT_SLEEP) + log.info("\n........... %s", now()) + get_related_tasks_params = { + 'cluster': cluster_name, + 'maxResults': 100, + 'serviceName': service_name, + 'desiredStatus': 'RUNNING' + } + log.info("Get Running Services: %s", _format_json(get_related_tasks_params)) + response = ecs.list_tasks(**get_related_tasks_params) + if not response or not response.get('taskArns'): + raise Exception("ERROR: Invalid response from aws: {}".format(_format_json(response))) + + stop_count = 0 + for task in response.get('taskArns'): + log.info("Stopping Task: %s on cluster = %s", task, cluster_name) + stop_res = ecs.stop_task( \ + cluster=cluster_name, task=task, reason='cfes-update --kill-running-tasks') + if not stop_res or not stop_res.get('task'): + raise Exception("ERROR: Invalid response from aws: {}".format(_format_json(stop_res))) + else: + stop_count += 1 + log.debug("Stop task response: %s", _format_json(stop_res)) + time.sleep(SHORT_SLEEP) + log.info("\n........... %s", now()) + log.info("Tasks %d Stopped", stop_count) def update_service(cluster_name, service_name, ecs=None, **kwargs): - log.info("\n---------------------\nUpdating Service: cluster = {} , service = {}\n" - "{}".format(cluster_name, service_name, pprint.pformat(kwargs, indent=4))) + log.info("\n---------------------\nUpdating Service: cluster = %s , service = %s\n%s\n", \ + cluster_name, service_name, _format_json(kwargs)) if not ecs: - ecs = get_ecs(region_name = kwargs.get("region_name")) + ecs = get_ecs(region_name=kwargs.get("region_name")) services = ecs.describe_services(cluster=cluster_name, services=[service_name]) if not services or not services.get('services'): - raise Exception("ERROR: Cannot find service {} in cluster {}".format(service_name, cluster_name)) + raise Exception("ERROR: Cannot find service {} in cluster {}".format( \ + service_name, cluster_name)) service = services['services'][0] current_task_def_arn = service["taskDefinition"] - log.info('current task definition arn = {}'.format(current_task_def_arn)) + log.info("current task definition arn = %s", current_task_def_arn) - task_definition_desc = ecs.describe_task_definition(taskDefinition = current_task_def_arn) + task_definition_desc = ecs.describe_task_definition(taskDefinition=current_task_def_arn) task_definition = task_definition_desc['taskDefinition'] keys_to_remove = ["status", "taskDefinitionArn", "requiresAttributes", "revision"] for k in keys_to_remove: @@ -172,20 +241,25 @@ def update_service(cluster_name, service_name, ecs=None, **kwargs): new_image_name_tag = '{}:{}'.format(image_name, new_image_tag) if image_name and new_image_tag: _found = False - for c in task_definition['containerDefinitions']: - _image_name_split = c.get('image').split(':') + for cur in task_definition['containerDefinitions']: + _image_name_split = cur.get('image').split(':') if _image_name_split[0] == image_name: - log.info("Set new image: {} ( was {} )".format(new_image_name_tag, c.get('image'))) - c['image'] = new_image_name_tag + log.info("Set new image: %s ( was %s )", new_image_name_tag, cur.get('image')) + cur['image'] = new_image_name_tag _found = True break if not _found: - raise Exception("ERROR: Cannot find image {} in service {} of cluster {}".format(image_name, service_name, cluster_name)) + raise Exception("ERROR: Cannot find image {} in service {} of cluster {}".format( \ + image_name, service_name, cluster_name)) register_task_resp = ecs.register_task_definition(**task_definition) new_task_def_arn = register_task_resp['taskDefinition']['taskDefinitionArn'] - log.info("new task definition arn: {}".format(new_task_def_arn)) + log.info("new task definition arn: %s", new_task_def_arn) + + kill_tasks = kwargs.get('kill_tasks') + if kill_tasks: + _kill_running_tasks(cluster_name=cluster_name, service=service, ecs=ecs) update_service_params = { 'cluster': cluster_name, @@ -194,9 +268,11 @@ def update_service(cluster_name, service_name, ecs=None, **kwargs): 'taskDefinition': new_task_def_arn, 'deploymentConfiguration': service['deploymentConfiguration'] } - log.info("Updating Service: {}".format(pprint.pformat(update_service_params))) + log.info("Updating Service: %s", _format_json(update_service_params)) response = ecs.update_service(**update_service_params) - if not response or not response.get('service') or not response.get('ResponseMetadata') or response.get('ResponseMetadata').get('HTTPStatusCode') > 299: + if not response or not response.get('service') or \ + not response.get('ResponseMetadata') or \ + response.get('ResponseMetadata').get('HTTPStatusCode') > 299: raise Exception("ERROR: Invalid response from aws: {}".format(response)) wait = kwargs.get('wait')