44
55"""
66import pika
7+ import functools
8+ import threading
79
810from message_queue import logger
911from message_queue .adapters import BaseAdapter
@@ -24,6 +26,7 @@ def __init__(self, host='localhost', port=5672, user='guest', password='guest',
2426 :param string vhost: Server virutal host
2527
2628 """
29+ self .threads = []
2730 self .queue = None
2831 self ._host = host
2932 self ._credentials = pika .PlainCredentials (user , password )
@@ -143,38 +146,49 @@ def consume(self, worker):
143146 :param function worker: Method that consume the message
144147
145148 """
146- callback = self .consume_callback ( worker )
149+ callback = functools . partial ( self .consume_callback , worker = worker )
147150 self .channel .basic_consume (callback , self .queue )
148151
149152 try :
150153 self .channel .start_consuming ()
151154
152155 except KeyboardInterrupt :
153156 self .channel .stop_consuming ()
154- self .close ()
155157
156- def consume_callback ( self , worker ) :
157- """Decorate worker to exectue on consume callback.
158+ for thread in self . threads :
159+ thread . join ()
158160
159- :param function worker: Worker to execture in the consume callback
161+ self . close ()
160162
161- """
162- def callback (channel , method , properties , body ):
163- """Message consume callback.
163+ def consume_callback (self , channel , method , properties , body , worker ):
164+ """Create a new thred.
164165
165- :param pika.channel.Channel channel: The channel object
166- :param pika.Spec.Basic.Deliver method: basic_deliver method
167- :param pika.Spec.BasicProperties properties: properties
168- :param str|unicode body: The message body
166+ :param pika.channel.Channel channel: The channel object
167+ :param pika.Spec.Basic.Deliver method: basic_deliver method
168+ :param pika.Spec.BasicProperties properties: properties
169+ :param str|unicode body: The message body
170+ :param function worker: Worker to execture in the consume callback
171+ """
172+ thread = threading .Thread (target = self .do_work , args = (channel , method , properties , body , worker ))
173+ thread .start ()
174+ self .threads .append (thread )
169175
170- """
171- # Execute the worker
172- acknowledge = worker (channel , method , properties , body )
176+ def do_work (self , channel , method , properties , body , worker ):
177+ """Execute worker
173178
174- # Acknowledge the message or not
175- self ._consume_acknowledge (channel , method .delivery_tag , acknowledge )
179+ :param pika.channel.Channel channel: The channel object
180+ :param pika.Spec.Basic.Deliver method: basic_deliver method
181+ :param pika.Spec.BasicProperties properties: properties
182+ :param str|unicode body: The message body
183+ :param function worker: Worker to execture in the consume callback
184+ """
185+ thread_id = threading .current_thread ().ident
186+ tag = method .delivery_tag
187+ LOGGER .debug ('Thread id: %r Delivery tag: %r Message body: %r' , thread_id , tag , body )
176188
177- return callback
189+ acknowledge = worker (channel , method , properties , body )
190+ callback = functools .partial (self ._consume_acknowledge , channel , tag , acknowledge )
191+ self .connection .add_callback_threadsafe (callback )
178192
179193 def _consume_acknowledge (self , channel , tag , acknowledge = True ):
180194 """Message acknowledge.
0 commit comments