Skip to content
Open

V0.8 #17

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@ PyApollo - Python Client for Ctrip's Apollo
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)

方便Python接入配置中心框架 [Apollo](https://github.com/ctripcorp/apollo) 所开发的Python版本客户端。
Tested with python 2.7 & 3.6
Tested with python 3

基于https://github.com/filamoon/pyapollo修改

Installation
------------

``` shell
python setup.py install
pip install apollo-client
```

# Features
* 实时同步配置
* 灰度配置

# Missing Features
* 客户端容灾

# Usage
Expand All @@ -35,8 +35,8 @@ client.start()
```

# Contribution
* Source Code: https://github.com/filamoon/pyapollo/
* Issue Tracker: https://github.com/filamoon/pyapollo/issues
* Source Code: https://github.com/BruceWW/pyapollo
* Issue Tracker: https://github.com/BruceWW/pyapollo/issues

# License
The project is licensed under the [Apache 2 license](https://github.com/zouyx/agollo/blob/master/LICENSE).
Expand Down
6 changes: 3 additions & 3 deletions pyapollo/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
from .apollo_client import ApolloClient

__version__ = "0.0.1.dev1"
__author__ = 'filamoon'
__email__ = 'filamoon@gmail.com'
__version__ = "0.8"
__author__ = 'Bruce.Liu'
__email__ = '15869300264@163.com'
262 changes: 201 additions & 61 deletions pyapollo/apollo_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# !/usr/bin/env python
# -*- coding: utf-8 -*-
import hashlib
import json
import logging
import os
import sys
import threading
import time
Expand All @@ -9,33 +12,62 @@


class ApolloClient(object):
def __init__(self, app_id, cluster='default', config_server_url='http://localhost:8080', timeout=35, ip=None):
def __init__(self, app_id, cluster='default', config_server_url='http://localhost:8080', timeout=60, ip=None,
cycle_time=300, cache_file_path=None):
"""

:param app_id:
:param cluster:
:param config_server_url:
:param timeout:
:param ip: the deploy ip for grey release
:param cycle_time: the cycle time to update configuration content from server
:param cache_file_path: local cache file store path
"""
self.config_server_url = config_server_url
self.appId = app_id
self.app_id = app_id
self.cluster = cluster
self.timeout = timeout
self.stopped = False
self.init_ip(ip)
self.ip = self.init_ip(ip)

self._stopping = False
self._cache = {}
self._notification_map = {'application': -1}

def init_ip(self, ip):
if ip:
self.ip = ip
self._cycle_time = cycle_time
self._hash = dict()
if cache_file_path is None:
self._cache_file_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'config')
else:
self._cache_file_path = cache_file_path
self._path_checker()

@staticmethod
def init_ip(ip):
"""
for grey release
:param ip:
:return:
"""
if ip is None:
import socket
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 53))
ip = s.getsockname()[0]
finally:
s.close()
self.ip = ip
return ip

# Main method
def get_value(self, key, default_val=None, namespace='application', auto_fetch_on_cache_miss=False):
"""
get the configuration value
:param key:
:param default_val:
:param namespace:
:param auto_fetch_on_cache_miss:
:return:
"""
if namespace not in self._notification_map:
self._notification_map[namespace] = -1
logging.getLogger(__name__).info("Add namespace '%s' to local notification map", namespace)
Expand All @@ -54,16 +86,22 @@ def get_value(self, key, default_val=None, namespace='application', auto_fetch_o
else:
return default_val

# Start the long polling loop. Two modes are provided:
# 1: thread mode (default), create a worker thread to do the loop. Call self.stop() to quit the loop
# 2: eventlet mode (recommended), no need to call the .stop() since it is async
def start(self, use_eventlet=False, eventlet_monkey_patch=False, catch_signals=True):
# First do a blocking long poll to populate the local cache, otherwise we may get racing problems
def start(self, use_event_let=False, event_let_monkey_patch=False, catch_signals=True):
"""
Start the long polling loop. Two modes are provided:
1: thread mode (default), create a worker thread to do the loop. Call self.stop() to quit the loop
2: event_let mode (recommended), no need to call the .stop() since it is async
First do a blocking long poll to populate the local cache, otherwise we may get racing problems
:param use_event_let:
:param event_let_monkey_patch:
:param catch_signals:
:return:
"""
if len(self._cache) == 0:
self._long_poll()
if use_eventlet:
if use_event_let:
import eventlet
if eventlet_monkey_patch:
if event_let_monkey_patch:
eventlet.monkey_patch()
eventlet.spawn(self._listener)
else:
Expand All @@ -76,38 +114,114 @@ def start(self, use_eventlet=False, eventlet_monkey_patch=False, catch_signals=T
t.start()

def stop(self):
"""
stop the client
:return:
"""
self._stopping = True
logging.getLogger(__name__).info("Stopping listener...")

def _cached_http_get(self, key, default_val, namespace='application'):
url = '{}/configfiles/json/{}/{}/{}?ip={}'.format(self.config_server_url, self.appId, self.cluster, namespace, self.ip)
r = requests.get(url)
if r.ok:
data = r.json()
self._cache[namespace] = data
logging.getLogger(__name__).info('Updated local cache for namespace %s', namespace)
else:
data = self._cache[namespace]

if key in data:
return data[key]
else:
return default_val
"""
get the configuration content from server with cache
:param key:
:param default_val:
:param namespace:
:return:
"""
url = '{}/configfiles/json/{}/{}/{}?ip={}'.format(self.config_server_url, self.app_id, self.cluster, namespace,
self.ip)
data = dict()
try:
r = requests.get(url)
if r.ok:
data = r.json()
self._cache[namespace] = data
logging.getLogger(__name__).info('Updated local cache for namespace %s', namespace)
self._update_local_cache(data, namespace)
else:
if self._cache[namespace] is None or len(self._cache[namespace]) == 0:
logging.getLogger(__name__).info('cached http get configuration from local cache file')
data = self._get_local_cache(namespace)
else:
data = self._cache[namespace]
except BaseException as e:
logging.getLogger(__name__).warning(str(e))
data = self._get_local_cache(namespace)
finally:
if key in data:
return data[key]
else:
return default_val

def _uncached_http_get(self, namespace='application'):
url = '{}/configs/{}/{}/{}?ip={}'.format(self.config_server_url, self.appId, self.cluster, namespace, self.ip)
r = requests.get(url)
if r.status_code == 200:
data = r.json()
"""
get thr configuration content from server without cache
:param namespace:
:return:
"""
url = '{}/configs/{}/{}/{}?ip={}'.format(self.config_server_url, self.app_id, self.cluster, namespace, self.ip)
try:
r = requests.get(url)
if r.status_code == 200:
data = r.json()
self._cache[namespace] = data['configurations']
logging.getLogger(__name__).info('Updated local cache for namespace %s release key %s: %s',
namespace, data['releaseKey'],
repr(self._cache[namespace]))
self._update_local_cache(data, namespace)
else:
data = self._get_local_cache(namespace)
logging.getLogger(__name__).info('uncached http get configuration from local cache file')
self._cache[namespace] = data['configurations']
except BaseException as e:
logging.getLogger(__name__).warning(str(e))
data = self._get_local_cache(namespace)
self._cache[namespace] = data['configurations']
logging.getLogger(__name__).info('Updated local cache for namespace %s release key %s: %s',
namespace, data['releaseKey'],
repr(self._cache[namespace]))

def _signal_handler(self, signal, frame):
def _signal_handler(self):
logging.getLogger(__name__).info('You pressed Ctrl+C!')
self._stopping = True

def _path_checker(self):
"""
create configuration cache file directory if not exits
:return:
"""
if not os.path.isdir(self._cache_file_path):
os.mkdir(self._cache_file_path)

def _update_local_cache(self, data, namespace='application'):
"""
if local cache file exits, update the content
if local cache file not exits, create a version
:param data: new configuration content
:param namespace::s
:return:
"""
new_string = json.dumps(data)
new_hash = hashlib.md5(new_string.encode('utf-8')).hexdigest()
if self._hash.get(namespace) == new_hash:
pass
else:
with open(os.path.join(self._cache_file_path, 'configuration_%s.txt' % namespace), 'w') as f:
f.write(new_string)
self._hash[namespace] = new_hash

def _get_local_cache(self, namespace='application'):
"""
get configuration from local cache file
if local cache file not exits than return empty dict
:param namespace:
:return:
"""
cache_file_path = os.path.join(self._cache_file_path, 'configuration_%s.txt' % namespace)
if os.path.isfile(cache_file_path):
with open(cache_file_path, 'r') as f:
result = json.loads(f.readline())
return result
return dict()

def _long_poll(self):
url = '{}/notifications/v2'.format(self.config_server_url)
notifications = []
Expand All @@ -117,36 +231,62 @@ def _long_poll(self):
'namespaceName': key,
'notificationId': notification_id
})
try:
r = requests.get(url=url, params={
'appId': self.app_id,
'cluster': self.cluster,
'notifications': json.dumps(notifications, ensure_ascii=False)
}, timeout=self.timeout)

r = requests.get(url=url, params={
'appId': self.appId,
'cluster': self.cluster,
'notifications': json.dumps(notifications, ensure_ascii=False)
}, timeout=self.timeout)

logging.getLogger(__name__).debug('Long polling returns %d: url=%s', r.status_code, r.request.url)

if r.status_code == 304:
# no change, loop
logging.getLogger(__name__).debug('No change, loop...')
return

if r.status_code == 200:
data = r.json()
for entry in data:
ns = entry['namespaceName']
nid = entry['notificationId']
logging.getLogger(__name__).info("%s has changes: notificationId=%d", ns, nid)
self._uncached_http_get(ns)
self._notification_map[ns] = nid
else:
logging.getLogger(__name__).warn('Sleep...')
time.sleep(self.timeout)
logging.getLogger(__name__).debug('Long polling returns %d: url=%s', r.status_code, r.request.url)

if r.status_code == 304:
# no change, loop
logging.getLogger(__name__).debug('No change, loop...')
return

if r.status_code == 200:
data = r.json()
for entry in data:
ns = entry['namespaceName']
nid = entry['notificationId']
logging.getLogger(__name__).info("%s has changes: notificationId=%d", ns, nid)
self._uncached_http_get(ns)
self._notification_map[ns] = nid
return
else:
logging.getLogger(__name__).warning('Sleep...')
time.sleep(self.timeout)
return
except requests.exceptions.ReadTimeout as e:
logging.getLogger(__name__).warning(str(e))
except requests.exceptions.ConnectionError as e:
logging.getLogger(__name__).warning(str(e))
self._load_local_cache_file()

def _load_local_cache_file(self):
"""
load all cached files from local path
is only used while apollo server is unreachable
:return:
"""
for file in os.listdir(self._cache_file_path):
file_path = os.path.join(self._cache_file_path, file)
if os.path.isfile(file_path):
namespace = file.split('.')[0].split('_')[1]
with open(file_path) as f:
self._cache[namespace] = json.loads(f.read())['configurations']
return True

def _listener(self):
"""

:return:
"""
logging.getLogger(__name__).info('Entering listener loop...')
while not self._stopping:
self._long_poll()
time.sleep(self._cycle_time)

logging.getLogger(__name__).info("Listener stopped!")
self.stopped = True
Expand Down
Loading