-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathvector_vision_sdk.py
More file actions
executable file
·182 lines (155 loc) · 6.42 KB
/
vector_vision_sdk.py
File metadata and controls
executable file
·182 lines (155 loc) · 6.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
#!/usr/bin/env python3
# Copyright (c) 2018 Anki, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License in the file LICENSE.txt or at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Wait for Vector to hear "Hey Vector!" and then play an animation.
The wake_word event only is dispatched when the SDK program has
not requested behavior control. After the robot hears "Hey Vector!"
and the event is received, you can then request behavior control
and control the robot. See the 'requires_behavior_control' method in
connection.py for more information.
"""
import sys
import time
import io
import os
import json
import requests
import threading
import logging
import anki_vector
from anki_vector.events import Events
from anki_vector.util import degrees
try:
from PIL import Image
except ImportError:
sys.exit("Cannot import from PIL: Do `pip3 install --user Pillow` to install")
pic_number = 0
reboot_counter = 0
retry_counter = 0
wake_word_heard = False
wake_word_processing = False
url = os.environ['CSURL']
def main():
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
args = anki_vector.util.parse_command_args()
with anki_vector.Robot(args.serial, enable_camera_feed=True, requires_behavior_control=False, cache_animation_list=False) as robot:
evt = threading.Event()
def post_to_cognitive_services():
global pic_number
global url
image = robot.camera.latest_image
if image is None:
print("Image acquisition error!")
return
#await robot.say_text("Something's got in my eye, please reboot me!")
#await robot.conn.release_control()
image.save('/sdk/' + str(pic_number) + '.jpg', 'JPEG')
pic_number+=1
imgByteArr = io.BytesIO()
image.save(imgByteArr, format='JPEG')
imgByteArr.seek(0)
#image = image.resize((184, 96), Image.ANTIALIAS)
#print("Display image on Vector's face...")
#screen_data = anki_vector.screen.convert_image_to_screen_data(image)
#robot.screen.set_screen_with_image_data(screen_data, 4.0)
vision_base_url = url
text_recognition_url = vision_base_url + "recognizeTextDirect"
params = {'mode': 'printed'}
data = {'form': ('jpegfile', imgByteArr, 'image/jpeg')}
response = requests.post(text_recognition_url, params=params, files=data)
response.raise_for_status()
analysis = json.loads(response.text)
print(json.dumps(analysis, indent=4, sort_keys=True))
if len(analysis['lines']) != 0:
return analysis
async def my_coroutine():
global retry_counter
global wake_word_processing
print("Running my_coroutine on the connection thread")
# Connection request sometimes fails.
#
for attempt in range(2):
try:
robot.conn.CONTROL_PRIORITY_LEVEL = 20
await robot.conn.request_control(timeout=1.0)
except:
#await robot.conn.release_control()
robot.events.unsubscribe(on_wake_word, Events.wake_word)
robot.events.subscribe(on_wake_word, Events.wake_word)
wake_word_processing = False
return
else:
print ("Connected on attempt #", attempt)
break
'''
else:
await robot.conn.release_control()
print ("Unable to get control!")
wake_word_processing = False
return
'''
#reboot_counter = 0
retry_counter += 1
analysis = post_to_cognitive_services()
if not analysis or len(analysis['lines']) == 0:
if retry_counter < 3:
await robot.conn.release_control()
robot.conn.run_soon(my_coroutine())
return;
await robot.say_text("sorry, I didn't get that!")
else:
print("Number of lines: ", len(analysis['lines']))
for i in range(len(analysis['lines'])):
await robot.say_text(analysis['lines'][i]['text'])
await robot.conn.release_control()
retry_counter = 0
wake_word_processing = False
return
async def on_wake_word(event_type, event):
global wake_word_processing
if not wake_word_processing:
wake_word_processing = True
print("Callback invoked...")
robot.conn.request_control()
robot.behavior.set_head_angle(degrees(25.0))
robot.behavior.set_lift_height(0.0)
time.sleep(1)
robot.conn.run_soon(my_coroutine())
return
'''
async def on_wake_word(event_type, event):
global wake_word_heard
global reboot_counter
if not wake_word_heard:
#reboot_counter+=1
print("Callback invoked...")
wake_word_heard = True
robot.conn.run_soon(my_coroutine())
wake_word_heard = False
'''
#evt.set()
#robot.conn.request_control()
#robot.behavior.set_lift_height(0.5, duration=0.5)
#robot.behavior.set_lift_height(1.0, duration=0.7)
#robot.behavior.set_lift_height(0.0, duration=0.0)
#robot.conn.release_control()
robot.events.subscribe(on_wake_word, Events.wake_word)
print('------ Vector is waiting to hear "Hey Vector!" Press ctrl+c to exit early ------', file=sys.stderr)
try:
if not evt.wait(timeout=1000000):
print('------ Vector never heard "Hey Vector!" ------')
except KeyboardInterrupt:
pass
if __name__ == '__main__':
main()