Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
277 changes: 271 additions & 6 deletions tests/analog_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,23 @@
import libm2k
import time
from multiprocessing.pool import ThreadPool
import threading
import os
from pandas import DataFrame
from pathlib import Path
import pandas
import random
import sys
import reset_def_values as reset
from helpers import get_result_files, get_sample_rate_display_format, get_time_format, save_data_to_csv, plot_to_file, plot_to_file_multiline
from helpers import (
get_result_files,
get_sample_rate_display_format,
get_time_format,
save_data_to_csv,
plot_to_file,
plot_to_file_multiline,
)
from open_context import ctx_timeout, ctx
from create_files import results_file, results_dir, csv

from shapefile import shape_gen, Shape
import logging

# dicts that will be saved to csv files
shape_csv_vals = {}
Expand Down Expand Up @@ -1685,4 +1690,264 @@ def configure_trigger(trig: libm2k.M2kHardwareTrigger,
ylim=(-6, 6),
)
aout.stop()
return result
return result


def test_dual_channel_sync(
ain: libm2k.M2kAnalogIn,
aout: libm2k.M2kAnalogOut,
trig: libm2k.M2kHardwareTrigger,
ctx: libm2k.M2k,
) -> tuple[bool, str]:
"""Test dual-channel waveform synchronization with hardware oversampling.

Validation Strategy:
1. Detects falling edges in the ramp signal (CH1)
2. Verifies that triangle wave (CH0) minima align with these falling edges
within a specified tolerance (200 samples)
3. Checks monotonicity of the triangle wave before and after each minimum
using filtered data with relaxed constraints (70% threshold) to account
for noise and low ADC resolution sampling (1 MHz vs 75 MHz DAC rate)
4. Ensures both channels have sufficient signal amplitude (> 1.0V range)
"""

ctx.reset()
ctx.calibrateADC()
ctx.calibrateDAC()
ctx.setTimeout(10_000) # [ms]

file_name, dir_name, csv_path = get_result_files(gen_reports)

reset.analog_in(ain)
reset.analog_out(aout)
reset.trigger(trig)

# Generate waveform data programmatically
# CH0: Triangle wave - 750,000 samples, 0-5V amplitude
n_samples = 750_000
n_rising = 375_042
amplitude = 5.0

n_falling = n_samples - n_rising
step = amplitude / (n_rising - 1)

rising = np.linspace(0, amplitude, n_rising)
falling_start = amplitude - step
falling_end = falling_start - step * (n_falling - 1)
falling = np.linspace(falling_start, falling_end, n_falling)
ch0_buffer = np.concatenate([rising, falling])

# CH1: Ramp/stair pattern - 20 samples, values 0-4 repeated
ch1_buffer = np.tile(np.arange(5), 4).astype(float)

logger = logging.getLogger(__name__)
logger.info(f"Generated waveform data: CH0={len(ch0_buffer)} samples, CH1={len(ch1_buffer)} samples")

# Hardware configuration
dac_sr = 75_000_000 # 75 MHz base sample rate
adc_sr = 1_000_000 # 1 MHz ADC sample rate

# Oversampling configuration
ch0_oversampling = 1 # native rate
ch1_oversampling = 750_000 # repeat each sample 750000 times

# Trigger configuration
trig_delay = 8_000 # Pre-trigger delay in samples
trig_level = 1.5 # [V]
trig_hysteresis = 0.25 # [V]

# Capture configuration
in_samples = 75_000 # Capture 75ms worth of data at 1MHz (fewer periods for easier visual inspection)

# Configure analog input
ain.setSampleRate(adc_sr)
actual_adc_sr = ain.getSampleRate()
assert abs(actual_adc_sr - adc_sr) / adc_sr < 0.001, (
f"ADC sample rate mismatch: expected {adc_sr}, got {actual_adc_sr}"
)
ain.enableChannel(libm2k.ANALOG_IN_CHANNEL_1, True)
ain.enableChannel(libm2k.ANALOG_IN_CHANNEL_2, True)
assert ain.isChannelEnabled(libm2k.ANALOG_IN_CHANNEL_1), "ADC CH1 not enabled"
assert ain.isChannelEnabled(libm2k.ANALOG_IN_CHANNEL_2), "ADC CH2 not enabled"
ain.setRange(libm2k.ANALOG_IN_CHANNEL_1, libm2k.PLUS_MINUS_25V)
ain.setRange(libm2k.ANALOG_IN_CHANNEL_2, libm2k.PLUS_MINUS_25V)
assert ain.getRange(libm2k.ANALOG_IN_CHANNEL_1) == libm2k.PLUS_MINUS_25V, (
"ADC CH1 range mismatch"
)
assert ain.getRange(libm2k.ANALOG_IN_CHANNEL_2) == libm2k.PLUS_MINUS_25V, (
"ADC CH2 range mismatch"
)

