From 9f81f60968142bdabdc94e573684f83809b2c3d2 Mon Sep 17 00:00:00 2001 From: Julius Kammerl Date: Sat, 26 Aug 2017 22:29:20 -0700 Subject: [PATCH] Adds end-to-end audio latency test to walt.py --- pywalt/pywalt/walt.py | 38 +++++++++++++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/pywalt/pywalt/walt.py b/pywalt/pywalt/walt.py index cf58b3b..f492f54 100644 --- a/pywalt/pywalt/walt.py +++ b/pywalt/pywalt/walt.py @@ -92,6 +92,9 @@ class Walt(object): CMD_VERSION = 'V' CMD_SAMPLE_ALL = 'Q' CMD_BRIGHTNESS_CURVE = 'U' + CMD_BEEP = 'B' + CMD_STOP_BEEP = 'S' + CMD_AUDIO = 'A' def __init__(self, serial_dev, timeout=None, encoding='utf-8'): @@ -298,7 +301,7 @@ def parse_args(argv): parser.add_argument('-s', '--serial', default=serial, help='WALT serial port') parser.add_argument('-t', '--type', - help='Test type: drag|tap|screen|sanity|curve|bridge') + help='Test type: drag|tap|screen|sanity|curve|bridge|audio') parser.add_argument('-l', '--logdir', default=temp_dir, help='where to store logs') parser.add_argument('-n', default=40, type=int, @@ -538,6 +541,37 @@ def run_walt_sanity_test(args): print(s.strip() + '\tmin-max: ' + minmax) time.sleep(0.1) +def run_audio_latency(args): + print('Starting end-to-end audio latency test') + with Walt(args.serial) as walt: + walt.sndrcv(Walt.CMD_RESET) + tstart = time.time() + t_zero = walt.zero_clock() + if t_zero < 0: + print('Error: Couldn\'t zero clock, exitting') + sys.exit(1) + + # Sleep to ensure no previous beeps interfere the measurement + walt.sndrcv(Walt.CMD_STOP_BEEP) + time.sleep(1) + + # Trigger beep & execute latency test + walt.sndrcv(Walt.CMD_BEEP) + walt.sndrcv(Walt.CMD_AUDIO) + + print('waiting for response') + # The following line blocks until a message from WALT arrives + trigger_line = walt.readline() + walt.sndrcv(Walt.CMD_STOP_BEEP) + + parts = trigger_line.strip().split() + if len(parts) != 5: + raise Exception('Malformed trigger line: "%s"\n' % trigger_line) + # Convert microseconds to milliseconds + latency_ms = float(parts[2]) * 0.001 + + print('Measured end-to-end latency: %f ms' % latency_ms) + class TcpServer: """ @@ -714,6 +748,8 @@ def main(argv=sys.argv[1:]): run_screen_curve(args) elif args.type == 'bridge': run_tcp_bridge(args) + elif args.type == 'e2e_audio': + run_audio_latency(args) else: print('Unknown test type: "%s"' % args.type)