-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathWbeSerialPort.py
More file actions
154 lines (127 loc) · 5.33 KB
/
WbeSerialPort.py
File metadata and controls
154 lines (127 loc) · 5.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
########################################################################
#
# @File: WbeSerialPort.py
#
# @Author: İbrahim Alan
# Onur Güzel
#
# @Mail: ibrahimalan996@gmail.com
# onurguzel4@gmail.com
#
# @Description: This module encapsulates the access for the serial
# port. It provides backends for Python running on
# Windows, OSX, Linux, BSD etc.
#
# For more detail;
# https://pyserial.readthedocs.io/en/latest/pyserial.html
#
# @Note: Version 1.0.0 (Library version) - 16 APR 2020
#
# Created on 16 APR 2020 - THU, 16.14
#
# @Version 1.0.0 - Modified Date : 16 APR 2020
#
########################################################################
#!/usr/bin/env python
# Import Libraries
try:
import os
import time
import binascii
import struct
import multiprocessing
import serial
import WbeGlobalVariables
except ImportError:
print("Some required files could not be found for program in Serial Port...",
"\nPlease contact with the manufacturer!")
# Serial Port Class
class SerialPortClass(multiprocessing.Process):
def __init__(self, input_queue_serial, output_queue_serial):
multiprocessing.Process.__init__(self)
self.input_queue_serial = input_queue_serial
self.output_queue_serial = output_queue_serial
self.number_of_messages_sent = 0
self.number_of_messages_received = 0
# Change this to match your local settings
self.serialPortName = 'COM7'
self.serialPortBaudrate = 115200
self.serialPort = self.SerialPortConfiguration(serial.Serial(), self.serialPortName, self.serialPortBaudrate)
# Serial port configuration
def SerialPortConfiguration(self, serialPort, portName, baud):
serialPort = serial.Serial(
port = portName, # Device name or None
baudrate = baud, # Baud rate
bytesize = serial.EIGHTBITS, # Number of data bits
parity = serial.PARITY_NONE, # Disable parity checking
stopbits = serial.STOPBITS_ONE, # Number of stop bits.
timeout = 0.01, # Set a read timeout value
writeTimeout = 0.01, # Set a write timeout value
xonxoff = False, # Disable software flow control
rtscts = False, # Disable hardware flow control (RTS/CTS)
dsrdtr = False # Disable hardware flow control (DSR/DTR)
)
return serialPort
# Open serial port
def SerialPortOpen(self):
try:
self.serialPort.open()
return True
except Exception as e:
if(WbeGlobalVariables.info_print_allowed == 1):
print("Error open serial port : " + str(e))
return False
# Check open/close status of serial port
def SerialPortCheckStatus(self):
if(self.serialPort.isOpen()):
if(WbeGlobalVariables.info_print_allowed == 1):
print("Port is Available!")
return True
else:
if(WbeGlobalVariables.info_print_allowed == 1):
print("Port is Not Available!")
return False
# Close serial port
def SerialPortClose(self):
try:
self.serialPort.close()
except Exception as e:
if(WbeGlobalVariables.info_print_allowed == 1):
print("Error Close Serial Port : " + str(e))
# Write to serial port
def SerialWrite(self, data):
self.serialPort.flushInput()
self.serialPort.write(data)
self.number_of_messages_sent = self.number_of_messages_sent + 1
if(WbeGlobalVariables.info_print_allowed == 1):
print("Number Of Messages Sent :" + str(self.number_of_messages_sent))
#print("Writing to Serial Port:")
#print(data)
# Read from serial port
def SerialRead(self):
try:
coming_data = self.serialPort.read(100)
return coming_data
except Exception as err:
if(WbeGlobalVariables.info_print_allowed == 1):
print("No message")
# Scheduler
def run(self):
while True:
try:
# look for incoming tornado request
if not self.input_queue_serial.empty():
data = self.input_queue_serial.get()
# send it to the serial device
self.SerialWrite(data)
# look for incoming serial data
if (self.serialPort.inWaiting() > 0):
data = self.SerialRead()
self.output_queue_serial.put(data)
self.number_of_messages_received = self.number_of_messages_received + 1
if(WbeGlobalVariables.info_print_allowed == 1):
print("Number Of Messages Received :" + str(self.number_of_messages_received))
print("Coming Message From Serial Port:")
print(binascii.hexlify(data))
except Exception as err:
print("Serial Run Error")