# Configure analog output
aout.setSampleRate(libm2k.ANALOG_IN_CHANNEL_1, dac_sr)
aout.setSampleRate(libm2k.ANALOG_IN_CHANNEL_2, dac_sr)
assert aout.getSampleRate(libm2k.ANALOG_IN_CHANNEL_1) == dac_sr, (
f"DAC CH1 sample rate mismatch: expected {dac_sr}, got {aout.getSampleRate(libm2k.ANALOG_IN_CHANNEL_1)}"
)
assert aout.getSampleRate(libm2k.ANALOG_IN_CHANNEL_2) == dac_sr, (
f"DAC CH2 sample rate mismatch: expected {dac_sr}, got {aout.getSampleRate(libm2k.ANALOG_IN_CHANNEL_2)}"
)
aout.setOversamplingRatio(libm2k.ANALOG_IN_CHANNEL_1, ch0_oversampling)
aout.setOversamplingRatio(libm2k.ANALOG_IN_CHANNEL_2, ch1_oversampling)
assert aout.getOversamplingRatio(libm2k.ANALOG_IN_CHANNEL_1) == ch0_oversampling, (
f"DAC CH1 oversampling mismatch: expected {ch0_oversampling}, got {aout.getOversamplingRatio(libm2k.ANALOG_IN_CHANNEL_1)}"
)
assert aout.getOversamplingRatio(libm2k.ANALOG_IN_CHANNEL_2) == ch1_oversampling, (
f"DAC CH2 oversampling mismatch: expected {ch1_oversampling}, got {aout.getOversamplingRatio(libm2k.ANALOG_IN_CHANNEL_2)}"
)
aout.enableChannel(libm2k.ANALOG_IN_CHANNEL_1, True)
aout.enableChannel(libm2k.ANALOG_IN_CHANNEL_2, True)
assert aout.isChannelEnabled(libm2k.ANALOG_IN_CHANNEL_1), "DAC CH1 not enabled"
assert aout.isChannelEnabled(libm2k.ANALOG_IN_CHANNEL_2), "DAC CH2 not enabled"
aout.setCyclic(True)

# Configure trigger on CH0
trig.setAnalogSource(libm2k.ANALOG_IN_CHANNEL_1)
assert trig.getAnalogSource() == libm2k.ANALOG_IN_CHANNEL_1, (
"Trigger source mismatch"
)
trig.setAnalogSourceChannel(libm2k.ANALOG_IN_CHANNEL_1)
trig.setAnalogMode(libm2k.ANALOG_IN_CHANNEL_1, libm2k.ANALOG)
assert trig.getAnalogMode(libm2k.ANALOG_IN_CHANNEL_1) == libm2k.ANALOG, (
"Trigger mode mismatch"
)
trig.setAnalogCondition(libm2k.ANALOG_IN_CHANNEL_1, libm2k.RISING_EDGE_ANALOG)
assert (
trig.getAnalogCondition(libm2k.ANALOG_IN_CHANNEL_1) == libm2k.RISING_EDGE_ANALOG
), "Trigger condition mismatch"
trig.setAnalogLevel(libm2k.ANALOG_IN_CHANNEL_1, trig_level)
assert abs(trig.getAnalogLevel(libm2k.ANALOG_IN_CHANNEL_1) - trig_level) < 0.05, (
f"Trigger level mismatch: expected {trig_level}, got {trig.getAnalogLevel(libm2k.ANALOG_IN_CHANNEL_1)}"
)
trig.setAnalogHysteresis(libm2k.ANALOG_IN_CHANNEL_1, trig_hysteresis)
assert (
abs(trig.getAnalogHysteresis(libm2k.ANALOG_IN_CHANNEL_1) - trig_hysteresis)
< 0.05
), (
f"Trigger hysteresis mismatch: expected {trig_hysteresis}, got {trig.getAnalogHysteresis(libm2k.ANALOG_IN_CHANNEL_1)}"
)
trig.setAnalogDelay(-trig_delay) # Negative delay to see pre-trigger data
assert trig.getAnalogDelay() == -trig_delay, (
f"Trigger delay mismatch: expected {-trig_delay}, got {trig.getAnalogDelay()}"
)

# Start acquisition before pushing to ensure we capture the signal
ain.startAcquisition(in_samples)

# Synchronized push - both channels simultaneously
aout.push([ch0_buffer, ch1_buffer])

try:
input_data = ain.getSamples(in_samples)
except Exception as e:
ain.stopAcquisition()
aout.stop()
return False, f"Timeout during acquisition: {e}"

ain.stopAcquisition()
aout.stop()

ch0_data = np.array(input_data[libm2k.ANALOG_IN_CHANNEL_1]) # Triangle
ch1_data = np.array(input_data[libm2k.ANALOG_IN_CHANNEL_2]) # Ramp

if gen_reports:
subdir_name = f"{dir_name}/dual_channel_sync"
os.makedirs(subdir_name, exist_ok=True)
x_time, x_label = get_time_format(in_samples, adc_sr)
plot_to_file(
"Dual Channel Sync Test (HW Oversampling)",
ch0_data,
subdir_name,
"dual_channel_sync_captured.png",
data1=ch1_data,
x_data=x_time,
xlabel=x_label,
)

