Skip to content

Latest commit

 

History

History
192 lines (127 loc) · 16.2 KB

File metadata and controls

192 lines (127 loc) · 16.2 KB

discord_activity_tracker.services

Module path: discord_activity_tracker.services Description: Discord servers, channels, messages, and reactions. Single place for all writes to discord_activity_tracker models. Discord user profiles live in cppa_user_tracker.DiscordProfile.

Type notation: Model types refer to discord_activity_tracker.models unless noted. DiscordProfile refers to cppa_user_tracker.models.DiscordProfile.


Service contract

  • get_or_create pattern: get_or_create_discord_server and get_or_create_discord_channel return tuple[Model, bool] where the bool is Django's created flag (a new row was inserted on this call).
  • update_or_create pattern: create_or_update_discord_message and add_or_update_reaction return tuple[Model, bool] with Django update_or_create semantics for created.
  • Partial updates: On existing rows, server and channel helpers use save(update_fields=[...]) when metadata changed; mark_message_deleted updates is_deleted, deleted_at, and updated_at via update_fields.
  • Bulk upsert: bulk_upsert_discord_messages and bulk_upsert_discord_reactions use bulk_create(..., update_conflicts=True, unique_fields=..., update_fields=...). bulk_upsert_discord_users uses per-row queries and get_or_create_discord_profile because DiscordProfile uses multi-table inheritance (no bulk_create(update_conflicts=True)).
  • Transactions: bulk_process_message_batch wraps user → message → reaction upserts in a single transaction.atomic(); an unhandled exception rolls back all phases.
  • bulk_process_message_batch return value: Returns len(message_data_list) when the input list is non-empty, not the count of rows successfully written. Individual messages may still be skipped inside bulk_upsert_discord_messages (see below).

Raises and edge behavior

  • discord_activity_tracker.services does not intentionally raise ValueError for invalid arguments; validate inputs at sync/staging boundaries where appropriate.
  • bulk_upsert_discord_users: Each dict must include user_id (and keys used in the loop); malformed payloads can raise KeyError.
  • bulk_upsert_discord_messages: If user_map has no profile for message_data["author"]["user_id"], that message is skipped and a warning is logged (no exception). If every message in the batch is skipped, no bulk insert runs and {} is returned.
  • bulk_upsert_discord_reactions: If message_map has no message for discord_message_id, that reaction is skipped silently. Duplicate (message, emoji) pairs in one batch keep the last entry.
  • ORM: Functions may propagate Django database exceptions (e.g. IntegrityError, OperationalError) under concurrency or infrastructure faults.

CollectorFailureCategory

discord_activity_tracker.services performs database I/O only. It does not call Discord HTTP APIs and does not assign CollectorFailureCategory values.

Collectors, management commands, and sync layers classify failures with classify_failure when handling exceptions (e.g. DiscordChatExporter subprocess failures wrapped in CommandError, discord.py HTTP errors, rate limits). If ORM errors are passed through classify_failure, mapping follows core/errors.py (for example django.core.exceptions.ValidationError may map to VALIDATION in typical paths).


DiscordServer

Function Parameter types Return type Description
get_or_create_discord_server server_id: int, server_name: str, icon_url: str = "" tuple[DiscordServer, bool] Get or create server; update name/icon if changed.

DiscordChannel

New fields (migration 0005): category_id: BigIntegerField | null, category_name: CharField.

Function Parameter types Return type Description
get_or_create_discord_channel server: DiscordServer, channel_id: int, channel_name: str, channel_type: str, topic: str = "", position: int = 0, category_id: int | None = None, category_name: str = "" tuple[DiscordChannel, bool] Get or create channel; update all fields (incl. category) if changed.
get_channel_latest_message_at channel: DiscordChannel datetime | None Max message_created_at among non-deleted DiscordMessage rows for the channel.
queryset_channels_with_recent_messages server: DiscordServer, cutoff: datetime, channel_ids: list[int] | None = None QuerySet[DiscordChannel] Channels on the server with at least one non-deleted message at or after cutoff; optional Discord channel_id allowlist.

DiscordMessage

New fields (migration 0005): message_type: CharField (default "Default"), is_pinned: BooleanField (default False).

Function Parameter types Return type Description
create_or_update_discord_message message_id: int, channel: DiscordChannel, author: DiscordProfile, content: str, message_created_at: datetime, message_edited_at: datetime | None = None, reply_to_message_id: int | None = None, attachment_urls: list | None = None, message_type: str = "Default", is_pinned: bool = False tuple[DiscordMessage, bool] Create or update message.
mark_message_deleted message: DiscordMessage, deleted_at: datetime | None = None DiscordMessage Mark message as deleted.

DiscordReaction

Function Parameter types Return type Description
add_or_update_reaction message: DiscordMessage, emoji: str, count: int tuple[DiscordReaction, bool] Add or update reaction.

Bulk operations

Message and reaction upserts use bulk_create(update_conflicts=True) on DiscordMessage and DiscordReaction. bulk_upsert_discord_users does not: DiscordProfile uses multi-table inheritance, so users are deduplicated and updated with targeted queries / get_or_create_discord_profile per missing row (see services.py).

Inputs are lists of pre-normalised message dicts (from sync.messages._prepare_message_data or sync.chat_exporter.convert_exporter_message_to_dict).

