From 5e40094892726dd2909b2842d907a4aec0d70f53 Mon Sep 17 00:00:00 2001 From: kylemvz Date: Wed, 11 Jan 2017 10:48:39 -0800 Subject: [PATCH 1/8] Initial commit of Spark-ified version of MarathonSpawner.py --- l41_nbhub/MarathonSparkSpawner.py | 224 ++++++++++++++++++++++++++++++ 1 file changed, 224 insertions(+) create mode 100644 l41_nbhub/MarathonSparkSpawner.py diff --git a/l41_nbhub/MarathonSparkSpawner.py b/l41_nbhub/MarathonSparkSpawner.py new file mode 100644 index 0000000..6adbb96 --- /dev/null +++ b/l41_nbhub/MarathonSparkSpawner.py @@ -0,0 +1,224 @@ +import time +import os +import json +import requests +from traitlets import Int, List, Unicode +from tornado import gen +from tornado.web import HTTPError + +from jupyterhub.spawner import Spawner +from .QueryUser import query_user +from .marathon import Marathon +from .GPUResourceAllocator import GPUResourceAllocator + + +class MarathonSparkSpawner(Spawner): + ''' + + resource_file_name = Unicode('resources', + help="File describing GPU resources available", + config=True) + status_file_name = Unicode('status.json', + help="File Describing the current state of allocations", + config=True) + ''' + + base_port = Int( + 10000, + help='Base int for port calculation in get_notebook_port()', + config=True + ) + base_mod = Int( + 1000, + help='Mod int for port calculation in get_notebook_port()', + config=True + ) + + home_basepath = Unicode('/home', + help="Basepath for user home directories", + config=True) + env_url = Unicode('', + help="URL containing JSON environment variables to push to notebook server", + config=True) + network_mode = Unicode('HOST', + help="Whether to use BRIDGE or HOST netowrking", + config=True) + marathon_group = Unicode('spark-notebooks', + help="Marathon group name (folder) prefix for container names", + config=True) + mem_limit = Int( + 4096, + help='Memory limit in MB', + config=True) + volumes = List( + [], + help='Volumes to mount as Read-write. If a single string is entered then it is mounted in same path.' + 'If a tuple is specified then first item is hostPath and the 2nd is the containerPath', + config=True) + ports = List( + [8888], + help='Ports to expose externally', + config=True) + marathon_constraints = List([], + help='Constraints to be passed through to Marathon', + config=True) + hub_ip_connect = Unicode(u'', + help="Public IP address of the hub", + config=True) + marathon_host = Unicode(u'', + help="Hostname of Marathon server", + config=True) + docker_image_name = Unicode(u'', + help="Name of the docker image", + config=True) + + def get_notebook_port(self): + port = (int(self.user_id) % self.base_mod) + int(self.base_port) + return port + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + # All traitlets configurables are configured by now + self.marathon = Marathon(self.marathon_host) + ''' + self.gpu_resources = GPUResourceAllocator(self.resource_file_name, + self.status_file_name) + ''' + + def _expand_user_vars(self, string): + """ + Expand user related variables in a given string + + Currently expands: + {USERNAME} -> Name of the user + {USERID} -> UserID + """ + return string.format( + USERNAME=self.user.name, + USERID=self._user_id_default() + ) + + def get_state(self): + state = super().get_state() + state['container_name'] = self.get_container_name() + return state + + def load_state(self, state): + if 'container_name' in state: + pass + + def get_env(self): + env = super().get_env() + env.update(dict( + # User Info + USER=self.user.name, + USER_ID=str(self._user_id_default()), + HOME='/home/%s'%self.user.name, + + # Container info + CONTAINER_NAME=self.docker_image_name, + NOTEBOOK_PORT=str(self.ports[0]), + + # Jupyter Hub config + JPY_USER=self.user.name, + JPY_COOKIE_NAME=self.user.server.cookie_name, + JPY_BASE_URL=self.user.server.base_url, + JPY_HUB_PREFIX=self.hub.server.base_url, + JPY_HUB_API_URL = 'http://%s:8081/hub/api'%self.hub_ip_connect, + )) + + if len(self.env_url) > 0: + # get content + try: + parsed_data = requests.get(self.env_url, verify=False).json() + except: + parsed_data = json.loads(open(self.env_url).read()) + + for env_variable in parsed_data: + env[env_variable] = parsed_data[env_variable] + + pyspark_submit_args = [] + for var in os.environ: + if var.lower().startswith("l41"): + pyspark_submit_args.append("--conf spark.executorEnv.%s=%s" % (var, os.environ[var])) + env["PYSPARK_SUBMIT_ARGS"] = " ".join(pyspark_submit_args) + return env + + def get_container_name(self): + return '/%s/%s-notebook'%(self.marathon_group, self.user.name) + + @gen.coroutine + def start(self): + print('HUB URI:', self.hub.api_url) + container_name = self.get_container_name() + #hostname, gpu_id = self.gpu_resources.get_host_id(self.user.name) + #driver_version = self.gpu_resources.get_driver_version(hostname) + #print('Hostname: {} GPU ID: {}'.format(hostname, gpu_id)) + hostname = "l41-srv-gpu01.b.internal" + constraint = [['hostname', 'LIKE', hostname]] + parameters = [{'key':'workdir', 'value':os.path.join(self.home_basepath, self.user.name)}] + #parameters.append({'key': 'device', 'value': '/dev/nvidiactl'}) + #parameters.append({'key': 'device', 'value': '/dev/nvidia-uvm'}) + #parameters.append({'key': 'device', 'value': '/dev/nvidia%d'%gpu_id}) + #parameters.append({'key': 'volume-driver', 'value': 'nvidia-docker'}) + #parameters.append({'key': 'volume', 'value': 'nvidia_driver_{}:/usr/local/nvidia:ro'.format(driver_version)}) + cmd = "/bin/bash /srv/ganymede_nbserver/ganymede_nbserver.sh" + self.marathon.start_container(container_name, + self.docker_image_name, + cmd, + constraints=constraint, + env=self.get_env(), + parameters = parameters, + mem_limit=self.mem_limit, + volumes=self.volumes, + ports=self.ports, + network_mode=self.network_mode) + + for i in range(self.start_timeout): + is_up = yield self.poll() + if is_up is None: + time.sleep(1) + ip, port = self.marathon.get_ip_and_port(container_name) + self.user.server.ip=ip + self.user.server.port = port + print('IP/PORT', ip, port) + return (ip, port) + time.sleep(1) + + return None + + @gen.coroutine + def stop(self): + container_name = self.get_container_name() + self.marathon.stop_container(container_name) + #self.gpu_resources.release_resource(self.user.name) + + @gen.coroutine + def get_ip_and_port(self): + container_name = self.get_container_name() + print('IP/PORT: {}'.format(self.marathon.get_ip_and_port(container_name))) + ip, port = self.marathon.get_ip_and_port(container_name) + port = self.get_notebook_port() + return ip, port + + @gen.coroutine + def poll(self): + container_info = self.marathon.get_container_status(self.get_container_name()) + + if container_info is None: + return "" + + if 'tasks' in container_info and len(container_info['tasks']) == 1: + return None + else: + print('Container Not Found') + return "" + + def _user_id_default(self): + """ + Query the REST user client running on a local socket. + """ + response = query_user(self.user.name) + if "uid" not in response: + raise HTTPError(403) + return response['uid'] From 14d1ec9759f35176e7d950a9e4ef2e4696271fe7 Mon Sep 17 00:00:00 2001 From: kylemvz Date: Wed, 11 Jan 2017 11:29:33 -0800 Subject: [PATCH 2/8] Update jupyterhub_config.py to use MarathonSparkSpawner class. --- jupyterhub_config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jupyterhub_config.py b/jupyterhub_config.py index e0abc71..1e886a7 100644 --- a/jupyterhub_config.py +++ b/jupyterhub_config.py @@ -1,7 +1,7 @@ import os import ast -c.JupyterHub.spawner_class = 'l41_nbhub.MarathonSpawner.MarathonSpawner' +c.JupyterHub.spawner_class = 'l41_nbhub.MarathonSpawner.MarathonSparkSpawner' c.JupyterHub.authenticator_class = 'l41_nbhub.L41OAuthenticator.L41OAuthenticator' c.Authenticator.admin_users = [name.strip() for name in os.environ['JH_ADMIN_USERS'].split(",")] From 5ad02f64b8ab8395659a50b4668f34a4a84baea6 Mon Sep 17 00:00:00 2001 From: kylemvz Date: Wed, 11 Jan 2017 11:46:15 -0800 Subject: [PATCH 3/8] Fix reference to MarathonSparkSpawner in jupyterhub_config.py --- jupyterhub_config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jupyterhub_config.py b/jupyterhub_config.py index 1e886a7..9959cc7 100644 --- a/jupyterhub_config.py +++ b/jupyterhub_config.py @@ -1,7 +1,7 @@ import os import ast -c.JupyterHub.spawner_class = 'l41_nbhub.MarathonSpawner.MarathonSparkSpawner' +c.JupyterHub.spawner_class = 'l41_nbhub.MarathonSparkSpawner.MarathonSparkSpawner' c.JupyterHub.authenticator_class = 'l41_nbhub.L41OAuthenticator.L41OAuthenticator' c.Authenticator.admin_users = [name.strip() for name in os.environ['JH_ADMIN_USERS'].split(",")] From 34bca0b98eee781dd7765e757cdbcb49d6d57def Mon Sep 17 00:00:00 2001 From: kylemvz Date: Wed, 11 Jan 2017 11:47:36 -0800 Subject: [PATCH 4/8] Add PYSPARK_SUBMIT_ARGS as env var on hub machine for debugging. --- l41_nbhub/MarathonSparkSpawner.py | 1 + 1 file changed, 1 insertion(+) diff --git a/l41_nbhub/MarathonSparkSpawner.py b/l41_nbhub/MarathonSparkSpawner.py index 6adbb96..d3a350f 100644 --- a/l41_nbhub/MarathonSparkSpawner.py +++ b/l41_nbhub/MarathonSparkSpawner.py @@ -141,6 +141,7 @@ def get_env(self): for var in os.environ: if var.lower().startswith("l41"): pyspark_submit_args.append("--conf spark.executorEnv.%s=%s" % (var, os.environ[var])) + os.environ["PYSPARK_SUBMIT_ARGS"] = " ".join(pyspark_submit_args) env["PYSPARK_SUBMIT_ARGS"] = " ".join(pyspark_submit_args) return env From 6834f5429cf1d523cba9f2419496be74d918e629 Mon Sep 17 00:00:00 2001 From: kylemvz Date: Thu, 12 Jan 2017 19:28:14 +0000 Subject: [PATCH 5/8] Initial working version of MarathonSparkSpawner. --- l41_nbhub/MarathonSparkSpawner.py | 27 +++-- l41_nbhub/sparkmarathon.py | 157 ++++++++++++++++++++++++++++++ 2 files changed, 176 insertions(+), 8 deletions(-) create mode 100644 l41_nbhub/sparkmarathon.py diff --git a/l41_nbhub/MarathonSparkSpawner.py b/l41_nbhub/MarathonSparkSpawner.py index d3a350f..17b74f8 100644 --- a/l41_nbhub/MarathonSparkSpawner.py +++ b/l41_nbhub/MarathonSparkSpawner.py @@ -8,9 +8,12 @@ from jupyterhub.spawner import Spawner from .QueryUser import query_user -from .marathon import Marathon +from .sparkmarathon import Marathon from .GPUResourceAllocator import GPUResourceAllocator +import logging + +logger = logging.getLogger(__name__) class MarathonSparkSpawner(Spawner): ''' @@ -73,7 +76,7 @@ class MarathonSparkSpawner(Spawner): config=True) def get_notebook_port(self): - port = (int(self.user_id) % self.base_mod) + int(self.base_port) + port = (int(self._user_id_default()) % self.base_mod) + int(self.base_port) return port def __init__(self, *args, **kwargs): @@ -117,7 +120,7 @@ def get_env(self): # Container info CONTAINER_NAME=self.docker_image_name, - NOTEBOOK_PORT=str(self.ports[0]), + NOTEBOOK_PORT=str(self.get_notebook_port()), # Jupyter Hub config JPY_USER=self.user.name, @@ -146,7 +149,7 @@ def get_env(self): return env def get_container_name(self): - return '/%s/%s-notebook'%(self.marathon_group, self.user.name) + return '%s/%s-notebook'%(self.marathon_group, self.user.name) @gen.coroutine def start(self): @@ -155,8 +158,6 @@ def start(self): #hostname, gpu_id = self.gpu_resources.get_host_id(self.user.name) #driver_version = self.gpu_resources.get_driver_version(hostname) #print('Hostname: {} GPU ID: {}'.format(hostname, gpu_id)) - hostname = "l41-srv-gpu01.b.internal" - constraint = [['hostname', 'LIKE', hostname]] parameters = [{'key':'workdir', 'value':os.path.join(self.home_basepath, self.user.name)}] #parameters.append({'key': 'device', 'value': '/dev/nvidiactl'}) #parameters.append({'key': 'device', 'value': '/dev/nvidia-uvm'}) @@ -164,10 +165,11 @@ def start(self): #parameters.append({'key': 'volume-driver', 'value': 'nvidia-docker'}) #parameters.append({'key': 'volume', 'value': 'nvidia_driver_{}:/usr/local/nvidia:ro'.format(driver_version)}) cmd = "/bin/bash /srv/ganymede_nbserver/ganymede_nbserver.sh" + self.ports = [self.get_notebook_port()] self.marathon.start_container(container_name, self.docker_image_name, cmd, - constraints=constraint, + constraints=self.marathon_constraints, env=self.get_env(), parameters = parameters, mem_limit=self.mem_limit, @@ -176,10 +178,14 @@ def start(self): network_mode=self.network_mode) for i in range(self.start_timeout): + #logger.warning("Starting poll()") is_up = yield self.poll() + #logger.warning("Finished poll(): %s" % is_up) if is_up is None: time.sleep(1) + logger.warning("Calling marathon get_ip_and_port()") ip, port = self.marathon.get_ip_and_port(container_name) + logger.warning("IP: %s, PORT: %s" % (ip, port)) self.user.server.ip=ip self.user.server.port = port print('IP/PORT', ip, port) @@ -204,7 +210,12 @@ def get_ip_and_port(self): @gen.coroutine def poll(self): - container_info = self.marathon.get_container_status(self.get_container_name()) + #logger.warning("Calling container_name()") + name = self.get_container_name() + #logger.warning("Name: %s" % name) + #logger.warning("Calling container_status()") + container_info = self.marathon.get_container_status(name) + logger.warning("Info: %s" % container_info) if container_info is None: return "" diff --git a/l41_nbhub/sparkmarathon.py b/l41_nbhub/sparkmarathon.py new file mode 100644 index 0000000..edc0ef6 --- /dev/null +++ b/l41_nbhub/sparkmarathon.py @@ -0,0 +1,157 @@ +import os +import pprint +import requests +import socket +from copy import deepcopy + +import logging + +logger = logging.getLogger(__name__) + +container_type = 'docker' + +default_container = { + "volumes": [], + container_type: { + "network": "BRIDGE", + "portMappings":[ ] + } + } + +default_request = { + "cpus": 1, + "disk": 0, + "instances": 1, + "constraints": [] +} + +class Marathon: + def __init__(self, hostname): + self.hostname = hostname + + def _make_request(self, type, endpoint, data=None, json_data=None): + url = os.path.join(self.hostname, endpoint) + if type.lower() == 'get': + return requests.get(url) + elif type.lower() == 'post': + r = requests.post(url, json=json_data) + return r + elif type.lower() == 'delete': + return requests.delete(url) + + + def start_container(self, + container_name, + image_name, + entry_point, + env={}, + constraints=[], + parameters= [{}], + resources=None, + mem_limit=128, + volumes=[], + ports=[], + network_mode='BRIDGE'): + new_request = deepcopy(default_request) + if container_name.startswith('/'): + new_request['id'] = container_name + else: + new_request['id'] = '/' + container_name + + if len(entry_point.strip()) > 0: + new_request['cmd'] = entry_point + new_request['mem'] = mem_limit + new_request['env'] = {} + new_request['constraints'] = constraints + for key in env: + new_request['env'][key] = env[key] + + new_container = deepcopy(default_container) + if container_type == 'docker': + new_container['docker']['image'] = image_name + new_container['docker']['parameters'] = parameters + else: + new_container['mesos']['image']['type'] = 'DOCKER' + new_container['mesos']['image']['docker'] = {'name':image_name} + + # Map Volumes + for item in volumes: + if isinstance(item, tuple): + hostPath = item[0] + containerPath = item[1] + else: + hostPath = item + containerPath = item + + volume = { + 'containerPath': containerPath, + 'hostPath': hostPath, + 'mode': 'RW' + } + new_container['volumes'].append(volume) + + # Map Ports + for item in ports: + new_port = { + 'containerPort': item, + 'hostPort': 0 + } + new_container['docker']['portMappings'].append(new_port) + + del new_container['docker']['portMappings'] + logger.warning("Ports: %s" % ports) + #new_container['docker']['ports'] = [int(port) for port in ports] + #new_request['requirePorts'] = True + new_request['ports'] = [int(port) for port in ports] + #new_request['portDefinitions'] = [{"port": int(port)} for port in ports] + + new_container['docker']['network'] = network_mode + new_request['container'] = new_container + pprint.pprint(new_request) + logger.warning("%s" % new_request) + response = self._make_request('POST', 'v2/apps', json_data=new_request) + if response.status_code == 201: + return None + else: + raise ValueError(response.text) + + def stop_container(self, container_name): + response = self._make_request('DELETE', 'v2/apps/%s'%container_name) + if response.status_code == 200: + return None + else: + raise ValueError(response.text) + + def get_container_env_variable(self, container_name, env_variable): + response = self._make_request('GET', 'v2/apps/%s'%container_name) + if response.status_code != 200: + return None + container = response.json()['app'] + return container['env'][env_variable] + + def get_ip_and_port(self, container_name): + response = self._make_request('GET', 'v2/apps/%s'%container_name) + if response.status_code != 200: + return None + container = response.json()['app'] + + # There should only be one instance of our container + assert len(container['tasks']) == 1 + running_task = container['tasks'][0] + + hostname =running_task['host'] + ip = hostname + port = container["env"]["NOTEBOOK_PORT"] + return (ip, port) + + def get_container_status(self, container_name): + response = self._make_request('GET', 'v2/apps/%s'%container_name) + if response.status_code != 200: + return None + container = response.json()['app'] + return container + + def get_running_containers(self): + response = self._make_request('GET', 'v2/apps') + return response.json()['apps'] + From d03401639c5e7e822e444b1e44f53f72e225dfeb Mon Sep 17 00:00:00 2001 From: kylemvz Date: Thu, 12 Jan 2017 13:57:54 -0800 Subject: [PATCH 6/8] Remove debug logs and other unnecessary comments from MarathonSparkSpawner.py --- l41_nbhub/MarathonSparkSpawner.py | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/l41_nbhub/MarathonSparkSpawner.py b/l41_nbhub/MarathonSparkSpawner.py index 17b74f8..5932009 100644 --- a/l41_nbhub/MarathonSparkSpawner.py +++ b/l41_nbhub/MarathonSparkSpawner.py @@ -9,11 +9,6 @@ from jupyterhub.spawner import Spawner from .QueryUser import query_user from .sparkmarathon import Marathon -from .GPUResourceAllocator import GPUResourceAllocator - -import logging - -logger = logging.getLogger(__name__) class MarathonSparkSpawner(Spawner): ''' @@ -155,15 +150,7 @@ def get_container_name(self): def start(self): print('HUB URI:', self.hub.api_url) container_name = self.get_container_name() - #hostname, gpu_id = self.gpu_resources.get_host_id(self.user.name) - #driver_version = self.gpu_resources.get_driver_version(hostname) - #print('Hostname: {} GPU ID: {}'.format(hostname, gpu_id)) parameters = [{'key':'workdir', 'value':os.path.join(self.home_basepath, self.user.name)}] - #parameters.append({'key': 'device', 'value': '/dev/nvidiactl'}) - #parameters.append({'key': 'device', 'value': '/dev/nvidia-uvm'}) - #parameters.append({'key': 'device', 'value': '/dev/nvidia%d'%gpu_id}) - #parameters.append({'key': 'volume-driver', 'value': 'nvidia-docker'}) - #parameters.append({'key': 'volume', 'value': 'nvidia_driver_{}:/usr/local/nvidia:ro'.format(driver_version)}) cmd = "/bin/bash /srv/ganymede_nbserver/ganymede_nbserver.sh" self.ports = [self.get_notebook_port()] self.marathon.start_container(container_name, @@ -178,14 +165,10 @@ def start(self): network_mode=self.network_mode) for i in range(self.start_timeout): - #logger.warning("Starting poll()") is_up = yield self.poll() - #logger.warning("Finished poll(): %s" % is_up) if is_up is None: time.sleep(1) - logger.warning("Calling marathon get_ip_and_port()") ip, port = self.marathon.get_ip_and_port(container_name) - logger.warning("IP: %s, PORT: %s" % (ip, port)) self.user.server.ip=ip self.user.server.port = port print('IP/PORT', ip, port) @@ -210,12 +193,8 @@ def get_ip_and_port(self): @gen.coroutine def poll(self): - #logger.warning("Calling container_name()") name = self.get_container_name() - #logger.warning("Name: %s" % name) - #logger.warning("Calling container_status()") container_info = self.marathon.get_container_status(name) - logger.warning("Info: %s" % container_info) if container_info is None: return "" From 3b15ebc5d052a11343ac9c2f61431e0970cad1ee Mon Sep 17 00:00:00 2001 From: kylemvz Date: Fri, 13 Jan 2017 00:34:01 +0000 Subject: [PATCH 7/8] Revert jupyterhub_config.py. --- jupyterhub_config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jupyterhub_config.py b/jupyterhub_config.py index 9959cc7..e0abc71 100644 --- a/jupyterhub_config.py +++ b/jupyterhub_config.py @@ -1,7 +1,7 @@ import os import ast -c.JupyterHub.spawner_class = 'l41_nbhub.MarathonSparkSpawner.MarathonSparkSpawner' +c.JupyterHub.spawner_class = 'l41_nbhub.MarathonSpawner.MarathonSpawner' c.JupyterHub.authenticator_class = 'l41_nbhub.L41OAuthenticator.L41OAuthenticator' c.Authenticator.admin_users = [name.strip() for name in os.environ['JH_ADMIN_USERS'].split(",")] From d0f1339db4125526b723c14390017afcce40ce59 Mon Sep 17 00:00:00 2001 From: kylemvz Date: Sat, 14 Jan 2017 00:26:11 +0000 Subject: [PATCH 8/8] Combining sparkmarathon.py and marathon.py to reduce duplication of code. --- l41_nbhub/MarathonSparkSpawner.py | 4 +- l41_nbhub/marathon.py | 14 ++- l41_nbhub/sparkmarathon.py | 157 ------------------------------ 3 files changed, 12 insertions(+), 163 deletions(-) delete mode 100644 l41_nbhub/sparkmarathon.py diff --git a/l41_nbhub/MarathonSparkSpawner.py b/l41_nbhub/MarathonSparkSpawner.py index 5932009..c455c2b 100644 --- a/l41_nbhub/MarathonSparkSpawner.py +++ b/l41_nbhub/MarathonSparkSpawner.py @@ -8,7 +8,7 @@ from jupyterhub.spawner import Spawner from .QueryUser import query_user -from .sparkmarathon import Marathon +from .marathon import Marathon class MarathonSparkSpawner(Spawner): ''' @@ -77,7 +77,7 @@ def get_notebook_port(self): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # All traitlets configurables are configured by now - self.marathon = Marathon(self.marathon_host) + self.marathon = Marathon(self.marathon_host, network_type=self.network_mode) ''' self.gpu_resources = GPUResourceAllocator(self.resource_file_name, self.status_file_name) diff --git a/l41_nbhub/marathon.py b/l41_nbhub/marathon.py index 4f14a34..b0ff638 100644 --- a/l41_nbhub/marathon.py +++ b/l41_nbhub/marathon.py @@ -23,8 +23,9 @@ } class Marathon: - def __init__(self, hostname): + def __init__(self, hostname, network_type="BRIDGE"): self.hostname = hostname + self.network_type = network_type def _make_request(self, type, endpoint, data=None, json_data=None): url = os.path.join(self.hostname, endpoint) @@ -130,9 +131,14 @@ def get_ip_and_port(self, container_name): hostname =running_task['host'] # Resolve hostname to ip - ip = socket.gethostbyname(hostname) - - return (ip, running_task['ports'][0]) + if self.network_type == "BRIDGE": + ip = socket.gethostbyname(hostname) + port = running_task['ports'][0] + elif self.network_type == "HOST": + ip = hostname + port = container["env"]["NOTEBOOK_PORT"] + + return (ip, port) def get_container_status(self, container_name): response = self._make_request('GET', 'v2/apps/%s'%container_name) diff --git a/l41_nbhub/sparkmarathon.py b/l41_nbhub/sparkmarathon.py deleted file mode 100644 index edc0ef6..0000000 --- a/l41_nbhub/sparkmarathon.py +++ /dev/null @@ -1,157 +0,0 @@ -import os -import pprint -import requests -import socket -from copy import deepcopy - -import logging - -logger = logging.getLogger(__name__) - -container_type = 'docker' - -default_container = { - "volumes": [], - container_type: { - "network": "BRIDGE", - "portMappings":[ ] - } - } - -default_request = { - "cpus": 1, - "disk": 0, - "instances": 1, - "constraints": [] -} - -class Marathon: - def __init__(self, hostname): - self.hostname = hostname - - def _make_request(self, type, endpoint, data=None, json_data=None): - url = os.path.join(self.hostname, endpoint) - if type.lower() == 'get': - return requests.get(url) - elif type.lower() == 'post': - r = requests.post(url, json=json_data) - return r - elif type.lower() == 'delete': - return requests.delete(url) - - - def start_container(self, - container_name, - image_name, - entry_point, - env={}, - constraints=[], - parameters= [{}], - resources=None, - mem_limit=128, - volumes=[], - ports=[], - network_mode='BRIDGE'): - new_request = deepcopy(default_request) - if container_name.startswith('/'): - new_request['id'] = container_name - else: - new_request['id'] = '/' + container_name - - if len(entry_point.strip()) > 0: - new_request['cmd'] = entry_point - new_request['mem'] = mem_limit - new_request['env'] = {} - new_request['constraints'] = constraints - for key in env: - new_request['env'][key] = env[key] - - new_container = deepcopy(default_container) - if container_type == 'docker': - new_container['docker']['image'] = image_name - new_container['docker']['parameters'] = parameters - else: - new_container['mesos']['image']['type'] = 'DOCKER' - new_container['mesos']['image']['docker'] = {'name':image_name} - - # Map Volumes - for item in volumes: - if isinstance(item, tuple): - hostPath = item[0] - containerPath = item[1] - else: - hostPath = item - containerPath = item - - volume = { - 'containerPath': containerPath, - 'hostPath': hostPath, - 'mode': 'RW' - } - new_container['volumes'].append(volume) - - # Map Ports - for item in ports: - new_port = { - 'containerPort': item, - 'hostPort': 0 - } - new_container['docker']['portMappings'].append(new_port) - - del new_container['docker']['portMappings'] - logger.warning("Ports: %s" % ports) - #new_container['docker']['ports'] = [int(port) for port in ports] - #new_request['requirePorts'] = True - new_request['ports'] = [int(port) for port in ports] - #new_request['portDefinitions'] = [{"port": int(port)} for port in ports] - - new_container['docker']['network'] = network_mode - new_request['container'] = new_container - pprint.pprint(new_request) - logger.warning("%s" % new_request) - response = self._make_request('POST', 'v2/apps', json_data=new_request) - if response.status_code == 201: - return None - else: - raise ValueError(response.text) - - def stop_container(self, container_name): - response = self._make_request('DELETE', 'v2/apps/%s'%container_name) - if response.status_code == 200: - return None - else: - raise ValueError(response.text) - - def get_container_env_variable(self, container_name, env_variable): - response = self._make_request('GET', 'v2/apps/%s'%container_name) - if response.status_code != 200: - return None - container = response.json()['app'] - return container['env'][env_variable] - - def get_ip_and_port(self, container_name): - response = self._make_request('GET', 'v2/apps/%s'%container_name) - if response.status_code != 200: - return None - container = response.json()['app'] - - # There should only be one instance of our container - assert len(container['tasks']) == 1 - running_task = container['tasks'][0] - - hostname =running_task['host'] - ip = hostname - port = container["env"]["NOTEBOOK_PORT"] - return (ip, port) - - def get_container_status(self, container_name): - response = self._make_request('GET', 'v2/apps/%s'%container_name) - if response.status_code != 200: - return None - container = response.json()['app'] - return container - - def get_running_containers(self): - response = self._make_request('GET', 'v2/apps') - return response.json()['apps'] -