# Validation: Check synchronization between channels

# Detect falling edges in stair step signal using derivative
ch1_diff = np.diff(ch1_data)

# Find indices where there are large negative jumps (falling edges)
falling_threshold = -0.5 # Significant drop
falling_edges = np.where(ch1_diff < falling_threshold)[0]

if len(falling_edges) == 0:
return (
False,
"No falling edges detected in ramp signal - signal may not be outputting correctly",
)

# Verify signal ranges before running sync validation
ch0_range = ch0_data.max() - ch0_data.min()
ch1_range = ch1_data.max() - ch1_data.min()
if ch0_range < 1.0:
return False, f"CH0 (triangle) signal too weak: range = {ch0_range:.2f}V"
if ch1_range < 1.0:
return False, f"CH1 (ramp) signal too weak: range = {ch1_range:.2f}V"

# Validate synchronization: Check that triangle minima align with stair step falling edges
window_size = 250
tolerance = 10

sync_errors = []

for i, edge_idx in enumerate(falling_edges):
# Define window around the falling edge
window_start = max(0, edge_idx - window_size)
window_end = min(len(ch0_data), edge_idx + window_size)

# Extract triangle data (CH0) in this window
triangle_window = ch0_data[window_start:window_end]

if len(triangle_window) < 10:
logger.warning(
f"Edge {i}: insufficient data in window for analysis, skipping"
)
continue

# Find where triangle transitions from decreasing to increasing (local minimum)
min_idx = np.argmin(triangle_window)
min_position = window_start + min_idx

# Check if minimum is close to falling edge
distance = abs(min_position - edge_idx)
if distance > tolerance:
sync_errors.append(
f"Edge {i}: triangle minimum at {min_position} is {distance} samples "
f"from falling edge at {edge_idx} (tolerance: {tolerance})"
)

if min_idx > 0:
filtered_before = np.convolve(
triangle_window[: min_idx + 1], np.ones(25) / 25, mode="valid"
)
if len(filtered_before) > 1:
decreasing_count = np.sum(np.diff(filtered_before) <= 0)
total_count = len(np.diff(filtered_before))
if decreasing_count < total_count * 0.7:
sync_errors.append(
f"Edge {i}: triangle not decreasing before minimum at {min_position}. "
f"Decreasing: {decreasing_count}/{total_count} samples"
)
if min_idx < len(triangle_window) - 1:
filtered_after = np.convolve(
triangle_window[min_idx:], np.ones(25) / 25, mode="valid"
)
if len(filtered_after) > 1:
increasing_count = np.sum(np.diff(filtered_after) >= 0)
total_count = len(np.diff(filtered_after))
if increasing_count < total_count * 0.7:
sync_errors.append(
f"Edge {i}: triangle not increasing after minimum at {min_position}. "
f"Increasing: {increasing_count}/{total_count} samples"
)

if sync_errors:
return False, f"Sync errors ({len(sync_errors)}): " + "; ".join(sync_errors[:3])

return (
True,
f"Sync validated: {len(falling_edges)} stair step falling edges detected, all triangle minima within {tolerance} samples tolerance",
)
59 changes: 59 additions & 0 deletions tests/doc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Test Design Documentation

This directory contains design documentation for tests in the libm2k test suite. Each document explains the rationale behind test design decisions, making it easier to understand, maintain, and extend the test suite.

## Purpose

- **Document design decisions** - Explain why tests are structured the way they are
- **Capture domain knowledge** - Record hardware-specific considerations
- **Enable maintenance** - Help future developers understand test intent
- **Track issue references** - Link tests to the bugs/features they validate

## Document Structure

Each test document should follow this template:

```markdown
# Test: [Test Name]

## Overview
| Field | Value |
| -------------------- | ------------------------------------ |
| **Test Name** | `test_function_name` |
| **Test File** | `tests/file.py` |
| **Related Issue** | Link to GitHub issue (if applicable) |
| **Minimum Firmware** | Version requirement (if applicable) |

## Problem Statement
What problem or feature does this test validate?

## Test Configuration
What parameters are used and why?

## Design Rationale
Why was this approach chosen? What alternatives were considered?

## Expected Results
What indicates pass/fail? What error messages mean what?

## Running the Test
Command-line examples for running the test.

## Running the Test
Instructions for running the test.

## References
Links to issues, documentation, datasheets, etc.
```

## Naming Convention

Documentation files should be named after the test method they document:
- `test_dual_channel_waveform_sync.md`

## Index

| Document | Test | Description |
| ------------------------------------------------------------------------ | --------------------------------- | -------------------------------------------- |
| [test_dual_channel_waveform_sync.md](test_dual_channel_waveform_sync.md) | `test_dual_channel_waveform_sync` | Validates dual-channel phase synchronization |

Loading
Loading