Function Parameter types Return type Description
bulk_upsert_discord_users user_data_list: list[dict] dict[int, DiscordProfile] Upsert DiscordProfile rows; returns {discord_user_id: profile}.
bulk_upsert_discord_messages message_data_list: list[dict], channel: DiscordChannel, user_map: dict[int, DiscordProfile] dict[int, DiscordMessage] Upsert DiscordMessage rows incl. message_type and is_pinned; returns {message_id: msg}.
bulk_upsert_discord_reactions reaction_data_list: list[dict], message_map: dict[int, DiscordMessage] None Upsert DiscordReaction rows.
bulk_process_message_batch message_data_list: list[dict], channel: DiscordChannel int Runs users → messages → reactions inside one transaction.atomic(). Return value is len(message_data_list) when non-empty (not the count of rows actually upserted); see Raises and edge behavior above.

Query helpers

Function Parameter types Return type Description
get_active_channels server: DiscordServer, days: int = 30, channel_ids: list[int] | None = None QuerySet Same as queryset_channels_with_recent_messages with cutoff = now - days.

Sync package (discord_activity_tracker.sync)

Module / symbol Role
sync/chat_exporter.py Runs DiscordChatExporter (exportguild, etc.), date bounds in UTC, filters JSON paths. Used by run_discord_activity_tracker.
sync/messages.py _prepare_message_data, _process_messages_in_batches (calls bulk_process_message_batch). Also exposes discord.py helpers (DiscordSyncClient, sync_all_channels, …) for Bot API–style sync; those entry points are not wired to run_discord_activity_tracker today (that command uses the exporter + user token only).
sync/client.py DiscordSyncClient — discord.py wrapper (intents, fetch guild/channel/messages).
sync/exporter_window.py latest_message_created_at_for_guild — lower bound for incremental exporter runs when --since is omitted.
sync/utils.py Parsing helpers shared by exporter and message pipelines.
sync/export.py Markdown export from DB (used downstream of sync; see command help for DISCORD_CONTEXT_* settings).

Ingestion commands

Two management commands handle message ingestion. Both follow the CollectorBase pattern with four phases: fetch → db_sync → save_raw → pinecone_sync.

run_discord_activity_tracker — incremental / scheduled

Uses DiscordChatExporter CLI with the user token. Setup (download, install path, env vars): DiscordChatExporter operations doc.

Fetches into a staging directory, persists to the database, then archives JSON under:

{WORKSPACE_DIR}/raw/discord_activity_tracker/<server_id>/<channel_id>/

Date bounds passed to the exporter use UTC (see sync/chat_exporter.py). When --since is omitted, the lower bound is the latest stored message time for this guild (and channel allowlist). If the database has no matching rows, no --after filter is applied (full history). When --until is omitted, there is no upper bound (export through the present). If --since and --until are both set but since is after until, the command logs a warning and treats both as unset, then recomputes bounds from the rules above.

python manage.py run_discord_activity_tracker [options]

Options:
  --dry-run                 No fetch, export, push, or Pinecone writes; planned steps logged at INFO
  --skip-discord-sync       Skip DiscordChatExporter, DB upserts, and raw JSON
  --skip-markdown-export    Skip writing Markdown from DB to DISCORD_CONTEXT_REPO_PATH
  --skip-remote-push        Skip git commit/push after export (see DISCORD_CONTEXT_AUTO_COMMIT)
  --skip-pinecone           Skip run_cppa_pinecone_sync
  --ignore-pinecone         Deprecated alias for --skip-pinecone
  --since, --until          ISO or YYYY-MM-DD window (UTC; aliases: --from-date, --to-date, --start-time, --end-time). Omit `--since` to continue from latest DB message; omit `--until` for no upper bound.
  --channels IDS            Comma-separated channel ID override
  --task {sync,export,all}  Deprecated: maps to the skip flags (prefer --skip-*)

backfill_discord_activity_tracker — import JSON from workspace

Imports existing DiscordChatExporter JSON files from:

{WORKSPACE_DIR}/discord_activity_tracker/Discussion - c-cpp-discussion/

(recursively; skips macOS ._*.json sidecars). Each file is parsed, upserted into the database, then deleted after a successful import so it is not processed again. Does not invoke DiscordChatExporter itself — export JSON elsewhere or manually, then drop it into that folder.

python manage.py backfill_discord_activity_tracker [options]

Options:
  --skip-pinecone          Skip Pinecone sync after import
  --ignore-pinecone        Deprecated alias for --skip-pinecone
  --dry-run                List files that would be imported; no DB writes or deletes

Channel allowlist

run_discord_activity_tracker respects DISCORD_CHANNEL_IDS in settings.py (from the DISCORD_CHANNEL_IDS env var, comma-separated snowflake IDs). The --channels CLI argument overrides the setting for a single run.

backfill_discord_activity_tracker imports every JSON file under the drop folder; it does not filter by DISCORD_CHANNEL_IDS.


Pinecone integration

discord_activity_tracker/preprocessor.py exposes preprocess_discord_for_pinecone(failed_ids, final_sync_at) which:

  1. Queries DiscordMessage rows (incremental: updated_at after final_sync_at, plus any failed_ids retry; first run with no watermark indexes all non-deleted messages).
  2. Groups messages into reply chains (reply_to_message_id linking).
  3. Filters documents with fewer than PINECONE_MIN_TEXT_LENGTH (default 20) characters.
  4. Emits {"content": str, "metadata": {...}} dicts with metadata keys: doc_id, type, channel_id, channel_name, server_id, server_name, author, timestamp, is_reply_chain, source_ids.

Settings:

Setting Default Description
PINECONE_DISCORD_APP_TYPE (empty skips sync) Passed to run_cppa_pinecone_sync as --app-type. If unset/empty, Pinecone sync is skipped.
PINECONE_DISCORD_NAMESPACE (empty skips sync) Pinecone namespace. If unset/empty, Pinecone sync is skipped.

Related