Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,8 @@ export default {
}

this.playbackTimer = setInterval(() => {
if (this.playbackDateTime) {
// Don't advance time while previous playback requests are still loading
if (this.playbackDateTime && !this.playbackLoading) {
this.playbackDateTime = new Date(
this.playbackDateTime.getTime() + 1000 * this.playbackStep,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,15 +671,21 @@ export default {
}
}
}
// Decrement store counter now that playback data has been processed
this.dataChanged = true
this.updateGraphData()
})
.catch((error) => {
// eslint-disable-next-line no-console
console.error('Error fetching playback telemetry:', error)
})
.finally(() => {
// Decrement store counter now that playback request has completed
this.store.updatePlayback({
playbackLoading: Math.max(
0,
this.store.playback.playbackLoading - 1,
),
})
this.dataChanged = true
this.updateGraphData()
})
}
this.lastPlaybackDateTime = this.playbackDateTime
Expand Down
34 changes: 18 additions & 16 deletions openc3/lib/openc3/utilities/questdb_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,41 +15,43 @@
require 'base64'
require 'bigdecimal'
require 'pg'
require 'concurrent'

module OpenC3
# Utility class for QuestDB data encoding and decoding.
# This provides a common interface for serializing/deserializing COSMOS data types
# when writing to and reading from QuestDB.
class QuestDBClient
@@conn = nil
@@conn_mutex = Mutex.new
# Thread-local PG connection storage using Concurrent::ThreadLocalVar.
# Each thread gets its own connection to avoid thread-safety issues with PG::Connection.
# Connections are automatically garbage collected when threads terminate.
@thread_conn = Concurrent::ThreadLocalVar.new(nil)

