-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy path02_asyncio_console_basic_graph.py
More file actions
87 lines (66 loc) · 2.32 KB
/
02_asyncio_console_basic_graph.py
File metadata and controls
87 lines (66 loc) · 2.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import platform
from typing import TypedDict
from langchain_deepseek import ChatDeepSeek
from langgraph.graph import StateGraph
import asyncio
joke_model = ChatDeepSeek(model="deepseek-reasoner", max_tokens=1000)
poem_model = ChatDeepSeek(model="deepseek-chat", max_tokens=1000)
class State(TypedDict):
topic: str
joke: str
poem: str
async def generate_joke(state, config):
topic = state["topic"]
print("Writing joke...\n")
joke_response = await joke_model.ainvoke(
[{"role": "user", "content": f"Write a joke about {topic}"}],
config,
)
print()
return {"joke": joke_response.content}
async def generate_poem(state, config):
topic = state["topic"]
print("\nWriting poem...\n")
poem_response = await poem_model.ainvoke(
[{"role": "user", "content": f"Write a short poem about {topic}"}],
config,
)
print()
return {"poem": poem_response.content}
def create_graph():
workflow = StateGraph(State)
# Add nodes
workflow.add_node("generate_joke", generate_joke)
workflow.add_node("generate_poem", generate_poem)
# Set the entry point
workflow.set_entry_point("generate_joke")
# Add edges
workflow.add_edge("generate_joke", "generate_poem")
# Set the final node
workflow.set_finish_point("generate_poem")
return workflow.compile()
graph = create_graph()
async def main():
thinking_started = False
async for msg, metadata in graph.astream(
{"topic": "cats"},
stream_mode="messages",
):
node = metadata["langgraph_node"]
if node == "generate_joke":
if msg.content:
if thinking_started:
print("\n</thinking>\n")
thinking_started = False
print(msg.content, end="", flush=True)
if "reasoning_content" in msg.additional_kwargs:
if not thinking_started:
print("<thinking>")
thinking_started = True
print(msg.additional_kwargs["reasoning_content"], end="", flush=True)
if node == "generate_poem":
print(msg.content, end="", flush=True)
if __name__ == "__main__":
if platform.system() == "Windows":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(main())