Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions apluslms_shepherd/build/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ class Build(db.Model):
number = db.Column(db.Integer, primary_key=True)
start_time = db.Column(db.DateTime)
end_time = db.Column(db.DateTime)
state = db.Column(db.Enum(BuildState))
step = db.Column(db.Enum(BuildStep))
result = db.Column(db.Enum(BuildState), default=0)
instance = db.relationship('CourseInstance', backref=db.backref('builds', cascade="save-update, merge, "
"delete"))

Expand All @@ -39,7 +38,7 @@ class BuildLog(db.Model):
number = db.Column(db.Integer, primary_key=True)
step = db.Column(db.Enum(BuildStep), primary_key=True)
roman_step = db.Column(db.String, primary_key=True, default="Roman is not running")
result = db.Column(db.Boolean)
result = db.Column(db.Enum(BuildState))
start_time = db.Column(db.DateTime)
end_time = db.Column(db.DateTime)
log_text = db.Column(db.Text)
Expand Down
2 changes: 1 addition & 1 deletion apluslms_shepherd/build/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from apluslms_shepherd.observer.observer import ShepherdObserver

build_observer = ShepherdObserver()
observer = ShepherdObserver()
107 changes: 39 additions & 68 deletions apluslms_shepherd/build/tasks/signals.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from datetime import datetime

from celery.signals import task_prerun, task_postrun, task_failure, before_task_publish
from celery.utils.log import get_task_logger
from celery.worker.control import revoke

from apluslms_shepherd.build.models import Build, BuildStep, BuildState
from apluslms_shepherd.build.tasks import build_observer
from apluslms_shepherd.courses.models import CourseInstance
from apluslms_shepherd.extensions import celery
from apluslms_shepherd.extensions import celery, db

logger = get_task_logger(__name__)

Expand Down Expand Up @@ -40,18 +41,12 @@ def clone_task_before_publish(sender=None, headers=None, body=None, **kwargs):
logger.warning('No such course instance in the database')
revoke(info["id"], terminate=True)
return
build_observer.update_database(ins.id, current_build_number, BuildStep.CLONE, BuildState.PUBLISH)
build = Build(course_id=ins.id, number=current_build_number, start_time=datetime.utcnow(), result=BuildState.NONE)
db.session.add(build)
db.session.commit()
logger.warning('clone_log')
logger.warning('Task sent')
build_observer.enter_prepare()
build_observer.state_update(ins.id, current_build_number, BuildStep.CLONE, BuildState.PUBLISH,
"-------------------------------------------------New Build Start-------------------------------------------------\n "
"Instance with course_key:{}, instance_key:{} entering task queue, this is build No.{} \n".format(
course_key,
instance_key,
current_build_number))
logger.warning('Current state sent to frontend')
build_observer.step_pending(BuildStep.CLONE)


@task_prerun.connect
Expand All @@ -63,76 +58,54 @@ def task_prerun(task_id=None, sender=None, *args, **kwargs):
# using the task protocol version 2.
if sender.__name__ not in build_tasks:
return
if sender.__name__ == 'pull_repo':
build_observer.enter_prepare()
else:
build_observer.enter_build()
build_observer.step_running(task_step_mapping[sender.__name__])
logger.warning(sender.__name__ + ' pre_run')
current_build_number = kwargs['args'][-1]
step = task_step_mapping[sender.__name__]
instance_key = kwargs['args'][-2]
course_key = kwargs['args'][-3]
logger.info('task_prerun for task id {}'.format(
task_id
))
logger.info('course_key:{}, instance_key:{}'.format(course_key, instance_key))
log_txt = 'task_prerun for task id {}'.format(task_id)
logger.info(log_txt)
log_txt = 'course_key:{}, instance_key:{}'.format(course_key, instance_key)
logger.info(log_txt)
with celery.app.app_context():
# Get course instance by course key and instance key
ins = CourseInstance.query.filter_by(course_key=course_key, instance_key=instance_key).first()
# If no such instance in database, stop the task
if ins is None:
logger.error('No such course instance inthe database')
logger.error('No such course instance in the database')
revoke(task_id, terminate=True)
return
build_observer.update_database(ins.id, current_build_number, task_step_mapping[sender.__name__],
BuildState.RUNNING)
log_txt = 'Task {} for course_key:{}, instance_key:{} starts running, current step: {}'.format(
sender.__name__, course_key,
instance_key, step.name)
logger.info(log_txt)
# Send the state to frontend
build_observer.state_update(ins.id, current_build_number, task_step_mapping[sender.__name__],
BuildState.RUNNING,
'Task {} for course_key:{}, instance_key:{} starts running\n'.format(
sender.__name__, course_key,
instance_key))
build_observer.step_running(task_step_mapping[sender.__name__])