# Get or create a thread-safe PG connection with type mapping configured.
# Returns a shared singleton connection — callers should not close it.
# Get or create a thread-local PG connection with type mapping configured.
# Returns the thread-local connection — callers should not close it.
def self.connection
@@conn_mutex.synchronize do
@@conn ||= PG::Connection.new(
conn = @thread_conn.value
if conn.nil? || conn.finished?
conn = PG::Connection.new(
host: ENV['OPENC3_TSDB_HOSTNAME'],
port: ENV['OPENC3_TSDB_QUERY_PORT'],
user: ENV['OPENC3_TSDB_USERNAME'],
password: ENV['OPENC3_TSDB_PASSWORD'],
dbname: 'qdb'
)
if @@conn.type_map_for_results.is_a? PG::TypeMapAllStrings
@@conn.type_map_for_results = PG::BasicTypeMapForResults.new @@conn
end
@@conn
conn.type_map_for_results = PG::BasicTypeMapForResults.new(conn)
@thread_conn.value = conn
end
conn
end

# Reset the connection (close if open, set to nil). Used after errors.
# Reset the connection for the current thread. Used after errors.
def self.disconnect
@@conn_mutex.synchronize do
if @@conn && !@@conn.finished?
@@conn.finish
end
@@conn = nil
conn = @thread_conn.value
if conn && !conn.finished?
conn.finish
end
@thread_conn.value = nil
end

# Health check — attempt to connect and immediately close.
Expand Down
8 changes: 2 additions & 6 deletions openc3/test/integration/tsdb/ruby/cvt_model_tsdb_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,12 @@
RSpec.describe OpenC3::CvtModel, :questdb do
# Reset connection before each test to ensure clean state
before(:each) do
OpenC3::CvtModel.class_variable_set(:@@conn, nil)
OpenC3::QuestDBClient.disconnect
end

after(:each) do
# Clean up connection
conn = OpenC3::CvtModel.class_variable_get(:@@conn)
if conn && !conn.finished?
conn.finish rescue nil
end
OpenC3::CvtModel.class_variable_set(:@@conn, nil)
OpenC3::QuestDBClient.disconnect
end

# Helper method to run a roundtrip test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def includes_realtime
def create_thread(objects, max_batch_size: 100)
collection = MockCollection.new(objects)
thread = LoggedStreamingThread.new(streaming_api, collection, max_batch_size, scope: scope, token: token)
thread.class.class_variable_set(:@@conn, nil) if thread.class.class_variable_defined?(:@@conn)
OpenC3::QuestDBClient.disconnect
thread
end

Expand Down Expand Up @@ -173,20 +173,12 @@ def run_streaming_test(write_options, value_type: :RAW, expected_key: 'expected_
end

before(:each) do
if LoggedStreamingThread.class_variable_defined?(:@@conn)
conn = LoggedStreamingThread.class_variable_get(:@@conn)
conn&.finish rescue nil
LoggedStreamingThread.class_variable_set(:@@conn, nil)
end
OpenC3::QuestDBClient.disconnect
streaming_api.clear
end

after(:each) do
if LoggedStreamingThread.class_variable_defined?(:@@conn)
conn = LoggedStreamingThread.class_variable_get(:@@conn)
conn&.finish rescue nil
LoggedStreamingThread.class_variable_set(:@@conn, nil)
end
OpenC3::QuestDBClient.disconnect
end

describe 'INT (signed integer) streaming' do
Expand Down
197 changes: 99 additions & 98 deletions playwright/tests/tlm-viewer.p.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ async function showScreen(
utils,
target,
screen,
close = true,
callback = async () => {},
) {
await page.locator('[data-test="select-target"]').click()
Expand All @@ -52,10 +53,12 @@ async function showScreen(
).toBeVisible()
}
await callback()
await page.locator('[data-test=close-screen-icon]').click()
await expect(
page.locator(`.v-toolbar:has-text("${target} ${screen}")`),
).not.toBeVisible()
if (close) {
await page.locator('[data-test=close-screen-icon]').click()
await expect(
page.locator(`.v-toolbar:has-text("${target} ${screen}")`),
).not.toBeVisible()
}
}

async function deleteScreen(page, utils, target, screen) {
Expand Down Expand Up @@ -92,7 +95,7 @@ test('displays INST BLOCK', async ({ page, utils }) => {
})

test('displays INST COMMANDING', async ({ page, utils }) => {
await showScreen(page, utils, 'INST', 'COMMANDING', async function () {
await showScreen(page, utils, 'INST', 'COMMANDING', true, async function () {
// This is the expansion panel button
await page.locator('#innerapp button').first().click()

Expand Down Expand Up @@ -131,7 +134,7 @@ test('displays INST GROUND', async ({ page, utils }) => {
})

test('displays INST HS', async ({ page, utils }) => {
await showScreen(page, utils, 'INST', 'HS', async function () {
await showScreen(page, utils, 'INST', 'HS', true, async function () {
await expect(page.locator('text=Health and Status')).toBeVisible()
await page.locator('[data-test=minimize-screen-icon]').click()
await expect(page.locator('text=Health and Status')).not.toBeVisible()
Expand Down Expand Up @@ -197,7 +200,7 @@ test('displays INST ROLLUP', async ({ page, utils }) => {

test('displays INST SIMPLE', async ({ page, utils }) => {
const text = 'TEST' + Math.floor(Math.random() * 10000)
await showScreen(page, utils, 'INST', 'SIMPLE', async function () {
await showScreen(page, utils, 'INST', 'SIMPLE', true, async function () {
await expect(page.locator(`text=${text}`)).not.toBeVisible()
await page.locator('[data-test=edit-screen-icon]').click()
await page.locator('textarea').fill(`SCREEN AUTO AUTO 0.5
Expand Down Expand Up @@ -276,7 +279,7 @@ test('opens SCREEN documentation from context menu', async ({
page,
utils,
}) => {
await showScreen(page, utils, 'INST', 'ADCS', async function () {
await showScreen(page, utils, 'INST', 'ADCS', true, async function () {
await page.locator('[data-test=edit-screen-icon]').click()
await expect(
page.locator(`.v-toolbar:has-text("Edit Screen")`),
Expand All @@ -296,97 +299,95 @@ test('opens SCREEN documentation from context menu', async ({
})

test('plays back to a screen', async ({ page, utils }) => {
await showScreen(page, utils, 'INST', 'ADCS', async function () {
// Helper to parse timestamp string to epoch seconds
const parseTime = (timeStr: string): number => {
// Format: "2026-01-26 15:16:13.116"
return Math.floor(new Date(timeStr).getTime() / 1000)
}

// Define the locator for the PACKET_TIMEFORMATTED value
const packetTimeInput = page
.locator('[data-test="label"]:has-text("PACKET_TIMEFORMATTED")')
.locator('..')
.locator('[data-test="value"] input')

// Wait for it to have a valid timestamp value before entering playback mode
await expect(packetTimeInput).toHaveValue(
/\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}/,
)
await showScreen(page, utils, 'INST', 'ADCS', false)
await showScreen(page, utils, 'INST', 'GRAPHS', false)

await page.locator('[data-test="telemetry-viewer-file"]').click()
await page.getByRole('checkbox', { name: 'Playback Mode' }).check()
await page.keyboard.press('Escape') // Close the file menu
// Helper to parse timestamp string to epoch seconds
const parseTime = (timeStr: string): number => {
// Format: "2026-01-26 15:16:13.116"
return Math.floor(new Date(timeStr).getTime() / 1000)
}

// Open and close the time dialog
await page
.locator('[data-test="playback-time"]')
.getByRole('button')
.click()
await utils.sleep(500)
await page.keyboard.press('Escape')
// Define the locator for the PACKET_TIMEFORMATTED value
const packetTimeInput = page
.locator('[data-test="label"]:has-text("PACKET_TIMEFORMATTED")')
.locator('..')
.locator('[data-test="value"] input')

const start = sub(new Date(), { minutes: 2 })
await page
.locator('[data-test=playback-date] input')
.fill(format(start, 'yyyy-MM-dd'))
await page
.locator('[data-test=playback-time] input')
.fill(format(start, 'HH:mm:ss'))

// Click play, wait for time to increment, then pause
await page.getByRole('button', { name: 'Play / Pause' }).click()
await utils.sleep(1100)
// Get time after starting playback
let previousTime = parseTime(await packetTimeInput.inputValue())
await expect
.poll(async () => parseTime(await packetTimeInput.inputValue()))
.toBeGreaterThan(previousTime)
await page.getByRole('button', { name: 'Play / Pause' }).click()
await utils.sleep(500)

// Verify step forward increments by 1s
previousTime = parseTime(await packetTimeInput.inputValue())
await page.locator('[data-test="playback-step-forward"]').click()
await expect
.poll(async () => parseTime(await packetTimeInput.inputValue()))
.toBe(previousTime + 1)

// Verify step backward decrements by 1s
previousTime = parseTime(await packetTimeInput.inputValue())
await page.locator('[data-test="playback-step-backward"]').click()
await expect
.poll(async () => parseTime(await packetTimeInput.inputValue()))
.toBe(previousTime - 1)

// Verify skip forward increments by 15s
previousTime = parseTime(await packetTimeInput.inputValue())
await page.locator('[data-test="playback-skip-forward"]').click()
await expect
.poll(async () => parseTime(await packetTimeInput.inputValue()))
.toBe(previousTime + 10)

// Verify skip backward decrements by 15s
previousTime = parseTime(await packetTimeInput.inputValue())
await page.locator('[data-test="playback-skip-backward"]').click()
await expect
.poll(async () => parseTime(await packetTimeInput.inputValue()))
.toBe(previousTime - 10)

// Change step value to 2 and verify step forward increments by 2s
await page.getByRole('spinbutton', { name: 'Step (Speed)' }).fill('2')
previousTime = parseTime(await packetTimeInput.inputValue())
await page.locator('[data-test="playback-step-forward"]').click()
await expect
.poll(async () => parseTime(await packetTimeInput.inputValue()))
.toBe(previousTime + 2)

// Change skip value to 15 and verify skip forward increments by 15s
await page.getByRole('spinbutton', { name: 'Skip' }).fill('15')
previousTime = parseTime(await packetTimeInput.inputValue())
await page.locator('[data-test="playback-skip-forward"]').click()
await expect
.poll(async () => parseTime(await packetTimeInput.inputValue()))
.toBe(previousTime + 15)
})
// Wait for it to have a valid timestamp value before entering playback mode
await expect(packetTimeInput).toHaveValue(
/\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}/,
)

await page.locator('[data-test="telemetry-viewer-file"]').click()
await page.getByRole('checkbox', { name: 'Playback Mode' }).check()
await page.keyboard.press('Escape') // Close the file menu

// Open and close the time dialog
await page.locator('[data-test="playback-time"]').getByRole('button').click()
await utils.sleep(500)
await page.keyboard.press('Escape')

const start = sub(new Date(), { minutes: 2 })
await page
.locator('[data-test=playback-date] input')
.fill(format(start, 'yyyy-MM-dd'))
await page
.locator('[data-test=playback-time] input')
.fill(format(start, 'HH:mm:ss'))

// Click play, wait for time to increment, then pause
await page.getByRole('button', { name: 'Play / Pause' }).click()
await utils.sleep(1100)
// Get time after starting playback
let previousTime = parseTime(await packetTimeInput.inputValue())
await expect
.poll(async () => parseTime(await packetTimeInput.inputValue()))
.toBeGreaterThan(previousTime)
await page.getByRole('button', { name: 'Play / Pause' }).click()
await utils.sleep(500)

// Verify step forward increments by 1s
previousTime = parseTime(await packetTimeInput.inputValue())
await page.locator('[data-test="playback-step-forward"]').click()
await expect
.poll(async () => parseTime(await packetTimeInput.inputValue()))
.toBe(previousTime + 1)

// Verify step backward decrements by 1s
previousTime = parseTime(await packetTimeInput.inputValue())
await page.locator('[data-test="playback-step-backward"]').click()
await expect
.poll(async () => parseTime(await packetTimeInput.inputValue()))
.toBe(previousTime - 1)

// Verify skip forward increments by 15s
previousTime = parseTime(await packetTimeInput.inputValue())
await page.locator('[data-test="playback-skip-forward"]').click()
await expect
.poll(async () => parseTime(await packetTimeInput.inputValue()))
.toBe(previousTime + 10)

// Verify skip backward decrements by 15s
previousTime = parseTime(await packetTimeInput.inputValue())
await page.locator('[data-test="playback-skip-backward"]').click()
await expect
.poll(async () => parseTime(await packetTimeInput.inputValue()))
.toBe(previousTime - 10)

// Change step value to 2 and verify step forward increments by 2s
await page.getByRole('spinbutton', { name: 'Step (Speed)' }).fill('2')
previousTime = parseTime(await packetTimeInput.inputValue())
await page.locator('[data-test="playback-step-forward"]').click()
await expect
.poll(async () => parseTime(await packetTimeInput.inputValue()))
.toBe(previousTime + 2)

// Change skip value to 15 and verify skip forward increments by 15s
await page.getByRole('spinbutton', { name: 'Skip' }).fill('15')
previousTime = parseTime(await packetTimeInput.inputValue())
await page.locator('[data-test="playback-skip-forward"]').click()
await expect
.poll(async () => parseTime(await packetTimeInput.inputValue()))
.toBe(previousTime + 15)
})
Loading