-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtwisted_kqueue.py
More file actions
171 lines (135 loc) · 5.43 KB
/
twisted_kqueue.py
File metadata and controls
171 lines (135 loc) · 5.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
A kqueue()/kevent() based implementation of the Twisted main loop.
To install the event loop (and you should do this before any connections,
listeners or connectors are added)::
| import twisted_kqueue
| twisted_kqueue.install()
This reactor only works on FreeBSD and requires Python 2.6+
"""
import errno, sys
from zope.interface import implements
from select import kqueue, kevent
from select import KQ_FILTER_READ, KQ_FILTER_WRITE, KQ_EV_ADD, KQ_EV_DELETE
from twisted.internet.interfaces import IReactorFDSet
from twisted.python import log, failure
from twisted.internet import main, posixbase
class KQueueReactor(posixbase.PosixReactorBase):
"""
A reactor that uses kqueue(2)/kevent(2).
@ivar _kq: A L{kqueue} which will be used to check for I/O readiness.
@ivar _selectables: A dictionary mapping integer file descriptors to
instances of L{FileDescriptor} which have been registered with the
reactor. All L{FileDescriptors} which are currently receiving read or
write readiness notifications will be present as values in this
dictionary.
@ivar _reads: A dictionary mapping integer file descriptors to arbitrary
values (this is essentially a set). Keys in this dictionary will be
registered with C{_kq} for read readiness notifications which will be
dispatched to the corresponding L{FileDescriptor} instances in
C{_selectables}.
@ivar _writes: A dictionary mapping integer file descriptors to arbitrary
values (this is essentially a set). Keys in this dictionary will be
registered with C{_kq} for write readiness notifications which will be
dispatched to the corresponding L{FileDescriptor} instances in
C{_selectables}.
"""
implements(IReactorFDSet)
def __init__(self):
"""
Initialize kqueue object, file descriptor tracking dictionaries, and the
base class.
"""
self._kq = kqueue()
self._reads = {}
self._writes = {}
self._selectables = {}
posixbase.PosixReactorBase.__init__(self)
def _updateRegistration(self, *args):
self._kq.control([kevent(*args)], 0, 0)
def addReader(self, reader):
"""Add a FileDescriptor for notification of data available to read.
"""
fd = reader.fileno()
if fd not in self._reads:
self._selectables[fd] = reader
self._reads[fd] = 1
self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD)
def addWriter(self, writer):
"""Add a FileDescriptor for notification of data available to write.
"""
fd = writer.fileno()
if fd not in self._writes:
self._selectables[fd] = writer
self._writes[fd] = 1
self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD)
def removeReader(self, reader):
"""Remove a Selectable for notification of data available to read.
"""
fd = reader.fileno()
if fd in self._reads:
del self._reads[fd]
if fd not in self._writes:
del self._selectables[fd]
self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_DELETE)
def removeWriter(self, writer):
"""Remove a Selectable for notification of data available to write.
"""
fd = writer.fileno()
if fd in self._writes:
del self._writes[fd]
if fd not in self._reads:
del self._selectables[fd]
self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_DELETE)
def removeAll(self):
"""
Remove all selectables, and return a list of them.
"""
return self._removeAll(
[self._selectables[fd] for fd in self._reads],
[self._selectables[fd] for fd in self._writes])
def getReaders(self):
return [self._selectables[fd] for fd in self._reads]
def getWriters(self):
return [self._selectables[fd] for fd in self._writes]
def doKEvent(self, timeout):
"""Poll the kqueue for new events."""
try:
l = self._kq.control([], len(self._selectables), timeout)
except OSError, e:
if e[0] == errno.EINTR:
return
else:
raise
_drdw = self._doWriteOrRead
for event in l:
why = None
fd, filter = event.ident, event.filter
try:
selectable = self._selectables[fd]
except KeyError:
# Handles the infrequent case where one selectable's
# handler disconnects another.
continue
log.callWithLogger(selectable, _drdw, selectable, fd, filter)
def _doWriteOrRead(self, selectable, fd, filter):
try:
if filter == KQ_FILTER_READ:
why = selectable.doRead()
if filter == KQ_FILTER_WRITE:
why = selectable.doWrite()
if not selectable.fileno() == fd:
why = main.CONNECTION_LOST
except:
why = sys.exc_info()[1]
log.deferr()
if why:
self.removeReader(selectable)
self.removeWriter(selectable)
selectable.connectionLost(failure.Failure(why))
doIteration = doKEvent
def install():
k = KQueueReactor()
main.installReactor(k)
__all__ = ["KQueueReactor", "install"]