@task_postrun.connect
def task_postrun(task_id=None, sender=None, state=None, retval=None, *args, **kwargs):
def task_postrun(task_id=None, sender=None, task_result=None, retval=None, *args, **kwargs):
"""
Triggered when task is finished
Triggered when task is finished, in this step , save the task result to database.
"""
# information about task are located in headers for task messages
# using the task protocol version 2.
if sender.__name__ not in build_tasks:
return
logger.warning(sender.__name__ + ' post_run')
current_build_number = kwargs['args'][-1]
step = task_step_mapping[sender.__name__]
instance_key = kwargs['args'][-2]
course_key = kwargs['args'][-3]
logger.info('task_postrun for task id {}'.format(
task_id
))
logger.info('course_key:{}, instance_key:{}'.format(course_key, instance_key))
with celery.app.app_context():
# Get the instance id
course_id = CourseInstance.query.filter_by(course_key=course_key, instance_key=instance_key).first().id
# add end time for build entry and buildlog entry, change build state
logger.warning('finished')
build = Build.query.filter_by(course_id=course_id,
number=current_build_number).first()
# The state code is in the beginning, divided with main part by "|"
log_text = retval['msg']
if retval['code'] == 0:
state = BuildState.SUCCESS
elif retval['code'] == 5:
state = BuildState.CANCELED
else:
state = BuildState.FAILED
build_observer.update_database(course_id, current_build_number, task_step_mapping[sender.__name__], state,
log_text)
build_observer.state_update(course_id, current_build_number, task_step_mapping[sender.__name__],
build.state,
log_text.replace('\\r', '\r').replace('\\n', '\n') + '\n')
build_observer.step_succeeded(task_step_mapping[sender.__name__])
if retval['code'] == 0:
task_result = BuildState.SUCCESS
elif retval['code'] == 5:
task_result = BuildState.CANCELED
else:
task_result = BuildState.FAILED
log_text = 'task id {} with course_key: {}, instance_key: {}, step:{} finished. result: {}'.format(task_id,
course_key,
instance_key,
step.name,
task_result.name)
logger.info(log_text)


@task_failure.connect
Expand All @@ -146,16 +119,14 @@ def task_failure(task_id=None, sender=None, *args, **kwargs):
logger.info('task_failure for task id {}'.format(
task_id
))
current_build_number = kwargs['args'][-1]
instance_key = kwargs['args'][-2]
course_key = kwargs['args'][-3]
step = task_step_mapping[sender.__name__]
with celery.app.app_context():
course_id = CourseInstance.query.filter_by(course_key=course_key, instance_key=instance_key).first().id

logger.warning('finished')
build_observer.update_database(course_id, current_build_number, task_step_mapping[sender.__name__],
BuildState.SUCCESS, )
build_observer.state_update(course_id, current_build_number, task_step_mapping[sender.__name__],
BuildState.FAILED,
'Task {} is Failed.\n'.format(sender.__name__))
build_observer.step_failed(task_step_mapping[sender.__name__])
logger.warning('failed')
log_text = 'task_failed for task id {}, course_key: {}, instance_key: {} at step {}. result: {}'.format(task_id,
course_key,
instance_key,
step.name,
BuildState.FAILED.name)
logger.error(log_text)
Loading