Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,14 @@ configure Inviso on the NameNode/ResourceManager host.
```

6. Build virtual environment and index some jobs

```bash
> virtualenv venv
> source venv/bin/activate
> pip install -r inviso/jes/requirements.txt
> cd inviso/jes/
> cp settings_default.py settings.py
#Edit inviso.conf and adapt it to your needs.
> vim inviso/jes/conf/inviso.conf
> python jes.py
> python index_cluster_stats.py

Expand Down
17 changes: 17 additions & 0 deletions jes/conf/inviso.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[inviso]
inviso_host: localhost:5858
genie_host: localhost:5858

[elastic_search]
host: localhost
port: 9200

[cluster]
cluster_id: cluster_1
cluster_name: cluster_1
namenode: hadoop-training-namenode.s.com
namenode_port: 8020
history_server_dir: /mr-history/done
resource_manager: hadoop-training-resourcemanager.s.com
resource_manager_port: 8088
effective_user: hdfs
9 changes: 5 additions & 4 deletions jes/index_cluster_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
es_index = 'inviso-cluster'

def index_apps(es, cluster, info):
apps = requests.get('http://%s:%s/ws/v1/cluster/apps?state=RUNNING' % (cluster.host, '9026'), headers = {'ACCEPT':'application/json'}).json().get('apps')
apps = requests.get('http://%s:%s/ws/v1/cluster/apps?state=RUNNING' % (cluster.host, cluster.port), headers = {'ACCEPT':'application/json'}).json().get('apps')

if not apps:
log.info(cluster.name + ': no applications running.')
Expand Down Expand Up @@ -51,7 +51,7 @@ def index_apps(es, cluster, info):
log.debug(bulk(es, documents, stats_only=True));

def index_metrics(es, cluster, info):
metrics = requests.get('http://%s:%s/ws/v1/cluster/metrics' % (cluster.host, '9026'), headers = {'ACCEPT':'application/json'}).json()['clusterMetrics']
metrics = requests.get('http://%s:%s/ws/v1/cluster/metrics' % (cluster.host, cluster.port), headers = {'ACCEPT':'application/json'}).json()['clusterMetrics']
metrics.update(info)

r = es.index(index=es_index,
Expand All @@ -63,7 +63,7 @@ def index_metrics(es, cluster, info):
log.debug(r)

def index_scheduler(es, cluster, info):
scheduler = requests.get('http://%s:%s/ws/v1/cluster/scheduler' % (cluster.host, '9026'), headers = {'ACCEPT':'application/json'}).json()['scheduler']['schedulerInfo']['rootQueue']
scheduler = requests.get('http://%s:%s/ws/v1/cluster/scheduler' % (cluster.host, cluster.port), headers = {'ACCEPT':'application/json'}).json()['scheduler']['schedulerInfo']['rootQueue']
scheduler.update(info)

r = es.index(index=es_index,
Expand All @@ -84,7 +84,8 @@ def index_stats(clusters):
'timestamp': timestamp,
'cluster': cluster.name,
'cluster.id': cluster.id,
'host': cluster.host
'host': cluster.host,
'port': cluster.port
}

index_apps(es, cluster, info)
Expand Down
9 changes: 9 additions & 0 deletions jes/inviso/conf_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/usr/bin/python
import ConfigParser
def getConfElement(section,element):
config = ConfigParser.ConfigParser()
fd = config.read("./conf/inviso.conf")
if(len(fd)==0):
fd = config.read("../conf/inviso.conf")
return config.get(section,element)

7 changes: 5 additions & 2 deletions jes/inviso/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@


class Cluster:
def __init__(self, id, name, host):
def __init__(self, id, name, host, port,namenode,namenode_port,history_server):
self.id = id
self.name = name
self.host = host
self.port = port
self.namenode = namenode
self.namenode_port = namenode_port
self.history_server = history_server

class Monitor(object):
def __init__(self, publisher=None, **kwargs):
Expand Down Expand Up @@ -286,7 +290,6 @@ def __init__(self,
self.host = host
self.port = port
self.log_path = log_path

def run(self):
c = Client(self.host, self.port)

Expand Down
6 changes: 4 additions & 2 deletions jes/jes.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ def main():
monitors.append(HdfsMr2LogMonitor(jobflow=cluster.id,
cluster_id=cluster.id,
cluster_name=cluster.name,
host=cluster.host,
host=cluster.namenode,
port=cluster.namenode_port,
log_path=cluster.history_server,
publisher=publisher,
elasticsearch=settings.elasticsearch))

Expand All @@ -41,4 +43,4 @@ def main():


if __name__ == "__main__":
sys.exit(main())
sys.exit(main())
11 changes: 6 additions & 5 deletions jes/settings_default.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@
from snakebite import channel
from snakebite.channel import log_protobuf_message
from snakebite.protobuf.IpcConnectionContext_pb2 import IpcConnectionContextProto
from inviso.conf_reader import getConfElement

log = util.get_logger("inviso-settings")

# A monkey patch to make us run as hadoop
def create_hadoop_connection_context(self):
'''Creates and seriazlies a IpcConnectionContextProto (not delimited)'''
context = IpcConnectionContextProto()
context.userInfo.effectiveUser = 'hadoop'
context.userInfo.effectiveUser = getConfElement('cluster','effective_user')
context.protocol = "org.apache.hadoop.hdfs.protocol.ClientProtocol"

s_context = context.SerializeToString()
Expand Down Expand Up @@ -60,12 +61,12 @@ def inviso_trace(job_id, uri, version='mr1', summary=True):

return response.json()

inviso_host = 'localhost:8080'
genie_host = 'localhost:8080'
elasticsearch_hosts = [{'host': 'localhost', 'port': 9200}]
inviso_host = getConfElement('inviso','inviso_host')
genie_host = getConfElement('inviso','genie_host')
elasticsearch_hosts = [{'host': getConfElement('elastic_search','host'), 'port': int(getConfElement('elastic_search','port'))}]
elasticsearch = Elasticsearch(elasticsearch_hosts)
clusters = [
Cluster(id='cluster_1', name='cluster_1', host=socket.getfqdn())
Cluster(id=getConfElement('cluster','cluster_id'), name=getConfElement('cluster','cluster_name'), host=socket.getfqdn(getConfElement('cluster','resource_manager')), port=getConfElement('cluster','resource_manager_port'),namenode=getConfElement('cluster','namenode'),namenode_port=int(getConfElement('cluster','namenode_port')),history_server=getConfElement('cluster','history_server_dir'))
]

handler = IndexHandler(trace=inviso_trace, elasticsearch=elasticsearch)
Expand Down