Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions app/controllers/admin/streams_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,11 @@ def index

persist_sort_preferences if persist_sort_preferences?

@pagy, @streams = pagy(
Stream.includes(:streamer)
.filtered(filter_params)
.sorted(@sort_column, @sort_direction),
items: 20,
)
streams_scope = Stream.filtered(filter_params)
streams_scope = streams_scope.includes(:streamer) if streams_scope.where.not(streamer_id: nil).exists?
streams_scope = streams_scope.sorted(@sort_column, @sort_direction)

@pagy, @streams = pagy(streams_scope, items: 20)

respond_to do |format|
format.html
Expand Down
12 changes: 12 additions & 0 deletions app/models/stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
# ended_at :datetime
# is_archived :boolean default(false)
#
# rubocop:disable Metrics/ClassLength
class Stream < ApplicationRecord
include StreamLocationHandling
include StreamStatusTracking
Expand All @@ -35,6 +36,7 @@ class Stream < ApplicationRecord
belongs_to :location, optional: true, inverse_of: :streams
has_many :timestamp_streams, dependent: :destroy
has_many :timestamps, through: :timestamp_streams
attr_accessor :streamer_name

# Enums
enum :status, {
Expand Down Expand Up @@ -171,8 +173,17 @@ class Stream < ApplicationRecord
# Callbacks
before_validation :normalize_link
before_validation :set_posted_by
before_create :assign_streamer_from_source

# Instance methods
def assign_streamer_from_source(candidate_name: nil)
return if streamer_id.present?

candidate_name ||= streamer_name
streamer = Streamer.resolve_for_stream(self, candidate_name: candidate_name)
self.streamer = streamer if streamer.present?
end

def pin!
update!(is_pinned: true)
end
Expand Down Expand Up @@ -213,3 +224,4 @@ def broadcast_streamer_updates
})
end
end
# rubocop:enable Metrics/ClassLength
62 changes: 62 additions & 0 deletions app/models/streamer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,54 @@ class Streamer < ApplicationRecord
# Validations
validates :name, presence: true, uniqueness: { case_sensitive: false }

# Class methods
def self.resolve_for_stream(stream, candidate_name: nil)
return nil if stream.nil?
return nil if stream.user.blank?

name = candidate_name.presence || stream.source
return nil if name.blank?

streamer = find_by_platform_source(stream.platform, stream.source)
streamer ||= lookup_by_normalized_name(name)
streamer ||= create_from_stream(stream, name)
return nil if streamer.nil?

streamer.ensure_account(platform: stream.platform, source: stream.source)
streamer
end

def self.find_by_platform_source(platform, source)
return nil if platform.blank? || source.blank?
return nil unless StreamerAccount.platforms.key?(platform)

normalized = source.to_s.strip.downcase
account = StreamerAccount
.includes(:streamer)
.where(platform: platform)
.where("LOWER(username) = ?", normalized)
.first
account&.streamer
end

def self.lookup_by_normalized_name(name)
return nil if name.blank?

Streamer.where("LOWER(name) = ?", name.to_s.strip.downcase).first
end

def self.create_from_stream(stream, name)
return nil if name.blank?
return nil if stream.user.blank?

Streamer.create!(
name: name,
user: stream.user,
posted_by: stream.posted_by.presence || stream.user.email,
notes: stream.notes.presence,
)
end

