-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy path03_streams.py
More file actions
48 lines (32 loc) · 1.28 KB
/
03_streams.py
File metadata and controls
48 lines (32 loc) · 1.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from __future__ import annotations
import asyncio
import rsloop
class EchoServerProtocol(asyncio.Protocol):
def connection_made(self, transport: asyncio.BaseTransport) -> None:
self.transport = transport
def data_received(self, data: bytes) -> None:
self.transport.write(data.upper())
class ClientProtocol(asyncio.Protocol):
def __init__(self, done: asyncio.Future[str]) -> None:
self.done = done
def connection_made(self, transport: asyncio.BaseTransport) -> None:
self.transport = transport
transport.write(b"hello over tcp")
def data_received(self, data: bytes) -> None:
if not self.done.done():
self.done.set_result(data.decode())
self.transport.close()
async def main() -> None:
loop = asyncio.get_running_loop()
server = await loop.create_server(EchoServerProtocol, "127.0.0.1", 0)
host, port = server.sockets[0].getsockname()[:2]
done: asyncio.Future[str] = loop.create_future()
transport, _ = await loop.create_connection(
lambda: ClientProtocol(done), host, port
)
print("create_connection/create_server:", await asyncio.wait_for(done, 1.0))
transport.close()
server.close()
await server.wait_closed()
if __name__ == "__main__":
rsloop.run(main())