-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathnew_rfid_reader_allaccess.py
More file actions
196 lines (162 loc) · 7.42 KB
/
new_rfid_reader_allaccess.py
File metadata and controls
196 lines (162 loc) · 7.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from os.path import exists
import wiringpi as wiringpi2
import numpy as np
import time
import sys
from datetime import datetime
from datetime import timedelta
import csv
from time import sleep
import subprocess
import Device #this is needed for communication with servo controller
import threading #this is a built in library needed to run csv_writer and motor activation codes simultaneously
#dir_setup("/home/quailrfid/")
servo_board = Device.BoardDevice(verbose=True) # setting which board to communicate with
warn = 0
header = ['year', 'month', 'day', 'timestamp', 'PIT_tag_ID']
rfid_data = "/home/quailrfid/Data_quailforage/RFID"
#set GPIO pin
GPIO_PIN = 1
motor_data = "/home/quailrfid/Data_motor/motor"
file_extension = '.csv'
def csv_writer(data_path, datestring, header, value):
try:
if data_path: #if path to rfid_data exists, do this:
filename = datestring + '.csv'
full_path = data_path + "/" + filename
if exists(full_path):
file = open(full_path, 'a+')
tmp_writer = csv.writer(file)
tmp_writer.writerow(value)
file.close()
else:
file = open(full_path, 'w+')
tmp_writer = csv.writer(file)
tmp_writer.writerow(header)
tmp_writer.writerow(value)
file.close()
except Exception as E:
pass #pass does nothing
def csv_writer_motor(data_path, datestring, header, value):
try:
if data_path: #if path to rfid_data exists, do this:
filename = 'motor_' + datestring + '.csv'
full_path = data_path + "/" + filename
if exists(full_path):
file = open(full_path, 'a+')
tmp_writer = csv.writer(file)
tmp_writer.writerow(value)
file.close()
else:
file = open(full_path, 'w+')
tmp_writer = csv.writer(file)
tmp_writer.writerow(header)
tmp_writer.writerow(value)
file.close()
except Exception as E:
pass #pass does nothing
def activate_motor(tagid):
try:
time_now = datetime.now()
global time_detected #tell python you are referring to the global time_detected variable
if "time_detected" not in globals(): #if time_detected has no value, set it to 61 seconds ago. This is needed when restarting script
time_detected = time_now + timedelta(seconds=-301)
#print('good')
#print(time_detected)
if time_now >= (time_detected + timedelta(seconds=300)): #if current time is at least 60 seconds after previous detection, activate motor
time_detected = time_now #if condition is met, set new time_detected
#print(time_detected)
servo_board.device.set_target(5, 1300)
time.sleep(1) #length of time for motor to move before rotating back to starting position
servo_board.device.set_target(5, 2550)
global motor_activation
motor_activation = 1
except Exception as E:
pass
#wait for input from pin
def WaitForCTS():
while wiringpi2.digitalRead(GPIO_PIN):
time.sleep(0.001)
return
#initiate and open communication with RFID reader
def RFIDSetup():
response = wiringpi2.wiringPiSetup()
wiringpi2.pinMode(GPIO_PIN, 0)
fd = wiringpi2.serialOpen('/dev/serial0', 9600)
wiringpi2.serialFlush(fd)
if response != 0 and fd <= 0:
print ("Unable to Setup communications")
sys.exit()
return fd
#set reader mode to EM4102 compatible
def SetReaderMode(fd):
WaitForCTS()
wiringpi2.serialPutchar(fd, 0x76)
wiringpi2.serialPutchar(fd, 0x03) #EM/MC2000
#set polling delay to 20 ms
def SetPollingDelay(fd):
WaitForCTS()
wiringpi2.serialPutchar(fd, 0x50)
wiringpi2.serialPutchar(fd, 0x60) #262 ms
#read text from RFID reader
def ReadText(fd):
response = ""
i = 1
while i <= 5:
temp = format(wiringpi2.serialGetchar(fd), 'x').upper()
if len(temp) == 1:
temp = '0' + temp
if i <= 4:
temp = temp + "-"
response = response + temp
i = i + 1
return response
#read int from RFID reader
def ReadInt(fd):
qtydata = wiringpi2.serialDataAvail(fd)
response = 0
if qtydata > 0:
response = wiringpi2.serialGetchar(fd)
return response
def ReadTagPageZero(fd):
try:
while True:
WaitForCTS()
wiringpi2.serialPutchar(fd, 0x52)
wiringpi2.serialPutchar(fd, 0x00)
ans = ReadInt(fd)
time.sleep(0.1)
if ans == int("0xD6", 16):
ans = ReadText(fd)
dt = datetime.now()
tag = ans
if __name__ == '__main__':
fname = f"{dt.year}_{dt.month}_{dt.day}.csv"
f_path = rfid_data + "/" + fname
if exists(f_path):
pass
#time_detected = datetime.now() + timedelta(seconds=-61)
else:
global time_detected #tell python you are defining a global time_detected variable
time_detected = datetime.now() + timedelta(seconds=-61) #if csv is not present, set time_detected to 61 seconds ago
global motor_activation #using a global variable to control recording of motor activations bc csv_writer_motor did not work inside of activate_motor
motor_activation = 0
t1 = threading.Thread(target=csv_writer, args=(rfid_data, f"{dt.year}_{dt.month}_{dt.day}", header, [f"{dt.year}", f"{dt.month}", f"{dt.day}", f"{dt:%H:%M:%S.%f}", ans]))
t2 = threading.Thread(target=activate_motor, args=(ans,))
t1.start()
t2.start()
t1.join() # .join makes main code wait for thread to finish
t2.join() #motor does not finish one cycle unless I have this here
if motor_activation == 1:
csv_writer_motor(motor_data, f"{dt.year}_{dt.month}_{dt.day}", header, [f"{dt.year}", f"{dt.month}", f"{dt.day}", f"{dt:%H:%M:%S.%f}", ans])
motor_activation = 0
time.sleep(1)
except Exception as E:
pass
comms = RFIDSetup()
SetReaderMode(comms)
SetPollingDelay(comms)
ReadTagPageZero(comms)
#subprocess.run(["/home/quailrfid/maestro-set-target.sh", "/dev/ttyACM0 5 10000"], shell=True)