# Scopes
scope :with_active_accounts, -> { joins(:streamer_accounts).where(streamer_accounts: { is_active: true }).distinct }
scope :by_platform, lambda { |platform|
Expand All @@ -35,6 +83,20 @@ class Streamer < ApplicationRecord
before_save :set_posted_by

# Instance methods
def ensure_account(platform:, source:)
return if platform.blank? || source.blank?
return unless StreamerAccount.platforms.key?(platform)

normalized = source.to_s.strip.downcase
existing = streamer_accounts
.where(platform: platform)
.where("LOWER(username) = ?", normalized)
.first
return if existing.present?

streamer_accounts.create!(platform: platform, username: normalized)
end

def active_stream
streams.live.not_archived.order(started_at: :desc).first
end
Expand Down
36 changes: 35 additions & 1 deletion db/seeds.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,30 @@
Rails.logger.debug "========================================\n"
end

def normalize_seed_platform(value)
key = value.to_s.strip.downcase
StreamerAccount.platforms.key?(key) ? key : nil
end

def find_or_create_seed_streamer(stream_data, user)
name = stream_data[:streamer_name].presence || stream_data[:source]
return nil if name.blank?

streamer = Streamer.find_or_create_by!(name: name) do |record|
record.user = user
record.notes = stream_data[:notes]
record.posted_by = stream_data[:posted_by]
end

platform = normalize_seed_platform(stream_data[:platform])
username = stream_data[:source].to_s.strip
if platform.present? && username.present?
StreamerAccount.find_or_create_by!(streamer: streamer, platform: platform, username: username)
end

streamer
end

# Create sample streams
if Stream.none? && Rails.env.local?
# Get the admin and editor users
Expand Down Expand Up @@ -146,9 +170,11 @@
]

sample_streams.each do |stream_data|
streamer = find_or_create_seed_streamer(stream_data, admin)
Stream.create!(
**stream_data,
user: admin,
streamer: streamer,
last_checked_at: rand(1..24).hours.ago,
last_live_at: stream_data[:status] == "live" ? rand(1..6).hours.ago : rand(1..30).days.ago,
)
Expand Down Expand Up @@ -200,9 +226,11 @@
]

editor_streams.each do |stream_data|
streamer = find_or_create_seed_streamer(stream_data, editor)
Stream.create!(
**stream_data,
user: editor,
streamer: streamer,
last_checked_at: rand(1..48).hours.ago,
last_live_at: stream_data[:status] == "live" ? rand(1..12).hours.ago : nil,
)
Expand All @@ -211,7 +239,7 @@
Rails.logger.debug { "Created #{editor_streams.count} sample streams for editor user" }

