diff --git a/apluslms_shepherd/build/models.py b/apluslms_shepherd/build/models.py index dba63bf..7261d1f 100644 --- a/apluslms_shepherd/build/models.py +++ b/apluslms_shepherd/build/models.py @@ -27,8 +27,7 @@ class Build(db.Model): number = db.Column(db.Integer, primary_key=True) start_time = db.Column(db.DateTime) end_time = db.Column(db.DateTime) - state = db.Column(db.Enum(BuildState)) - step = db.Column(db.Enum(BuildStep)) + result = db.Column(db.Enum(BuildState), default=0) instance = db.relationship('CourseInstance', backref=db.backref('builds', cascade="save-update, merge, " "delete")) @@ -39,7 +38,7 @@ class BuildLog(db.Model): number = db.Column(db.Integer, primary_key=True) step = db.Column(db.Enum(BuildStep), primary_key=True) roman_step = db.Column(db.String, primary_key=True, default="Roman is not running") - result = db.Column(db.Boolean) + result = db.Column(db.Enum(BuildState)) start_time = db.Column(db.DateTime) end_time = db.Column(db.DateTime) log_text = db.Column(db.Text) diff --git a/apluslms_shepherd/build/tasks/__init__.py b/apluslms_shepherd/build/tasks/__init__.py index 48078a2..eb427bd 100644 --- a/apluslms_shepherd/build/tasks/__init__.py +++ b/apluslms_shepherd/build/tasks/__init__.py @@ -1,3 +1,3 @@ from apluslms_shepherd.observer.observer import ShepherdObserver -build_observer = ShepherdObserver() +observer = ShepherdObserver() diff --git a/apluslms_shepherd/build/tasks/signals.py b/apluslms_shepherd/build/tasks/signals.py index 7da491a..3f1beb1 100644 --- a/apluslms_shepherd/build/tasks/signals.py +++ b/apluslms_shepherd/build/tasks/signals.py @@ -1,11 +1,12 @@ +from datetime import datetime + from celery.signals import task_prerun, task_postrun, task_failure, before_task_publish from celery.utils.log import get_task_logger from celery.worker.control import revoke from apluslms_shepherd.build.models import Build, BuildStep, BuildState -from apluslms_shepherd.build.tasks import build_observer from apluslms_shepherd.courses.models import CourseInstance -from apluslms_shepherd.extensions import celery +from apluslms_shepherd.extensions import celery, db logger = get_task_logger(__name__) @@ -40,18 +41,12 @@ def clone_task_before_publish(sender=None, headers=None, body=None, **kwargs): logger.warning('No such course instance in the database') revoke(info["id"], terminate=True) return - build_observer.update_database(ins.id, current_build_number, BuildStep.CLONE, BuildState.PUBLISH) + build = Build(course_id=ins.id, number=current_build_number, start_time=datetime.utcnow(), result=BuildState.NONE) + db.session.add(build) + db.session.commit() logger.warning('clone_log') logger.warning('Task sent') - build_observer.enter_prepare() - build_observer.state_update(ins.id, current_build_number, BuildStep.CLONE, BuildState.PUBLISH, - "-------------------------------------------------New Build Start-------------------------------------------------\n " - "Instance with course_key:{}, instance_key:{} entering task queue, this is build No.{} \n".format( - course_key, - instance_key, - current_build_number)) logger.warning('Current state sent to frontend') - build_observer.step_pending(BuildStep.CLONE) @task_prerun.connect @@ -63,76 +58,54 @@ def task_prerun(task_id=None, sender=None, *args, **kwargs): # using the task protocol version 2. if sender.__name__ not in build_tasks: return - if sender.__name__ == 'pull_repo': - build_observer.enter_prepare() - else: - build_observer.enter_build() - build_observer.step_running(task_step_mapping[sender.__name__]) logger.warning(sender.__name__ + ' pre_run') - current_build_number = kwargs['args'][-1] + step = task_step_mapping[sender.__name__] instance_key = kwargs['args'][-2] course_key = kwargs['args'][-3] - logger.info('task_prerun for task id {}'.format( - task_id - )) - logger.info('course_key:{}, instance_key:{}'.format(course_key, instance_key)) + log_txt = 'task_prerun for task id {}'.format(task_id) + logger.info(log_txt) + log_txt = 'course_key:{}, instance_key:{}'.format(course_key, instance_key) + logger.info(log_txt) with celery.app.app_context(): # Get course instance by course key and instance key ins = CourseInstance.query.filter_by(course_key=course_key, instance_key=instance_key).first() # If no such instance in database, stop the task if ins is None: - logger.error('No such course instance inthe database') + logger.error('No such course instance in the database') revoke(task_id, terminate=True) return - build_observer.update_database(ins.id, current_build_number, task_step_mapping[sender.__name__], - BuildState.RUNNING) + log_txt = 'Task {} for course_key:{}, instance_key:{} starts running, current step: {}'.format( + sender.__name__, course_key, + instance_key, step.name) + logger.info(log_txt) # Send the state to frontend - build_observer.state_update(ins.id, current_build_number, task_step_mapping[sender.__name__], - BuildState.RUNNING, - 'Task {} for course_key:{}, instance_key:{} starts running\n'.format( - sender.__name__, course_key, - instance_key)) - build_observer.step_running(task_step_mapping[sender.__name__]) @task_postrun.connect -def task_postrun(task_id=None, sender=None, state=None, retval=None, *args, **kwargs): +def task_postrun(task_id=None, sender=None, task_result=None, retval=None, *args, **kwargs): """ - Triggered when task is finished + Triggered when task is finished, in this step , save the task result to database. """ # information about task are located in headers for task messages # using the task protocol version 2. if sender.__name__ not in build_tasks: return logger.warning(sender.__name__ + ' post_run') - current_build_number = kwargs['args'][-1] + step = task_step_mapping[sender.__name__] instance_key = kwargs['args'][-2] course_key = kwargs['args'][-3] - logger.info('task_postrun for task id {}'.format( - task_id - )) - logger.info('course_key:{}, instance_key:{}'.format(course_key, instance_key)) - with celery.app.app_context(): - # Get the instance id - course_id = CourseInstance.query.filter_by(course_key=course_key, instance_key=instance_key).first().id - # add end time for build entry and buildlog entry, change build state - logger.warning('finished') - build = Build.query.filter_by(course_id=course_id, - number=current_build_number).first() - # The state code is in the beginning, divided with main part by "|" - log_text = retval['msg'] - if retval['code'] == 0: - state = BuildState.SUCCESS - elif retval['code'] == 5: - state = BuildState.CANCELED - else: - state = BuildState.FAILED - build_observer.update_database(course_id, current_build_number, task_step_mapping[sender.__name__], state, - log_text) - build_observer.state_update(course_id, current_build_number, task_step_mapping[sender.__name__], - build.state, - log_text.replace('\\r', '\r').replace('\\n', '\n') + '\n') - build_observer.step_succeeded(task_step_mapping[sender.__name__]) + if retval['code'] == 0: + task_result = BuildState.SUCCESS + elif retval['code'] == 5: + task_result = BuildState.CANCELED + else: + task_result = BuildState.FAILED + log_text = 'task id {} with course_key: {}, instance_key: {}, step:{} finished. result: {}'.format(task_id, + course_key, + instance_key, + step.name, + task_result.name) + logger.info(log_text) @task_failure.connect @@ -146,16 +119,14 @@ def task_failure(task_id=None, sender=None, *args, **kwargs): logger.info('task_failure for task id {}'.format( task_id )) - current_build_number = kwargs['args'][-1] instance_key = kwargs['args'][-2] course_key = kwargs['args'][-3] + step = task_step_mapping[sender.__name__] with celery.app.app_context(): - course_id = CourseInstance.query.filter_by(course_key=course_key, instance_key=instance_key).first().id - - logger.warning('finished') - build_observer.update_database(course_id, current_build_number, task_step_mapping[sender.__name__], - BuildState.SUCCESS, ) - build_observer.state_update(course_id, current_build_number, task_step_mapping[sender.__name__], - BuildState.FAILED, - 'Task {} is Failed.\n'.format(sender.__name__)) - build_observer.step_failed(task_step_mapping[sender.__name__]) + logger.warning('failed') + log_text = 'task_failed for task id {}, course_key: {}, instance_key: {} at step {}. result: {}'.format(task_id, + course_key, + instance_key, + step.name, + BuildState.FAILED.name) + logger.error(log_text) diff --git a/apluslms_shepherd/build/tasks/tasks.py b/apluslms_shepherd/build/tasks/tasks.py index 5636c1e..fbce3ad 100644 --- a/apluslms_shepherd/build/tasks/tasks.py +++ b/apluslms_shepherd/build/tasks/tasks.py @@ -1,9 +1,11 @@ import os import shutil -from apluslms_shepherd.build.tasks import build_observer -from apluslms_shepherd.build.tasks.signals import task_step_mapping +from datetime import datetime + from apluslms_shepherd.build.tasks.utils import bare_clone, get_current_build_number_list, roman_build, slugify +from apluslms_shepherd.observer.observer import ShepherdObserver +from apluslms_shepherd.observer.utils import get_logger, BrokerClient try: from subprocess import DEVNULL # Python 3 @@ -13,16 +15,15 @@ from celery.result import AsyncResult from celery.utils.log import get_task_logger -from apluslms_shepherd.build.models import BuildState -from apluslms_shepherd.courses.models import CourseInstance -from apluslms_shepherd.extensions import celery +from apluslms_shepherd.build.models import BuildState, BuildStep, Build +from apluslms_shepherd.extensions import celery, db from apluslms_shepherd.config import DevelopmentConfig logger = get_task_logger(__name__) @celery.task -def update_state(course_id, build_number, state, action, log): +def update_state(course_id, build_number, state, step, roman_step, log): """ Take the updated state to MQ, this task is not going to the worker """ @@ -30,96 +31,149 @@ def update_state(course_id, build_number, state, action, log): @celery.task -def pull_repo(base_path, url, branch, course_key, instance_key, build_number): +def pull_repo(base_path, url, branch, course_id, course_key, instance_key, build_number): """ Clone bear repo to local, or update local one, generate working tree. """ - build_observer.enter_prepare() + build = Build(course_id=course_id, number=build_number, start_time=datetime.utcnow(), result=BuildState.NONE) + db.session.add(build) + db.session.commit() + course_info = {'course_id': course_id, 'number': build_number} + observer_logger = get_logger(db.session, course_info) + observer = ShepherdObserver(observer_logger, BrokerClient('apluslms_shepherd.celery_tasks.build.tasks.update_state', + 'celery_state', course_info)) + observer.enter_prepare() + observer.shepherd_step_start(BuildStep.CLONE) + observer.shepherd_msg(BuildStep.CLONE, BuildState.RUNNING, + 'url:%s, branch:%s course_key:%s instance_key:%s' % (url, branch, course_key, instance_key)) logger.info('url:%s, branch:%s course_key:%s instance_key:%s', url, branch, course_key, instance_key) + observer.shepherd_msg(BuildStep.CLONE, BuildState.RUNNING, + "Pulling from %s" % url) logger.info("Pulling from %s", url) args = [base_path, url, course_key, instance_key, branch, build_number, - os.path.join(DevelopmentConfig.REPO_KEYS_PATH, slugify(url), 'private.pem'), build_observer] + os.path.join(DevelopmentConfig.REPO_KEYS_PATH, slugify(url), 'private.pem'), observer] return_code = bare_clone(*args) + observer.shepherd_step_end(BuildStep.CLONE, BuildState.SUCCESS if return_code == 0 else BuildState.FAILED) return {'code': return_code, 'msg': ''} @celery.task -def build_repo(pull_result, base_path, course_key, instance_key, build_number): +def build_repo(pull_result, base_path, course_id, course_key, instance_key, build_number): """ build the course material with roman """ + course_info = {'course_id': course_id, 'number': build_number} + observer_logger = get_logger(db.session, course_info) + observer = ShepherdObserver(observer_logger, BrokerClient('apluslms_shepherd.celery_tasks.build.tasks.update_state', + 'celery_state', course_info)) + observer.enter_build() + observer.shepherd_step_start(BuildStep.BUILD) logger.info("pull_repo result: %s", str(pull_result)) - build_observer.enter_build() + observer.shepherd_msg(BuildStep.BUILD, BuildState.RUNNING, "pull_repo result: %s" % pull_result) # Check the result of last step if not pull_result['code'] == 0: logger.error('The clone task was failed, aborting the build task') + observer.shepherd_msg(BuildStep.BUILD, BuildState.CANCELED, + 'The clone task was failed, aborting the build task') + observer.shepherd_step_end(BuildStep.BUILD, BuildState.CANCELED) return {'code': -1, 'msg': 'The clone task was failed, aborting the build task.'} - log = "The repo has been pulled, Building the course, course key:{}, branch:{}\n".format(course_key, instance_key) + log = "The repo has been pulled, Building the course, course key:{}, branch:{}".format(course_key, instance_key) logger.info(log) - ins = CourseInstance.query.filter_by(course_key=course_key, instance_key=instance_key).first() - build_observer.state_update(ins.id, build_number, task_step_mapping['build_repo'], BuildState.RUNNING, - log) + observer.shepherd_msg(BuildStep.BUILD, BuildState.RUNNING, log) number_list = get_current_build_number_list() - log = "Current build task number of this instance in the queue:{}\n".format(number_list) + log = "Current build task number of this instance in the queue:{}".format(number_list) + observer.shepherd_msg(BuildStep.BUILD, BuildState.RUNNING, log) logger.info(log) - build_observer.state_update(ins.id, build_number, task_step_mapping['build_repo'], BuildState.RUNNING, - log) try: if int(build_number) < max(number_list): - logger.warning( - "Already have newer version in the task queue, task with build number %s aborted.", build_number) + logger.warning("Already have newer version in the task queue, task with build number %s aborted.", + build_number) + observer.shepherd_msg(BuildStep.BUILD, BuildState.CANCELED, + "Already have newer version in the task queue, task with build number %s aborted." % build_number) + observer.shepherd_step_end(BuildStep.BUILD, BuildState.CANCELED) return {'code': 5, 'msg': "Already have newer version in the task queue, task with build number {} " "aborted.".format(build_number)} except (ValueError, TypeError): - logger.error("Cannot compare current build number with max number in the queue") - code = roman_build(base_path, ins.id, course_key, instance_key, build_number) - return {'code': code, 'msg': ''} + log = "Cannot compare current build number with max number in the queue" + logger.error(log) + observer.shepherd_msg(BuildStep.BUILD, BuildState.FAILED, + log) + observer.shepherd_step_end(BuildStep.BUILD, BuildState.FAILED) + return {'code': 1, 'msg': log} + return_code = roman_build(base_path, course_key, instance_key, build_number, observer) + observer.shepherd_step_end(BuildStep.BUILD, BuildState.SUCCESS if return_code == 0 else BuildState.FAILED) + return {'code': return_code, 'msg': 'Build success'} @celery.task -def deploy(build_result, deploy_base_path, base_path, course_key, instance_key, build_number): +def deploy(build_result, deploy_base_path, base_path, course_id, course_key, instance_key, build_number): """ Copy the build filed to deploy folder TODO: Support remote deploy location(Cloud .etc) """ + course_info = {'course_id': course_id, 'number': build_number} + observer_logger = get_logger(db.session, course_info) + observer = ShepherdObserver(observer_logger, BrokerClient('apluslms_shepherd.celery_tasks.build.tasks.update_state', + 'celery_state', course_info)) + observer.enter_build() + observer.shepherd_step_start(BuildStep.DEPLOY) # Check the last step logger.info("build_repo result %s", build_result) if not build_result['code'] == 0: logger.error('The build task was failed, aborting the deployment task') - return {'code': -1, 'msg': 'The clone task was failed or aborted, aborting the build task'} + observer.shepherd_step_end(BuildStep.DEPLOY, BuildState.CANCELED) + return {'code': 5, 'msg': 'The clone task was failed or aborted, aborting the build task'} # Check is there has a newer version in the queue.If true, cancel the task and start cleaning number_list = get_current_build_number_list() if int(build_number) < max(number_list): logger.warning("Already have newer version in the task queue, task with build number %s aborted.", build_number) + observer.shepherd_step_end(BuildStep.DEPLOY, BuildState.CANCELED) return {'code': 5, 'msg': "Newer version in the task queue, task with build number {} aborted. Cleaning the local " "repo".format(build_number)} - logger.info( - "The repo has been build, deploying the course, course key: %s, branch: %s", course_key, instance_key) + log = "The repo has been build, deploying the course, course key: %s, branch: %s" % (course_key, instance_key) + observer.shepherd_msg(BuildStep.DEPLOY, BuildState.RUNNING, log) try: build_path = os.path.join(base_path, 'builds', course_key, instance_key, build_number, "_build") deploy_path = os.path.join(deploy_base_path, course_key, instance_key, build_number) shutil.move(build_path, deploy_path) except (FileNotFoundError, OSError, IOError) as why: - logger.error('Error: %', why.strerror) + log = 'Error: %', why.strerror + logger.error(log) + observer.shepherd_msg(BuildStep.DEPLOY, BuildState.FAILED, log) + observer.shepherd_step_end(BuildStep.DEPLOY, BuildState.FAILED) return {'code': 1, 'msg': 'Error when deploying files'} - return {'code': 0, 'msg': 'successfully moved to deployment folder.'} + log = "Successfully moved files to deployment folder." + observer.shepherd_msg(BuildStep.DEPLOY, BuildState.RUNNING, log) + observer.shepherd_step_end(BuildStep.DEPLOY, BuildState.SUCCESS) + return {'code': 0, 'msg': log} @celery.task -def clean(res, base_path, course_key, instance_key, build_number): +def clean(res, base_path, course_id, course_key, instance_key, build_number): """ Clean the generated working tree. """ + course_info = {'course_id': course_id, 'number': build_number} + observer_logger = get_logger(db.session, course_info) + observer = ShepherdObserver(observer_logger, BrokerClient('apluslms_shepherd.celery_tasks.build.tasks.update_state', + 'celery_state', course_info)) + observer.done() + observer.shepherd_step_start(BuildStep.CLEAN) logger.warning('Cleaning repo') path = os.path.join(base_path, 'builds', course_key, instance_key, build_number) try: - logger.warning("Local work tree of build number %s deleted", build_number) - build_observer.done() + log = "Local work tree of build number %s deleted" % build_number + observer.shepherd_msg(BuildStep.CLEAN, BuildState.RUNNING, log) + logger.warning(log) shutil.rmtree(path) + observer.shepherd_msg(BuildStep.CLEAN, BuildState.RUNNING, "Worktree cleaned, keep bare repo.") + observer.shepherd_step_end(BuildStep.CLEAN, BuildState.SUCCESS) return {'code': 0, 'msg': 'Worktree cleaned.'} except (FileNotFoundError, IOError, OSError) as why: logger.info('Error: %s', why.strerror) + observer.shepherd_msg(BuildStep.CLEAN, BuildState.FAILED, why.strerror) + observer.shepherd_step_end(BuildStep.CLEAN, BuildState.FAILED) return {'code': 1, 'msg': 'Error when cleaning local worktree files,'} diff --git a/apluslms_shepherd/build/tasks/utils.py b/apluslms_shepherd/build/tasks/utils.py index 58db05e..df124e3 100644 --- a/apluslms_shepherd/build/tasks/utils.py +++ b/apluslms_shepherd/build/tasks/utils.py @@ -10,8 +10,6 @@ from apluslms_shepherd import celery from apluslms_shepherd.build.models import BuildStep, BuildState -from apluslms_shepherd.courses.models import CourseInstance -from apluslms_shepherd.observer.observer import ShepherdObserver logger = get_task_logger(__name__) @@ -40,14 +38,15 @@ def bare_clone(base_path, origin, course, instance, branch, number, key_path, ob :return: boolean, if is succeeded """ has_private_key = isfile(key_path) - course_id = CourseInstance.query.filter_by(course_key=course, instance_key=instance).first().id if has_private_key: logger.info("Private key detected on {}.".format(key_path)) + observer.shepherd_msg(BuildStep.CLONE, BuildState.RUNNING,"Private key detected on {}.".format(key_path)) env = dict(SSH_ASKPASS="echo", GIT_TERMINAL_PROMPT="0", GIT_SSH_COMMAND="ssh -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -i {}".format( key_path)) else: logger.info("Private key cannot be found.") + observer.shepherd_msg(BuildStep.CLONE, BuildState.RUNNING, "Private key cannot be found.") env = dict(SSH_ASKPASS="echo", GIT_TERMINAL_PROMPT="0") repo_folder = join(base_path, quote(origin).split('/')[-1]) # Check is using clone or fetch @@ -55,24 +54,29 @@ def bare_clone(base_path, origin, course, instance, branch, number, key_path, ob logger.info("Find local repo, update") proc = subprocess.run(['git', 'fetch', 'origin', branch + ':' + branch], env=env, cwd=repo_folder, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - logger.info(proc.stdout) - observer.state_update(course_id, number, BuildStep.CLONE, BuildState.RUNNING, + observer.shepherd_msg(BuildStep.CLONE, BuildState.RUNNING, proc.stdout.decode('utf-8')) if proc.returncode != 0: logger.error(proc.returncode, proc.stdout) + observer.shepherd_msg(BuildStep.CLONE, BuildState.FAILED, + proc.stdout.decode('utf-8')) return proc.returncode else: logger.info('No local repo can be found, cloning from remote at ' + origin) + observer.shepherd_msg(BuildStep.CLONE, BuildState.RUNNING, + 'No local repo can be found, cloning from remote at %s' % origin) proc = subprocess.run(['git', 'clone', '--bare', origin], env=env, cwd=base_path, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) logger.info(proc.stdout) - observer.state_update(course_id, number, BuildStep.CLONE, BuildState.RUNNING, + observer.shepherd_msg(BuildStep.CLONE, BuildState.RUNNING, proc.stdout.decode('utf-8')) if proc.returncode != 0: logger.error('Error in cloning, program terminated. Code:', str(proc.returncode)) + observer.shepherd_msg(BuildStep.CLONE, BuildState.FAILED, 'Error in cloning, program terminated. Code: %s' % proc.returncode) return proc.returncode if not exists(repo_folder): logger.error('Cannot find cloned repo, terminated') + observer.shepherd_msg(BuildStep.CLONE, BuildState.FAILED, 'Cannot find cloned repo, terminated') return 1 # Generate worktree @@ -81,61 +85,94 @@ def bare_clone(base_path, origin, course, instance, branch, number, key_path, ob proc = subprocess.run(['git', 'worktree', 'add', '-f', worktree_path, branch], env=env, cwd=repo_folder, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) logger.info(proc.stdout) + observer.shepherd_msg(BuildStep.CLONE, BuildState.RUNNING, + proc.stdout.decode('utf-8')) if proc.returncode != 0: logger.error('Error in generating worktree, program terminated. Code:', str(proc.returncode)) return proc.returncode proc = subprocess.run("git submodule init && git submodule update --depth 1", shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=worktree_path) - logger.info(proc.stdout) + observer.shepherd_msg(BuildStep.CLONE, BuildState.RUNNING, + proc.stdout.decode('utf-8')) if proc.returncode != 0: logger.error('Error in generating worktree, program terminated. Code:', str(proc.returncode)) + observer.shepherd_msg(BuildStep.CLONE, BuildState.FAILED, + 'Error in generating worktree, program terminated. Code: %s' % proc.returncode) return proc.returncode return 0 -def roman_build(base_path, course_id, course_key, instance_key, build_number, config_filename=None): - shepherd_builder_observer = ShepherdObserver([course_id, build_number, BuildStep.BUILD.name, None, ""]) +def roman_build(base_path, course_key, instance_key, build_number, observer, config_filename=None): + observer.enter_build() source_path = join(base_path, 'builds', course_key, instance_key, build_number) if not exists(source_path): - logger.error("Cannot find source file for building at %s", source_path) + log = "Cannot find source file for building at %s" % source_path + logger.error(log) + observer.shepherd_msg(BuildStep.BUILD, BuildState.FAILED, + log) return 1 # Get config try: project_config = ProjectConfig.find_from(source_path) except ValidationError as e: + log = render_error(e) logger.error(render_error(e)) + observer.shepherd_msg(BuildStep.BUILD, BuildState.FAILED, + log) return 1 except ProjectConfigError as e: + log = render_error(e) logger.error('Invalid project configuration: %s', e) + observer.shepherd_msg(BuildStep.BUILD, BuildState.FAILED, + log) return 1 # Get global settings try: global_settings = GlobalSettings.load( config_filename if config_filename is not None else GlobalSettings.get_config_path(), allow_missing=True) except ValidationError as e: + log = render_error(e) logger.error(render_error(e)) + observer.shepherd_msg(BuildStep.BUILD, BuildState.FAILED, + log) return 1 except OSError as e: + log = render_error(e) logger.error(str(e)) + observer.shepherd_msg(BuildStep.BUILD, BuildState.FAILED, + log) return 1 # Get engine try: engine = Engine(settings=global_settings) except ImportError: - logger.error("Unable to find backend %s", str(global_settings)) + log = "Unable to find backend %s"% global_settings + logger.error(log) + observer.shepherd_msg(BuildStep.BUILD, BuildState.FAILED, + log) return 1 - builder = engine.create_builder(project_config, shepherd_builder_observer) + builder = engine.create_builder(project_config, observer) if not project_config.steps: - logger.error("Nothing to build.") + log = "Nothing to build." + logger.error(log) + observer.shepherd_msg(BuildStep.BUILD, BuildState.FAILED, + log) return 1 try: result = builder.build() - except KeyError as err: - logger.error("No step named %s", err.args[0]) + except KeyError as e: + log = "No step named %s", str(e.args[0]) + logger.error(log) + observer.shepherd_msg(BuildStep.BUILD, BuildState.FAILED, + log) return 1 - except IndexError as err: - logger.error("Index %s is out of range. There are %d steps. Indexing begins ar 0.", err.args[0], - len(project_config.steps)) + except IndexError as e: + log = "Index %s is out of range. There are %d steps. Indexing begins ar 0."% (e.args[0], len(project_config.steps)) + logger.error(log) + observer.shepherd_msg(BuildStep.BUILD, BuildState.FAILED, + log) + observer.shepherd_msg(BuildStep.BUILD, BuildState.SUCCESS, + "Roman build success") return result.code diff --git a/apluslms_shepherd/message_broker/broker.py b/apluslms_shepherd/message_broker/broker.py index cb1d429..6cf979c 100644 --- a/apluslms_shepherd/message_broker/broker.py +++ b/apluslms_shepherd/message_broker/broker.py @@ -23,12 +23,13 @@ async def get_state(): print(received_message.json()) # get JSON from incoming messages easily received_message.ack() await sio.emit('update', { - 'course_id': received_message.json()[0][0], - 'build_number': received_message.json()[0][1], - 'current_step': received_message.json()[0][2], - 'current_state': received_message.json()[0][3]}) + 'course_id': received_message.json()[1]['course_id'], + 'build_number': received_message.json()[1]['number'], + 'current_step': received_message.json()[1]['step'], + 'roman_step': received_message.json()[1]['roman_step'], + 'current_state': received_message.json()[1]['state']}) - await sio.emit(received_message.json()[0][0], {'log': received_message.json()[0][4]}) + await sio.emit(received_message.json()[1]['course_id'], {'log': received_message.json()[1]['log']}) except AttributeError: pass diff --git a/apluslms_shepherd/observer/observer.py b/apluslms_shepherd/observer/observer.py index d55c01c..beaa1e3 100644 --- a/apluslms_shepherd/observer/observer.py +++ b/apluslms_shepherd/observer/observer.py @@ -1,109 +1,82 @@ -import sys from datetime import datetime -from enum import Enum +from typing import Callable -from apluslms_roman.observer import BuildObserver, Phase, Message +from apluslms_roman.observer import BuildObserver, Message -from apluslms_shepherd.build.models import BuildStep, BuildState, Build, BuildLog -from apluslms_shepherd.extensions import celery, db +from apluslms_shepherd.build.models import BuildStep, BuildState +from apluslms_shepherd.observer.utils import BrokerClient, ShepherdMessage +from celery.utils.log import get_task_logger - -class ShepherdMessage(Enum): - ENTER = 0 - START_STEP = 1 - END_STEP = 2 - MANAGER_MSG = 11 - CONTAINER_MSG = 12 - STATE_UPDATE = 13 +celery_logger = get_task_logger(__name__) class ShepherdObserver(BuildObserver): - def __init__(self, data=None, stream=None): + def __init__(self, logger: Callable = None, broker: BrokerClient = None): super().__init__() - self._phase = Phase.NONE - self.stream = stream or sys.stdout - self.data = data + self.logger = logger + self.broker = broker + self.starts = {} + self.log_cache = {} + self.logs = {} + + def set_logger(self, logger: Callable): + self.logger = logger + + def set_broker(self, broker: BrokerClient): + self.broker = broker def _message(self, phase, type_, step=None, state=None, data=None): - print('New msg:phase {}, type:{} step: {} state: {}'.format(phase, type_, step, state)) - if type_ == ShepherdMessage.ENTER: return - phase_s = '{} {}'.format(phase.name, step) if step is not None else phase.name - if type_ == ShepherdMessage.CONTAINER_MSG: - fmt = '{} >> {}' - elif type_ == ShepherdMessage.MANAGER_MSG: - fmt = '{} : {}' - elif type_ == ShepherdMessage.STATE_UPDATE: + if type_ == ShepherdMessage.START: + self.starts[step] = datetime.utcnow() + if type_ == ShepherdMessage.END: + end = datetime.utcnow() + start = self.starts.get(step, 0) + # Send all left message to frontend logger + self.broker.notify(self.log_cache[step]) + # Append these sent log to logs, which has all previous logs. + self.logs.setdefault(step, []).extend(self.log_cache[step]) + # The log of each step should be like {'step1':[{'state': 'state for log 1', 'step': 'step for log1', + # 'log': 'log 1 content' }, {same dict for log2}, ...]} + logs = self.logs.get(step, []) + # Write all log in this step log entry to db + self.logger(step, start, end, state, '\n'.join([l['log'] for l in logs])) + self.log_cache[step] = [] + if type_ == ShepherdMessage.UPDATE: + # Only update frontend, no log. + frontend_msg = [{'step': step.name, 'state': state.name, 'roman_step': None, 'log': ''}] + self.broker.notify(frontend_msg) + if type_ == ShepherdMessage.MSG: fmt = '{} {}' - celery.send_task('apluslms_shepherd.celery_tasks.build.tasks.update_state', queue='celery_state', - args=data) - elif type_ in [Message.CONTAINER_MSG, type_ == Message.MANAGER_MSG]: - # This is a task from roman. + # Composing a log entry + log_txt = fmt.format(phase.name, str(data).rstrip()) + frontend_msg = {'step': step.name, 'state': state.name, 'roman_step': None, 'log': log_txt} + self.log_cache.setdefault(step, []).append(frontend_msg) + # We store the course info in logger function + # Notify broker every 10 lines of log + if len(self.log_cache[step]) > 10: + self.broker.notify(self.log_cache[step]) + # Append log in cache to logs, which has all previous logs for every step. + self.logs.setdefault(step, []).extend(self.log_cache[step]) + # Empty cache + self.log_cache[step] = [] + if type_ == Message.CONTAINER_MSG: fmt = '{} >> {}' - output = fmt.format(phase_s, str(data).rstrip()) + '\n' - self.data[-1] = output - celery.send_task('apluslms_shepherd.celery_tasks.build.tasks.update_state', queue='celery_state', - args=self.data) - else: - fmt = '{} {}' - if not data: data = type_.name.lower() - self.stream.write(fmt.format(phase_s, str(data).rstrip()) + '\n') + log_txt = fmt.format(phase.name, str(data).rstrip()) + celery_logger.info(log_txt) + frontend_msg = {'step': BuildStep.BUILD.name, 'state': BuildState.RUNNING.name, 'roman_step': str(step), + 'log': log_txt} + self.log_cache.setdefault(BuildStep.BUILD, []).append(frontend_msg) + # Notify broker every 10 lines of log + if len(self.log_cache[BuildStep.BUILD]) > 10: + self.broker.notify(self.log_cache[BuildStep.BUILD]) + self.logs.setdefault(BuildStep.BUILD, []).extend(self.log_cache[BuildStep.BUILD]) + self.log_cache[BuildStep.BUILD] = [] - def update_database(self, course_id, number, step, state, log=None): - now = datetime.utcnow() - build_log = None - build = None - # We create new Build and BuildLog when it is the start of the first step, and only the first step has - # publish state. + def shepherd_step_start(self, step): + self._message(self._phase, ShepherdMessage.START, step) - # We don't catch the task publish signal form the second task (after task CLONE). That's because the celery - # will be publish all taks before the first task runs, thus the state in of Build table will be changed too - # early. In this case, we create the BuildLog and change the state in the Build table of Build task in this - # function. - if step == BuildStep.CLONE and state == BuildState.PUBLISH: - build = Build(course_id=course_id, start_time=now, - state=BuildState.PUBLISH, - step=BuildStep.CLONE, number=number) - build_log = BuildLog( - course_id=course_id, - start_time=now, - number=number, - step=BuildStep.CLONE, - log_text=log - ) - # We create new build log when it comes to next step - elif state == BuildState.RUNNING and step != BuildStep.CLONE: - build_log = BuildLog( - course_id=course_id, - start_time=now, - number=number, - step=step, - log_text=log - ) - build = Build.query.filter_by(course_id=course_id, number=number).first() - build.state = state - build.step = step - # If task finished, set end time for BuildLog, if the current step is the last step, - # set end time for Build as well - elif state == BuildState.SUCCESS or BuildState.FAILED: - build = Build.query.filter_by(course_id=course_id, - number=number).first() - # Get current build_log, filter condition "step" is different according to the task - build_log = BuildLog.query.filter_by(course_id=course_id, - number=number, - step=step).first() - build.state = state - build_log.log_text = log - build.end_time = now if step == BuildStep.CLEAN else None - build_log.end_time = now - if build_log is not None and build is not None: - # Submit the changes to db - db.session.add(build_log) - db.session.add(build) - db.session.commit() - self.stream.write('Current state write to database: build id:{}, build number: {}, step: {}, state: {}' - .format(course_id, number, step.name, state.name)) + def shepherd_step_end(self, step, state): + self._message(self._phase, ShepherdMessage.END, step, state) - def state_update(self, course_id, build_number, step, state, log=None): - state_name = state.name if state is not None else None - state_list = [course_id, build_number, step.name, state_name, log] - self._message(self._phase, ShepherdMessage.STATE_UPDATE, step, state, state_list) + def shepherd_msg(self, step, state, log): + self._message(self._phase, ShepherdMessage.MSG, step, state, log) diff --git a/apluslms_shepherd/observer/utils.py b/apluslms_shepherd/observer/utils.py new file mode 100644 index 0000000..15020a4 --- /dev/null +++ b/apluslms_shepherd/observer/utils.py @@ -0,0 +1,50 @@ +from enum import Enum + +from apluslms_shepherd import celery +from apluslms_shepherd.build.models import BuildLog, Build, BuildState, BuildStep + + +class ShepherdMessage(Enum): + START = 0 + UPDATE = 9 + MSG = 9 + END = 10 + + +class BrokerClient: + def __init__(self, task: str, queue: str, course_info: dict): + self.queue = queue + self.task = task + self.course_info = course_info + + def notify(self, log): + """ + Send log to frontend + :param log: List of dict which contains state and log + :return: + """ + for each in log: + log_context = {**self.course_info, **each} + celery.send_task(self.task, queue=self.queue, kwargs=log_context) + + +def get_logger(session, build_info: dict): + """ + This function is for observer to write the data to database, will be called in in the end of each steps. + :param session: Database session + :param build_info: a dictionary, key must include: course_id, number. + :return: + """ + + def log(step, start, end, result: BuildState, logs: str, roman_step=None): + # If it is the last step, then write end time and result to Build. + # Build entry is created in publish signal, while each buildlog only be created in then end of steps. + if step == BuildStep.CLEAN: + build = Build(**build_info, end_time=end, result=result) + session.merge(build) + session.commit() + log_entry = BuildLog(**build_info, step=step, start_time=start, end_time=end, result=result, log_text=logs, + roman_step=roman_step) + session.add(log_entry) + session.commit() + return log diff --git a/apluslms_shepherd/static/socketio.js b/apluslms_shepherd/static/socketio.js index ed197ab..ec91bdf 100644 --- a/apluslms_shepherd/static/socketio.js +++ b/apluslms_shepherd/static/socketio.js @@ -2,11 +2,17 @@ var socket = io.connect('http://' + document.domain + ':' + 5001); socket.on('update', function(data) { console.log(data) - $("#"+data.instance_id+".table-number").html(data.build_number); - $("#"+data.instance_id+".table-action").html(data.current_action); + $("#"+data.course_id+".table-number").html(data.build_number); + $("#"+data.course_id+".table-step").html(data.current_step); if (data.current_state !== null) { - $("#" + data.instance_id + ".table-state").html(data.current_state); + $("#" + data.course_id + ".table-state").html(data.current_state); } + if (data.roman_step === null) { + $("#" + data.course_id + ".table-roman-step").html("Not Running"); + } else { + $("#" + data.course_id + ".table-roman-step").html("Running Step " + data.roman_step); + } + }); socket.on('connect', function() { console.log('connect') diff --git a/apluslms_shepherd/templates/builds/instance_log.html b/apluslms_shepherd/templates/builds/instance_log.html index b284353..46be5cf 100644 --- a/apluslms_shepherd/templates/builds/instance_log.html +++ b/apluslms_shepherd/templates/builds/instance_log.html @@ -36,8 +36,7 @@