Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
0ca3475
Adding job moving feature
DasFranck Jul 5, 2017
346e0ef
Checking destination_queue is not null or empty before moving a job (…
DasFranck Jul 5, 2017
2c73b9b
Prompt message modified for move action (dashboard/static/js/views/jo…
DasFranck Jul 5, 2017
a3fc2ec
Factoring move with requeue (basetasks/utils.py)
DasFranck Jul 5, 2017
d1ea418
Remove move member function in Job (mrq/job.py)
DasFranck Jul 5, 2017
d5086ad
Retry count is now shown in the dashboard (dashboard/static/js/views/…
DasFranck Jul 5, 2017
fff76c6
Remove move exception import in Job (mrq/job.py)
DasFranck Jul 5, 2017
efbfffe
Adding group job moving feature
DasFranck Jul 5, 2017
62cd693
Adding a supplementary for move action in groupactions (dashboard/sta…
DasFranck Jul 6, 2017
99e88b6
Adding a UI for worker groups configuration (not working for now)
DasFranck Jul 7, 2017
2ea541f
Adding buttons to delete/add workers groups (dashboard/static/js/view…
DasFranck Jul 7, 2017
2d1b322
Updating POST on /api/workergroups (dashboard/app.py)
DasFranck Jul 10, 2017
9bdea97
Updating UI for worker groups configuration, working and nearly finis…
DasFranck Jul 10, 2017
026592e
Don't send profiles with an empty name (Worker group configuration UI)
DasFranck Jul 10, 2017
d51866a
Don't send workergroups with an empty name (Dashboard - Worker group …
DasFranck Jul 11, 2017
7255838
Adding serial checking to avoid configuration conflict writing (Dashb…
DasFranck Jul 11, 2017
f806295
Adding a test for job killing and sleep test task (tests/{test_kill.p…
DasFranck Jul 12, 2017
4e77a07
Adding a killing feature for jobs (mrq/{job.py,worker.py})
DasFranck Jul 12, 2017
0d0b506
Replacing iteritems (non-existant in Python3) (mrq/dashboard/app.py)
DasFranck Jul 18, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions mrq/basetasks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def perform_action(self, action, query, destination_queue):
if list(query.keys()) == ["queue"]:
Queue(query["queue"]).empty()

elif action in ("requeue", "requeue_retry"):
elif action in ("requeue", "requeue_retry", "move"):

# Requeue task by groups of maximum 1k items (if all in the same
# queue)
Expand Down Expand Up @@ -127,7 +127,7 @@ def perform_action(self, action, query, destination_queue):
if destination_queue is not None:
updates["queue"] = destination_queue

if action == "requeue":
if action in ("requeue", "move"):
updates["retry_count"] = 0

self.collection.update({
Expand Down
23 changes: 20 additions & 3 deletions mrq/dashboard/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,17 +129,34 @@ def get_workers():
def get_workergroups():
collection = connections.mongodb_jobs.mrq_workergroups
data = {"workergroups": {str(row.pop("_id")): row for row in collection.find(sort=[("_id", 1)])}}
for workergroup_id in data["workergroups"]:
if "serial" not in data["workergroups"][workergroup_id]:
data["workergroups"][workergroup_id]["serial"] = str(int(time.time()))
return jsonify(data)


@app.route('/api/workergroups', methods=["POST"])
@requires_auth
def post_workergroups():
workergroups = json.loads(request.form["workergroups"])
for k, v in workergroups.iteritems():
connections.mongodb_jobs.mrq_workergroups.update_one({"_id": k}, {"$set": v}, upsert=True)

return jsonify({"status": "ok"})
# Remove workergroups which hasn't be sent but were present previously (supposed deleted)
workergroup_list_json = list(workergroups.keys())
workergroup_list_mongo = [document["_id"] for document in connections.mongodb_jobs.mrq_workergroups.find()]
workergroup_to_delete_list = list(set(workergroup_list_mongo) - set(workergroup_list_json))
for workergroup_id in workergroup_to_delete_list:
connections.mongodb_jobs.mrq_workergroups.delete_one({"_id": workergroup_id})

outdated_wgcs = []
for k, v in iteritems(workergroups):
if ("serial" not in v or v["serial"] == connections.mongodb_jobs.mrq_workergroups.find_one({"_id": k})["serial"]):
v["serial"] = str(int(time.time()))
connections.mongodb_jobs.mrq_workergroups.update_one({"_id": k}, {"$set": v}, upsert=True)
else:
outdated_wgcs.append(k)

return jsonify({"status": "outdated" if len(outdated_wgcs) else "ok",
"outdated_wgcs": outdated_wgcs})


def build_api_datatables_query(req):
Expand Down
3 changes: 2 additions & 1 deletion mrq/dashboard/static/js/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ require.config({
datatables: "/static/js/vendor/jquery.dataTables.min",
datatablesbs3: "/static/js/vendor/datatables.bs3",
moment: "/static/js/vendor/moment.min",
sparkline: "/static/js/vendor/jquery.sparkline.min"
sparkline: "/static/js/vendor/jquery.sparkline.min",
quicksettings: "/static/js/vendor/quicksettings.min"
},

// urlArgs: "bust=" + (new Date()).getTime(),
Expand Down
1 change: 1 addition & 0 deletions mrq/dashboard/static/js/vendor/quicksettings.min.js

Large diffs are not rendered by default.

24 changes: 24 additions & 0 deletions mrq/dashboard/static/js/views/jobs.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ define(["jquery", "underscore", "views/generic/datatablepage", "models"],functio
var action = $(evt.target).data("action");
var data = _.clone(this.filters);

if (action == "move"){
if (destination_queue == null || destination_queue == "") {
return ;
}
data["destination_queue"] = prompt("Enter destination queue");
}

data["action"] = action;
self.jobaction(evt, data);

Expand Down Expand Up @@ -223,6 +230,18 @@ define(["jquery", "underscore", "views/generic/datatablepage", "models"],functio
});
self.refreshCallStack(job_id);

} else if (action == "move") {

var queue = prompt("Enter destination queue");
if (queue != null && queue != "")
{
self.jobaction(evt, {
"id": job_id,
"action": action,
"destination_queue": queue
});
}

} else {

self.jobaction(evt, {
Expand Down Expand Up @@ -343,6 +362,10 @@ define(["jquery", "underscore", "views/generic/datatablepage", "models"],functio
display.push("cputime "+String(source.time).substring(0,6)+"s ("+source.switches+" switches)");
}

if (source.retry_count) {
display.push("retried " + String(source.retry_count) + " times");
}

return "<small>" + display.join("<br/>") + "</small>";

} else {
Expand Down Expand Up @@ -392,6 +415,7 @@ define(["jquery", "underscore", "views/generic/datatablepage", "models"],functio
"<br/><br/>"+
"<button class='btn btn-xs btn-danger pull-right' data-action='cancel'><span class='glyphicon glyphicon-remove-circle'></span> Cancel</button>"+
"<button class='btn btn-xs btn-warning' data-action='requeue'><span class='glyphicon glyphicon-refresh'></span> Requeue</button>"+
"<button class='btn btn-xs btn-warning' data-action='move'><span class='glyphicon glyphicon-refresh'></span> Move to...</button>"+
"</div>";
}
return "";
Expand Down
163 changes: 148 additions & 15 deletions mrq/dashboard/static/js/views/workergroups.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
define(["jquery", "underscore", "models", "views/generic/page"],function($, _, Models, Page) {
define(["jquery", "underscore", "models", "views/generic/page", "quicksettings"],function($, _, Models, Page, QuickSettings) {

return Page.extend({

Expand All @@ -10,28 +10,161 @@ define(["jquery", "underscore", "models", "views/generic/page"],function($, _, M
"click .submit": "submit"
},

render: function() {
var self = this;
$.get("/api/workergroups").done(function(data) {
self.renderTemplate();
self.$("textarea").val(JSON.stringify(data["workergroups"], null, 8));
});
addCommandPanel: function() {
var _this = this;

if (typeof this.commandPanel === "undefined")
{
this.commandPanel = QuickSettings.create(25, 80, "Actions", $(this.el)[0])
.addButton("Add a Worker Group", function() {
_this.addPanel();
})
.addButton("Save", function() {
_this.save();
})
.addButton("Reload", function() {
_this.reload();
})
.addHTML("Status", "<font color=\"green\">OK</font>")
.setDraggable(false)
.setWidth(150);
}
},

submit: function(el) {
var self = this;
addPanel: function(workergroup = null, workergroupName = "") {
var _this = this;
var panelIndex = this.workergroupPanels.length;
var workerPanel = QuickSettings.create(225 + this.workergroupPanels.length * 350, 80, "Worker group configuration", $(this.el)[0])
.addText("Workgroup Name", workergroupName)
.hideTitle("Workgroup Name")
.addButton("Remove this Worker Group", function() {
_this.workergroupPanels[panelIndex].destroy();
try {
delete _this.workergroupPanels[panelIndex];
}
catch (e) {}
_this.workergroupPanels[panelIndex] = null;
})
.addText("Process Termination Timeout", workergroup ? workergroup["process_termination_timeout"] : 0)
.addButton("Add a Profile", function() {
_this.addProfileToPanel(_this.workergroupPanels[panelIndex]);
})
.setDraggable(false)
.setHeight(850)
.setWidth(300);
workerPanel.profilesNumber = 0;
this.workergroupPanels.push(workerPanel);
if (workergroup != null)
{
this.serials[workergroupName] = workergroup["serial"]
_.forEach(workergroup["profiles"], function(profile, profileName) {
this.addProfileToPanel(workerPanel, profile, profileName);
}, this);
}
},

addProfileToPanel: function(workerPanel, profile = null, profileName = "") {
workerPanel.profilesNumber += 1;
header = "Profile " + String(workerPanel.profilesNumber) + " - ";
workerPanel.addHTML("separator", "<br>")
.hideTitle("separator")
.addText(header + "Profile Name", profileName)
.addText(header + "Memory", profile ? profile["memory"]: 0)
.addText(header + "CPU", profile ? profile["cpu"] : 0)
.addRange(header + "MinCount", 0, 100, profile ? profile["min_count"] : 0, 1)
.addRange(header + "MaxCount", 0, 100, profile ? profile["max_count"] : 0, 1)
.addTextArea(header + "Command", profile ? profile["command"]: "")
.addButton("Remove this Profile");
},

remove_profile: function() {
},

reload: function(force = false) {
if (force || confirm('It will discard every changes that hasn\'t be saved. Are you sure?')) {
_.forEach(this.workergroupPanels, function(panel) {
if (panel !== null && panel !== undefined)
panel.destroy();
delete panel;
})
this.render();
}
},


// Check for "continue" usage instead of nested ifs
save: function() {
var _this = this;
this.commandPanel._controls["Status"].setValue("<font color=\"orange\">Saving...</font>");

self.$("button")[0].innerHTML = "Wait...";
data = {};
_.forEach(this.workergroupPanels, function(panel) {
// Equivalent of python's "not in"
if ($.inArray(panel, [null, undefined]) == -1)
{
panelJSON = panel.getValuesAsJSON();
if ($.inArray(panelJSON["Workgroup Name"], [null, ""]) == -1)
{
workergroup = {
"profiles" : {},
"process_termination_timeout": parseInt(panelJSON["Process Termination Timeout"], 10)
}

var val = self.$("textarea").val();
if (panelJSON["Workgroup Name"] in _this.serials)
workergroup["serial"] = _this.serials[panelJSON["Workgroup Name"]];

$.post("/api/workergroups", {"workergroups": val}).done(function(data) {
if (data.status != "ok") {
_.forEach(_.range(1, panel.profilesNumber + 1), function(index) {
header = "Profile " + String(index) + " - ";
if ($.inArray(panelJSON[header + "Profile Name"], [null, ""]) == -1)
{
profile = {};
profile["memory"] = parseInt(panelJSON[header + "Memory"], 10);
profile["cpu"] = parseInt(panelJSON[header + "CPU"], 10);
profile["min_count"] = parseInt(panelJSON[header + "MinCount"], 10);
profile["max_count"] = parseInt(panelJSON[header + "MaxCount"], 10);
profile["command"] = panelJSON[header + "Command"];
workergroup["profiles"][panelJSON[header + "Profile Name"]] = profile;
}
})
data[panelJSON["Workgroup Name"]] = workergroup;
}
}
})

$.post("/api/workergroups", {"workergroups": JSON.stringify(data)}).done(function(result) {
if (result.status === "ok")
{
_this.commandPanel._controls["Status"].setValue("<font color=\"green\">Saved</font>");
_this.reload(true);
}
else if (result.status === "outdated")
{
string = "";
_.forEach(result.outdated_wgcs, function(wgc) {
string += "- " + wgc + "<br>";
})
_this.commandPanel._controls["Status"].setValue("<font color=\"red\">These configurations were outdated and were not saved:<br>" + string + "</font><br>The others were saved.");
}
else
{
_this.commandPanel._controls["Status"].setValue("<font color=\"red\">FAILED</font>");
return alert("There was an error while saving!");
}
self.$("button")[0].innerHTML = "Save";
});
},

render: function() {
var _this = this;

this.workergroupPanels = [];
this.addCommandPanel();
this.serials = {};

$.get("/api/workergroups").done(function(data) {
_.forEach(data["workergroups"], function(workergroup, workergroupName) {
_this.addPanel(workergroup, workergroupName);
});
});
}
});

});
6 changes: 2 additions & 4 deletions mrq/dashboard/templates/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,6 @@ <h3>Current I/O operations</h3>

<h3>Worker groups</h3>
<div class="container">
<div class="row">
<textarea style="width:100%;height:400px;font-family:monospace;" onkeydown="if(event.keyCode===9){var v=this.value,s=this.selectionStart,e=this.selectionEnd;this.value=v.substring(0, s)+'\t'+v.substring(e);this.selectionStart=this.selectionEnd=s+1;return false;}"></textarea>
<button class="submit">Save</button>
</div>
</div>
</script>

Expand Down Expand Up @@ -394,6 +390,8 @@ <h4 class="modal-title"></h4>

<div class="pull-right js-jobs-groupactions">
<br/>
<button type="submit" class="btn btn-warning input-sm js-jobs-groupaction" data-action="move" style="padding:5px"><span class='glyphicon glyphicon-refresh'></span> Move these jobs to...</button>
&nbsp;
<button type="submit" class="btn btn-warning input-sm js-jobs-groupaction" data-action="requeue" style="padding:5px"><span class='glyphicon glyphicon-refresh'></span> Requeue these jobs</button>
&nbsp;
<button type="submit" class="btn btn-danger input-sm js-jobs-groupaction" data-action="cancel" style="padding:5px"><span class='glyphicon glyphicon-remove-circle'></span> Cancel these jobs</button>
Expand Down
22 changes: 15 additions & 7 deletions mrq/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,14 @@ def abort(self):
self._attach_original_exception(exc)
raise exc

def kill(self):
""" Kill the current job """
context.connections.redis.rpush("{}:wcmd:{}".format(context.get_current_config()["redis_prefix"],
context.get_current_worker()),
"kill {}".format(self.id))
self._save_status("killed")
pass

def cancel(self):
""" Markes the current job as cancelled. Doesn't interrupt it. """
self._save_status("cancel")
Expand Down Expand Up @@ -321,15 +329,15 @@ def perform(self):
# pylint: disable=protected-access

gevent.sleep(0)
current_greenlet = gevent.getcurrent()
self.current_greenlet = gevent.getcurrent()
t = (datetime.datetime.utcnow() - self.datestarted).total_seconds()

context.log.debug(
"Job %s success: %0.6fs total, %0.6fs in greenlet, %s switches" %
(self.id,
t,
current_greenlet._trace_time,
current_greenlet._trace_switches - 1)
self.current_greenlet._trace_time,
self.current_greenlet._trace_switches - 1)
)

else:
Expand Down Expand Up @@ -454,13 +462,13 @@ def _save_status(self, status, updates=None, exception=False, w=None, j=None):
db_updates["totaltime"] = (now - self.datestarted).total_seconds()

if context.get_current_config().get("trace_greenlets"):
current_greenlet = gevent.getcurrent()
self.current_greenlet = gevent.getcurrent()

# TODO are we sure the current job is doing the save_status() on itself?
if hasattr(current_greenlet, "_trace_time"):
if hasattr(self.current_greenlet, "_trace_time"):
# pylint: disable=protected-access
db_updates["time"] = current_greenlet._trace_time
db_updates["switches"] = current_greenlet._trace_switches
db_updates["time"] = self.current_greenlet._trace_time
db_updates["switches"] = self.current_greenlet._trace_switches

if exception:
trace = traceback.format_exc()
Expand Down
Loading