diff --git a/README.md b/README.md index de64100..372a5cc 100644 --- a/README.md +++ b/README.md @@ -74,13 +74,14 @@ configure Inviso on the NameNode/ResourceManager host. ``` 6. Build virtual environment and index some jobs - + ```bash > virtualenv venv > source venv/bin/activate > pip install -r inviso/jes/requirements.txt > cd inviso/jes/ - > cp settings_default.py settings.py + #Edit inviso.conf and adapt it to your needs. + > vim inviso/jes/conf/inviso.conf > python jes.py > python index_cluster_stats.py diff --git a/jes/conf/inviso.conf b/jes/conf/inviso.conf new file mode 100644 index 0000000..a8c99a5 --- /dev/null +++ b/jes/conf/inviso.conf @@ -0,0 +1,17 @@ +[inviso] +inviso_host: localhost:5858 +genie_host: localhost:5858 + +[elastic_search] +host: localhost +port: 9200 + +[cluster] +cluster_id: cluster_1 +cluster_name: cluster_1 +namenode: hadoop-training-namenode.s.com +namenode_port: 8020 +history_server_dir: /mr-history/done +resource_manager: hadoop-training-resourcemanager.s.com +resource_manager_port: 8088 +effective_user: hdfs diff --git a/jes/index_cluster_stats.py b/jes/index_cluster_stats.py index d6bbd59..f3a59d2 100755 --- a/jes/index_cluster_stats.py +++ b/jes/index_cluster_stats.py @@ -14,7 +14,7 @@ es_index = 'inviso-cluster' def index_apps(es, cluster, info): - apps = requests.get('http://%s:%s/ws/v1/cluster/apps?state=RUNNING' % (cluster.host, '9026'), headers = {'ACCEPT':'application/json'}).json().get('apps') + apps = requests.get('http://%s:%s/ws/v1/cluster/apps?state=RUNNING' % (cluster.host, cluster.port), headers = {'ACCEPT':'application/json'}).json().get('apps') if not apps: log.info(cluster.name + ': no applications running.') @@ -51,7 +51,7 @@ def index_apps(es, cluster, info): log.debug(bulk(es, documents, stats_only=True)); def index_metrics(es, cluster, info): - metrics = requests.get('http://%s:%s/ws/v1/cluster/metrics' % (cluster.host, '9026'), headers = {'ACCEPT':'application/json'}).json()['clusterMetrics'] + metrics = requests.get('http://%s:%s/ws/v1/cluster/metrics' % (cluster.host, cluster.port), headers = {'ACCEPT':'application/json'}).json()['clusterMetrics'] metrics.update(info) r = es.index(index=es_index, @@ -63,7 +63,7 @@ def index_metrics(es, cluster, info): log.debug(r) def index_scheduler(es, cluster, info): - scheduler = requests.get('http://%s:%s/ws/v1/cluster/scheduler' % (cluster.host, '9026'), headers = {'ACCEPT':'application/json'}).json()['scheduler']['schedulerInfo']['rootQueue'] + scheduler = requests.get('http://%s:%s/ws/v1/cluster/scheduler' % (cluster.host, cluster.port), headers = {'ACCEPT':'application/json'}).json()['scheduler']['schedulerInfo']['rootQueue'] scheduler.update(info) r = es.index(index=es_index, @@ -84,7 +84,8 @@ def index_stats(clusters): 'timestamp': timestamp, 'cluster': cluster.name, 'cluster.id': cluster.id, - 'host': cluster.host + 'host': cluster.host, + 'port': cluster.port } index_apps(es, cluster, info) diff --git a/jes/inviso/conf_reader.py b/jes/inviso/conf_reader.py new file mode 100644 index 0000000..a3d3e3c --- /dev/null +++ b/jes/inviso/conf_reader.py @@ -0,0 +1,9 @@ +#!/usr/bin/python +import ConfigParser +def getConfElement(section,element): + config = ConfigParser.ConfigParser() + fd = config.read("./conf/inviso.conf") + if(len(fd)==0): + fd = config.read("../conf/inviso.conf") + return config.get(section,element) + diff --git a/jes/inviso/monitor.py b/jes/inviso/monitor.py index 097e3a0..d0475d3 100644 --- a/jes/inviso/monitor.py +++ b/jes/inviso/monitor.py @@ -17,10 +17,14 @@ class Cluster: - def __init__(self, id, name, host): + def __init__(self, id, name, host, port,namenode,namenode_port,history_server): self.id = id self.name = name self.host = host + self.port = port + self.namenode = namenode + self.namenode_port = namenode_port + self.history_server = history_server class Monitor(object): def __init__(self, publisher=None, **kwargs): @@ -286,7 +290,6 @@ def __init__(self, self.host = host self.port = port self.log_path = log_path - def run(self): c = Client(self.host, self.port) diff --git a/jes/jes.py b/jes/jes.py index e736da9..da3eb71 100755 --- a/jes/jes.py +++ b/jes/jes.py @@ -26,7 +26,9 @@ def main(): monitors.append(HdfsMr2LogMonitor(jobflow=cluster.id, cluster_id=cluster.id, cluster_name=cluster.name, - host=cluster.host, + host=cluster.namenode, + port=cluster.namenode_port, + log_path=cluster.history_server, publisher=publisher, elasticsearch=settings.elasticsearch)) @@ -41,4 +43,4 @@ def main(): if __name__ == "__main__": - sys.exit(main()) \ No newline at end of file + sys.exit(main()) diff --git a/jes/settings_default.py b/jes/settings_default.py index 5a7fd6d..dd5d48d 100644 --- a/jes/settings_default.py +++ b/jes/settings_default.py @@ -11,6 +11,7 @@ from snakebite import channel from snakebite.channel import log_protobuf_message from snakebite.protobuf.IpcConnectionContext_pb2 import IpcConnectionContextProto +from inviso.conf_reader import getConfElement log = util.get_logger("inviso-settings") @@ -18,7 +19,7 @@ def create_hadoop_connection_context(self): '''Creates and seriazlies a IpcConnectionContextProto (not delimited)''' context = IpcConnectionContextProto() - context.userInfo.effectiveUser = 'hadoop' + context.userInfo.effectiveUser = getConfElement('cluster','effective_user') context.protocol = "org.apache.hadoop.hdfs.protocol.ClientProtocol" s_context = context.SerializeToString() @@ -60,12 +61,12 @@ def inviso_trace(job_id, uri, version='mr1', summary=True): return response.json() -inviso_host = 'localhost:8080' -genie_host = 'localhost:8080' -elasticsearch_hosts = [{'host': 'localhost', 'port': 9200}] +inviso_host = getConfElement('inviso','inviso_host') +genie_host = getConfElement('inviso','genie_host') +elasticsearch_hosts = [{'host': getConfElement('elastic_search','host'), 'port': int(getConfElement('elastic_search','port'))}] elasticsearch = Elasticsearch(elasticsearch_hosts) clusters = [ - Cluster(id='cluster_1', name='cluster_1', host=socket.getfqdn()) + Cluster(id=getConfElement('cluster','cluster_id'), name=getConfElement('cluster','cluster_name'), host=socket.getfqdn(getConfElement('cluster','resource_manager')), port=getConfElement('cluster','resource_manager_port'),namenode=getConfElement('cluster','namenode'),namenode_port=int(getConfElement('cluster','namenode_port')),history_server=getConfElement('cluster','history_server_dir')) ] handler = IndexHandler(trace=inviso_trace, elasticsearch=elasticsearch)