diff --git a/README.md b/README.md index 7944856..cde18c6 100644 --- a/README.md +++ b/README.md @@ -4,20 +4,20 @@ PyApollo - Python Client for Ctrip's Apollo [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) 方便Python接入配置中心框架 [Apollo](https://github.com/ctripcorp/apollo) 所开发的Python版本客户端。 -Tested with python 2.7 & 3.6 +Tested with python 3 + +基于https://github.com/filamoon/pyapollo修改 Installation ------------ ``` shell -python setup.py install +pip install apollo-client ``` # Features * 实时同步配置 * 灰度配置 - -# Missing Features * 客户端容灾 # Usage @@ -35,8 +35,8 @@ client.start() ``` # Contribution - * Source Code: https://github.com/filamoon/pyapollo/ - * Issue Tracker: https://github.com/filamoon/pyapollo/issues + * Source Code: https://github.com/BruceWW/pyapollo + * Issue Tracker: https://github.com/BruceWW/pyapollo/issues # License The project is licensed under the [Apache 2 license](https://github.com/zouyx/agollo/blob/master/LICENSE). diff --git a/pyapollo/__init__.py b/pyapollo/__init__.py index 34eb45e..02623af 100644 --- a/pyapollo/__init__.py +++ b/pyapollo/__init__.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- from .apollo_client import ApolloClient -__version__ = "0.0.1.dev1" -__author__ = 'filamoon' -__email__ = 'filamoon@gmail.com' +__version__ = "0.8" +__author__ = 'Bruce.Liu' +__email__ = '15869300264@163.com' diff --git a/pyapollo/apollo_client.py b/pyapollo/apollo_client.py index e8acd20..f531ddf 100644 --- a/pyapollo/apollo_client.py +++ b/pyapollo/apollo_client.py @@ -1,6 +1,9 @@ +# !/usr/bin/env python # -*- coding: utf-8 -*- +import hashlib import json import logging +import os import sys import threading import time @@ -9,22 +12,44 @@ class ApolloClient(object): - def __init__(self, app_id, cluster='default', config_server_url='http://localhost:8080', timeout=35, ip=None): + def __init__(self, app_id, cluster='default', config_server_url='http://localhost:8080', timeout=60, ip=None, + cycle_time=300, cache_file_path=None): + """ + + :param app_id: + :param cluster: + :param config_server_url: + :param timeout: + :param ip: the deploy ip for grey release + :param cycle_time: the cycle time to update configuration content from server + :param cache_file_path: local cache file store path + """ self.config_server_url = config_server_url - self.appId = app_id + self.app_id = app_id self.cluster = cluster self.timeout = timeout self.stopped = False - self.init_ip(ip) + self.ip = self.init_ip(ip) self._stopping = False self._cache = {} self._notification_map = {'application': -1} - - def init_ip(self, ip): - if ip: - self.ip = ip + self._cycle_time = cycle_time + self._hash = dict() + if cache_file_path is None: + self._cache_file_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'config') else: + self._cache_file_path = cache_file_path + self._path_checker() + + @staticmethod + def init_ip(ip): + """ + for grey release + :param ip: + :return: + """ + if ip is None: import socket try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) @@ -32,10 +57,17 @@ def init_ip(self, ip): ip = s.getsockname()[0] finally: s.close() - self.ip = ip + return ip - # Main method def get_value(self, key, default_val=None, namespace='application', auto_fetch_on_cache_miss=False): + """ + get the configuration value + :param key: + :param default_val: + :param namespace: + :param auto_fetch_on_cache_miss: + :return: + """ if namespace not in self._notification_map: self._notification_map[namespace] = -1 logging.getLogger(__name__).info("Add namespace '%s' to local notification map", namespace) @@ -54,16 +86,22 @@ def get_value(self, key, default_val=None, namespace='application', auto_fetch_o else: return default_val - # Start the long polling loop. Two modes are provided: - # 1: thread mode (default), create a worker thread to do the loop. Call self.stop() to quit the loop - # 2: eventlet mode (recommended), no need to call the .stop() since it is async - def start(self, use_eventlet=False, eventlet_monkey_patch=False, catch_signals=True): - # First do a blocking long poll to populate the local cache, otherwise we may get racing problems + def start(self, use_event_let=False, event_let_monkey_patch=False, catch_signals=True): + """ + Start the long polling loop. Two modes are provided: + 1: thread mode (default), create a worker thread to do the loop. Call self.stop() to quit the loop + 2: event_let mode (recommended), no need to call the .stop() since it is async + First do a blocking long poll to populate the local cache, otherwise we may get racing problems + :param use_event_let: + :param event_let_monkey_patch: + :param catch_signals: + :return: + """ if len(self._cache) == 0: self._long_poll() - if use_eventlet: + if use_event_let: import eventlet - if eventlet_monkey_patch: + if event_let_monkey_patch: eventlet.monkey_patch() eventlet.spawn(self._listener) else: @@ -76,38 +114,114 @@ def start(self, use_eventlet=False, eventlet_monkey_patch=False, catch_signals=T t.start() def stop(self): + """ + stop the client + :return: + """ self._stopping = True logging.getLogger(__name__).info("Stopping listener...") def _cached_http_get(self, key, default_val, namespace='application'): - url = '{}/configfiles/json/{}/{}/{}?ip={}'.format(self.config_server_url, self.appId, self.cluster, namespace, self.ip) - r = requests.get(url) - if r.ok: - data = r.json() - self._cache[namespace] = data - logging.getLogger(__name__).info('Updated local cache for namespace %s', namespace) - else: - data = self._cache[namespace] - - if key in data: - return data[key] - else: - return default_val + """ + get the configuration content from server with cache + :param key: + :param default_val: + :param namespace: + :return: + """ + url = '{}/configfiles/json/{}/{}/{}?ip={}'.format(self.config_server_url, self.app_id, self.cluster, namespace, + self.ip) + data = dict() + try: + r = requests.get(url) + if r.ok: + data = r.json() + self._cache[namespace] = data + logging.getLogger(__name__).info('Updated local cache for namespace %s', namespace) + self._update_local_cache(data, namespace) + else: + if self._cache[namespace] is None or len(self._cache[namespace]) == 0: + logging.getLogger(__name__).info('cached http get configuration from local cache file') + data = self._get_local_cache(namespace) + else: + data = self._cache[namespace] + except BaseException as e: + logging.getLogger(__name__).warning(str(e)) + data = self._get_local_cache(namespace) + finally: + if key in data: + return data[key] + else: + return default_val def _uncached_http_get(self, namespace='application'): - url = '{}/configs/{}/{}/{}?ip={}'.format(self.config_server_url, self.appId, self.cluster, namespace, self.ip) - r = requests.get(url) - if r.status_code == 200: - data = r.json() + """ + get thr configuration content from server without cache + :param namespace: + :return: + """ + url = '{}/configs/{}/{}/{}?ip={}'.format(self.config_server_url, self.app_id, self.cluster, namespace, self.ip) + try: + r = requests.get(url) + if r.status_code == 200: + data = r.json() + self._cache[namespace] = data['configurations'] + logging.getLogger(__name__).info('Updated local cache for namespace %s release key %s: %s', + namespace, data['releaseKey'], + repr(self._cache[namespace])) + self._update_local_cache(data, namespace) + else: + data = self._get_local_cache(namespace) + logging.getLogger(__name__).info('uncached http get configuration from local cache file') + self._cache[namespace] = data['configurations'] + except BaseException as e: + logging.getLogger(__name__).warning(str(e)) + data = self._get_local_cache(namespace) self._cache[namespace] = data['configurations'] - logging.getLogger(__name__).info('Updated local cache for namespace %s release key %s: %s', - namespace, data['releaseKey'], - repr(self._cache[namespace])) - def _signal_handler(self, signal, frame): + def _signal_handler(self): logging.getLogger(__name__).info('You pressed Ctrl+C!') self._stopping = True + def _path_checker(self): + """ + create configuration cache file directory if not exits + :return: + """ + if not os.path.isdir(self._cache_file_path): + os.mkdir(self._cache_file_path) + + def _update_local_cache(self, data, namespace='application'): + """ + if local cache file exits, update the content + if local cache file not exits, create a version + :param data: new configuration content + :param namespace::s + :return: + """ + new_string = json.dumps(data) + new_hash = hashlib.md5(new_string.encode('utf-8')).hexdigest() + if self._hash.get(namespace) == new_hash: + pass + else: + with open(os.path.join(self._cache_file_path, 'configuration_%s.txt' % namespace), 'w') as f: + f.write(new_string) + self._hash[namespace] = new_hash + + def _get_local_cache(self, namespace='application'): + """ + get configuration from local cache file + if local cache file not exits than return empty dict + :param namespace: + :return: + """ + cache_file_path = os.path.join(self._cache_file_path, 'configuration_%s.txt' % namespace) + if os.path.isfile(cache_file_path): + with open(cache_file_path, 'r') as f: + result = json.loads(f.readline()) + return result + return dict() + def _long_poll(self): url = '{}/notifications/v2'.format(self.config_server_url) notifications = [] @@ -117,36 +231,62 @@ def _long_poll(self): 'namespaceName': key, 'notificationId': notification_id }) + try: + r = requests.get(url=url, params={ + 'appId': self.app_id, + 'cluster': self.cluster, + 'notifications': json.dumps(notifications, ensure_ascii=False) + }, timeout=self.timeout) - r = requests.get(url=url, params={ - 'appId': self.appId, - 'cluster': self.cluster, - 'notifications': json.dumps(notifications, ensure_ascii=False) - }, timeout=self.timeout) - - logging.getLogger(__name__).debug('Long polling returns %d: url=%s', r.status_code, r.request.url) - - if r.status_code == 304: - # no change, loop - logging.getLogger(__name__).debug('No change, loop...') - return - - if r.status_code == 200: - data = r.json() - for entry in data: - ns = entry['namespaceName'] - nid = entry['notificationId'] - logging.getLogger(__name__).info("%s has changes: notificationId=%d", ns, nid) - self._uncached_http_get(ns) - self._notification_map[ns] = nid - else: - logging.getLogger(__name__).warn('Sleep...') - time.sleep(self.timeout) + logging.getLogger(__name__).debug('Long polling returns %d: url=%s', r.status_code, r.request.url) + + if r.status_code == 304: + # no change, loop + logging.getLogger(__name__).debug('No change, loop...') + return + + if r.status_code == 200: + data = r.json() + for entry in data: + ns = entry['namespaceName'] + nid = entry['notificationId'] + logging.getLogger(__name__).info("%s has changes: notificationId=%d", ns, nid) + self._uncached_http_get(ns) + self._notification_map[ns] = nid + return + else: + logging.getLogger(__name__).warning('Sleep...') + time.sleep(self.timeout) + return + except requests.exceptions.ReadTimeout as e: + logging.getLogger(__name__).warning(str(e)) + except requests.exceptions.ConnectionError as e: + logging.getLogger(__name__).warning(str(e)) + self._load_local_cache_file() + + def _load_local_cache_file(self): + """ + load all cached files from local path + is only used while apollo server is unreachable + :return: + """ + for file in os.listdir(self._cache_file_path): + file_path = os.path.join(self._cache_file_path, file) + if os.path.isfile(file_path): + namespace = file.split('.')[0].split('_')[1] + with open(file_path) as f: + self._cache[namespace] = json.loads(f.read())['configurations'] + return True def _listener(self): + """ + + :return: + """ logging.getLogger(__name__).info('Entering listener loop...') while not self._stopping: self._long_poll() + time.sleep(self._cycle_time) logging.getLogger(__name__).info("Listener stopped!") self.stopped = True diff --git a/setup.py b/setup.py index 9d257eb..ab30904 100644 --- a/setup.py +++ b/setup.py @@ -7,24 +7,24 @@ from setuptools import setup, find_packages import pyapollo -SHORT=u'pyapollo' +SHORT = u'a client for apollo' setup( - name='pyapollo', + name='apollo-client', version=pyapollo.__version__, packages=find_packages(), install_requires=[ 'requests' ], - url='https://code.aliyun.com/cashbusrisk/pyapollo', + url='', author=pyapollo.__author__, - author_email='filamoon@gmail.com', + author_email=pyapollo.__email__, classifiers=[ 'Programming Language :: Python', 'Programming Language :: Python :: 2.7', ], include_package_data=True, - package_data={'': ['*.py','*.pyc']}, + package_data={'': ['*.py', '*.pyc']}, zip_safe=False, platforms='any',