From c822f183df57a2a589910c7aa27faf715a36df99 Mon Sep 17 00:00:00 2001 From: zewelor Date: Sat, 21 Mar 2026 11:49:22 +0000 Subject: [PATCH] Add rss trigger --- Gemfile | 3 + Gemfile.lock | 4 + lib/r3x/triggers/rss.rb | 122 ++++++++ .../workflows/rss_test_workflow/workflow.rb | 9 + test/lib/r3x/triggers/rss_test.rb | 269 ++++++++++++++++++ 5 files changed, 407 insertions(+) create mode 100644 lib/r3x/triggers/rss.rb create mode 100644 test/fixtures/workflows/rss_test_workflow/workflow.rb create mode 100644 test/lib/r3x/triggers/rss_test.rb diff --git a/Gemfile b/Gemfile index 260d6fc..412535c 100644 --- a/Gemfile +++ b/Gemfile @@ -16,6 +16,9 @@ gem "multi_json" # HTML/XML parsing gem "nokogiri" +# RSS/Atom feed parsing +gem "rss" + # LLM integration gem "ruby_llm" diff --git a/Gemfile.lock b/Gemfile.lock index d94ff99..c6831d1 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -398,6 +398,8 @@ GEM uber (< 0.2.0) retriable (3.4.1) rexml (3.4.4) + rss (0.3.2) + rexml rubocop (1.85.1) json (~> 2.3) language_server-protocol (~> 3.17.0.2) @@ -520,6 +522,7 @@ DEPENDENCIES propshaft puma (>= 5.0) rails (~> 8.1.2) + rss rubocop-rails-omakase ruby_llm solid_cache @@ -670,6 +673,7 @@ CHECKSUMS representable (3.2.0) sha256=cc29bf7eebc31653586849371a43ffe36c60b54b0a6365b5f7d95ec34d1ebace retriable (3.4.1) sha256=fb3f114b7d492121c158c01f3d5152b5a615c5b70d5877d0bc08c7ec3725c3bc rexml (3.4.4) sha256=19e0a2c3425dfbf2d4fc1189747bdb2f849b6c5e74180401b15734bc97b5d142 + rss (0.3.2) sha256=3bd0446d32d832cda00ba07f4b179401f903b52ea1fdaac0f1f08de61a501efa rubocop (1.85.1) sha256=3dbcf9e961baa4c376eeeb2a03913dca5e3987033b04d38fa538aa1e7406cc77 rubocop-ast (1.49.1) sha256=4412f3ee70f6fe4546cc489548e0f6fcf76cafcfa80fa03af67098ffed755035 rubocop-performance (1.26.1) sha256=cd19b936ff196df85829d264b522fd4f98b6c89ad271fa52744a8c11b8f71834 diff --git a/lib/r3x/triggers/rss.rb b/lib/r3x/triggers/rss.rb new file mode 100644 index 0000000..8712f8d --- /dev/null +++ b/lib/r3x/triggers/rss.rb @@ -0,0 +1,122 @@ +require "rss" + +module R3x + module Triggers + class Rss < Base + include Concerns::CronSchedulable + include Concerns::ChangeDetecting + + validates :url, presence: true + validates_with Validators::Url, url_field: :url + + def initialize(url: nil, cron: nil, **options) + normalized_url = url.is_a?(String) ? url.strip : url + normalized_cron = cron.is_a?(String) ? cron.strip : cron + super(:rss, url: normalized_url, cron: normalized_cron, **options) + end + + def url + options[:url] + end + + def cron + options[:cron] + end + + def unique_key + "rss:#{Digest::SHA256.hexdigest(url)[0..15]}" + end + + def detect_changes(workflow_key:, state:) + response = Faraday.get(url) + raise Faraday::Error, "HTTP #{response.status}" unless response.success? + + feed = RSS::Parser.parse(response.body, false) + + current_links = extract_links(feed) + seen_links = (state[:seen_links] || []).map(&:to_s) + new_links = current_links - seen_links + + if new_links.empty? + { changed: false, state: { seen_links: current_links }, payload: nil } + else + new_items = feed.items.select { |item| + link = extract_link(item) + new_links.include?(link) + } + + { + changed: true, + state: { seen_links: current_links }, + payload: { + feed_title: extract_feed_title(feed), + feed_url: url, + new_items: new_items.map { |item| + { + title: extract_title(item), + link: extract_link(item), + published_at: extract_published(item), + description: extract_description(item) + } + } + } + } + end + end + + private + + def extract_links(feed) + feed.items.map { |item| extract_link(item) }.compact + end + + def extract_link(item) + if item.respond_to?(:link) && item.link.is_a?(RSS::Atom::Feed::Link) + item.link.href + elsif item.respond_to?(:link) && item.link.is_a?(String) + item.link.presence + elsif item.respond_to?(:guid) && item.guid + item.guid.content + end + end + + def extract_feed_title(feed) + title = if feed.respond_to?(:channel) && feed.channel + feed.channel.title + else + feed.title + end + extract_text(title) + end + + def extract_title(item) + extract_text(item.title) + end + + def extract_text(value) + return nil if value.nil? + value.respond_to?(:content) ? value.content : value.to_s + end + + def extract_published(item) + if item.respond_to?(:pubDate) && item.pubDate + item.pubDate.to_s + elsif item.respond_to?(:published) && item.published + item.published.to_s + elsif item.respond_to?(:updated) && item.updated + item.updated.to_s + end + end + + def extract_description(item) + if item.respond_to?(:description) && item.description + item.description + elsif item.respond_to?(:content) && item.content + item.content + elsif item.respond_to?(:summary) && item.summary + item.summary + end + end + end + end +end diff --git a/test/fixtures/workflows/rss_test_workflow/workflow.rb b/test/fixtures/workflows/rss_test_workflow/workflow.rb new file mode 100644 index 0000000..9ce81f5 --- /dev/null +++ b/test/fixtures/workflows/rss_test_workflow/workflow.rb @@ -0,0 +1,9 @@ +module Workflows + class RssTestWorkflow < R3x::Workflow::Base + trigger :rss, url: "https://example.com/feed.xml", cron: "every 15 minutes" + + def run(ctx) + ctx.trigger.payload + end + end +end diff --git a/test/lib/r3x/triggers/rss_test.rb b/test/lib/r3x/triggers/rss_test.rb new file mode 100644 index 0000000..738d29e --- /dev/null +++ b/test/lib/r3x/triggers/rss_test.rb @@ -0,0 +1,269 @@ +require "test_helper" + +module R3x + class RssTriggerTest < ActiveSupport::TestCase + include ActiveJob::TestHelper + + SAMPLE_RSS_FEED = <<~XML + + + + Test Feed + https://example.com + A test RSS feed + + First Post + https://example.com/first + Description of first post + Mon, 01 Jan 2024 12:00:00 GMT + + + Second Post + https://example.com/second + Description of second post + Tue, 02 Jan 2024 12:00:00 GMT + + + + XML + + SAMPLE_RSS_FEED_UPDATED = <<~XML + + + + Test Feed + https://example.com + A test RSS feed + + Third Post + https://example.com/third + Description of third post + Wed, 03 Jan 2024 12:00:00 GMT + + + First Post + https://example.com/first + Description of first post + Mon, 01 Jan 2024 12:00:00 GMT + + + + XML + + SAMPLE_ATOM_FEED = <<~XML + + + Test Atom Feed + + + Atom Entry + + https://example.com/atom-entry + 2024-01-01T12:00:00Z + + + XML + + # Validation tests + + test "validates presence of url" do + error = assert_raises(ConfigurationError) do + Class.new(R3x::Workflow::Base) do + def self.name + "Test" + end + trigger :rss, cron: "every 15 minutes" + end + end + + assert_includes error.message, "Url can't be blank" + end + + test "validates presence of cron" do + error = assert_raises(ConfigurationError) do + Class.new(R3x::Workflow::Base) do + def self.name + "Test" + end + trigger :rss, url: "https://example.com/feed.xml" + end + end + + assert_includes error.message, "Cron can't be blank" + end + + test "validates url format" do + error = assert_raises(ConfigurationError) do + Class.new(R3x::Workflow::Base) do + def self.name + "Test" + end + trigger :rss, url: "not a url", cron: "every 15 minutes" + end + end + + assert_includes error.message, "is not a valid HTTP/HTTPS URL" + end + + test "validates cron format" do + error = assert_raises(ConfigurationError) do + Class.new(R3x::Workflow::Base) do + def self.name + "Test" + end + trigger :rss, url: "https://example.com/feed.xml", cron: "bad cron" + end + end + + assert_includes error.message, "Cron is not a valid cron expression" + end + + test "accepts valid rss trigger configuration" do + klass = Class.new(R3x::Workflow::Base) do + def self.name + "Test" + end + trigger :rss, url: "https://example.com/feed.xml", cron: "every 15 minutes" + end + + trigger = klass.schedulable_triggers.first + assert trigger + assert_equal :rss, trigger.type + assert_equal "https://example.com/feed.xml", trigger.url + assert trigger.cron_schedulable? + assert trigger.change_detecting? + end + + # unique_key tests + + test "unique_key is based on url" do + trigger = R3x::Triggers::Rss.new(url: "https://example.com/feed.xml", cron: "every 15 minutes") + assert_match(/\Arss:[a-f0-9]{16}\z/, trigger.unique_key) + end + + test "unique_key does not change when cron changes" do + trigger_one = R3x::Triggers::Rss.new(url: "https://example.com/feed.xml", cron: "every 15 minutes") + trigger_two = R3x::Triggers::Rss.new(url: "https://example.com/feed.xml", cron: "every hour") + + assert_equal trigger_one.unique_key, trigger_two.unique_key + end + + test "different urls produce different unique_keys" do + trigger_one = R3x::Triggers::Rss.new(url: "https://example.com/feed1.xml", cron: "every 15 minutes") + trigger_two = R3x::Triggers::Rss.new(url: "https://example.com/feed2.xml", cron: "every 15 minutes") + + refute_equal trigger_one.unique_key, trigger_two.unique_key + end + + # detect_changes tests + + test "first check with empty state detects all items as new" do + stub_request(:get, "https://example.com/feed.xml").to_return(status: 200, body: SAMPLE_RSS_FEED) + + trigger = R3x::Triggers::Rss.new(url: "https://example.com/feed.xml", cron: "every 15 minutes") + result = trigger.detect_changes(workflow_key: "test", state: {}) + + assert result[:changed] + assert_equal "Test Feed", result[:payload][:feed_title] + assert_equal "https://example.com/feed.xml", result[:payload][:feed_url] + assert_equal 2, result[:payload][:new_items].size + + first_item = result[:payload][:new_items].first + assert_equal "First Post", first_item[:title] + assert_equal "https://example.com/first", first_item[:link] + assert_equal "Description of first post", first_item[:description] + assert first_item[:published_at] + + assert_equal [ "https://example.com/first", "https://example.com/second" ], result[:state][:seen_links] + end + + test "second check with same feed reports no changes" do + stub_request(:get, "https://example.com/feed.xml").to_return(status: 200, body: SAMPLE_RSS_FEED) + + trigger = R3x::Triggers::Rss.new(url: "https://example.com/feed.xml", cron: "every 15 minutes") + + state = { seen_links: [ "https://example.com/first", "https://example.com/second" ] } + result = trigger.detect_changes(workflow_key: "test", state: state) + + refute result[:changed] + assert_nil result[:payload] + end + + test "detects new items added to feed" do + stub_request(:get, "https://example.com/feed.xml").to_return(status: 200, body: SAMPLE_RSS_FEED_UPDATED) + + trigger = R3x::Triggers::Rss.new(url: "https://example.com/feed.xml", cron: "every 15 minutes") + + state = { seen_links: [ "https://example.com/first", "https://example.com/second" ] } + result = trigger.detect_changes(workflow_key: "test", state: state) + + assert result[:changed] + assert_equal 1, result[:payload][:new_items].size + assert_equal "Third Post", result[:payload][:new_items].first[:title] + assert_equal "https://example.com/third", result[:payload][:new_items].first[:link] + + assert_equal [ "https://example.com/third", "https://example.com/first" ], result[:state][:seen_links] + end + + test "handles Atom feeds" do + stub_request(:get, "https://example.com/atom.xml").to_return(status: 200, body: SAMPLE_ATOM_FEED) + + trigger = R3x::Triggers::Rss.new(url: "https://example.com/atom.xml", cron: "every 15 minutes") + result = trigger.detect_changes(workflow_key: "test", state: {}) + + assert result[:changed] + assert_equal "Test Atom Feed", result[:payload][:feed_title] + assert_equal 1, result[:payload][:new_items].size + assert_equal "Atom Entry", result[:payload][:new_items].first[:title] + end + + test "raises on HTTP error" do + stub_request(:get, "https://example.com/feed.xml").to_return(status: 500, body: "Internal Server Error") + + trigger = R3x::Triggers::Rss.new(url: "https://example.com/feed.xml", cron: "every 15 minutes") + + assert_raises(Faraday::Error) do + trigger.detect_changes(workflow_key: "test", state: {}) + end + end + + test "handles empty feed gracefully" do + empty_feed = <<~XML + + + + Empty Feed + + + XML + + stub_request(:get, "https://example.com/empty.xml").to_return(status: 200, body: empty_feed) + + trigger = R3x::Triggers::Rss.new(url: "https://example.com/empty.xml", cron: "every 15 minutes") + result = trigger.detect_changes(workflow_key: "test", state: {}) + + refute result[:changed] + assert_nil result[:payload] + end + + # Integration with workflow DSL + + test "rss trigger integrates with workflow dsl" do + klass = Class.new(R3x::Workflow::Base) do + def self.name + "Workflows::RssWatcher" + end + trigger :rss, url: "https://example.com/feed.xml", cron: "every 15 minutes" + end + + trigger = klass.schedulable_triggers.first + assert_equal :rss, trigger.type + assert trigger.cron_schedulable? + assert trigger.change_detecting? + end + + test "rss trigger appears in supported types" do + assert_includes R3x::Triggers.supported_types, :rss + end + end +end