senders: resolve and use a random address on websockets connect#173
senders: resolve and use a random address on websockets connect#173hnousiainen wants to merge 6 commits intomasterfrom
Conversation
3a3330d to
392c136
Compare
journalpump/senders/websocket.py
Outdated
| gai_result[4][0] | ||
| for gai_result in await self.websocket_loop.getaddrinfo( | ||
| url_parsed.hostname, 0, type=socket.SOCK_STREAM, proto=socket.IPPROTO_TCP | ||
| ) |
There was a problem hiding this comment.
I feel the [4][0] look a bit "magic" and/or obscure even though I suppose these are "well known":
Could the readability of this be improved with destructuring assignment, such as
| gai_result[4][0] | |
| for gai_result in await self.websocket_loop.getaddrinfo( | |
| url_parsed.hostname, 0, type=socket.SOCK_STREAM, proto=socket.IPPROTO_TCP | |
| ) | |
| sockaddr[0] # Get IP address only | |
| for _, _, _, _, sockaddr in await self.websocket_loop.getaddrinfo( | |
| url_parsed.hostname, 0, type=socket.SOCK_STREAM, proto=socket.IPPROTO_TCP | |
| ) |
or even:
| gai_result[4][0] | |
| for gai_result in await self.websocket_loop.getaddrinfo( | |
| url_parsed.hostname, 0, type=socket.SOCK_STREAM, proto=socket.IPPROTO_TCP | |
| ) | |
| ip_address | |
| for _, _, _, _, (ip_address, *_) in await self.websocket_loop.getaddrinfo( | |
| url_parsed.hostname, 0, type=socket.SOCK_STREAM, proto=socket.IPPROTO_TCP | |
| ) |
There was a problem hiding this comment.
The improvement works fine as long as they don't decide to add a 6th element to the tuple. Maybe an extra *_ would avoid that risk.
There was a problem hiding this comment.
I adjusted the code with a comment and the first proposed form. getaddrinfo() is used quite widely, changes to the return types would be quite disruptive so I think it's good with the parsing of the 5-tuple.
392c136 to
7285d4b
Compare
Kafka-python has dropped RETRY_ERROR_TYPES definition. Catch generic KafkaError and check from the exception itself whether the error is retriable. Re-raise fatal errors.
Replace legacy websockets.exceptions.InvalidStatusCode with websockets.exceptions.InvalidHandshake. Reorder exceptions to catch more specific timeout errors.
7285d4b to
8680735
Compare
journalpump/senders/websocket.py
Outdated
| host=preferred_host, | ||
| ssl=ssl_context, | ||
| compression=ws_compr, | ||
| extra_headers=headers, |
There was a problem hiding this comment.
Seems like the tests are failing here, they're being run with a new python-websockets version where extra_headers has been replaced with additional_headers. https://websockets.readthedocs.io/en/stable/faq/client.html#how-do-i-set-http-headers
There was a problem hiding this comment.
Yup, I opened a can of worms with the modification. Fedora still ships 12.0 while PyPI carries 15.0.1. I added a compat layer on the keyword args.
8680735 to
2825aa4
Compare
Websockets library has changed interfaces somewhat from 12.0 version (shipped with Fedora) and the most recent 15.0 (available via PyPI). Dynamically figure out the correct keyword arguments to use between the known versions.
8d4cccd to
4a55ddd
Compare
The path argument in server conneciton handlers was removed in 10.1 with backwards compatibility removed in 13.0.
Resolve websocket remote address and pick a random one as an explicit host to connect. The combination of asyncio websockets and asyncio create_connect() have some overlapping timeouts that lead to connection attempts to run through the whole list of targets returned by getaddrinfo(). This breaks connectivity in some dual stack IPv6/IPv4 cases, when either one of the address family has e.g. routing problems. We can fairly safely pick one at random here; we have retry logic built around the connections anyway that eventually lead to connected state.
755fcc6 to
6b1066d
Compare
Resolve websocket remote address and pick a random one as an explicit host to connect. The combination of asyncio websockets and asyncio create_connect() have some overlapping timeouts that lead to connection attempts to run through the whole list of targets returned by getaddrinfo(). This breaks connectivity in some dual stack IPv6/IPv4 cases, when either one of the address family has e.g. routing problems.
We can fairly safely pick one at random here; we have retry logic built around the connections anyway that eventually lead to connected state.