From de2f1722f0d6fa77917b4ea9da97dfab16c59949 Mon Sep 17 00:00:00 2001 From: zewelor Date: Wed, 3 Jun 2026 08:47:06 +0100 Subject: [PATCH] Revert "Remove unused change-detecting trigger runtime" This reverts commit ff358c8ec755114b5d18e691905be5a586af96d4. --- AGENTS.md | 19 +- .../r3x/dashboard/application_helper.rb | 5 +- app/jobs/r3x/change_detection_job.rb | 114 ++++++++++ app/lib/r3x/dashboard/overview.rb | 12 +- app/lib/r3x/dashboard/workflow/catalog.rb | 32 ++- .../r3x/dashboard/workflow/run_enqueuer.rb | 20 +- app/lib/r3x/dashboard/workflow/summaries.rb | 81 +++++-- app/models/dashboard/recurring_task.rb | 8 + app/models/dashboard/run.rb | 7 +- app/models/r3x/trigger_state.rb | 26 +++ .../r3x/dashboard/workflows/index.html.erb | 2 +- .../r3x/dashboard/workflows/show.html.erb | 19 +- .../20260318100000_create_trigger_states.rb | 20 ++ db/schema.rb | 16 ++ db/seeds/support/dashboard_demo_seeder.rb | 30 ++- lib/r3x/recurring_tasks_config.rb | 21 +- lib/r3x/triggers/base.rb | 4 + lib/r3x/triggers/concerns/change_detecting.rb | 15 ++ test/integration/dashboard_test.rb | 96 ++++++++- test/jobs/r3x/change_detection_job_test.rb | 201 ++++++++++++++++++ test/lib/r3x/dashboard/recurring_task_test.rb | 40 +++- .../r3x/dashboard/workflow_catalog_test.rb | 12 +- .../r3x/dashboard/workflow_run_counts_test.rb | 7 + .../dashboard/workflow_run_enqueuer_test.rb | 57 ++++- test/lib/r3x/dashboard/workflow_runs_test.rb | 34 +++ .../r3x/dashboard/workflow_summaries_test.rb | 61 +++++- test/lib/r3x/recurring_tasks_config_test.rb | 27 +++ test/lib/r3x/workflow_test.rb | 25 +++ test/lib/seeds/dashboard_demo_seeder_test.rb | 4 +- test/support/fake_change_detecting_trigger.rb | 30 +++ test/support/test_db_cleanup.rb | 1 + 31 files changed, 987 insertions(+), 59 deletions(-) create mode 100644 app/jobs/r3x/change_detection_job.rb create mode 100644 app/models/r3x/trigger_state.rb create mode 100644 db/migrate/20260318100000_create_trigger_states.rb create mode 100644 lib/r3x/triggers/concerns/change_detecting.rb create mode 100644 test/jobs/r3x/change_detection_job_test.rb create mode 100644 test/support/fake_change_detecting_trigger.rb diff --git a/AGENTS.md b/AGENTS.md index 96d7d69..05b74d4 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -19,7 +19,7 @@ This Rails app uses a small set of preferred libraries for common integration wo - Mission Control Jobs remains available at `/ops/jobs` for queue inspection and operational actions. - `bin/jobs-worker` and `bin/jobs-scheduler` set `R3X_RUNTIME_PROFILE=jobs` before boot. That internal profile keeps production eager load enabled for jobs pods, but trims boot to the Active Model / Active Job / Active Record runtime, excludes the dashboard web stack and web-only gems, removes the app `config/routes.rb` / `config/routes/**/*` files from Rails path registration, keeps `ActionController::Base.include_all_helpers = false` for headless boot, ignores `lib/r3x/workflow/cli.rb` so worker/scheduler processes do not eager-load the Thor wrapper, and leaves the operator-facing commands unchanged. - `bin/workflow` sets `R3X_RUNTIME_PROFILE=workflow_cli` before boot. `workflow_cli` is also headless and skips the web stack, but unlike `jobs` it keeps `lib/r3x/workflow/cli.rb` available so the command can load `R3x::Workflow::Cli`. This profile is command-owned and is not a deployment env knob. -- The dashboard is DB-first: workflow pages and recent runs are derived from current `Solid Queue` recurring task and job tables, so they only show workflows and runs that have persisted runtime artifacts. +- The dashboard is DB-first: workflow pages and recent runs are derived from current `Solid Queue` tables plus `trigger_states`, so they only show workflows and runs that have persisted runtime artifacts. - Dashboard-side queue rows and recurring-task rows are wrapped by app models `Dashboard::Run` and `Dashboard::RecurringTask`; treat those as the dashboard-facing read/write boundary over `solid_queue_jobs` and `solid_queue_recurring_tasks`. - The dashboard can optionally query indexed application logs when `R3X_LOGS_PROVIDER` is configured. The current supported provider is `victorialogs`, which reads from `R3X_VICTORIA_LOGS_URL` via VictoriaLogs native query API. - App log output format is controlled independently by `R3X_LOG_FORMAT`. Set it to `json` for structured logs (required by the dashboard log view), or `plain` for standard Rails flat text. Unsupported values raise on boot. @@ -32,7 +32,7 @@ This Rails app uses a small set of preferred libraries for common integration wo - `app/controllers/r3x/dashboard/overview_controller.rb`: root overview screen that surfaces attention items, recent runs, and workflow shortcuts from persisted runtime data. - `app/views/r3x/dashboard/` + `app/views/layouts/r3x/dashboard.html.erb`: dashboard UI templates and layout. - `app/models/dashboard/`: dashboard-facing models over persisted queue metadata, especially `Dashboard::Run` (`solid_queue_jobs` + execution-table relations), `Dashboard::RecurringTask` (`solid_queue_recurring_tasks` parsing and workflow-key lookups), and `Dashboard::DirectWorkflowEnqueuer` (the deliberate low-level Solid Queue enqueue boundary used when web pods should not load workflow classes). -- `app/lib/r3x/dashboard/`: read-only query objects that build workflow summaries, recent runs, and optional indexed log views from dashboard models and configured log providers. +- `app/lib/r3x/dashboard/`: read-only query objects that build workflow summaries, recent runs, and optional indexed log views from dashboard models, `TriggerState`, and configured log providers. - `app/lib/r3x/dashboard/workflow/`: dashboard workflow composition/query namespace (`Catalog`, `Runs`, `Summaries`, `RunCounts`, `RunEnqueuer`) used by controllers and overview composition. - `app/lib/r3x/dashboard/overview.rb`: assembles the root dashboard overview cards and sections from workflow summaries and persisted runs. - `app/lib/r3x/client/hashi_corp_vault/`: Vault client helpers for config parsing and auth mode implementations (`Config`, `Auth::Token`, `Auth::Kubernetes`). @@ -50,8 +50,8 @@ This Rails app uses a small set of preferred libraries for common integration wo - `app/lib/r3x/client/google/translate.rb`: Google Translate client used by workflows via `ctx.client.google_translate(...)`. - `lib/r3x/workflow/llm_schema.rb`: lazy wrapper around `RubyLLM::Schema` for workflows that need structured LLM output. - `R3x::Client::Google` is a project namespace; when referencing the third-party Google gem namespace, use `::Google` to avoid constant collisions. -- `app/jobs/r3x/`: job entrypoints for workflow runtime support. -- `app/models/r3x/`: runtime support models under the `R3x` namespace. +- `app/jobs/r3x/`: job entrypoints, especially `R3x::ChangeDetectionJob`, which evaluates change-detecting triggers before enqueueing workflow runs directly on the workflow job class. +- `app/models/r3x/`: runtime support models such as `R3x::TriggerState` for per-trigger change-detection state. - `workflows/`: user workflow packs. These are not the framework itself; they are loaded by the framework. - `workflows//test/`: self-contained tests for a specific workflow pack. Keep pack-local tests beside the workflow code, and use `test/fixtures/workflows/` for framework-level fixtures. - `lib/r3x/workflow/boot.rb`: explicit workflow boot helper used by process entrypoints. @@ -74,9 +74,12 @@ This Rails app uses a small set of preferred libraries for common integration wo - `R3x::RecurringTasksConfig` turns schedulable workflow triggers into Solid Queue dynamic recurring tasks via `SolidQueue::RecurringTask`. All triggers have a `unique_key` (based on type + options hash) used for identification and duplicate detection. `schedule_all!` persists dynamic tasks and sweeps stale ones. - Workflow packs are loaded explicitly by process entrypoints, not globally during Rails boot. `R3x::Workflow::Entrypoint` centralizes those boot decisions for the web server hook and `bin/jobs*`. In the current split-process setup, the generic `bin/jobs` entrypoint keeps the default Solid Queue behavior: it only loads workflow classes when `SOLID_QUEUE_IN_PUMA=true`, and otherwise it also schedules recurring tasks before starting the CLI. `bin/jobs-worker` only loads workflow classes, defaults `config/queue.worker.yml`, defaults `SOLID_QUEUE_SKIP_RECURRING=true`, and boots Rails under the internal `jobs` runtime profile. `bin/jobs-scheduler` defaults `config/queue.scheduler.yml`, schedules recurring tasks, and also boots under the `jobs` runtime profile. `bin/workflow` boots Rails under the internal `workflow_cli` runtime profile. Both `jobs` and `workflow_cli` are headless profiles selected by the command itself, not by deployment env: they remove the app route files from Rails path registration before initialization, keep `ActionController::Base.include_all_helpers = false` to avoid helper scans from framework eager-load, and deliberately exclude web-only gems. Only `jobs` also ignores `lib/r3x/workflow/cli.rb` to keep worker/scheduler boots slimmer. The web process also loads workflows on server boot so Mission Control Jobs can validate and enqueue workflow-backed recurring tasks; it only schedules recurring tasks when it is also hosting Solid Queue in-process, which currently means development or `SOLID_QUEUE_IN_PUMA=true`. - Production deployments should prefer separate web and jobs controllers/processes over embedding Solid Queue into the Puma web process. Prefer separate worker and scheduler jobs controllers: worker pods can run `bin/jobs-worker`, while scheduler pods can run `bin/jobs-scheduler`. If `SOLID_QUEUE_IN_PUMA` is enabled, remember that the Puma plugin can fork an additional Solid Queue supervisor plus worker/dispatcher/scheduler processes inside the same pod, the web boot path will also load workflows and schedule recurring tasks, and separate jobs pods should not also try to own recurring scheduling in that mode. -- The dashboard does not require workflow packs to be loaded on the web pod. It reconstructs workflow pages from `solid_queue_recurring_tasks` and `solid_queue_jobs`, via `Dashboard::RecurringTask` and `Dashboard::Run`, and can enqueue `Run now` actions through `POST /workflows/:workflow_key/runs` without loading workflow classes on the web pod: the workflow-level action deliberately uses `Dashboard::DirectWorkflowEnqueuer` to insert the workflow job directly into `Solid Queue` with no trigger arguments so runtime resolves the implicit manual trigger, while trigger-specific scheduled actions enqueue the persisted workflow job class with the recurring task arguments. +- The dashboard does not require workflow packs to be loaded on the web pod. It reconstructs workflow pages from `solid_queue_recurring_tasks`, `solid_queue_jobs`, and `trigger_states`, via `Dashboard::RecurringTask`, `Dashboard::Run`, and `R3x::TriggerState`, and can enqueue `Run now` actions through `POST /workflows/:workflow_key/runs` without loading workflow classes on the web pod: the generic workflow-level `Run now` action deliberately uses `Dashboard::DirectWorkflowEnqueuer` to insert the workflow job directly into `Solid Queue` with no trigger arguments so runtime resolves the implicit manual trigger, while trigger-specific change-detecting actions still use `R3x::ChangeDetectionJob`. +- Change-detecting triggers are file-defined trigger objects that provide `cron`, `unique_key`, and `detect_changes(workflow_key:, state:)`. Their durable runtime state lives in `R3x::TriggerState`. - Workflow entrypoints can be disabled without deletion by adding `# r3x:disable ...` near the top of `workflow.rb`. Disabled entrypoints are skipped by the pack loader and are not registered/scheduled. -- `ApplicationJob` and `R3x::Workflow::Base` add stable tagged log context so indexed logs can be correlated back to run pages. The workflow job itself keeps the per-run tags minimal (`r3x.run_active_job_id` and `r3x.trigger_key`). +- `R3x::ChangeDetectionJob` loads the trigger, fetches/updates `R3x::TriggerState`, and only enqueues the workflow job class itself when the trigger reports a change. +- Because the app currently uses `Solid Queue` as a database-backed backend on the same Active Record database connection, code may intentionally rely on a database transaction covering both `TriggerState` updates and `perform_later`. Do not assume those guarantees survive a future backend or database split. +- `ApplicationJob`, `R3x::Workflow::Base`, and `R3x::ChangeDetectionJob` add stable tagged log context so indexed logs can be correlated back to run pages. The workflow job itself keeps the per-run tags minimal (`r3x.run_active_job_id` and `r3x.trigger_key`), while change-detection orchestration still emits `r3x.workflow_key` for broader workflow-level correlation. - When `R3X_LOG_FORMAT=json`, logs are emitted as structured JSON with explicit `level`, `message`, and tag data so the dashboard can read real log levels directly. When `R3X_LOG_FORMAT=plain` (or unset), logs use standard Rails flat text. - Known limitation: because queued workflow runs persist the concrete workflow class name, renaming or removing a workflow class across deploys can strand older queued runs with job deserialization failures. This is currently an accepted tradeoff for preserving `ActiveJob::Continuable` on the workflow job itself. - The dashboard's run history is DB-first and parses `Solid Queue` / `Active Job` payloads directly. It still accepts the underlying tradeoff that finished runs are retention-bound and that workflows with no persisted runtime artifacts are invisible to the dashboard. @@ -132,13 +135,13 @@ bin/workflow [options] [command] [arguments] ### Operational note -- When refactoring workflow class names, remember that already queued scheduled runs may still point at the old concrete class name. +- When refactoring workflow class names, remember that already queued scheduled or change-detected runs may still point at the old concrete class name. - If a workflow class is renamed or removed, consider cleaning up pending jobs and recurring tasks created under the old class, or accept that older queued runs may fail deserialization. ## Maintenance Warning - Keep this file synchronized with the real codebase. If you change workflow loading, trigger discovery, scheduling flow, top-level directory structure, namespaces, or the framework/user-workflow boundary, update the relevant `AGENTS.md` sections in the same change. -- In particular, update examples and notes here when changing files such as `lib/r3x/workflow.rb`, `lib/r3x/workflow/pack_loader.rb`, `lib/r3x/workflow/registry.rb`, `lib/r3x/workflow/boot.rb`, `lib/r3x/recurring_tasks_config.rb`, `lib/r3x/triggers.rb`, `bin/workflow`, `bin/jobs`, or `config/application.rb`. +- In particular, update examples and notes here when changing files such as `lib/r3x/workflow.rb`, `lib/r3x/workflow/pack_loader.rb`, `lib/r3x/workflow/registry.rb`, `lib/r3x/workflow/boot.rb`, `lib/r3x/recurring_tasks_config.rb`, `lib/r3x/triggers.rb`, `app/jobs/r3x/change_detection_job.rb`, `bin/workflow`, `bin/jobs`, or `config/application.rb`. - Also update this file when changing the shared DSL validation contract in files such as `lib/r3x/dsl/validatable.rb`, `lib/r3x/configuration_error.rb`, or the base classes for workflow-declared objects. - Also update this file when changing Active Job backend semantics, `Solid Queue` database wiring, or any logic that depends on enqueueing being inside the same database transaction as app writes. - When adding a new subsystem or moving code between `lib/r3x/`, `app/lib/r3x/`, `app/jobs/r3x/`, or `workflows/`, refresh the project overview and codebase map so future agents can still orient themselves quickly. diff --git a/app/helpers/r3x/dashboard/application_helper.rb b/app/helpers/r3x/dashboard/application_helper.rb index 84baabb..a57c304 100644 --- a/app/helpers/r3x/dashboard/application_helper.rb +++ b/app/helpers/r3x/dashboard/application_helper.rb @@ -25,7 +25,8 @@ def dashboard_tone_for(value) "idle" => "muted", "queued" => "info", "running" => "info", - "scheduled" => "info" + "scheduled" => "info", + "trigger_error" => "danger" }.fetch(value, "muted") end @@ -137,6 +138,7 @@ def dashboard_trigger_label(trigger_entry) return "Schedule: #{cron}" if cron.present? { + "change_detecting" => "Change detection", "manual" => "Manual", "observed" => "Observed trigger" }.fetch(mode.to_s, mode.to_s.humanize) @@ -144,6 +146,7 @@ def dashboard_trigger_label(trigger_entry) def dashboard_trigger_kind(trigger_entry) { + "change_detecting" => "Change detection", "manual" => "Manual", "observed" => "Observed", "schedule" => "Schedule", diff --git a/app/jobs/r3x/change_detection_job.rb b/app/jobs/r3x/change_detection_job.rb new file mode 100644 index 0000000..f5063df --- /dev/null +++ b/app/jobs/r3x/change_detection_job.rb @@ -0,0 +1,114 @@ +module R3x + class ChangeDetectionJob < ApplicationJob + queue_as :default + + def perform(workflow_key, options = nil) + workflow_key, options = normalize_arguments(workflow_key, options) + trigger_key = options.fetch(:trigger_key) + trigger_state = nil + + with_log_tags( + R3x::Log.tag(R3x::Log::WORKFLOW_KEY_TAG, workflow_key), + R3x::Log.tag(R3x::Log::TRIGGER_KEY_TAG, trigger_key) + ) do + workflow_class = R3x::Workflow::Registry.fetch(workflow_key) + trigger = find_trigger(workflow_class: workflow_class, trigger_key: trigger_key) + logger.info "Checking change-detecting trigger type=#{trigger.type}" + + trigger_state = load_trigger_state(workflow_key: workflow_key, trigger_key: trigger_key, trigger_type: trigger.type) + result = normalize_result( + trigger.detect_changes( + workflow_key: workflow_key, + state: trigger_state.state.deep_symbolize_keys + ) + ) + logger.info "Evaluated change-detecting trigger changed=#{result[:changed]}" + + TriggerState.transaction do + if result[:changed] + with_log_tags(R3x::Log.tag(R3x::Log::JOB_OUTCOME_TAG, "changed")) do + logger.info "Change detected; enqueueing workflow class=#{workflow_class.name}" + end + + workflow_class.perform_later(trigger_key, trigger_payload: result[:payload]) + end + + trigger_state.record_check!(result) + end + end + rescue => e + trigger_state.record_error!(e) if defined?(trigger_state) && trigger_state&.persisted? + + with_log_tags( + R3x::Log.tag(R3x::Log::WORKFLOW_KEY_TAG, workflow_key), + R3x::Log.tag(R3x::Log::TRIGGER_KEY_TAG, defined?(trigger_key) ? trigger_key : nil), + R3x::Log.tag(R3x::Log::JOB_OUTCOME_TAG, "failed") + ) do + structured_error(message: "Change detection failed", error: e) + end + + raise + end + + private + + def find_trigger(workflow_class:, trigger_key:) + trigger = workflow_class.triggers_by_key[trigger_key] + + if trigger.nil? + raise ArgumentError, "Unknown trigger key '#{trigger_key}' for workflow '#{workflow_class.workflow_key}'" + end + + unless trigger.change_detecting? + raise ArgumentError, "Trigger '#{trigger_key}' is not change-detecting" + end + + trigger + end + + def load_trigger_state(workflow_key:, trigger_key:, trigger_type:) + TriggerState.find_or_create_by!( + workflow_key: workflow_key, + trigger_key: trigger_key + ) do |state| + state.trigger_type = trigger_type.to_s + state.state = {} + end + end + + def normalize_arguments(workflow_key, options) + if workflow_key.is_a?(Hash) && options.nil? + options = workflow_key + workflow_key = nil + end + + options = normalize_options_hash(options) + workflow_key ||= options[:workflow_key] + + [ workflow_key.presence || raise(ArgumentError, "Missing workflow_key"), options ] + end + + def normalize_options_hash(options) + case options + when nil + {} + when Hash + options.deep_symbolize_keys + else + raise ArgumentError, "Expected options hash, got #{options.class.name}" + end + end + + def normalize_result(result) + normalized = result.deep_symbolize_keys + + unless normalized.key?(:changed) && normalized.key?(:state) + raise ArgumentError, "Change-detecting trigger must return a hash with :changed and :state" + end + + normalized[:state] ||= {} + normalized[:payload] ||= nil + normalized + end + end +end diff --git a/app/lib/r3x/dashboard/overview.rb b/app/lib/r3x/dashboard/overview.rb index 01e6581..a56dee0 100644 --- a/app/lib/r3x/dashboard/overview.rb +++ b/app/lib/r3x/dashboard/overview.rb @@ -34,7 +34,7 @@ def summary_cards def needs_attention @needs_attention ||= workflows - .select { |workflow| workflow.dig(:health, :status) == "failed" } + .select { |workflow| %w[failed trigger_error].include?(workflow.dig(:health, :status)) } .map do |workflow| workflow.merge( attention_at: attention_time_for(workflow), @@ -65,11 +65,17 @@ def workflows end def attention_time_for(workflow) - workflow.dig(:last_run, :recorded_at) + return workflow.dig(:last_run, :recorded_at) if workflow.dig(:health, :status) == "failed" + + workflow[:trigger_entries] + .filter_map { |entry| entry[:trigger_state]&.last_error_at } + .max end def attention_label_for(workflow) - "Failed" + return "Failed" if workflow.dig(:health, :status) == "failed" + + "Trigger error" end end end diff --git a/app/lib/r3x/dashboard/workflow/catalog.rb b/app/lib/r3x/dashboard/workflow/catalog.rb index be03d88..4b9662d 100644 --- a/app/lib/r3x/dashboard/workflow/catalog.rb +++ b/app/lib/r3x/dashboard/workflow/catalog.rb @@ -3,6 +3,7 @@ module Dashboard module Workflow class Catalog TRIGGER_OBSERVATION_JOB_LIMIT = 250 + CHANGE_DETECTION_CLASS_NAME = ::Dashboard::RecurringTask::CHANGE_DETECTION_CLASS_NAME def all workflow_keys.map { |workflow_key| build_entry(workflow_key) } @@ -17,7 +18,7 @@ def find!(workflow_key) def workflow_keys @workflow_keys ||= begin - keys = workflow_keys_from_recurring_tasks + observed_class_names_to_keys.values + keys = workflow_keys_from_recurring_tasks + workflow_keys_from_trigger_states + observed_class_names_to_keys.values keys.compact.uniq.sort end end @@ -26,6 +27,10 @@ def recurring_tasks_for(workflow_key) recurring_tasks_by_workflow_key.fetch(workflow_key.to_s, []) end + def trigger_states_for(workflow_key) + trigger_states_by_workflow_key.fetch(workflow_key.to_s, []) + end + def class_names_for(workflow_key) class_names_by_workflow_key.fetch(workflow_key.to_s, []) end @@ -45,13 +50,20 @@ def build_entry(workflow_key) end def trigger_keys_for(workflow_key) - recurring_tasks_for(workflow_key).map(&:trigger_key).compact.uniq.sort + task_keys = recurring_tasks_for(workflow_key).map(&:trigger_key) + state_keys = trigger_states_for(workflow_key).map(&:trigger_key) + + (task_keys + state_keys).compact.uniq.sort end def workflow_keys_from_recurring_tasks recurring_tasks_by_workflow_key.keys end + def workflow_keys_from_trigger_states + trigger_states_by_workflow_key.keys + end + def recurring_tasks @recurring_tasks ||= begin ::Dashboard::RecurringTask.workflow_tasks.to_a @@ -64,6 +76,16 @@ def recurring_tasks_by_workflow_key @recurring_tasks_by_workflow_key ||= recurring_tasks.group_by(&:workflow_key) end + def trigger_states_by_workflow_key + @trigger_states_by_workflow_key ||= begin + R3x::TriggerState + .order(:workflow_key, :trigger_key) + .group_by(&:workflow_key) + rescue ActiveRecord::NoDatabaseError, ActiveRecord::StatementInvalid + {} + end + end + def recurring_task_class_names_to_keys @recurring_task_class_names_to_keys ||= recurring_tasks.each_with_object({}) do |task, mapping| class_name = task.direct_workflow_class_name @@ -150,6 +172,12 @@ def trigger_keys_to_workflow_keys mapping[task.trigger_key] << task.workflow_key end + trigger_states_by_workflow_key.each do |workflow_key, states| + states.each do |state| + mapping[state.trigger_key] << workflow_key + end + end + mapping.transform_values { |values| values.compact.uniq } end end diff --git a/app/lib/r3x/dashboard/workflow/run_enqueuer.rb b/app/lib/r3x/dashboard/workflow/run_enqueuer.rb index 36b124d..39cd34f 100644 --- a/app/lib/r3x/dashboard/workflow/run_enqueuer.rb +++ b/app/lib/r3x/dashboard/workflow/run_enqueuer.rb @@ -8,13 +8,24 @@ def initialize(workflow_key:, trigger_key:) end def enqueue! - ::Dashboard::Run.enqueue_direct!(**direct_enqueue_options) + if trigger_key.present? && recurring_task.change_detection? + enqueue_change_detection_job + nil + else + ::Dashboard::Run.enqueue_direct!(**direct_enqueue_options) + end end private attr_reader :trigger_key, :workflow_key + def enqueue_change_detection_job + R3x::ChangeDetectionJob + .set(job_options) + .perform_later(workflow_key, trigger_key: trigger_key) + end + def direct_enqueue_options trigger_key.present? ? trigger_enqueue_options : manual_enqueue_options end @@ -56,6 +67,13 @@ def last_run def catalog @catalog ||= Workflow::Catalog.new end + + def job_options + { + queue: recurring_task.queue_name, + priority: recurring_task.priority + }.compact + end end end end diff --git a/app/lib/r3x/dashboard/workflow/summaries.rb b/app/lib/r3x/dashboard/workflow/summaries.rb index 931e966..b5f03ad 100644 --- a/app/lib/r3x/dashboard/workflow/summaries.rb +++ b/app/lib/r3x/dashboard/workflow/summaries.rb @@ -11,7 +11,7 @@ class Summaries "next_trigger" => "asc", "last_run" => "desc" }.freeze - HEALTH_SORT_ORDER = %w[failed healthy idle].freeze + HEALTH_SORT_ORDER = %w[trigger_error failed healthy idle].freeze attr_reader :direction, :sort @@ -53,8 +53,9 @@ def find!(workflow_key) private def build_summary(workflow_key) + trigger_states = trigger_states_by_workflow_key.fetch(workflow_key, []) recurring_tasks = recurring_tasks_by_workflow_key.fetch(workflow_key, []) - trigger_entries = trigger_entries_for(workflow_key:, recurring_tasks:) + trigger_entries = trigger_entries_for(workflow_key:, trigger_states:, recurring_tasks:) last_run = latest_run_for(workflow_key) preferred_recurring_task = recurring_tasks.find { |task| task.direct_workflow_class_name.present? } || recurring_tasks.first manual_enqueue_options = ::Dashboard::Run.manual_enqueue_options_for( @@ -66,8 +67,8 @@ def build_summary(workflow_key) { class_name: manual_enqueue_options&.fetch(:class_name), - health: health_for(last_run: last_run), - last_seen_at: last_seen_at_for(last_run: last_run), + health: health_for(last_run: last_run, trigger_states: trigger_states), + last_seen_at: last_seen_at_for(last_run: last_run, trigger_states: trigger_states), last_run: last_run && build_run_summary(last_run, workflow_key), mission_control_path: "/ops/jobs", next_trigger_at: trigger_entries.filter_map { |entry| entry[:next_trigger_at] }.min, @@ -149,42 +150,60 @@ def health_rank(status) end def health_timestamp_for(workflow) + return workflow[:last_seen_at] if workflow.dig(:health, :status) == "trigger_error" + workflow.dig(:last_run, :recorded_at) || workflow[:last_seen_at] end - def trigger_entries_for(workflow_key:, recurring_tasks:) + def trigger_entries_for(workflow_key:, trigger_states:, recurring_tasks:) trigger_keys = recurring_tasks.map(&:trigger_key) + trigger_keys |= trigger_states.map(&:trigger_key) recurring_tasks_by_trigger_key = recurring_tasks.index_by(&:trigger_key) + trigger_states_by_trigger_key = trigger_states.index_by(&:trigger_key) trigger_keys.sort.map do |trigger_key| build_trigger_entry( workflow_key: workflow_key, trigger_key: trigger_key, - recurring_task: recurring_tasks_by_trigger_key[trigger_key] + recurring_task: recurring_tasks_by_trigger_key[trigger_key], + trigger_state: trigger_states_by_trigger_key[trigger_key] ) end end - def build_trigger_entry(workflow_key:, trigger_key:, recurring_task:) + def build_trigger_entry(workflow_key:, trigger_key:, recurring_task:, trigger_state:) { + change_detecting: change_detecting_trigger?(recurring_task, trigger_state), cron: recurring_task&.schedule, - mode: trigger_mode_for(recurring_task), + mode: trigger_mode_for(recurring_task, trigger_state), next_trigger_at: next_trigger_at_for(recurring_task), queue_name: recurring_task&.queue_name || latest_queue_name(workflow_key), recurring_task: recurring_task, - run_now_available: recurring_task.present?, + run_now_available: recurring_task.present? && !recurring_task.change_detection?, + trigger_state: trigger_state, unique_key: trigger_key, workflow_key: workflow_key } end - def health_for(last_run:) + def health_for(last_run:, trigger_states:) + trigger_error = latest_trigger_error_for(trigger_states) + return trigger_error_health(trigger_error) if trigger_error.present? + return failed_run_health(last_run) if last_run&.status == "failed" return healthy_health if last_run.present? idle_health end + def trigger_error_health(trigger_error) + { + detail: trigger_error.last_error_message, + label: "Trigger error", + status: "trigger_error" + } + end + def failed_run_health(last_run) { detail: last_run.failed_execution&.error, @@ -209,6 +228,19 @@ def idle_health } end + def latest_trigger_error_for(trigger_states) + latest_trigger_error = nil + + trigger_states.each do |state| + next if state.last_error_at.blank? + next if latest_trigger_error.present? && state.last_error_at <= latest_trigger_error.last_error_at + + latest_trigger_error = state + end + + latest_trigger_error + end + def next_trigger_at_for(recurring_task) return if recurring_task.blank? @@ -231,18 +263,41 @@ def recurring_tasks_by_workflow_key end end + def trigger_states_by_workflow_key + @trigger_states_by_workflow_key ||= begin + R3x::TriggerState + .order(:workflow_key, :trigger_key) + .group_by(&:workflow_key) + rescue ActiveRecord::NoDatabaseError, ActiveRecord::StatementInvalid + {} + end + end + def catalog @catalog ||= Workflow::Catalog.new end - def trigger_mode_for(recurring_task) + def change_detecting_trigger?(recurring_task, trigger_state) + return true if recurring_task&.change_detection? + + trigger_state&.trigger_type.present? && !%w[manual schedule].include?(trigger_state.trigger_type) && recurring_task.blank? + end + + def trigger_mode_for(recurring_task, trigger_state) + return "change_detecting" if change_detecting_trigger?(recurring_task, trigger_state) return "scheduled" if recurring_task.present? + return trigger_state.trigger_type if trigger_state&.trigger_type.present? "observed" end - def last_seen_at_for(last_run:) - last_run&.recorded_at + def last_seen_at_for(last_run:, trigger_states:) + activity_times = [ + last_run&.recorded_at, + *trigger_states.flat_map { |state| [ state.last_checked_at, state.last_triggered_at, state.last_error_at ] } + ] + + activity_times.compact.max end def latest_queue_name(workflow_key) diff --git a/app/models/dashboard/recurring_task.rb b/app/models/dashboard/recurring_task.rb index 54246a3..be2c8ee 100644 --- a/app/models/dashboard/recurring_task.rb +++ b/app/models/dashboard/recurring_task.rb @@ -1,5 +1,7 @@ module Dashboard class RecurringTask < ApplicationRecord + CHANGE_DETECTION_CLASS_NAME = "R3x::ChangeDetectionJob" + self.table_name = "solid_queue_recurring_tasks" serialize :arguments, coder: SolidQueue::RecurringTask::Arguments, default: [] @@ -34,7 +36,13 @@ def trigger_key parsed_key.fetch(:trigger_key) end + def change_detection? + class_name == CHANGE_DETECTION_CLASS_NAME + end + def direct_workflow_class_name + return if change_detection? + class_name.presence end diff --git a/app/models/dashboard/run.rb b/app/models/dashboard/run.rb index b97489f..9d620da 100644 --- a/app/models/dashboard/run.rb +++ b/app/models/dashboard/run.rb @@ -2,6 +2,8 @@ module Dashboard class Run < ApplicationRecord include R3x::Concerns::Logger + CHANGE_DETECTION_CLASS_NAME = "R3x::ChangeDetectionJob" + IGNORED_CLASS_NAMES = [ CHANGE_DETECTION_CLASS_NAME ].freeze STATUSES = %w[blocked failed finished queued running scheduled].freeze LATEST_ACTIVITY_BUCKETS = [ [ "failed", :solid_queue_failed_executions, :created_at ], @@ -25,12 +27,13 @@ class Run < ApplicationRecord has_one :claimed_execution, class_name: "SolidQueue::ClaimedExecution", foreign_key: :job_id scope :with_execution_associations, -> { includes(:failed_execution, :scheduled_execution, :blocked_execution, :ready_execution, :claimed_execution) } + scope :excluding_ignored_classes, -> { where.not(class_name: IGNORED_CLASS_NAMES) } scope :dashboard_visible, ->(class_names) do visible_class_names = Array(class_names).compact_blank visible_class_names.present? ? where(class_name: visible_class_names) : none end - scope :direct_workflows, -> { where("class_name LIKE ?", "Workflows::%") } - scope :observed_triggers, -> { where(class_name: []) } + scope :direct_workflows, -> { excluding_ignored_classes.where("class_name LIKE ?", "Workflows::%") } + scope :observed_triggers, -> { excluding_ignored_classes.where(class_name: []) } scope :unfinished, -> { where(finished_at: nil).where.missing(:failed_execution) } scope :for_status, ->(status) do case status.to_s diff --git a/app/models/r3x/trigger_state.rb b/app/models/r3x/trigger_state.rb new file mode 100644 index 0000000..a8eb24d --- /dev/null +++ b/app/models/r3x/trigger_state.rb @@ -0,0 +1,26 @@ +module R3x + class TriggerState < ApplicationRecord + serialize :state, coder: MultiJSON if ActiveRecord::Base.connection_db_config.adapter.to_s.downcase == "sqlite" + + validates :workflow_key, presence: true + validates :trigger_type, presence: true + validates :trigger_key, presence: true, uniqueness: { scope: :workflow_key } + + def record_check!(result) + update!( + state: result.fetch(:state), + last_checked_at: Time.current, + last_error_at: nil, + last_error_message: nil, + last_triggered_at: result[:changed] ? Time.current : last_triggered_at + ) + end + + def record_error!(error) + update!( + last_error_at: Time.current, + last_error_message: error.message + ) + end + end +end diff --git a/app/views/r3x/dashboard/workflows/index.html.erb b/app/views/r3x/dashboard/workflows/index.html.erb index ade01a4..0aaa86a 100644 --- a/app/views/r3x/dashboard/workflows/index.html.erb +++ b/app/views/r3x/dashboard/workflows/index.html.erb @@ -9,7 +9,7 @@
<% if @workflows.empty? %> -
No workflows are visible yet. They will appear here once recurring tasks or direct workflow runs are written to the database.
+
No workflows are visible yet. They will appear here once recurring tasks, direct workflow runs, or trigger state are written to the database.
<% else %> diff --git a/app/views/r3x/dashboard/workflows/show.html.erb b/app/views/r3x/dashboard/workflows/show.html.erb index ed1433b..a95d360 100644 --- a/app/views/r3x/dashboard/workflows/show.html.erb +++ b/app/views/r3x/dashboard/workflows/show.html.erb @@ -11,7 +11,7 @@ <% @workflow[:trigger_entries].each do |trigger_entry| %> + <% trigger_state = trigger_entry[:trigger_state] %> @@ -90,6 +94,17 @@ <%= dashboard_pill(safe_join([ "Queue", content_tag(:code, trigger_entry[:queue_name]) ], " "), "muted") %> <% end %> + + <% if trigger_state.present? %> +
Last checked <%= dashboard_timestamp(trigger_state.last_checked_at) %>
+
Last triggered <%= dashboard_timestamp(trigger_state.last_triggered_at) %>
+ + <% if trigger_state.last_error_at.present? %> +
Last error <%= dashboard_error_summary(trigger_state.last_error_message) %>
+ <% end %> + <% else %> + No persisted runtime state + <% end %> <% end %> diff --git a/db/migrate/20260318100000_create_trigger_states.rb b/db/migrate/20260318100000_create_trigger_states.rb new file mode 100644 index 0000000..835a098 --- /dev/null +++ b/db/migrate/20260318100000_create_trigger_states.rb @@ -0,0 +1,20 @@ +class CreateTriggerStates < ActiveRecord::Migration[8.1] + def change + create_table :trigger_states do |t| + t.string :workflow_key, null: false + t.string :trigger_key, null: false + t.string :trigger_type, null: false + t.json :state, null: false, default: {} + t.datetime :last_checked_at + t.datetime :last_triggered_at + t.datetime :last_error_at + t.text :last_error_message + + t.timestamps + end + + add_index :trigger_states, [ :workflow_key, :trigger_key ], unique: true + add_index :trigger_states, :workflow_key + add_index :trigger_states, :trigger_type + end +end diff --git a/db/schema.rb b/db/schema.rb index 8669385..5b8530e 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -143,6 +143,22 @@ t.index ["key"], name: "index_solid_queue_semaphores_on_key", unique: true end + create_table "trigger_states", force: :cascade do |t| + t.datetime "created_at", null: false + t.datetime "last_checked_at" + t.datetime "last_error_at" + t.text "last_error_message" + t.datetime "last_triggered_at" + t.json "state", default: {}, null: false + t.string "trigger_key", null: false + t.string "trigger_type", null: false + t.datetime "updated_at", null: false + t.string "workflow_key", null: false + t.index ["trigger_type"], name: "index_trigger_states_on_trigger_type" + t.index ["workflow_key", "trigger_key"], name: "index_trigger_states_on_workflow_key_and_trigger_key", unique: true + t.index ["workflow_key"], name: "index_trigger_states_on_workflow_key" + end + add_foreign_key "solid_queue_blocked_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade add_foreign_key "solid_queue_claimed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade add_foreign_key "solid_queue_failed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade diff --git a/db/seeds/support/dashboard_demo_seeder.rb b/db/seeds/support/dashboard_demo_seeder.rb index fdea71b..a22ed1d 100644 --- a/db/seeds/support/dashboard_demo_seeder.rb +++ b/db/seeds/support/dashboard_demo_seeder.rb @@ -9,6 +9,7 @@ def seed! demo_definitions.map do |definition| create_recurring_task!(definition) + create_trigger_state!(definition) create_run!(definition) end end @@ -36,7 +37,10 @@ def demo_definitions priority: 0, schedule: "15 * * * *", trigger_key: "schedule:hourly", + trigger_type: "schedule", seed_scheduled_at: now + 2.days, + last_checked_at: now - 18.hours, + last_triggered_at: now - 17.hours, status: "finished", created_at: now - 18.hours, updated_at: now - 17.hours, @@ -49,8 +53,11 @@ def demo_definitions queue_name: "feeds", priority: 10, schedule: "*/10 * * * *", - trigger_key: "schedule:external", + trigger_key: "feed:external", + trigger_type: "feed", seed_scheduled_at: now + 2.days, + last_checked_at: now - 3.hours, + last_triggered_at: now - 3.hours, status: "failed", created_at: now - 3.hours, updated_at: now - 2.hours - 52.minutes, @@ -69,7 +76,10 @@ def demo_definitions priority: 5, schedule: "0 */6 * * *", trigger_key: "schedule:dispatch", + trigger_type: "schedule", seed_scheduled_at: now + 2.days, + last_checked_at: now - 14.minutes, + last_triggered_at: now - 12.minutes, status: "running", created_at: now - 12.minutes, updated_at: now - 11.minutes, @@ -83,7 +93,10 @@ def demo_definitions priority: 20, schedule: "*/5 * * * *", trigger_key: "schedule:inventory", + trigger_type: "schedule", seed_scheduled_at: now + 2.days, + last_checked_at: now - 26.minutes, + last_triggered_at: now - 24.minutes, status: "finished", created_at: now - 24.minutes, updated_at: now - 23.minutes, @@ -97,6 +110,9 @@ def demo_definitions priority: 30, schedule: "30 2 * * *", trigger_key: "schedule:nightly", + trigger_type: "schedule", + last_checked_at: now - 1.hour, + last_triggered_at: now - 1.day, status: "scheduled", created_at: now - 5.minutes, updated_at: now - 5.minutes, @@ -120,6 +136,7 @@ def clear_demo_data! SolidQueue::RecurringTask.where("key LIKE ?", "workflow:#{DEMO_WORKFLOW_PREFIX}%").delete_all SolidQueue::Process.where("name LIKE ?", "#{DEMO_PROCESS_PREFIX}%").delete_all + R3x::TriggerState.where("workflow_key LIKE ?", "#{DEMO_WORKFLOW_PREFIX}%").delete_all end def create_recurring_task!(definition) @@ -134,6 +151,17 @@ def create_recurring_task!(definition) ) end + def create_trigger_state!(definition) + R3x::TriggerState.create!( + workflow_key: definition.fetch(:workflow_key), + trigger_key: definition.fetch(:trigger_key), + trigger_type: definition.fetch(:trigger_type), + state: {}, + last_checked_at: definition[:last_checked_at], + last_triggered_at: definition[:last_triggered_at] + ) + end + def create_run!(definition) job = create_job!(definition) diff --git a/lib/r3x/recurring_tasks_config.rb b/lib/r3x/recurring_tasks_config.rb index 648959e..fa05a16 100644 --- a/lib/r3x/recurring_tasks_config.rb +++ b/lib/r3x/recurring_tasks_config.rb @@ -82,12 +82,21 @@ def namespaced_key(workflow_key, trigger) def task_options_for(workflow_class:, trigger:) queue_name = workflow_class.new.queue_name - { - class: workflow_class.name, - args: [ trigger.unique_key ], - schedule: trigger.schedule, - queue: queue_name - } + if trigger.change_detecting? + { + class: "R3x::ChangeDetectionJob", + args: [ workflow_class.workflow_key, { "trigger_key" => trigger.unique_key } ], + schedule: trigger.schedule, + queue: queue_name + } + else + { + class: workflow_class.name, + args: [ trigger.unique_key ], + schedule: trigger.schedule, + queue: queue_name + } + end end def workflow_and_trigger_for(key) diff --git a/lib/r3x/triggers/base.rb b/lib/r3x/triggers/base.rb index df3eae3..b924d26 100644 --- a/lib/r3x/triggers/base.rb +++ b/lib/r3x/triggers/base.rb @@ -10,6 +10,10 @@ def initialize(type, **options) @options = options end + def change_detecting? + false + end + def cron_schedulable? false end diff --git a/lib/r3x/triggers/concerns/change_detecting.rb b/lib/r3x/triggers/concerns/change_detecting.rb new file mode 100644 index 0000000..13fb95b --- /dev/null +++ b/lib/r3x/triggers/concerns/change_detecting.rb @@ -0,0 +1,15 @@ +module R3x + module Triggers + module Concerns + module ChangeDetecting + def change_detecting? + true + end + + def detect_changes(workflow_key:, state:) + raise NotImplementedError, "#{self.class.name} must implement #detect_changes" + end + end + end + end +end diff --git a/test/integration/dashboard_test.rb b/test/integration/dashboard_test.rb index 6550042..5b5a585 100644 --- a/test/integration/dashboard_test.rb +++ b/test/integration/dashboard_test.rb @@ -27,6 +27,14 @@ class DashboardTest < ActionDispatch::IntegrationTest created_at: 10.minutes.ago, updated_at: 1.minute.ago ) + R3x::TriggerState.create!( + workflow_key: "test_workflow", + trigger_key: @trigger, + trigger_type: "schedule", + state: {}, + last_checked_at: 2.minutes.ago, + last_triggered_at: 1.minute.ago + ) end teardown do @@ -180,6 +188,14 @@ class DashboardTest < ActionDispatch::IntegrationTest run_status: "failed", recorded_at: 2.days.ago ) + R3x::TriggerState.create!( + workflow_key: "stale_failure_workflow", + trigger_key: "schedule:stale", + trigger_type: "schedule", + state: {}, + last_checked_at: 1.minute.ago + ) + create_dashboard_workflow( workflow_key: "recent_failure_workflow", trigger_key: "schedule:recent", @@ -195,7 +211,7 @@ class DashboardTest < ActionDispatch::IntegrationTest assert_includes response.body, "Failed " end - test "workflow detail renders trigger and recent runs" do + test "workflow detail renders trigger state and recent runs" do ENV.delete("R3X_LOGS_PROVIDER") get "/workflows/test_workflow" @@ -207,6 +223,7 @@ class DashboardTest < ActionDispatch::IntegrationTest assert_includes response.body, "Last seen" assert_includes response.body, "Triggers" assert_includes response.body, @trigger + assert_includes response.body, "Last checked" assert_includes response.body, "Run now" refute_includes response.body, "Recent logs" end @@ -286,11 +303,17 @@ class DashboardTest < ActionDispatch::IntegrationTest run_status: "failed", recorded_at: 2.minutes.ago ) + create_dashboard_workflow( + workflow_key: "trigger_error_workflow", + trigger_key: "feed:error", + trigger_error_at: 1.minute.ago + ) + get "/workflows", params: { sort: "health", direction: "desc" } assert_response :success assert_equal( - [ "Idle Workflow", "Healthy Workflow", "Failed Workflow" ], + [ "Idle Workflow", "Healthy Workflow", "Failed Workflow", "Trigger Error Workflow" ], css_select("#workflows-catalog tbody tr .title-link").map { |link| link.text.strip } ) assert_includes response.body, 'href="/workflows?direction=asc&sort=health#workflows-catalog"' @@ -747,6 +770,62 @@ class DashboardTest < ActionDispatch::IntegrationTest refute_includes response.body, "Unavailable" end + test "workflow detail can enqueue run now for change detection task" do + SolidQueue::RecurringTask.create!( + key: "workflow:feed_watch:feed:123", + schedule: "*/5 * * * *", + class_name: "R3x::ChangeDetectionJob", + arguments: [ "feed_watch", { "trigger_key" => "feed:123" } ], + queue_name: "feed", + static: false + ) + + assert_enqueued_jobs 1, only: R3x::ChangeDetectionJob do + post "/workflows/feed_watch/runs", params: { trigger_key: "feed:123" } + end + + assert_redirected_to "/workflows/feed_watch" + end + + test "workflow detail shows generic run now for change-detecting-only workflows" do + clear_tables + SolidQueue::RecurringTask.create!( + key: "workflow:feed_watch:feed:123", + schedule: "*/5 * * * *", + class_name: "R3x::ChangeDetectionJob", + arguments: [ "feed_watch", { "trigger_key" => "feed:123" } ], + queue_name: "feed", + static: false + ) + + get "/workflows/feed_watch" + + assert_response :success + assert_includes response.body, "Run now" + end + + test "workflow detail run now enqueues a manual workflow job for change-detecting-only workflows" do + clear_tables + SolidQueue::RecurringTask.create!( + key: "workflow:feed_watch:feed:123", + schedule: "*/5 * * * *", + class_name: "R3x::ChangeDetectionJob", + arguments: [ "feed_watch", { "trigger_key" => "feed:123" } ], + queue_name: "feed", + priority: 4, + static: false + ) + + post "/workflows/feed_watch/runs" + + job = SolidQueue::Job.order(:id).last + assert_redirected_to "/workflow-runs/#{job.id}" + assert_equal "Workflows::FeedWatch", job.class_name + assert_equal "feed", job.queue_name + assert_equal 4, job.priority + assert_equal [], Dashboard::Run.find(job.id).workflow_arguments + end + test "workflow detail run now returns not found for unknown workflow keys" do clear_tables @@ -775,7 +854,7 @@ def claim_job!(job) SolidQueue::ClaimedExecution.create!(job_id: job.id, process_id: process.id, created_at: 30.seconds.ago) end - def create_dashboard_workflow(workflow_key:, trigger_key:, run_status: nil, recorded_at: nil) + def create_dashboard_workflow(workflow_key:, trigger_key:, run_status: nil, recorded_at: nil, trigger_error_at: nil) job_class_name = DashboardTestWorkflows.ensure_class(workflow_key.camelize) SolidQueue::RecurringTask.create!( @@ -787,6 +866,17 @@ def create_dashboard_workflow(workflow_key:, trigger_key:, run_status: nil, reco static: false ) + if trigger_error_at.present? + R3x::TriggerState.create!( + workflow_key: workflow_key, + trigger_key: trigger_key, + trigger_type: "feed", + state: {}, + last_error_at: trigger_error_at, + last_error_message: "#{workflow_key} error" + ) + end + return if run_status.blank? job = DashboardJobRows.create_job!( diff --git a/test/jobs/r3x/change_detection_job_test.rb b/test/jobs/r3x/change_detection_job_test.rb new file mode 100644 index 0000000..746a008 --- /dev/null +++ b/test/jobs/r3x/change_detection_job_test.rb @@ -0,0 +1,201 @@ +require "test_helper" +require_relative "../../support/fake_change_detecting_trigger" + +module R3x + class ChangeDetectionJobTest < ActiveSupport::TestCase + include ActiveJob::TestHelper + + setup do + @original_workflow_paths = ENV["R3X_WORKFLOW_PATHS"] + ENV["R3X_WORKFLOW_PATHS"] = Rails.root.join("test/fixtures/workflows").to_s + clear_enqueued_jobs + R3x::TriggerState.delete_all + end + + teardown do + clear_enqueued_jobs + R3x::TriggerState.delete_all + ENV["R3X_WORKFLOW_PATHS"] = @original_workflow_paths + Workflow::Registry.reset! + end + + test "creates trigger state and does not enqueue workflow when unchanged" do + fake_trigger = R3x::TestSupport::FakeChangeDetectingTrigger.new( + identity: "feed", + detector: ->(workflow_key:, state:) do + assert_equal "test_change_detecting_feed", workflow_key + assert_equal({}, state) + { changed: false, state: { cursor: "v1" }, payload: nil } + end + ) + + register_change_detecting_workflow(fake_trigger) + + assert_no_enqueued_jobs do + ChangeDetectionJob.perform_now("test_change_detecting_feed", { "trigger_key" => fake_trigger.unique_key }) + end + + state = R3x::TriggerState.find_by!(workflow_key: "test_change_detecting_feed", trigger_key: fake_trigger.unique_key) + assert_equal "fake_change_detecting", state.trigger_type + assert_equal({ "cursor" => "v1" }, state.state) + assert state.last_checked_at.present? + assert_nil state.last_triggered_at + assert_nil state.last_error_at + end + + test "enqueues workflow and records last_triggered_at when changed" do + fake_trigger = R3x::TestSupport::FakeChangeDetectingTrigger.new( + identity: "feed", + detector: ->(workflow_key:, state:) do + { changed: true, state: state.merge(cursor: "v2"), payload: { "entries" => [ { "title" => "Hello" } ] } } + end + ) + + workflow_class = register_change_detecting_workflow(fake_trigger) + + assert_enqueued_jobs 1, only: workflow_class do + ChangeDetectionJob.perform_now("test_change_detecting_feed", { "trigger_key" => fake_trigger.unique_key }) + end + + enqueued_job = enqueued_jobs.last + assert_equal workflow_class, enqueued_job[:job] + assert_equal fake_trigger.unique_key, enqueued_job[:args][0] + payload = enqueued_job[:args][1]["trigger_payload"] + assert_equal 1, payload["entries"].length + assert_equal "Hello", payload["entries"].first["title"] + + state = R3x::TriggerState.find_by!(workflow_key: "test_change_detecting_feed", trigger_key: fake_trigger.unique_key) + assert_equal({ "cursor" => "v2" }, state.state) + assert state.last_triggered_at.present? + end + + test "does not advance trigger state when enqueue fails" do + fake_trigger = R3x::TestSupport::FakeChangeDetectingTrigger.new( + identity: "feed", + detector: ->(workflow_key:, state:) do + { changed: true, state: state.merge(cursor: "v2"), payload: { "entries" => [ { "title" => "Hello" } ] } } + end + ) + + workflow_class = register_change_detecting_workflow(fake_trigger) + + workflow_class.stubs(:perform_later).raises(ActiveJob::EnqueueError, "enqueue failed") + + error = assert_raises(ActiveJob::EnqueueError) do + ChangeDetectionJob.perform_now("test_change_detecting_feed", { "trigger_key" => fake_trigger.unique_key }) + end + + assert_equal "enqueue failed", error.message + + state = R3x::TriggerState.find_by!(workflow_key: "test_change_detecting_feed", trigger_key: fake_trigger.unique_key) + + assert_equal({}, state.state) + assert_nil state.last_checked_at + assert_nil state.last_triggered_at + assert state.last_error_at.present? + assert_equal "enqueue failed", state.last_error_message + end + + test "persists last error details when detection fails" do + fake_trigger = R3x::TestSupport::FakeChangeDetectingTrigger.new( + identity: "feed", + detector: ->(workflow_key:, state:) do + raise ArgumentError, "detection failed" + end + ) + + register_change_detecting_workflow(fake_trigger) + + error = assert_raises(ArgumentError) do + ChangeDetectionJob.perform_now("test_change_detecting_feed", trigger_key: fake_trigger.unique_key) + end + + assert_equal "detection failed", error.message + + state = R3x::TriggerState.find_by!(workflow_key: "test_change_detecting_feed", trigger_key: fake_trigger.unique_key) + assert state.last_error_at.present? + assert_equal "detection failed", state.last_error_message + end + + test "tags change detection logs with workflow and trigger identifiers" do + fake_trigger = R3x::TestSupport::FakeChangeDetectingTrigger.new( + identity: "feed", + detector: ->(**) do + Rails.logger.info("checking trigger") + { changed: false, state: {}, payload: nil } + end + ) + + register_change_detecting_workflow(fake_trigger) + + output = capture_logged_output do + ChangeDetectionJob.perform_now("test_change_detecting_feed", trigger_key: fake_trigger.unique_key) + end + + assert_includes output, "r3x.workflow_key=test_change_detecting_feed" + assert_includes output, "r3x.trigger_key=#{fake_trigger.unique_key}" + assert_includes output, "Checking change-detecting trigger type=fake_change_detecting" + assert_includes output, "Evaluated change-detecting trigger changed=false" + assert_includes output, "checking trigger" + end + + test "logs changed outcome before enqueueing workflow" do + fake_trigger = R3x::TestSupport::FakeChangeDetectingTrigger.new( + identity: "feed", + detector: ->(workflow_key:, state:) do + { changed: true, state: state.merge(cursor: "v2"), payload: { "entries" => [ { "title" => "Hello" } ] } } + end + ) + + register_change_detecting_workflow(fake_trigger) + + output = capture_logged_output do + ChangeDetectionJob.perform_now("test_change_detecting_feed", trigger_key: fake_trigger.unique_key) + end + + assert_includes output, "Change detected; enqueueing workflow class=TestChangeDetectingFeed" + assert_includes output, "r3x.job_outcome=changed" + end + + test "logs failed outcome when change detection raises" do + fake_trigger = R3x::TestSupport::FakeChangeDetectingTrigger.new( + identity: "feed", + detector: ->(**) do + raise ArgumentError, "detection failed" + end + ) + + register_change_detecting_workflow(fake_trigger) + + output = capture_logged_output do + assert_raises(ArgumentError) do + ChangeDetectionJob.perform_now("test_change_detecting_feed", trigger_key: fake_trigger.unique_key) + end + end + + assert_includes output, "r3x.job_outcome=failed" + assert_includes output, "Change detection failed" + assert_includes output, "\"error_class\":\"ArgumentError\"" + assert_includes output, "\"error_message\":\"detection failed\"" + assert_includes output, "\"backtrace\":[" + end + + private + + def register_change_detecting_workflow(fake_trigger) + workflow_class = Class.new(R3x::Workflow::Base) do + def self.name + "TestChangeDetectingFeed" + end + + def run + ctx.trigger.payload + end + end + + workflow_class.stubs(:triggers_by_key).returns({ fake_trigger.unique_key => fake_trigger }) + Workflow::Registry.register(workflow_class) + workflow_class + end + end +end diff --git a/test/lib/r3x/dashboard/recurring_task_test.rb b/test/lib/r3x/dashboard/recurring_task_test.rb index ae000e5..6a7551c 100644 --- a/test/lib/r3x/dashboard/recurring_task_test.rb +++ b/test/lib/r3x/dashboard/recurring_task_test.rb @@ -9,8 +9,8 @@ class Dashboard::RecurringTaskTest < ActiveSupport::TestCase TestDbCleanup.clear_runtime_tables! end - test "parses workflow and trigger keys for recurring workflow tasks" do - task = Dashboard::RecurringTask.create!( + test "parses workflow and trigger keys for direct and change-detection tasks" do + direct_task = Dashboard::RecurringTask.create!( key: "workflow:test_workflow:schedule:123", schedule: "0 * * * *", class_name: "Workflows::TestWorkflow", @@ -18,10 +18,24 @@ class Dashboard::RecurringTaskTest < ActiveSupport::TestCase queue_name: "default", static: false ) + change_detection_task = Dashboard::RecurringTask.create!( + key: "workflow:feed_watch:feed:123", + schedule: "*/5 * * * *", + class_name: Dashboard::RecurringTask::CHANGE_DETECTION_CLASS_NAME, + arguments: [ "feed_watch", { "trigger_key" => "feed:123" } ], + queue_name: "feeds", + static: false + ) + + assert_equal "test_workflow", direct_task.workflow_key + assert_equal "schedule:123", direct_task.trigger_key + assert_equal "Workflows::TestWorkflow", direct_task.direct_workflow_class_name + refute direct_task.change_detection? - assert_equal "test_workflow", task.workflow_key - assert_equal "schedule:123", task.trigger_key - assert_equal "Workflows::TestWorkflow", task.direct_workflow_class_name + assert_equal "feed_watch", change_detection_task.workflow_key + assert_equal "feed:123", change_detection_task.trigger_key + assert_nil change_detection_task.direct_workflow_class_name + assert change_detection_task.change_detection? end test "workflow key lookup matches literal underscores and percents" do @@ -58,8 +72,16 @@ class Dashboard::RecurringTaskTest < ActiveSupport::TestCase assert_equal [ "workflow:foo%bar:schedule:1" ], Dashboard::RecurringTask.for_workflow_key("foo%bar").pluck(:key) end - test "preferred_for_workflow returns a matching recurring workflow task" do - task = Dashboard::RecurringTask.create!( + test "preferred_for_workflow prefers direct workflow classes over change detection tasks" do + change_detection_task = Dashboard::RecurringTask.create!( + key: "workflow:test_workflow:feed:123", + schedule: "*/5 * * * *", + class_name: Dashboard::RecurringTask::CHANGE_DETECTION_CLASS_NAME, + arguments: [ "test_workflow", { "trigger_key" => "feed:123" } ], + queue_name: "feeds", + static: false + ) + direct_task = Dashboard::RecurringTask.create!( key: "workflow:test_workflow:schedule:123", schedule: "0 * * * *", class_name: "Workflows::TestWorkflow", @@ -68,7 +90,7 @@ class Dashboard::RecurringTaskTest < ActiveSupport::TestCase static: false ) - assert_equal task, Dashboard::RecurringTask.preferred_for_workflow("test_workflow") - assert_equal task, Dashboard::RecurringTask.find_by_workflow_and_trigger_key!(workflow_key: "test_workflow", trigger_key: "schedule:123") + assert_equal direct_task, Dashboard::RecurringTask.preferred_for_workflow("test_workflow") + assert_equal change_detection_task, Dashboard::RecurringTask.find_by_workflow_and_trigger_key!(workflow_key: "test_workflow", trigger_key: "feed:123") end end diff --git a/test/lib/r3x/dashboard/workflow_catalog_test.rb b/test/lib/r3x/dashboard/workflow_catalog_test.rb index 3135146..5c2a264 100644 --- a/test/lib/r3x/dashboard/workflow_catalog_test.rb +++ b/test/lib/r3x/dashboard/workflow_catalog_test.rb @@ -13,7 +13,7 @@ class WorkflowCatalogTest < ActiveSupport::TestCase TestDbCleanup.clear_runtime_tables! end - test "collects workflow keys from recurring tasks and direct workflow runs" do + test "collects workflow keys from recurring tasks trigger states and direct workflow runs" do SolidQueue::RecurringTask.create!( key: "workflow:scheduled_workflow:schedule:123", schedule: "0 * * * *", @@ -73,7 +73,7 @@ class WorkflowCatalogTest < ActiveSupport::TestCase assert_includes catalog.class_names_for("manual_only_workflow"), "Workflows::ManualOnlyWorkflow" end - test "maps concrete workflow class names to workflow keys and excludes unrelated jobs" do + test "maps concrete workflow class names to workflow keys and excludes unrelated and change detection jobs" do SolidQueue::RecurringTask.create!( key: "workflow:test_workflow:schedule:123", schedule: "0 * * * *", @@ -82,6 +82,14 @@ class WorkflowCatalogTest < ActiveSupport::TestCase queue_name: "default", static: false ) + SolidQueue::RecurringTask.create!( + key: "workflow:test_workflow:feed:123", + schedule: "*/5 * * * *", + class_name: Workflow::Catalog::CHANGE_DETECTION_CLASS_NAME, + arguments: [ "test_workflow", { "trigger_key" => "feed:123" } ], + queue_name: "feeds", + static: false + ) DashboardJobRows.create_job!( job_class_name: "CleanupJob", arguments: [ "tmp/cache" ], diff --git a/test/lib/r3x/dashboard/workflow_run_counts_test.rb b/test/lib/r3x/dashboard/workflow_run_counts_test.rb index 0d042c7..c9357c0 100644 --- a/test/lib/r3x/dashboard/workflow_run_counts_test.rb +++ b/test/lib/r3x/dashboard/workflow_run_counts_test.rb @@ -78,6 +78,13 @@ class WorkflowRunCountsTest < ActiveSupport::TestCase updated_at: 2.days.ago ) + DashboardJobRows.create_job!( + job_class_name: "R3x::ChangeDetectionJob", + arguments: [ "test_workflow", { trigger_key: "feed:123" } ], + created_at: 10.minutes.ago, + updated_at: 10.minutes.ago + ) + counts = Workflow::RunCounts.new assert_equal 1, counts.running_count diff --git a/test/lib/r3x/dashboard/workflow_run_enqueuer_test.rb b/test/lib/r3x/dashboard/workflow_run_enqueuer_test.rb index 201e976..272c9c8 100644 --- a/test/lib/r3x/dashboard/workflow_run_enqueuer_test.rb +++ b/test/lib/r3x/dashboard/workflow_run_enqueuer_test.rb @@ -71,7 +71,14 @@ class WorkflowRunEnqueuerTest < ActiveSupport::TestCase assert_equal 7, job.priority end - test "enqueue without trigger key falls back to the last visible direct workflow run metadata" do + test "enqueue without trigger key falls back to the last visible direct workflow run metadata only" do + R3x::TriggerState.create!( + workflow_key: "test_workflow", + trigger_key: "feed:123", + trigger_type: "feed", + state: {}, + last_checked_at: 1.minute.ago + ) DashboardJobRows.create_job!( job_class_name: WORKFLOW_JOB_CLASS_NAME, arguments: [ "feed:123", { trigger_payload: { "id" => "42" } } ], @@ -94,6 +101,30 @@ class WorkflowRunEnqueuerTest < ActiveSupport::TestCase assert_equal [], ::Dashboard::Run.find(job.id).workflow_arguments end + test "enqueue without trigger key derives workflow class for change-detecting-only workflows" do + SolidQueue::RecurringTask.create!( + key: "workflow:feed_watch:feed:123", + schedule: "*/5 * * * *", + class_name: "R3x::ChangeDetectionJob", + arguments: [ "feed_watch", { "trigger_key" => "feed:123" } ], + queue_name: "feeds", + priority: 3, + static: false + ) + + result = nil + assert_difference -> { SolidQueue::Job.where(class_name: "Workflows::FeedWatch").count }, 1 do + result = Workflow::RunEnqueuer.new(workflow_key: "feed_watch", trigger_key: nil).enqueue! + end + + assert_instance_of ::Dashboard::Run, result + job = SolidQueue::Job.order(:id).last + assert_equal "Workflows::FeedWatch", job.class_name + assert_equal "feeds", job.queue_name + assert_equal 3, job.priority + assert_equal [], ::Dashboard::Run.find(job.id).workflow_arguments + end + test "enqueue without trigger key matches recurring tasks for workflow key literally" do ensure_workflow_job_class("FooBar") ensure_workflow_job_class("Foo1bar") @@ -152,6 +183,30 @@ class WorkflowRunEnqueuerTest < ActiveSupport::TestCase assert_equal [ "schedule:123" ], ::Dashboard::Run.find(job.id).workflow_arguments end + test "enqueue with change-detection task uses change detection job" do + SolidQueue::RecurringTask.create!( + key: "workflow:test_workflow:feed:123", + schedule: "*/5 * * * *", + class_name: "R3x::ChangeDetectionJob", + arguments: [ "test_workflow", { "trigger_key" => "feed:123" } ], + queue_name: "feeds", + priority: 3, + static: false + ) + + result = nil + assert_enqueued_with( + job: R3x::ChangeDetectionJob, + args: [ "test_workflow", { trigger_key: "feed:123" } ], + queue: "feeds", + priority: 3 + ) do + result = Workflow::RunEnqueuer.new(workflow_key: "test_workflow", trigger_key: "feed:123").enqueue! + end + + assert_nil result + end + test "enqueue without direct target raises a key error" do assert_raises(KeyError) do Workflow::RunEnqueuer.new(workflow_key: "missing_workflow", trigger_key: nil).enqueue! diff --git a/test/lib/r3x/dashboard/workflow_runs_test.rb b/test/lib/r3x/dashboard/workflow_runs_test.rb index 600a24d..4553aee 100644 --- a/test/lib/r3x/dashboard/workflow_runs_test.rb +++ b/test/lib/r3x/dashboard/workflow_runs_test.rb @@ -86,6 +86,27 @@ class WorkflowRunsTest < ActiveSupport::TestCase assert_equal "boom", run[:error] end + test "maps change-detection-driven workflow runs through trigger state observations" do + R3x::TriggerState.create!( + workflow_key: "feed_workflow", + trigger_key: "feed:123", + trigger_type: "feed", + state: {} + ) + job = DashboardJobRows.create_job!( + job_class_name: "Workflows::FeedWorkflow", + arguments: [ "feed:123", { trigger_payload: { "id" => "99" } } ], + finished_at: 1.minute.ago, + created_at: 5.minutes.ago, + updated_at: 1.minute.ago + ) + + run = Workflow::Runs.new(workflow_key: "feed_workflow").all.find { |entry| entry[:job_id] == job.id } + + assert_equal "feed_workflow", run[:workflow_key] + assert_equal "feed:123", run[:trigger_key] + end + test "maps manual-only workflow runs from direct workflow class names without trigger metadata" do job = DashboardJobRows.create_job!( job_class_name: "Workflows::ManualOnlyWorkflow", @@ -175,6 +196,19 @@ class WorkflowRunsTest < ActiveSupport::TestCase assert_equal [ failed_job.id ], runs.map { |run| run[:job_id] } end + test "ignores change detection jobs in workflow run history" do + DashboardJobRows.create_job!( + job_class_name: "R3x::ChangeDetectionJob", + arguments: [ "test_workflow", { "trigger_key" => "feed:123" } ], + created_at: 5.minutes.ago, + updated_at: 1.minute.ago + ) + + runs = Workflow::Runs.new.all + + assert_empty runs + end + private def seed_runtime_catalog SolidQueue::RecurringTask.create!( diff --git a/test/lib/r3x/dashboard/workflow_summaries_test.rb b/test/lib/r3x/dashboard/workflow_summaries_test.rb index 2dff421..7894bf5 100644 --- a/test/lib/r3x/dashboard/workflow_summaries_test.rb +++ b/test/lib/r3x/dashboard/workflow_summaries_test.rb @@ -36,6 +36,22 @@ class WorkflowSummariesTest < ActiveSupport::TestCase assert summary[:trigger_entries].first[:run_now_available] end + test "shows generic run now when only change detection metadata exists" do + SolidQueue::RecurringTask.create!( + key: "workflow:test_workflow:feed:abc123", + schedule: "*/5 * * * *", + class_name: Workflow::Catalog::CHANGE_DETECTION_CLASS_NAME, + arguments: [ "test_workflow", { "trigger_key" => "feed:abc123" } ], + queue_name: "feeds", + static: false + ) + + summary = Workflow::Summaries.new.find!("test_workflow") + + assert summary[:run_now_available] + refute summary[:trigger_entries].first[:run_now_available] + end + test "manual-only direct workflow runs stay visible without trigger metadata" do DashboardJobRows.create_job!( job_class_name: "Workflows::ManualOnlyWorkflow", @@ -52,6 +68,22 @@ class WorkflowSummariesTest < ActiveSupport::TestCase assert_equal "Workflows::ManualOnlyWorkflow", summary[:class_name] end + test "prefers trigger error health over last run status" do + R3x::TriggerState.create!( + workflow_key: "test_dashboard_change_feed", + trigger_key: "feed:123", + trigger_type: "fake_change_detecting", + state: {}, + last_error_at: Time.current, + last_error_message: "feed offline" + ) + + summary = Workflow::Summaries.new.find!("test_dashboard_change_feed") + + assert_equal "Trigger error", summary[:health][:label] + assert_equal "feed offline", summary[:health][:detail] + end + test "orders the catalog by health severity by default" do create_dashboard_workflow(workflow_key: "idle_workflow", trigger_key: "schedule:idle") create_dashboard_workflow( @@ -66,9 +98,15 @@ class WorkflowSummariesTest < ActiveSupport::TestCase run_status: "failed", recorded_at: 2.minutes.ago ) + create_dashboard_workflow( + workflow_key: "trigger_error_workflow", + trigger_key: "feed:error", + trigger_error_at: 1.minute.ago + ) + summaries = Workflow::Summaries.new.all - assert_equal %w[failed_workflow healthy_workflow idle_workflow], summaries.map { |summary| summary[:workflow_key] } + assert_equal %w[trigger_error_workflow failed_workflow healthy_workflow idle_workflow], summaries.map { |summary| summary[:workflow_key] } end test "reverses health severity order when sorted descending" do @@ -85,9 +123,15 @@ class WorkflowSummariesTest < ActiveSupport::TestCase run_status: "failed", recorded_at: 2.minutes.ago ) + create_dashboard_workflow( + workflow_key: "trigger_error_workflow", + trigger_key: "feed:error", + trigger_error_at: 1.minute.ago + ) + summaries = Workflow::Summaries.new(sort: "health", direction: "desc").all - assert_equal %w[idle_workflow healthy_workflow failed_workflow], summaries.map { |summary| summary[:workflow_key] } + assert_equal %w[idle_workflow healthy_workflow failed_workflow trigger_error_workflow], summaries.map { |summary| summary[:workflow_key] } end test "supports workflow and last run sorting" do @@ -177,7 +221,7 @@ def clear_tables TestDbCleanup.clear_runtime_tables! end - def create_dashboard_workflow(workflow_key:, trigger_key:, run_status: nil, recorded_at: nil) + def create_dashboard_workflow(workflow_key:, trigger_key:, run_status: nil, recorded_at: nil, trigger_error_at: nil) job_class_name = DashboardTestWorkflows.ensure_class(workflow_key.camelize) SolidQueue::RecurringTask.create!( @@ -189,6 +233,17 @@ def create_dashboard_workflow(workflow_key:, trigger_key:, run_status: nil, reco static: false ) + if trigger_error_at.present? + R3x::TriggerState.create!( + workflow_key: workflow_key, + trigger_key: trigger_key, + trigger_type: "feed", + state: {}, + last_error_at: trigger_error_at, + last_error_message: "#{workflow_key} error" + ) + end + return if run_status.blank? job = DashboardJobRows.create_job!( diff --git a/test/lib/r3x/recurring_tasks_config_test.rb b/test/lib/r3x/recurring_tasks_config_test.rb index 81100e8..90edb1d 100644 --- a/test/lib/r3x/recurring_tasks_config_test.rb +++ b/test/lib/r3x/recurring_tasks_config_test.rb @@ -1,4 +1,5 @@ require "test_helper" +require_relative "../../support/fake_change_detecting_trigger" module R3x class RecurringTasksConfigTest < ActiveSupport::TestCase @@ -44,6 +45,32 @@ def self.name Workflow::Registry.reset! end + test "generates change detection tasks for change-detecting triggers" do + fake_trigger = R3x::TestSupport::FakeChangeDetectingTrigger.new(identity: "feed") + workflow_class = Class.new(R3x::Workflow::Base) do + def self.name + "Workflows::ChangeDetectingFeed" + end + + stubs(:triggers).returns([ fake_trigger ]) + stubs(:schedulable_triggers).returns([ fake_trigger ]) + end + + R3x::Workflow::Registry.register(workflow_class) + + tasks = RecurringTasksConfig.to_h + expected_key = "workflow:change_detecting_feed:#{fake_trigger.unique_key}" + task = tasks.fetch(expected_key) + + assert_equal "R3x::ChangeDetectionJob", task["class"] + assert_equal [ "change_detecting_feed", { "trigger_key" => fake_trigger.unique_key } ], task["args"] + assert_equal "every 15 minutes", task["schedule"] + assert_equal "default", task["queue"] + ensure + Workflow::Registry.reset! + Workflow::PackLoader.load!(force: true) + end + test "schedule_all! persists dynamic tasks via SolidQueue" do ENV.delete("R3X_TIMEZONE") diff --git a/test/lib/r3x/workflow_test.rb b/test/lib/r3x/workflow_test.rb index ae6343a..2f0e3bd 100644 --- a/test/lib/r3x/workflow_test.rb +++ b/test/lib/r3x/workflow_test.rb @@ -1,4 +1,5 @@ require "test_helper" +require_relative "../../support/fake_change_detecting_trigger" module R3x class WorkflowTest < ActiveSupport::TestCase @@ -295,6 +296,30 @@ def self.name assert_match(/Supported types:.*:schedule/, error.message) end + test "rejects duplicate change-detecting trigger keys in one workflow" do + R3x::Triggers.stubs(:resolve).returns(R3x::TestSupport::FakeChangeDetectingTrigger) + + error = assert_raises(ArgumentError) do + Class.new(R3x::Workflow::Base) do + def self.name + "Workflows::DuplicateChangeDetecting" + end + + trigger :fake_change_detecting, identity: "same" + trigger :fake_change_detecting, identity: "same", cron: "every hour" + end + end + + assert_match(/Trigger with key .* already exists/, error.message) + end + + test "change-detecting trigger key does not change when only cron changes" do + trigger_one = R3x::TestSupport::FakeChangeDetectingTrigger.new(identity: "feed", cron: "every 15 minutes") + trigger_two = R3x::TestSupport::FakeChangeDetectingTrigger.new(identity: "feed", cron: "every hour") + + assert_equal trigger_one.unique_key, trigger_two.unique_key + end + # Default trigger behavior tests test "returns default Manual trigger when no triggers declared" do diff --git a/test/lib/seeds/dashboard_demo_seeder_test.rb b/test/lib/seeds/dashboard_demo_seeder_test.rb index 87f043b..60eb1f6 100644 --- a/test/lib/seeds/dashboard_demo_seeder_test.rb +++ b/test/lib/seeds/dashboard_demo_seeder_test.rb @@ -31,6 +31,7 @@ class DashboardDemoSeederTest < ActiveSupport::TestCase assert_equal %w[failed finished finished running scheduled], runs.map { |run| run[:status] }.sort assert_equal 5, SolidQueue::RecurringTask.where("key LIKE ?", "workflow:demo_%").count + assert_equal 5, R3x::TriggerState.where("workflow_key LIKE ?", "demo_%").count end test "re-seeding replaces demo records instead of duplicating them" do @@ -71,7 +72,8 @@ def runtime_counts processes: SolidQueue::Process.count, ready: SolidQueue::ReadyExecution.count, recurring_tasks: SolidQueue::RecurringTask.count, - scheduled: SolidQueue::ScheduledExecution.count + scheduled: SolidQueue::ScheduledExecution.count, + trigger_states: R3x::TriggerState.count } end end diff --git a/test/support/fake_change_detecting_trigger.rb b/test/support/fake_change_detecting_trigger.rb new file mode 100644 index 0000000..9262838 --- /dev/null +++ b/test/support/fake_change_detecting_trigger.rb @@ -0,0 +1,30 @@ +module R3x + module TestSupport + class FakeChangeDetectingTrigger < R3x::Triggers::Base + include R3x::Triggers::Concerns::CronSchedulable + include R3x::Triggers::Concerns::ChangeDetecting + + def initialize(identity:, cron: "every 15 minutes", detector: nil, **options) + @detector = detector || ->(workflow_key:, state:) { { changed: false, state: state, payload: nil } } + super(:fake_change_detecting, identity: identity, cron: cron, **options) + end + + def validate!(**) + true + end + + def cron + options[:cron] + end + + def unique_key + # Identity-based key - doesn't change when cron changes + "fake_change_detecting:#{options[:identity]}" + end + + def detect_changes(workflow_key:, state:) + @detector.call(workflow_key:, state:) + end + end + end +end diff --git a/test/support/test_db_cleanup.rb b/test/support/test_db_cleanup.rb index d06744a..33fec0c 100644 --- a/test/support/test_db_cleanup.rb +++ b/test/support/test_db_cleanup.rb @@ -14,6 +14,7 @@ def clear_runtime_tables! SolidQueue::Job.delete_all SolidQueue::Process.delete_all SolidQueue::RecurringTask.delete_all + R3x::TriggerState.delete_all rescue ActiveRecord::StatementTimeout => error attempts += 1 raise error unless error.message.include?("database is locked") && attempts < 6
<%= dashboard_trigger_kind(trigger_entry) %>
<%= dashboard_trigger_details(trigger_entry) %>
+ <% if trigger_entry[:change_detecting] %> + <%= dashboard_pill("Change detecting", "info") %> + <% end %> <% if trigger_entry[:recurring_task].blank? %> - <%= dashboard_pill("Observed from runtime", "muted") %> + <%= dashboard_pill("Observed from runtime state", "muted") %> <% end %>