# Create a stream for the default user
Stream.create!(
default_stream = {
source: "user_stream",
link: "https://www.tiktok.com/@user_stream/live",
platform: "TikTok",
Expand All @@ -223,7 +251,13 @@
posted_by: "user@example.com",
orientation: "vertical",
kind: "video",
}

default_streamer = find_or_create_seed_streamer(default_stream, default_user)
Stream.create!(
**default_stream,
user: default_user,
streamer: default_streamer,
last_checked_at: 1.week.ago,
)

Expand Down
86 changes: 21 additions & 65 deletions lib/tasks/streamwall_import.rake
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class StreamwallImporter
puts "Streamwall import from #{@url}"
puts "User: #{@user.email} (ID: #{@user.id})"
puts "Mode: #{@mode} | Dry run: #{@dry_run}"
puts "Attach streamers to existing streams: #{@create_streamers}"
puts "Limit: #{@limit || 'none'} | Offset: #{@offset}"

payload = fetch_json
Expand Down Expand Up @@ -171,8 +172,11 @@ class StreamwallImporter
return
end

streamer_name = attrs[:streamer_name]
updates = build_updates(existing, attrs)
if updates.empty?
should_attach_streamer = @create_streamers && existing.streamer_id.nil?

if updates.empty? && !should_attach_streamer
results[:skipped] += 1
return
end
Expand All @@ -182,8 +186,18 @@ class StreamwallImporter
return
end

existing.update!(updates)
results[:updated] += 1
existing.assign_attributes(updates) if updates.present?
if should_attach_streamer
existing.streamer_name = streamer_name
existing.assign_streamer_from_source(candidate_name: streamer_name)
end

if existing.changed?
existing.save!
results[:updated] += 1
else
results[:skipped] += 1
end
end

def handle_new(attrs, results)
Expand All @@ -194,70 +208,11 @@ class StreamwallImporter

streamer_name = attrs.delete(:streamer_name)
stream = @user.streams.build(attrs)
attach_streamer(stream, streamer_name)
stream.streamer_name = streamer_name
stream.save!
results[:created] += 1
end

def attach_streamer(stream, streamer_name)
return unless @create_streamers

platform = stream.platform
source = stream.source
candidate_name = streamer_name.presence || source

streamer = find_streamer_by_platform_source(platform, source)
streamer ||= find_streamer_by_name(candidate_name)
streamer ||= create_streamer_from_stream(stream, candidate_name)

return if streamer.nil?

ensure_streamer_account(streamer, platform, source)
stream.streamer = streamer
end

def find_streamer_by_platform_source(platform, source)
return nil if platform.blank? || source.blank?

normalized = source.to_s.strip.downcase
account = StreamerAccount
.includes(:streamer)
.where(platform: platform)
.where("LOWER(username) = ?", normalized)
.first
account&.streamer
end

def find_streamer_by_name(name)
return nil if name.blank?

Streamer.where("LOWER(name) = ?", name.to_s.strip.downcase).first
end

def create_streamer_from_stream(stream, name)
return nil if name.blank?

Streamer.create!(
name: name,
user: @user,
posted_by: stream.posted_by.presence,
notes: stream.notes.presence,
)
end

def ensure_streamer_account(streamer, platform, source)
return if streamer.nil? || platform.blank? || source.blank?

normalized = source.to_s.strip.downcase
existing = streamer.streamer_accounts
.where(platform: platform)
.where("LOWER(username) = ?", normalized)
.first
return if existing.present?

streamer.streamer_accounts.create!(platform: platform, username: source)
end

def build_updates(existing, attrs)
updates = {}
attrs.each do |key, value|
Expand Down Expand Up @@ -514,14 +469,15 @@ end

# rubocop:disable Metrics/BlockLength
namespace :streams do
desc "Import streams from a Streamwall JSON endpoint (URL=... USER_EMAIL=... MODE=upsert|skip DRY_RUN=true)"
desc "Import streams from a Streamwall JSON endpoint " \
"(URL=... USER_EMAIL=... MODE=upsert|skip DRY_RUN=true CREATE_STREAMERS=true)"
task import_streamwall: :environment do
url = ENV["URL"] || ENV["STREAMWALL_URL"] || StreamwallImporter::DEFAULT_URL
mode = (ENV["MODE"] || "upsert").to_s.downcase
dry_run = ENV.fetch("DRY_RUN", "false").to_s.downcase == "true"
limit = ENV["LIMIT"]&.to_i
offset = ENV["OFFSET"].to_i
create_streamers = ENV.fetch("CREATE_STREAMERS", "false").to_s.downcase == "true"
create_streamers = ENV.fetch("CREATE_STREAMERS", "true").to_s.downcase == "true"

user = if ENV["USER_ID"].present?
User.find(ENV["USER_ID"])
Expand Down
47 changes: 47 additions & 0 deletions spec/models/stream_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,53 @@
end
end

describe "streamer assignment" do
it "creates a streamer and account when none exists" do
stream = create(:stream, user: user, streamer: nil, source: "TestStreamer", platform: "tiktok")

expect(stream.streamer).to have_attributes(name: "TestStreamer", user: user)

account = stream.streamer.streamer_accounts.find_by(platform: "tiktok")
expect(account).to be_present
expect(account.username).to eq("teststreamer")
end

it "reuses an existing streamer by platform account match" do
streamer = create(:streamer, user: user, name: "Existing")
create(:streamer_account, streamer: streamer, platform: "tiktok", username: "sameuser")

stream = create(:stream, user: user, streamer: nil, source: "SameUser", platform: "tiktok")

expect(stream.streamer).to eq(streamer)
end

it "reuses an existing streamer by name match" do
streamer = create(:streamer, user: user, name: "NameMatch")

stream = create(:stream, user: user, streamer: nil, source: "NameMatch", platform: "tiktok")

expect(stream.streamer).to eq(streamer)
end

it "uses streamer_name when provided" do
stream = build(:stream, user: user, streamer: nil, source: "sourceuser", platform: "tiktok")
stream.streamer_name = "Display Name"

stream.save!

expect(stream.streamer).to be_present
expect(stream.streamer.name).to eq("Display Name")
end

it "does not assign when stream has no user" do
stream = build(:stream, user: nil, streamer: nil, source: "TestStreamer", platform: "tiktok")

stream.assign_streamer_from_source

expect(stream.streamer).to be_nil
end
end

describe ".sorted" do
it "orders by title when provided" do
stream_a = create(:stream, title: "Alpha")
Expand Down
Loading