diff --git a/examples/proactive-messaging/README.md b/examples/proactive-messaging/README.md new file mode 100644 index 00000000..2b1ece12 --- /dev/null +++ b/examples/proactive-messaging/README.md @@ -0,0 +1,57 @@ +# Proactive Messaging Example + +This example demonstrates how to send proactive messages to Teams users without running a server. This is useful for: +- Scheduled notifications +- Alert systems +- Background jobs that need to notify users +- Webhook handlers that send messages + +## Key Concepts + +- Uses `app.initialize()` instead of `app.start()` (no HTTP server) +- Directly sends messages using `app.send()` +- Requires a conversation ID (from previous interactions or from the Teams API) + +## How It Works + +The example shows the separation of activity sending from HTTP transport: + +1. **Initialize without server**: `await app.initialize()` sets up credentials, token manager, and activity sender without starting the HTTP server +2. **Send messages**: `await app.send(conversation_id, message)` sends messages directly using the ActivitySender +3. **No HTTP server**: Perfect for background jobs, scheduled tasks, or webhook handlers + +## Usage + +```bash +# Set up your environment variables +export CLIENT_ID=your_app_id +export CLIENT_SECRET=your_app_secret +export TENANT_ID=your_tenant_id + +# Run the example with a conversation ID +uv run src/main.py +``` + +## Getting a Conversation ID + +You need a conversation ID to send proactive messages. You can get this from: + +1. **Previous bot interactions**: Store the conversation ID when users first interact with your bot +2. **Teams API**: Use the Microsoft Teams API to create or get conversation references +3. **Testing**: Use an existing bot conversation and extract the conversation ID from the activity + +## Example Output + +``` +Initializing app (without starting server)... +āœ“ App initialized + +Sending proactive message to conversation: 19:... +Message: Hello! This is a proactive message sent without a running server šŸš€ +āœ“ Message sent successfully! Activity ID: 1234567890 + +Sending proactive card to conversation: 19:... +āœ“ Card sent successfully! Activity ID: 1234567891 + +āœ“ All proactive messages sent successfully! +``` diff --git a/examples/proactive-messaging/pyproject.toml b/examples/proactive-messaging/pyproject.toml new file mode 100644 index 00000000..437354e7 --- /dev/null +++ b/examples/proactive-messaging/pyproject.toml @@ -0,0 +1,13 @@ +[project] +name = "proactive-messaging" +version = "0.1.0" +description = "Example showing proactive messaging without running a server" +readme = "README.md" +requires-python = ">=3.12,<3.14" +dependencies = [ + "dotenv>=0.9.9", + "microsoft-teams-apps", +] + +[tool.uv.sources] +microsoft-teams-apps = { workspace = true } diff --git a/examples/proactive-messaging/src/main.py b/examples/proactive-messaging/src/main.py new file mode 100644 index 00000000..006723a9 --- /dev/null +++ b/examples/proactive-messaging/src/main.py @@ -0,0 +1,110 @@ +""" +Copyright (c) Microsoft Corporation. All rights reserved. +Licensed under the MIT License. + +Proactive Messaging Example +=========================== +This example demonstrates how to send proactive messages to Teams users +without running a server. This is useful for: +- Scheduled notifications +- Alert systems +- Background jobs that need to notify users +- Webhook handlers that send messages + +Key points: +- Uses app.initialize() instead of app.start() (no HTTP server) +- Directly sends messages using app.send() +- Requires a conversation ID (from previous interactions or from the Teams API) +""" + +import argparse +import asyncio + +from microsoft_teams.apps import App +from microsoft_teams.cards import ActionSet, AdaptiveCard, OpenUrlAction, TextBlock + + +async def send_proactive_message(app: App, conversation_id: str, message: str) -> None: + """ + Send a proactive message to a Teams conversation. + + Args: + app: The initialized App instance + conversation_id: The Teams conversation ID to send the message to + message: The message text to send + """ + print(f"Sending proactive message to conversation: {conversation_id}") + print(f"Message: {message}") + + # Send the message + result = await app.send(conversation_id, message) + + print(f"āœ“ Message sent successfully! Activity ID: {result.id}") + + +async def send_proactive_card(app: App, conversation_id: str) -> None: + """ + Send a proactive Adaptive Card to a Teams conversation. + + Args: + app: The initialized App instance + conversation_id: The Teams conversation ID to send the card to + """ + # Create an Adaptive Card + card = AdaptiveCard( + schema="http://adaptivecards.io/schemas/adaptive-card.json", + body=[ + TextBlock(text="Proactive Notification", size="Large", weight="Bolder"), + TextBlock(text="This message was sent proactively without a server running!", wrap=True), + TextBlock(text="Status: Active • Priority: High • Time: Now", wrap=True, is_subtle=True), + ActionSet(actions=[OpenUrlAction(title="Learn More", url="https://aka.ms/teams-sdk")]), + ], + ) + + print(f"Sending proactive card to conversation: {conversation_id}") + + result = await app.send(conversation_id, card) + + print(f"āœ“ Card sent successfully! Activity ID: {result.id}") + + +async def main(): + """ + Main function demonstrating proactive messaging. + + In a real application, you would: + 1. Store conversation IDs when users first interact with your bot + 2. Use those IDs later to send proactive messages + 3. Get conversation IDs from the Teams API or from previous interactions + """ + parser = argparse.ArgumentParser( + description="Send proactive messages to a Teams conversation without running a server" + ) + parser.add_argument("conversation_id", help="The Teams conversation ID to send messages to") + args = parser.parse_args() + + # Create app (no plugins needed for sending only) + app = App() + + # Initialize the app without starting the HTTP server + # This sets up credentials, token manager, and activity sender + print("Initializing app (without starting server)...") + await app.initialize() + print("āœ“ App initialized\n") + + # Example 1: Send a simple text message + await send_proactive_message( + app, args.conversation_id, "Hello! This is a proactive message sent without a running server šŸš€" + ) + + # Wait a bit between messages + await asyncio.sleep(2) + + # Example 2: Send an Adaptive Card + await send_proactive_card(app, args.conversation_id) + + print("\nāœ“ All proactive messages sent successfully!") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/packages/apps/src/microsoft_teams/apps/activity_sender.py b/packages/apps/src/microsoft_teams/apps/activity_sender.py new file mode 100644 index 00000000..3cd3f763 --- /dev/null +++ b/packages/apps/src/microsoft_teams/apps/activity_sender.py @@ -0,0 +1,76 @@ +""" +Copyright (c) Microsoft Corporation. All rights reserved. +Licensed under the MIT License. +""" + +from logging import Logger +from typing import Optional + +from microsoft_teams.api import ( + ActivityParams, + ApiClient, + ConversationReference, + SentActivity, +) +from microsoft_teams.common import Client, ConsoleLogger + +from .http_stream import HttpStream +from .plugins.streamer import StreamerProtocol + + +class ActivitySender: + """ + Handles sending activities to the Bot Framework. + Separate from transport concerns (HTTP, WebSocket, etc.) + """ + + def __init__(self, client: Client, logger: Optional[Logger] = None): + """ + Initialize ActivitySender. + + Args: + client: HTTP client with token provider configured + logger: Optional logger instance for debugging. If not provided, creates a default console logger. + """ + self._client = client + self._logger = logger or ConsoleLogger().create_logger("@teams/activity-sender") + + async def send(self, activity: ActivityParams, ref: ConversationReference) -> SentActivity: + """ + Send an activity to the Bot Framework. + + Args: + activity: The activity to send + ref: The conversation reference + + Returns: + The sent activity with id and other server-populated fields + """ + # Create API client for this conversation's service URL + api = ApiClient(service_url=ref.service_url, options=self._client) + + # Merge activity with conversation reference + activity.from_ = ref.bot + activity.conversation = ref.conversation + + # Decide create vs update + if hasattr(activity, "id") and activity.id: + res = await api.conversations.activities(ref.conversation.id).update(activity.id, activity) + return SentActivity.merge(activity, res) + + res = await api.conversations.activities(ref.conversation.id).create(activity) + return SentActivity.merge(activity, res) + + def create_stream(self, ref: ConversationReference) -> StreamerProtocol: + """ + Create a new activity stream for real-time updates. + + Args: + ref: The conversation reference + + Returns: + A new streaming instance + """ + # Create API client for this conversation's service URL + api = ApiClient(ref.service_url, self._client) + return HttpStream(api, ref, self._logger) diff --git a/packages/apps/src/microsoft_teams/apps/app.py b/packages/apps/src/microsoft_teams/apps/app.py index 8cebefa0..29aa887f 100644 --- a/packages/apps/src/microsoft_teams/apps/app.py +++ b/packages/apps/src/microsoft_teams/apps/app.py @@ -29,6 +29,7 @@ from microsoft_teams.cards import AdaptiveCard from microsoft_teams.common import Client, ClientOptions, ConsoleLogger, EventEmitter, LocalStorage +from .activity_sender import ActivitySender from .app_events import EventManager from .app_oauth import OauthHandlers from .app_plugins import PluginProcessor @@ -122,6 +123,12 @@ def __init__(self, **options: Unpack[AppOptions]): self._port: Optional[int] = None self._running = False + self._initialized = False + + # initialize ActivitySender for sending activities + self.activity_sender = ActivitySender( + self.http_client.clone(ClientOptions(token=self._get_bot_token)), self.log + ) # initialize all event, activity, and plugin processors self.activity_processor = ActivityProcessor( @@ -133,6 +140,7 @@ def __init__(self, **options: Unpack[AppOptions]): self.http_client, self._token_manager, self.options.api_client_settings, + self.activity_sender, ) self.event_manager = EventManager(self._events) self.activity_processor.event_manager = self.event_manager @@ -187,6 +195,32 @@ def id(self) -> Optional[str]: return None return self.credentials.client_id + async def initialize(self) -> None: + """ + Initialize the Teams application without starting the HTTP server. + + This method sets up credentials, token manager, activity sender, and plugins, + allowing you to use app.send() for proactive messaging without running a server. + """ + if self._initialized: + self.log.warning("App is already initialized") + return + + try: + for plugin in self.plugins: + # Inject the dependencies + self._plugin_processor.inject(plugin) + if hasattr(plugin, "on_init") and callable(plugin.on_init): + await plugin.on_init() + + self._initialized = True + self.log.info("Teams app initialized successfully (without HTTP server)") + + except Exception as error: + self.log.error(f"Failed to initialize app: {error}") + self._events.emit("error", ErrorEvent(error, context={"method": "initialize"})) + raise + async def start(self, port: Optional[int] = None) -> None: """ Start the Teams application and begin serving HTTP requests. @@ -204,11 +238,9 @@ async def start(self, port: Optional[int] = None) -> None: self._port = port or int(os.getenv("PORT", "3978")) try: - for plugin in self.plugins: - # Inject the dependencies - self._plugin_processor.inject(plugin) - if hasattr(plugin, "on_init") and callable(plugin.on_init): - await plugin.on_init() + # Initialize the app if not already initialized + if not self._initialized: + await self.initialize() # Set callback and start HTTP plugin async def on_http_ready() -> None: @@ -262,8 +294,11 @@ async def on_http_stopped() -> None: async def send(self, conversation_id: str, activity: str | ActivityParams | AdaptiveCard): """Send an activity proactively.""" + if not self._initialized: + raise ValueError("app not initialized - call app.initialize() or app.start() first") + if self.id is None: - raise ValueError("app not started") + raise ValueError("app credentials not configured") conversation_ref = ConversationReference( channel_id="msteams", @@ -279,7 +314,7 @@ async def send(self, conversation_id: str, activity: str | ActivityParams | Adap else: activity = activity - return await self.http.send(activity, conversation_ref) + return await self.activity_sender.send(activity, conversation_ref) def use(self, middleware: Callable[[ActivityContext[ActivityBase]], Awaitable[None]]) -> None: """Add middleware to run on all activities.""" @@ -471,7 +506,7 @@ async def call_next(r: Request) -> Any: ctx = FunctionContext( id=self.id, api=self.api, - http=self.http, + activity_sender=self.activity_sender, log=self.log, data=await r.json(), **r.state.context.__dict__, diff --git a/packages/apps/src/microsoft_teams/apps/app_events.py b/packages/apps/src/microsoft_teams/apps/app_events.py index 97f72856..88eb3fe6 100644 --- a/packages/apps/src/microsoft_teams/apps/app_events.py +++ b/packages/apps/src/microsoft_teams/apps/app_events.py @@ -8,7 +8,7 @@ from microsoft_teams.common import EventEmitter from .events import ActivityEvent, ActivityResponseEvent, ActivitySentEvent, ErrorEvent, EventType -from .plugins import PluginActivityResponseEvent, PluginActivitySentEvent, PluginBase, PluginErrorEvent, Sender +from .plugins import PluginActivityResponseEvent, PluginActivitySentEvent, PluginBase, PluginErrorEvent class EventManager: @@ -18,34 +18,29 @@ def __init__(self, event_emitter: EventEmitter[EventType]): async def on_error(self, event: ErrorEvent, plugins: List[PluginBase]) -> None: for plugin in plugins: if hasattr(plugin, "on_error_event") and callable(plugin.on_error): - await plugin.on_error(PluginErrorEvent(error=event.error, sender=plugin, activity=event.activity)) + await plugin.on_error(PluginErrorEvent(error=event.error, activity=event.activity)) self.event_emitter.emit("error", event) async def on_activity(self, event: ActivityEvent) -> None: self.event_emitter.emit("activity", event) - async def on_activity_sent(self, sender: Sender, event: ActivitySentEvent, plugins: List[PluginBase]) -> None: + async def on_activity_sent(self, event: ActivitySentEvent, plugins: List[PluginBase]) -> None: for plugin in plugins: if callable(plugin.on_activity_sent): await plugin.on_activity_sent( - PluginActivitySentEvent( - sender=event.sender, activity=event.activity, conversation_ref=event.conversation_ref - ) + PluginActivitySentEvent(activity=event.activity, conversation_ref=event.conversation_ref) ) - self.event_emitter.emit("activity_sent", {"event": event, "sender": sender}) + self.event_emitter.emit("activity_sent", event) - async def on_activity_response( - self, sender: Sender, event: ActivityResponseEvent, plugins: List[PluginBase] - ) -> None: + async def on_activity_response(self, event: ActivityResponseEvent, plugins: List[PluginBase]) -> None: for plugin in plugins: if callable(plugin.on_activity_response): await plugin.on_activity_response( PluginActivityResponseEvent( activity=event.activity, - sender=sender, response=event.response, conversation_ref=event.conversation_ref, ) ) - self.event_emitter.emit("activity_response", {"event": event, "sender": sender}) + self.event_emitter.emit("activity_response", event) diff --git a/packages/apps/src/microsoft_teams/apps/app_plugins.py b/packages/apps/src/microsoft_teams/apps/app_plugins.py index 0f3486b6..6050a324 100644 --- a/packages/apps/src/microsoft_teams/apps/app_plugins.py +++ b/packages/apps/src/microsoft_teams/apps/app_plugins.py @@ -17,10 +17,8 @@ from .plugins import ( DependencyMetadata, EventMetadata, - PluginActivityEvent, PluginBase, PluginErrorEvent, - Sender, get_metadata, ) @@ -104,19 +102,15 @@ def _inject_event(self, meta: EventMetadata, plugin: PluginBase, field_name: str async def error_handler(event: PluginErrorEvent) -> None: activity = cast(Activity, event.activity) - await self.event_manager.on_error( - ErrorEvent(error=event.error, activity=activity, sender=plugin), self.plugins - ) + await self.event_manager.on_error(ErrorEvent(error=event.error, activity=activity), self.plugins) setattr(plugin, field_name, error_handler) elif meta.name == "activity": self.logger.debug("Injecting the activity event") - async def activity_handler(event: PluginActivityEvent) -> InvokeResponse[Any]: - sender = cast(Sender, plugin) - activity_event = ActivityEvent(activity=event.activity, sender=sender, token=event.token) - await self.event_manager.on_activity(activity_event) - return await self.activity_processor.process_activity(self.plugins, sender, activity_event) + async def activity_handler(event: ActivityEvent) -> InvokeResponse[Any]: + await self.event_manager.on_activity(event) + return await self.activity_processor.process_activity(self.plugins, event) setattr(plugin, field_name, activity_handler) elif meta.name == "custom": diff --git a/packages/apps/src/microsoft_teams/apps/app_process.py b/packages/apps/src/microsoft_teams/apps/app_process.py index 21d2df51..5a466d01 100644 --- a/packages/apps/src/microsoft_teams/apps/app_process.py +++ b/packages/apps/src/microsoft_teams/apps/app_process.py @@ -9,6 +9,7 @@ from microsoft_teams.api import ( ActivityBase, ActivityParams, + ActivityTypeAdapter, ApiClient, ApiClientSettings, ConversationReference, @@ -23,8 +24,10 @@ if TYPE_CHECKING: from .app_events import EventManager + +from .activity_sender import ActivitySender from .events import ActivityEvent, ActivityResponseEvent, ActivitySentEvent, ErrorEvent -from .plugins import PluginActivityEvent, PluginBase, Sender +from .plugins import PluginActivityEvent, PluginBase from .routing.activity_context import ActivityContext from .routing.router import ActivityHandler, ActivityRouter from .token_manager import TokenManager @@ -44,6 +47,7 @@ def __init__( http_client: Client, token_manager: TokenManager, api_client_settings: Optional[ApiClientSettings], + activity_sender: ActivitySender, ) -> None: self.router = router self.logger = logger @@ -53,6 +57,7 @@ def __init__( self.http_client = http_client self.token_manager = token_manager self.api_client_settings = api_client_settings + self.activity_sender = activity_sender # This will be set after the EventManager is initialized due to # a circular dependency @@ -63,7 +68,6 @@ async def _build_context( activity: ActivityBase, token: TokenProtocol, plugins: List[PluginBase], - sender: Sender, ) -> ActivityContext[ActivityBase]: """Build the context object for activity processing. @@ -121,7 +125,7 @@ async def _build_context( conversation_ref, is_signed_in, self.default_connection_name, - sender, + activity_sender=self.activity_sender, app_token=lambda: self.token_manager.get_graph_token(tenant_id), ) @@ -140,8 +144,7 @@ async def updated_send( ref = conversation_ref or activityCtx.conversation_ref await self.event_manager.on_activity_sent( - sender, - ActivitySentEvent(sender=sender, activity=res, conversation_ref=ref), + ActivitySentEvent(activity=res, conversation_ref=ref), plugins=plugins, ) return res @@ -151,16 +154,14 @@ async def updated_send( async def handle_chunk(chunk_activity: SentActivity): if self.event_manager: await self.event_manager.on_activity_sent( - sender, - ActivitySentEvent(sender=sender, activity=chunk_activity, conversation_ref=conversation_ref), + ActivitySentEvent(activity=chunk_activity, conversation_ref=conversation_ref), plugins=plugins, ) async def handle_close(close_activity: SentActivity): if self.event_manager: await self.event_manager.on_activity_sent( - sender, - ActivitySentEvent(sender=sender, activity=close_activity, conversation_ref=conversation_ref), + ActivitySentEvent(activity=close_activity, conversation_ref=conversation_ref), plugins=plugins, ) @@ -169,10 +170,11 @@ async def handle_close(close_activity: SentActivity): return activityCtx - async def process_activity( - self, plugins: List[PluginBase], sender: Sender, event: ActivityEvent - ) -> InvokeResponse[Any]: - activityCtx = await self._build_context(event.activity, event.token, plugins, sender) + async def process_activity(self, plugins: List[PluginBase], event: ActivityEvent) -> InvokeResponse[Any]: + activity_dict = event.body.model_dump(by_alias=True, exclude_none=True) + activity = ActivityTypeAdapter.validate_python(activity_dict) + + activityCtx = await self._build_context(activity, event.token, plugins) self.logger.debug(f"Received activity: {activityCtx.activity}") @@ -183,8 +185,7 @@ def create_route(plugin: PluginBase) -> ActivityHandler: async def route(ctx: ActivityContext[ActivityBase]) -> Optional[Any]: await plugin.on_activity( PluginActivityEvent( - sender=sender, - activity=event.activity, + activity=activity, token=event.token, conversation_ref=activityCtx.conversation_ref, ) @@ -217,16 +218,15 @@ async def route(ctx: ActivityContext[ActivityBase]) -> Optional[Any]: response = InvokeResponse[Any](status=200, body=middleware_result) await self.event_manager.on_activity_response( - sender, ActivityResponseEvent( - activity=event.activity, + activity=activity, response=response, conversation_ref=activityCtx.conversation_ref, ), plugins=plugins, ) except Exception as error: - await self.event_manager.on_error(ErrorEvent(error=error, activity=event.activity, sender=sender), plugins) + await self.event_manager.on_error(ErrorEvent(error=error, activity=activity), plugins) raise error self.logger.debug("Completed processing activity") diff --git a/packages/apps/src/microsoft_teams/apps/contexts/function_context.py b/packages/apps/src/microsoft_teams/apps/contexts/function_context.py index 8766ba2e..beca1cf7 100644 --- a/packages/apps/src/microsoft_teams/apps/contexts/function_context.py +++ b/packages/apps/src/microsoft_teams/apps/contexts/function_context.py @@ -21,7 +21,7 @@ ) from microsoft_teams.cards import AdaptiveCard -from ..http_plugin import HttpPlugin +from ..activity_sender import ActivitySender from .client_context import ClientContext T = TypeVar("T") @@ -42,8 +42,8 @@ class FunctionContext(ClientContext, Generic[T]): api: ApiClient """The API client instance for conversation client.""" - http: HttpPlugin - """The HTTP plugin instance for sending messages.""" + activity_sender: ActivitySender + """The activity sender instance for sending messages.""" log: Logger """The app logger instance.""" @@ -80,7 +80,7 @@ async def send(self, activity: str | ActivityParams | AdaptiveCard) -> Optional[ else: activity = activity - return await self.http.send(activity, conversation_ref) + return await self.activity_sender.send(activity, conversation_ref) async def _resolve_conversation_id(self, activity: str | ActivityParams | AdaptiveCard) -> Optional[str]: """Resolve or create a conversation ID for the current user/context. diff --git a/packages/apps/src/microsoft_teams/apps/events/__init__.py b/packages/apps/src/microsoft_teams/apps/events/__init__.py index fa0b3f62..75004759 100644 --- a/packages/apps/src/microsoft_teams/apps/events/__init__.py +++ b/packages/apps/src/microsoft_teams/apps/events/__init__.py @@ -9,6 +9,7 @@ ActivityEvent, ActivityResponseEvent, ActivitySentEvent, + CoreActivity, ErrorEvent, SignInEvent, StartEvent, @@ -27,4 +28,5 @@ "is_registered_event", "ActivitySentEvent", "ActivityResponseEvent", + "CoreActivity", ] diff --git a/packages/apps/src/microsoft_teams/apps/events/types.py b/packages/apps/src/microsoft_teams/apps/events/types.py index bc041744..821e326f 100644 --- a/packages/apps/src/microsoft_teams/apps/events/types.py +++ b/packages/apps/src/microsoft_teams/apps/events/types.py @@ -16,21 +16,38 @@ TokenProtocol, TokenResponse, ) +from pydantic import BaseModel, ConfigDict -from ..plugins import PluginBase, Sender from ..routing import ActivityContext +class CoreActivity(BaseModel): + """ + Core activity fields that all transports need to know about. + Extensible for protocol-specific fields via extra="allow". + """ + + model_config = ConfigDict(extra="allow") + + service_url: Optional[str] = None + """Service URL for routing""" + + id: Optional[str] = None + """Activity ID for correlation""" + + type: Optional[str] = None + """Activity type for basic routing""" + + @dataclass class ActivityEvent: """Event emitted when an activity is processed.""" - activity: Activity - sender: Sender + body: CoreActivity token: TokenProtocol def __repr__(self) -> str: - return f"ActivityEvent(activity={self.activity}, token={self.token}, sender={self.sender})" + return f"ActivityEvent(body={self.body}, token={self.token})" @dataclass @@ -40,29 +57,24 @@ class ErrorEvent: error: Exception context: Optional[Dict[str, Any]] = None activity: Optional[Activity] = None - sender: Optional[PluginBase] = None def __post_init__(self) -> None: if self.context is None: self.context = {} def __repr__(self) -> str: - return f"ErrorEvent(error={self.error}, context={self.context}, activity={self.activity}, sender={self.sender})" + return f"ErrorEvent(error={self.error}, context={self.context}, activity={self.activity})" @dataclass class ActivitySentEvent: - """Event emitted by a plugin when an activity is sent.""" + """Event emitted when an activity is sent.""" - sender: Sender activity: SentActivity conversation_ref: ConversationReference def __repr__(self) -> str: - return ( - f"ActivitySentEvent(sender={self.sender}, activity={self.activity}, " - + f"conversation_ref={self.conversation_ref})" - ) + return f"ActivitySentEvent(activity={self.activity}, conversation_ref={self.conversation_ref})" @dataclass diff --git a/packages/apps/src/microsoft_teams/apps/http_plugin.py b/packages/apps/src/microsoft_teams/apps/http_plugin.py index 21358f3d..2f79654a 100644 --- a/packages/apps/src/microsoft_teams/apps/http_plugin.py +++ b/packages/apps/src/microsoft_teams/apps/http_plugin.py @@ -27,32 +27,24 @@ from fastapi import FastAPI, Request, Response from fastapi.staticfiles import StaticFiles from microsoft_teams.api import ( - Activity, - ActivityParams, - ActivityTypeAdapter, - ApiClient, - ConversationReference, Credentials, InvokeResponse, - SentActivity, TokenProtocol, ) -from microsoft_teams.apps.http_stream import HttpStream -from microsoft_teams.common import Client, ClientOptions, ConsoleLogger, Token -from pydantic import BaseModel, ValidationError +from microsoft_teams.common import ConsoleLogger +from pydantic import BaseModel from starlette.applications import Starlette from starlette.types import Lifespan from .auth import create_jwt_validation_middleware -from .events import ActivityEvent, ErrorEvent +from .events import ActivityEvent, CoreActivity, ErrorEvent from .plugins import ( DependencyMetadata, EventMetadata, LoggerDependencyOptions, PluginActivityResponseEvent, + PluginBase, PluginStartEvent, - Sender, - StreamerProtocol, ) from .plugins.metadata import Plugin @@ -67,10 +59,11 @@ class HttpPluginOptions(TypedDict, total=False): server_factory: Callable[[FastAPI], uvicorn.Server] -@Plugin(name="http", version=version, description="the default plugin for sending/receiving activities") -class HttpPlugin(Sender): +@Plugin(name="http", version=version, description="the default plugin for receiving activities via HTTP") +class HttpPlugin(PluginBase): """ Basic HTTP plugin that provides a FastAPI server for Teams activities. + Handles HTTP server setup, routing, and authentication. """ logger: Annotated[Logger, LoggerDependencyOptions()] @@ -79,10 +72,6 @@ class HttpPlugin(Sender): on_error_event: Annotated[Callable[[ErrorEvent], None], EventMetadata(name="error")] on_activity_event: Annotated[Callable[[ActivityEvent], InvokeResponse[Any]], EventMetadata(name="activity")] - client: Annotated[Client, DependencyMetadata()] - - bot_token: Annotated[Optional[Callable[[], Token]], DependencyMetadata(optional=True)] - lifespans: list[Lifespan[Starlette]] = [] def __init__(self, **options: Unpack[HttpPluginOptions]): @@ -236,33 +225,17 @@ async def on_activity_response(self, event: PluginActivityResponseEvent) -> None """ self.logger.debug(f"Completing activity response for {event.activity.id}") - async def send(self, activity: ActivityParams, ref: ConversationReference) -> SentActivity: - api = ApiClient(service_url=ref.service_url, options=self.client.clone(ClientOptions(token=self.bot_token))) - - activity.from_ = ref.bot - activity.conversation = ref.conversation - - if hasattr(activity, "id") and activity.id: - res = await api.conversations.activities(ref.conversation.id).update(activity.id, activity) - return SentActivity.merge(activity, res) - - res = await api.conversations.activities(ref.conversation.id).create(activity) - return SentActivity.merge(activity, res) - - async def _process_activity( - self, activity: Activity, activity_id: str, token: TokenProtocol - ) -> InvokeResponse[Any]: + async def _process_activity(self, core_activity: CoreActivity, token: TokenProtocol) -> InvokeResponse[Any]: """ Process an activity via the registered handler. Args: - activity: The Teams activity data + core_activity: The core activity payload token: The authorization token (if any) - activity_id: The activity ID for response coordination """ result: InvokeResponse[Any] try: - event = ActivityEvent(activity=activity, sender=self, token=token) + event = ActivityEvent(body=core_activity, token=token) if asyncio.iscoroutinefunction(self.on_activity_event): result = await self.on_activity_event(event) else: @@ -274,50 +247,6 @@ async def _process_activity( return result - async def _handle_activity_request(self, request: Request) -> Any: - """ - Process the activity request and coordinate response. - - Args: - request: The FastAPI request object (token is in request.state.validated_token) - - Returns: - The activity processing result - """ - # Parse activity data - body = await request.json() - - # Get validated token from middleware (if present - will be missing if skip_auth is True) - if hasattr(request.state, "validated_token") and request.state.validated_token: - token = request.state.validated_token - else: - token = cast( - TokenProtocol, - SimpleNamespace( - app_id="", - app_display_name="", - tenant_id="", - service_url=body.get("serviceUrl", ""), - from_="azure", - from_id="", - is_expired=lambda: False, - ), - ) - - activity_type = body.get("type", "unknown") - activity_id = body.get("id", "unknown") - - self.logger.debug(f"Received activity: {activity_type} (ID: {activity_id})") - - try: - activity = ActivityTypeAdapter.validate_python(body) - except ValidationError as e: - self.logger.error(e.errors()) - raise - - self.logger.debug(f"Processing activity {activity_id} via handler...") - return await self._process_activity(activity, activity_id, token) - def _handle_activity_response(self, response: Response, result: Any) -> Union[Response, Dict[str, object]]: """ Handle the activity response formatting. @@ -353,10 +282,33 @@ def _handle_activity_response(self, response: Response, result: Any) -> Union[Re self.logger.debug("Returning empty body") return response - async def on_activity_request(self, request: Request, response: Response) -> Any: + async def on_activity_request(self, core_activity: CoreActivity, request: Request, response: Response) -> Any: """Handle incoming Teams activity.""" - # Process the activity (token validation handled by middleware) - result = await self._handle_activity_request(request) + # Get validated token from middleware (if present - will be missing if skip_auth is True) + if hasattr(request.state, "validated_token") and request.state.validated_token: + token = request.state.validated_token + else: + token = cast( + TokenProtocol, + SimpleNamespace( + app_id="", + app_display_name="", + tenant_id="", + service_url=core_activity.service_url or "", + from_="azure", + from_id="", + is_expired=lambda: False, + ), + ) + + activity_type = core_activity.type or "unknown" + activity_id = core_activity.id or "unknown" + + self.logger.debug(f"Received activity: {activity_type} (ID: {activity_id})") + self.logger.debug(f"Processing activity {activity_id} via handler...") + + # Process the activity + result = await self._process_activity(core_activity, token) return self._handle_activity_response(response, result) def _setup_routes(self) -> None: @@ -370,13 +322,6 @@ async def health_check() -> Dict[str, Any]: self.app.get("/")(health_check) - def create_stream(self, ref: ConversationReference) -> StreamerProtocol: - """Create a new streaming instance.""" - - api = ApiClient(ref.service_url, self.client.clone(ClientOptions(token=self.bot_token))) - - return HttpStream(api, ref, self.logger) - def mount(self, name: str, dir_path: Path | str, page_path: Optional[str] = None) -> None: """ Serve a static page at the given path. diff --git a/packages/apps/src/microsoft_teams/apps/plugins/__init__.py b/packages/apps/src/microsoft_teams/apps/plugins/__init__.py index 1b5b0034..84071dc5 100644 --- a/packages/apps/src/microsoft_teams/apps/plugins/__init__.py +++ b/packages/apps/src/microsoft_teams/apps/plugins/__init__.py @@ -10,12 +10,10 @@ from .plugin_base import PluginBase from .plugin_error_event import PluginErrorEvent from .plugin_start_event import PluginStartEvent -from .sender import Sender from .streamer import StreamerProtocol __all__ = [ "PluginBase", - "Sender", "StreamerProtocol", "PluginActivityEvent", "PluginActivityResponseEvent", diff --git a/packages/apps/src/microsoft_teams/apps/plugins/plugin_activity_event.py b/packages/apps/src/microsoft_teams/apps/plugins/plugin_activity_event.py index 40c1d7f6..df4d5264 100644 --- a/packages/apps/src/microsoft_teams/apps/plugins/plugin_activity_event.py +++ b/packages/apps/src/microsoft_teams/apps/plugins/plugin_activity_event.py @@ -3,13 +3,10 @@ Licensed under the MIT License. """ -from typing import TYPE_CHECKING, NamedTuple, Optional +from typing import NamedTuple from microsoft_teams.api import Activity, ConversationReference, TokenProtocol -if TYPE_CHECKING: - from .sender import Sender - class PluginActivityEvent(NamedTuple): """Event emitted by a plugin when an activity is received.""" @@ -22,6 +19,3 @@ class PluginActivityEvent(NamedTuple): conversation_ref: ConversationReference """The conversation reference for the activity""" - - sender: Optional["Sender"] = None - """The sender""" diff --git a/packages/apps/src/microsoft_teams/apps/plugins/plugin_activity_response_event.py b/packages/apps/src/microsoft_teams/apps/plugins/plugin_activity_response_event.py index f9347b86..e26435f8 100644 --- a/packages/apps/src/microsoft_teams/apps/plugins/plugin_activity_response_event.py +++ b/packages/apps/src/microsoft_teams/apps/plugins/plugin_activity_response_event.py @@ -3,19 +3,13 @@ Licensed under the MIT License. """ -from typing import TYPE_CHECKING, Any, NamedTuple, Optional +from typing import Any, NamedTuple, Optional from microsoft_teams.api import Activity, ConversationReference, InvokeResponse -if TYPE_CHECKING: - from .sender import Sender - class PluginActivityResponseEvent(NamedTuple): - """Event emitted by a plugin before an activity response is sent""" - - sender: "Sender" - """The sender""" + """Event emitted before an activity response is sent""" activity: Activity """The inbound request activity payload""" diff --git a/packages/apps/src/microsoft_teams/apps/plugins/plugin_activity_sent_event.py b/packages/apps/src/microsoft_teams/apps/plugins/plugin_activity_sent_event.py index 1cab011c..f8916625 100644 --- a/packages/apps/src/microsoft_teams/apps/plugins/plugin_activity_sent_event.py +++ b/packages/apps/src/microsoft_teams/apps/plugins/plugin_activity_sent_event.py @@ -3,20 +3,14 @@ Licensed under the MIT License. """ -from typing import TYPE_CHECKING, NamedTuple +from typing import NamedTuple from microsoft_teams.api.activities import SentActivity from microsoft_teams.api.models import ConversationReference -if TYPE_CHECKING: - from .sender import Sender - class PluginActivitySentEvent(NamedTuple): - """Event emitted by a plugin when an activity is sent.""" - - sender: "Sender" - """The sender of the activity""" + """Event emitted when an activity is sent.""" activity: SentActivity """The sent activity""" diff --git a/packages/apps/src/microsoft_teams/apps/plugins/plugin_base.py b/packages/apps/src/microsoft_teams/apps/plugins/plugin_base.py index b63103c9..51c7e932 100644 --- a/packages/apps/src/microsoft_teams/apps/plugins/plugin_base.py +++ b/packages/apps/src/microsoft_teams/apps/plugins/plugin_base.py @@ -3,15 +3,11 @@ Licensed under the MIT License. """ -from microsoft_teams.api.activities import ActivityParams, SentActivity -from microsoft_teams.api.models.conversation import ConversationReference - from .plugin_activity_event import PluginActivityEvent from .plugin_activity_response_event import PluginActivityResponseEvent from .plugin_activity_sent_event import PluginActivitySentEvent from .plugin_error_event import PluginErrorEvent from .plugin_start_event import PluginStartEvent -from .streamer import StreamerProtocol class PluginBase: @@ -44,11 +40,3 @@ async def on_activity_sent(self, event: PluginActivitySentEvent) -> None: async def on_activity_response(self, event: PluginActivityResponseEvent) -> None: """Called by the App when an activity response is sent.""" ... - - async def send(self, activity: ActivityParams, ref: ConversationReference) -> SentActivity: - """Called by the App to send an activity""" - ... - - def create_stream(self, ref: ConversationReference) -> StreamerProtocol: - """Called by the App to create a new activity stream""" - ... diff --git a/packages/apps/src/microsoft_teams/apps/plugins/plugin_error_event.py b/packages/apps/src/microsoft_teams/apps/plugins/plugin_error_event.py index 5dc82ee3..6720c2de 100644 --- a/packages/apps/src/microsoft_teams/apps/plugins/plugin_error_event.py +++ b/packages/apps/src/microsoft_teams/apps/plugins/plugin_error_event.py @@ -3,13 +3,10 @@ Licensed under the MIT License. """ -from typing import TYPE_CHECKING, NamedTuple, Optional +from typing import NamedTuple, Optional from microsoft_teams.api import Activity -if TYPE_CHECKING: - from .plugin_base import PluginBase - class PluginErrorEvent(NamedTuple): """Event emitted when an error occurs.""" @@ -17,8 +14,5 @@ class PluginErrorEvent(NamedTuple): error: Exception """The error""" - sender: Optional["PluginBase"] = None - """The sender""" - activity: Optional[Activity] = None """The activity""" diff --git a/packages/apps/src/microsoft_teams/apps/plugins/sender.py b/packages/apps/src/microsoft_teams/apps/plugins/sender.py deleted file mode 100644 index 31450b5a..00000000 --- a/packages/apps/src/microsoft_teams/apps/plugins/sender.py +++ /dev/null @@ -1,26 +0,0 @@ -""" -Copyright (c) Microsoft Corporation. All rights reserved. -Licensed under the MIT License. -""" - -from abc import abstractmethod - -from microsoft_teams.api.activities import ActivityParams, SentActivity -from microsoft_teams.api.models import ConversationReference - -from .plugin_base import PluginBase -from .streamer import StreamerProtocol - - -class Sender(PluginBase): - """A plugin that can send activities""" - - @abstractmethod - async def send(self, activity: ActivityParams, ref: ConversationReference) -> SentActivity: - """Called by the App to send an activity""" - pass - - @abstractmethod - def create_stream(self, ref: ConversationReference) -> StreamerProtocol: - """Called by the App to create a new activity stream""" - pass diff --git a/packages/apps/src/microsoft_teams/apps/routing/activity_context.py b/packages/apps/src/microsoft_teams/apps/routing/activity_context.py index 42109562..943ea6ea 100644 --- a/packages/apps/src/microsoft_teams/apps/routing/activity_context.py +++ b/packages/apps/src/microsoft_teams/apps/routing/activity_context.py @@ -36,15 +36,13 @@ from microsoft_teams.common import Storage from microsoft_teams.common.http.client_token import Token +from ..activity_sender import ActivitySender + if TYPE_CHECKING: from msgraph.graph_service_client import GraphServiceClient -from ..plugins import Sender - T = TypeVar("T", bound=ActivityBase, contravariant=True) -SendCallable = Callable[[str | ActivityParams | AdaptiveCard], Awaitable[SentActivity]] - def _get_graph_client(token: Token): """Lazy import and call get_graph_client when needed.""" @@ -96,7 +94,7 @@ def __init__( conversation_ref: ConversationReference, is_signed_in: bool, connection_name: str, - sender: Sender, + activity_sender: ActivitySender, app_token: Token, ): self.activity = activity @@ -108,9 +106,9 @@ def __init__( self.user_token = user_token self.connection_name = connection_name self.is_signed_in = is_signed_in - self._plugin = sender + self._activity_sender = activity_sender self._app_token = app_token - self.stream = self._plugin.create_stream(self.conversation_ref) + self.stream = activity_sender.create_stream(conversation_ref) self._next_handler: Optional[Callable[[], Awaitable[None]]] = None @@ -177,7 +175,7 @@ async def send( Args: message: The message to send, can be a string, ActivityParams, or AdaptiveCard - conversation_id: Optional conversation ID to override the current conversation reference + conversation_ref: Optional conversation reference to override the current conversation reference """ if isinstance(message, str): activity = MessageActivityInput(text=message) @@ -187,7 +185,7 @@ async def send( activity = message ref = conversation_ref or self.conversation_ref - res = await self._plugin.send(activity, ref) + res = await self._activity_sender.send(activity, ref) return res async def reply(self, input: str | ActivityParams) -> SentActivity: diff --git a/packages/apps/tests/test_app.py b/packages/apps/tests/test_app.py index 902359e0..93277278 100644 --- a/packages/apps/tests/test_app.py +++ b/packages/apps/tests/test_app.py @@ -179,24 +179,15 @@ async def handle_activity(event: ActivityEvent) -> None: activity_events.append(event) event_received.set() - from_account = Account(id="bot-123", name="Test Bot", role="bot") - recipient = Account(id="user-456", name="Test User", role="user") - conversation = ConversationAccount(id="conv-789", conversation_type="personal") + from microsoft_teams.apps.events import CoreActivity - activity = MessageActivity( + core_activity = CoreActivity( type="message", id="test-activity-id", - text="Hello, world!", - from_=from_account, - recipient=recipient, - conversation=conversation, - channel_id="msteams", ) - plugin = app_with_activity_handler.http - await app_with_activity_handler.event_manager.on_activity( - ActivityEvent(activity=activity, sender=plugin, token=FakeToken()) + ActivityEvent(body=core_activity, token=FakeToken()) ) # Wait for the async event handler to complete @@ -205,12 +196,9 @@ async def handle_activity(event: ActivityEvent) -> None: # Verify event was emitted assert len(activity_events) == 1 assert isinstance(activity_events[0], ActivityEvent) - # The event contains the parsed output model, not the input model - assert activity_events[0].activity.id == activity.id - assert activity_events[0].activity.type == activity.type - # Check text only if it's a MessageActivity - if hasattr(activity_events[0].activity, "text"): - assert activity_events[0].activity.text == activity.text + # The event contains the core activity + assert activity_events[0].body.id == core_activity.id + assert activity_events[0].body.type == core_activity.type @pytest.mark.asyncio async def test_multiple_event_handlers(self, app_with_options: App) -> None: @@ -236,22 +224,15 @@ async def handle_activity_2(event: ActivityEvent) -> None: if received_count == 2: both_received.set() - from_account = Account(id="bot-123", name="Test Bot", role="bot") - recipient = Account(id="user-456", name="Test User", role="user") - conversation = ConversationAccount(id="conv-789", conversation_type="personal") + from microsoft_teams.apps.events import CoreActivity - activity = MessageActivity( + core_activity = CoreActivity( type="message", id="test-activity-id", - text="Hello, world!", - from_=from_account, - recipient=recipient, - conversation=conversation, - channel_id="msteams", ) await app_with_options.event_manager.on_activity( - ActivityEvent(activity=activity, sender=app_with_options.http, token=FakeToken()) + ActivityEvent(body=core_activity, token=FakeToken()) ) # Wait for both async event handlers to complete @@ -260,8 +241,8 @@ async def handle_activity_2(event: ActivityEvent) -> None: # Both handlers should have received the event assert len(activity_events_1) == 1 assert len(activity_events_2) == 1 - assert activity_events_1[0].activity == activity - assert activity_events_2[0].activity == activity + assert activity_events_1[0].body == core_activity + assert activity_events_2[0].body == core_activity # Generated Handler Tests diff --git a/packages/apps/tests/test_app_events.py b/packages/apps/tests/test_app_events.py index 7b5ea232..a4caad5d 100644 --- a/packages/apps/tests/test_app_events.py +++ b/packages/apps/tests/test_app_events.py @@ -7,16 +7,16 @@ from unittest.mock import AsyncMock, MagicMock import pytest -from microsoft_teams.api import Activity, ConversationReference, TokenProtocol +from microsoft_teams.api import Activity, ConversationReference, SentActivity, TokenProtocol from microsoft_teams.apps import ( ActivityEvent, ActivityResponseEvent, ActivitySentEvent, ErrorEvent, HttpPlugin, - Sender, ) from microsoft_teams.apps.app_events import EventManager +from microsoft_teams.apps.events import CoreActivity from microsoft_teams.common import EventEmitter @@ -57,9 +57,7 @@ async def test_on_error(self, event_manager, mock_event_emitter, mock_plugins): @pytest.mark.asyncio async def test_on_activity(self, event_manager, mock_event_emitter): """Test the on_activity method.""" - activity_event = ActivityEvent( - sender=Sender(), activity=MagicMock(spec=Activity), token=MagicMock(spec=TokenProtocol) - ) + activity_event = ActivityEvent(body=CoreActivity(), token=MagicMock(spec=TokenProtocol)) await event_manager.on_activity(activity_event) @@ -68,33 +66,27 @@ async def test_on_activity(self, event_manager, mock_event_emitter): @pytest.mark.asyncio async def test_on_activity_sent(self, event_manager, mock_event_emitter, mock_plugins): """Test the on_activity_sent method.""" - sender = Sender() activity_sent_event = ActivitySentEvent( - sender=sender, activity=MagicMock(spec=Activity), conversation_ref=MagicMock(spec=ConversationReference) + activity=MagicMock(spec=SentActivity), conversation_ref=MagicMock(spec=ConversationReference) ) - await event_manager.on_activity_sent(sender, activity_sent_event, mock_plugins) + await event_manager.on_activity_sent(activity_sent_event, mock_plugins) for plugin in mock_plugins: if callable(plugin.on_activity_sent): plugin.on_activity_sent.assert_called() - mock_event_emitter.emit.assert_called_once_with( - "activity_sent", {"event": activity_sent_event, "sender": sender} - ) + mock_event_emitter.emit.assert_called_once_with("activity_sent", activity_sent_event) @pytest.mark.asyncio async def test_on_activity_response(self, event_manager, mock_event_emitter, mock_plugins): """Test the on_activity_response method.""" - sender = Sender() activity_response_event = ActivityResponseEvent( activity=MagicMock(spec=Activity), response=MagicMock(), conversation_ref=MagicMock() ) - await event_manager.on_activity_response(sender, activity_response_event, mock_plugins) + await event_manager.on_activity_response(activity_response_event, mock_plugins) for plugin in mock_plugins: if callable(plugin.on_activity_response): plugin.on_activity_response.assert_called() - mock_event_emitter.emit.assert_called_once_with( - "activity_response", {"event": activity_response_event, "sender": sender} - ) + mock_event_emitter.emit.assert_called_once_with("activity_response", activity_response_event) diff --git a/packages/apps/tests/test_app_process.py b/packages/apps/tests/test_app_process.py index eab61cb8..27a66fa0 100644 --- a/packages/apps/tests/test_app_process.py +++ b/packages/apps/tests/test_app_process.py @@ -9,18 +9,17 @@ import pytest from microsoft_teams.api import ( - Account, Activity, ActivityBase, - ConversationAccount, ConversationReference, InvokeResponse, - MessageActivity, TokenProtocol, ) -from microsoft_teams.apps import ActivityContext, ActivityEvent, Sender +from microsoft_teams.apps import ActivityContext, ActivityEvent +from microsoft_teams.apps.activity_sender import ActivitySender from microsoft_teams.apps.app_events import EventManager from microsoft_teams.apps.app_process import ActivityProcessor +from microsoft_teams.apps.events import CoreActivity from microsoft_teams.apps.routing.router import ActivityHandler, ActivityRouter from microsoft_teams.apps.token_manager import TokenManager from microsoft_teams.common import Client, LocalStorage @@ -43,6 +42,11 @@ def activity_processor(self, mock_logger, mock_http_client): mock_storage = MagicMock(spec=LocalStorage) mock_activity_router = MagicMock(spec=ActivityRouter) mock_token_manager = MagicMock(spec=TokenManager) + mock_activity_sender = MagicMock(spec=ActivitySender) + # Mock the stream object with async close + mock_stream = MagicMock() + mock_stream.close = AsyncMock() + mock_activity_sender.create_stream.return_value = mock_stream return ActivityProcessor( mock_activity_router, mock_logger, @@ -52,6 +56,7 @@ def activity_processor(self, mock_logger, mock_http_client): mock_http_client, mock_token_manager, None, + mock_activity_sender, ) @pytest.mark.asyncio @@ -66,6 +71,8 @@ async def test_execute_middleware_chain_with_no_handlers(self, activity_processo @pytest.mark.asyncio async def test_execute_middleware_chain_with_two_handlers(self, activity_processor, mock_http_client, mock_logger): """Test the execute_middleware_chain method with two handlers.""" + mock_activity_sender = MagicMock(spec=ActivitySender) + mock_activity_sender.create_stream.return_value = MagicMock() context = ActivityContext( activity=MagicMock(spec=ActivityBase), app_id="app_id", @@ -76,8 +83,8 @@ async def test_execute_middleware_chain_with_two_handlers(self, activity_process conversation_ref=MagicMock(spec=ConversationReference), is_signed_in=True, connection_name="default_connection", - sender=MagicMock(spec=Sender), - app_token=None, + activity_sender=mock_activity_sender, + app_token=lambda: None, ) handler_one = AsyncMock(spec=ActivityHandler) @@ -118,35 +125,32 @@ async def test_process_activity_middleware_results(self, activity_processor, mid """Test process_activity with different middleware return values.""" # Setup mocks mock_plugins = [] - mock_sender = MagicMock() - stream = MagicMock() - stream.close = AsyncMock() - mock_sender.create_stream.return_value = stream - - # Create real activity and event - mock_account = Account(id="user-123", name="Test User") - mock_conversation = ConversationAccount(id="conv-789") - mock_bot = Account(id="bot-456", name="Test Bot") - activity = MessageActivity( + + # Create core activity with required fields for MessageActivity + core_activity = CoreActivity( type="message", - text="Test message", - from_=mock_account, - conversation=mock_conversation, - recipient=mock_bot, id="activity-123", service_url="https://service.url", + **{ + "from": {"id": "user-123", "name": "Test User"}, + "conversation": {"id": "conv-789"}, + "recipient": {"id": "bot-456", "name": "Test Bot"}, + "channelId": "msteams", + } ) mock_token = MagicMock(spec=TokenProtocol) - mock_activity_event = ActivityEvent(activity=activity, sender=mock_sender, token=mock_token) + mock_token.service_url = "https://service.url" + mock_activity_event = ActivityEvent(body=core_activity, token=mock_token) # Setup processor mocks activity_processor.router.select_handlers = MagicMock(return_value=[]) activity_processor.execute_middleware_chain = AsyncMock(return_value=middleware_result) activity_processor.event_manager = MagicMock() activity_processor.event_manager.on_activity_response = AsyncMock() + activity_processor.event_manager.on_error = AsyncMock() # Act - result = await activity_processor.process_activity(mock_plugins, mock_sender, mock_activity_event) + result = await activity_processor.process_activity(mock_plugins, mock_activity_event) # Assert assert result.status == expected_result.status @@ -157,26 +161,22 @@ async def test_process_activity_raises_exception(self, activity_processor): """Test process_activity raises exception when middleware chain fails.""" # Setup mocks mock_plugins = [] - mock_sender = MagicMock() - stream = MagicMock() - stream.close = AsyncMock() - mock_sender.create_stream.return_value = stream - - # Create real activity and event - mock_account = Account(id="user-123", name="Test User") - mock_conversation = ConversationAccount(id="conv-789") - mock_bot = Account(id="bot-456", name="Test Bot") - activity = MessageActivity( + + # Create core activity with required fields for MessageActivity + core_activity = CoreActivity( type="message", - text="Test message", - from_=mock_account, - conversation=mock_conversation, - recipient=mock_bot, id="activity-123", service_url="https://service.url", + **{ + "from": {"id": "user-123", "name": "Test User"}, + "conversation": {"id": "conv-789"}, + "recipient": {"id": "bot-456", "name": "Test Bot"}, + "channelId": "msteams", + } ) mock_token = MagicMock(spec=TokenProtocol) - mock_activity_event = ActivityEvent(activity=activity, sender=mock_sender, token=mock_token) + mock_token.service_url = "https://service.url" + mock_activity_event = ActivityEvent(body=core_activity, token=mock_token) # Setup processor mocks activity_processor.router.select_handlers = MagicMock(return_value=[]) @@ -188,7 +188,7 @@ async def test_process_activity_raises_exception(self, activity_processor): # Act & Assert - expect exception to be raised with pytest.raises(Exception, match="Test exception"): - await activity_processor.process_activity(mock_plugins, mock_sender, mock_activity_event) + await activity_processor.process_activity(mock_plugins, mock_activity_event) # Assert error event was called assert activity_processor.event_manager.on_error.called diff --git a/packages/apps/tests/test_function_context.py b/packages/apps/tests/test_function_context.py index 93ed9bd9..a6c37bbf 100644 --- a/packages/apps/tests/test_function_context.py +++ b/packages/apps/tests/test_function_context.py @@ -49,7 +49,7 @@ def function_context(self, mock_api: ApiClient, mock_http: Any, mock_logger: Any id="bot-123", name="Test Bot", api=mock_api, - http=mock_http, + activity_sender=mock_http, log=mock_logger, data={"some": "payload"}, app_session_id="dummy-session", diff --git a/packages/apps/tests/test_http_plugin.py b/packages/apps/tests/test_http_plugin.py index 41aed482..4ff0aaa6 100644 --- a/packages/apps/tests/test_http_plugin.py +++ b/packages/apps/tests/test_http_plugin.py @@ -77,7 +77,7 @@ async def test_on_activity_response(self, plugin_without_validator, mock_account response_data = InvokeResponse(body=cast(ConfigResponse, {"status": "success"}), status=200) await plugin_without_validator.on_activity_response( PluginActivityResponseEvent( - sender=plugin_without_validator, + activity=mock_activity, response=response_data, conversation_ref=mock_reference, @@ -97,7 +97,7 @@ async def test_on_error(self, plugin_with_validator, mock_account, mock_logger): error = ValueError("Test error") await plugin_with_validator.on_error( - PluginErrorEvent(sender=plugin_with_validator, activity=mock_activity, error=error) + PluginErrorEvent(activity=mock_activity, error=error) ) @pytest.mark.asyncio @@ -207,18 +207,22 @@ async def test_on_activity_request_success(self, plugin_without_validator, mock_ recipient=mock_account, ), ) + from microsoft_teams.apps.events import CoreActivity + mock_request.json.return_value = activity.model_dump() mock_request.state = MagicMock() mock_request.state.validated_token = None mock_response = MagicMock(spec=Response) - result = await plugin_without_validator.on_activity_request(mock_request, mock_response) + # Convert activity to CoreActivity + core_activity = CoreActivity(**activity.model_dump()) + + result = await plugin_without_validator.on_activity_request(core_activity, mock_request, mock_response) mock_handler.assert_called_once() call_args = mock_handler.call_args[0][0] assert isinstance(call_args, ActivityEvent) - assert call_args.sender == plugin_without_validator assert result == expected_body @@ -243,13 +247,18 @@ async def test_on_activity_request_exception(self, plugin_without_validator, moc recipient=mock_account, ), ) + from microsoft_teams.apps.events import CoreActivity + mock_request.json.return_value = activity.model_dump() mock_request.state = MagicMock() mock_request.state.validated_token = None mock_response = MagicMock(spec=Response) - result = await plugin_without_validator.on_activity_request(mock_request, mock_response) + # Convert activity to CoreActivity + core_activity = CoreActivity(**activity.model_dump()) + + result = await plugin_without_validator.on_activity_request(core_activity, mock_request, mock_response) mock_handler.assert_called_once() # Exception is logged directly at exception site diff --git a/packages/apps/tests/test_optional_graph_dependencies.py b/packages/apps/tests/test_optional_graph_dependencies.py index aa52e33c..2e48b5b4 100644 --- a/packages/apps/tests/test_optional_graph_dependencies.py +++ b/packages/apps/tests/test_optional_graph_dependencies.py @@ -22,7 +22,8 @@ def _create_activity_context(self) -> ActivityContext[Any]: mock_storage = MagicMock() mock_api = MagicMock() mock_conversation_ref = MagicMock() - mock_sender = MagicMock() + mock_activity_sender = MagicMock() + mock_activity_sender.create_stream.return_value = MagicMock() mock_app_token = MagicMock() # Provide an app token for graph access return ActivityContext( @@ -35,7 +36,7 @@ def _create_activity_context(self) -> ActivityContext[Any]: conversation_ref=mock_conversation_ref, is_signed_in=False, connection_name="test-connection", - sender=mock_sender, + activity_sender=mock_activity_sender, app_token=mock_app_token, # This is needed for app_graph to work ) @@ -83,7 +84,7 @@ def test_user_graph_property_not_signed_in(self) -> None: conversation_ref=MagicMock(), is_signed_in=False, # Not signed in connection_name="test-connection", - sender=MagicMock(), + activity_sender=MagicMock(), app_token=None, ) @@ -103,7 +104,7 @@ def test_user_graph_property_no_token(self) -> None: conversation_ref=MagicMock(), is_signed_in=True, # Signed in but no token connection_name="test-connection", - sender=MagicMock(), + activity_sender=MagicMock(), app_token=None, ) @@ -123,7 +124,7 @@ def test_app_graph_property_no_token(self) -> None: conversation_ref=MagicMock(), is_signed_in=False, connection_name="test-connection", - sender=MagicMock(), + activity_sender=MagicMock(), app_token=None, # No app token ) diff --git a/packages/botbuilder/src/microsoft_teams/botbuilder/botbuilder_plugin.py b/packages/botbuilder/src/microsoft_teams/botbuilder/botbuilder_plugin.py index ddc79f1f..bc0aae80 100644 --- a/packages/botbuilder/src/microsoft_teams/botbuilder/botbuilder_plugin.py +++ b/packages/botbuilder/src/microsoft_teams/botbuilder/botbuilder_plugin.py @@ -16,6 +16,7 @@ LoggerDependencyOptions, Plugin, ) +from microsoft_teams.apps.events import CoreActivity from microsoft_teams.apps.http_plugin import HttpPluginOptions from botbuilder.core import ( @@ -92,13 +93,13 @@ async def on_init(self) -> None: self.logger.debug("BotBuilder plugin initialized successfully") - async def on_activity_request(self, request: Request, response: Response) -> Any: + async def on_activity_request(self, core_activity: CoreActivity, request: Request, response: Response) -> Any: """ Handles an incoming activity. Overrides the base HTTP plugin behavior to: 1. Process the activity using the Bot Framework adapter/handler. - 2. Then pass the request to the Teams plugin pipeline (_handle_activity_request). + 2. Then pass the request to the parent Teams plugin pipeline. Returns the final HTTP response. """ @@ -106,9 +107,9 @@ async def on_activity_request(self, request: Request, response: Response) -> Any raise RuntimeError("plugin not registered") try: - # Parse activity data - body = await request.json() - activity_bf = cast(Activity, Activity().deserialize(body)) + # Parse activity data from core_activity + activity_dict = core_activity.model_dump(by_alias=True, exclude_none=True) + activity_bf = cast(Activity, Activity().deserialize(activity_dict)) # A POST request must contain an Activity if not activity_bf.type: @@ -126,9 +127,8 @@ async def logic(turn_context: TurnContext): auth_header = request.headers["Authorization"] if "Authorization" in request.headers else "" await self.adapter.process_activity(auth_header, activity_bf, logic) - # Call HTTP plugin to handle activity request - result = await self._handle_activity_request(request) - return self._handle_activity_response(response, result) + # Call parent's on_activity_request to handle Teams processing + return await super().on_activity_request(core_activity, request, response) except HTTPException as http_err: self.logger.error(f"HTTP error processing activity: {http_err}", exc_info=True) diff --git a/packages/botbuilder/tests/test_botbuilder_plugin.py b/packages/botbuilder/tests/test_botbuilder_plugin.py index 463a2600..0f604b10 100644 --- a/packages/botbuilder/tests/test_botbuilder_plugin.py +++ b/packages/botbuilder/tests/test_botbuilder_plugin.py @@ -68,6 +68,8 @@ async def test_on_activity_request_calls_adapter_and_handler(self, plugin_with_a "conversation": {"id": "conv1"}, "serviceUrl": "https://service.url", } + from microsoft_teams.apps.events import CoreActivity + request = AsyncMock(spec=Request) request.json.return_value = activity_data request.headers = {"Authorization": "Bearer token"} @@ -83,7 +85,10 @@ async def fake_process_activity(auth_header, activity, logic): # type: ignore plugin_with_adapter.adapter.process_activity = AsyncMock(side_effect=fake_process_activity) - await plugin_with_adapter.on_activity_request(request, response) + # Convert activity_data to CoreActivity + core_activity = CoreActivity(**activity_data) + + await plugin_with_adapter.on_activity_request(core_activity, request, response) # Ensure adapter.process_activity called with correct auth and activity plugin_with_adapter.adapter.process_activity.assert_called_once() @@ -98,6 +103,8 @@ async def fake_process_activity(auth_header, activity, logic): # type: ignore async def test_on_activity_request_raises_http_exception_on_adapter_error( self, plugin_with_adapter: BotBuilderPlugin ): + from microsoft_teams.apps.events import CoreActivity + activity_data = {"type": "message", "id": "activity-id"} request = AsyncMock(spec=Request) request.json.return_value = activity_data @@ -108,6 +115,9 @@ async def test_on_activity_request_raises_http_exception_on_adapter_error( plugin_with_adapter.adapter.process_activity = AsyncMock(side_effect=Exception("fail")) + # Convert activity_data to CoreActivity + core_activity = CoreActivity(**activity_data) + with pytest.raises(HTTPException) as exc: - await plugin_with_adapter.on_activity_request(request, response) + await plugin_with_adapter.on_activity_request(core_activity, request, response) assert exc.value.status_code == 500 diff --git a/packages/devtools/src/microsoft_teams/devtools/devtools_plugin.py b/packages/devtools/src/microsoft_teams/devtools/devtools_plugin.py index e905ed9d..46202899 100644 --- a/packages/devtools/src/microsoft_teams/devtools/devtools_plugin.py +++ b/packages/devtools/src/microsoft_teams/devtools/devtools_plugin.py @@ -16,9 +16,10 @@ from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.routing import APIRouter from fastapi.staticfiles import StaticFiles -from microsoft_teams.api import Activity, ActivityParams, ConversationReference, SentActivity, TokenProtocol +from microsoft_teams.api import Activity, TokenProtocol from microsoft_teams.apps import ( ActivityEvent, + CoreActivity, DependencyMetadata, ErrorEvent, EventMetadata, @@ -28,10 +29,9 @@ PluginActivityEvent, PluginActivityResponseEvent, PluginActivitySentEvent, + PluginBase, PluginErrorEvent, PluginStartEvent, - Sender, - StreamerProtocol, ) from .event import DevToolsActivityEvent, DevToolsActivityReceivedEvent, DevToolsActivitySentEvent @@ -46,7 +46,7 @@ version=version, description="set of tools to make development of Teams apps faster and simpler", ) -class DevToolsPlugin(Sender): +class DevToolsPlugin(PluginBase): logger: Annotated[Logger, LoggerDependencyOptions()] id: Annotated[Optional[TokenProtocol], DependencyMetadata(optional=True)] http: Annotated[HttpPlugin, DependencyMetadata()] @@ -128,7 +128,10 @@ async def process(token: TokenProtocol, activity: Activity): response_future = asyncio.get_event_loop().create_future() self.pending[activity.id] = response_future try: - result = self.on_activity_event(ActivityEvent(token=token, activity=activity, sender=self.http)) + # Convert Activity to CoreActivity + activity_dict = activity.model_dump(by_alias=True, exclude_none=True) + core_activity = CoreActivity.model_validate(activity_dict) + result = self.on_activity_event(ActivityEvent(body=core_activity, token=token)) # If the handler is a coroutine, schedule it if asyncio.iscoroutine(result): asyncio.create_task(result) @@ -225,12 +228,6 @@ async def on_activity_response(self, event: PluginActivityResponseEvent): promise.set_result(event.response) del self.pending[event.activity.id] - async def send(self, activity: ActivityParams, ref: ConversationReference) -> SentActivity: - return await self.http.send(activity, ref) - - def create_stream(self, ref: ConversationReference) -> StreamerProtocol: - return self.http.create_stream(ref) - async def emit_activity_to_sockets(self, event: DevToolsActivityEvent): data = event.model_dump(mode="json", exclude_none=True) for socket_id, websocket in self.sockets.items(): diff --git a/uv.lock b/uv.lock index 9a30754a..ff1bed31 100644 --- a/uv.lock +++ b/uv.lock @@ -30,6 +30,7 @@ members = [ "microsoft-teams-mcpplugin", "microsoft-teams-openai", "oauth", + "proactive-messaging", "stream", "tab", ] @@ -2099,6 +2100,21 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/5b/a5/987a405322d78a73b66e39e4a90e4ef156fd7141bf71df987e50717c321b/pre_commit-4.3.0-py2.py3-none-any.whl", hash = "sha256:2b0747ad7e6e967169136edffee14c16e148a778a54e4f967921aa1ebf2308d8", size = 220965, upload-time = "2025-08-09T18:56:13.192Z" }, ] +[[package]] +name = "proactive-messaging" +version = "0.1.0" +source = { virtual = "examples/proactive-messaging" } +dependencies = [ + { name = "dotenv" }, + { name = "microsoft-teams-apps" }, +] + +[package.metadata] +requires-dist = [ + { name = "dotenv", specifier = ">=0.9.9" }, + { name = "microsoft-teams-apps", editable = "packages/apps" }, +] + [[package]] name = "propcache" version = "0.4.1"