-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdirectLine.py
More file actions
74 lines (60 loc) · 2.97 KB
/
directLine.py
File metadata and controls
74 lines (60 loc) · 2.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
"""Python sample to call Bot Framework using DirectLine v3 API"""
# Start here for documentation: https://docs.microsoft.com/en-us/bot-framework/rest-api/bot-framework-rest-direct-line-3-0-concepts"""
# For Generating a token vs start conversation: https://docs.microsoft.com/en-us/bot-framework/rest-api/bot-framework-rest-direct-line-3-0-authentication
# Here we use start conversation as we'll be calling the bot immediately
import json
import requests
class DirectLineAPI(object):
"""Shared methods for the parsed result objects."""
def __init__(self, direct_line_secret):
self._direct_line_secret = direct_line_secret
self._base_url = 'https://directline.botframework.com/v3/directline'
self._set_headers()
self._start_conversation()
def _set_headers(self):
headers = {'Content-Type': 'application/json'}
value = ' '.join(['Bearer', self._direct_line_secret])
headers.update({'Authorization': value})
self._headers = headers
def _start_conversation(self):
# For Generating a token use
url = '/'.join([self._base_url, 'tokens/generate'])
botresponse = requests.post(url, headers=self._headers)
jsonresponse = botresponse.json()
self._token = jsonresponse['token']
# Start conversation and get us a conversationId to use
url = '/'.join([self._base_url, 'conversations'])
botresponse = requests.post(url, headers=self._headers)
# Extract the conversationID for sending messages to bot
jsonresponse = botresponse.json()
self._conversationid = jsonresponse['conversationId']
def send_message(self, text):
"""Send raw text to bot framework using directline api"""
url = '/'.join([self._base_url, 'conversations', self._conversationid, 'activities'])
jsonpayload = {
'conversationId': self._conversationid,
'type': 'message',
'from': {'id': 'user1'},
'text': text
}
botresponse = requests.post(url, headers=self._headers, json=jsonpayload)
if botresponse.status_code == 200:
return "message sent"
return "error contacting bot"
def get_message(self, i):
"""Get a response message back from the botframework using directline api"""
url = '/'.join([self._base_url, 'conversations', self._conversationid, 'activities'])
botresponse = requests.get(url, headers=self._headers,
json={'conversationId': self._conversationid})
if botresponse.status_code == 200:
jsonresponse = botresponse.json()
return jsonresponse['activities'][i]['text']
return "error contacting bot for response"
if __name__ == "__main__":
bot = DirectLineAPI('SECRET_HERE')
bot.send_message("Hi")
botresponse = bot.get_message(2)
print (botresponse)
bot.send_message("Who are you")
botresponse = bot.get_message(4)
print (botresponse)