-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathmixpanel.py
More file actions
123 lines (97 loc) · 3.32 KB
/
mixpanel.py
File metadata and controls
123 lines (97 loc) · 3.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#! /usr/bin/env python
import sys
import hashlib
import urllib
import urllib2
import time
import json
"""
Enter your Mixpanel api key and secret in the variables below.
This script will return a sum of your most popular events for the last minute.
"""
API_KEY = ''
API_SECRET =''
class Mixpanel(object):
ENDPOINT = 'http://mixpanel.com/api'
VERSION = '2.0'
def __init__(self, api_key, api_secret):
self.api_key = api_key
self.api_secret = api_secret
def request(self, methods, params, format='json'):
"""
methods - List of methods to be joined, e.g. ['events', 'properties', 'values']
will give us http://mixpanel.com/api/2.0/events/properties/values/
params - Extra parameters associated with method
"""
params['api_key'] = self.api_key
params['expire'] = int(time.time()) + 600 # Grant this request 10 minutes.
params['format'] = format
if 'sig' in params: del params['sig']
params['sig'] = self.hash_args(params)
request_url = '/'.join([self.ENDPOINT, str(self.VERSION)] + methods) + '/?' + self.unicode_urlencode(params)
request = urllib2.urlopen(request_url, timeout=120)
data = request.read()
return json.loads(data)
def unicode_urlencode(self, params):
"""
Convert lists to JSON encoded strings, and correctly handle any
unicode URL parameters.
"""
if isinstance(params, dict):
params = params.items()
for i, param in enumerate(params):
if isinstance(param[1], list):
params[i] = (param[0], json.dumps(param[1]),)
return urllib.urlencode(
[(k, isinstance(v, unicode) and v.encode('utf-8') or v) for k, v in params]
)
def hash_args(self, args, secret=None):
"""
Hashes arguments by joining key=value pairs, appending a secret, and
then taking the MD5 hex digest.
"""
for a in args:
if isinstance(args[a], list): args[a] = json.dumps(args[a])
args_joined = ''
for a in sorted(args.keys()):
if isinstance(a, unicode):
args_joined += a.encode('utf-8')
else:
args_joined += str(a)
args_joined += '='
if isinstance(args[a], unicode):
args_joined += args[a].encode('utf-8')
else:
args_joined += str(args[a])
hash = hashlib.md5(args_joined)
if secret:
hash.update(secret)
elif self.api_secret:
hash.update(self.api_secret)
return hash.hexdigest()
try:
api = Mixpanel(
api_key=API_KEY,
api_secret=API_SECRET
)
except Exception, e:
print "Connection failed! %s" % e
sys.exit(2)
event_list = api.request(['events', 'names'], {})
metrics = {}
for event in event_list:
event_sum = 0
data = api.request(['events'], {
'event': [event],
'unit': 'minute',
'interval': 1,
'type': 'general'
})
for k, v in data['data']['values'][event].iteritems():
event_sum += v
metrics[event.replace(' ', '.').lower()] = event_sum
output = "OK | "
for k, v in metrics.iteritems():
output += str(k) + '=' + str(v) + ';;;; '
print output
sys.exit(0)