-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathclient_image.py
More file actions
112 lines (91 loc) · 3.73 KB
/
client_image.py
File metadata and controls
112 lines (91 loc) · 3.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#!/usr/bin/python
import socket
import cv2
import numpy as np
import base64
import time
from datetime import datetime
from tkinter import *
from util import Util
import json
from model import ModelYolo
import ctypes
class Client:
def __init__(self, TCP_IP, TCP_PORT, saborn):
self.__saborn = saborn
self.__TCP_IP = TCP_IP
self.__TCP_PORT = TCP_PORT
self.__utility = Util()
self.model = ModelYolo()
self.connect()
def connect(self) -> None:
self.client_socket = socket.socket()
self.client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
self.client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 1)
self.client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 3)
self.client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 1)
self.client_socket.connect((self.get_host(), self.get_port()))
self.openCV()
def openCV(self) -> None:
self.capture = cv2.VideoCapture(0)
self.capture.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
self.capture.set(cv2.CAP_PROP_FRAME_HEIGHT, 640)
while self.capture.isOpened():
ret, self.frame = self.capture.read()
if not ret:
break
self.resize_frame = cv2.resize(self.frame, dsize=(640, 640), interpolation=cv2.INTER_LINEAR)
cv2.imshow('PC_cam', self.resize_frame)
results = self.model.model(self.resize_frame) # 수정됨
# results.print()
# results.show()
# DataFrame
# print(results.pandas().xyxy[0][])
# Series
# print(results.pandas().xyxy[0].confidence.values)
results = self.model.model(self.resize_frame)
# results.show()
isFlag = False
for value in results.pandas().xyxy[0].confidence.values:
if value >= 0.6:
isFlag = True
if isFlag:
cam_img, cam_length = self.img_encoding(self.resize_frame)
screen_shot, screen_shot_length = self.img_encoding(self.get_util().screen_shot())
data, data_length = self.get_infomation()
self.sendall(cam_img, cam_length)
self.sendall(screen_shot, screen_shot_length)
self.sendall(data, data_length)
ctypes.windll.user32.LockWorkStation()
print('send...')
cv2.waitKey(1)
time.sleep(1)
self.client_socket.close()
self.capture.release()
cv2.destroyAllWindows()
def sendall(self, data, length) -> None:
self.client_socket.sendall(length.encode('utf-8').ljust(64))
self.client_socket.send(data)
def img_encoding(self, image_frame) -> tuple:
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 60]
result, imgencode = cv2.imencode('.jpg', image_frame, encode_param)
data = np.array(imgencode)
img_to_byte = base64.b64encode(data)
cam_img_length = str(len(img_to_byte))
return (img_to_byte, cam_img_length)
def get_infomation(self) -> tuple:
data = self.get_util().create_infomation(self.get_saborn())
data['datetime'] = self.get_time()
to_json_data = json.dumps(data).encode('utf-8')
data_length = str(len(to_json_data))
return (to_json_data, data_length)
def get_time(self) -> tuple:
return datetime.utcnow().strftime("%Y/%m/%D %H:%M:%S")
def get_util(self):
return self.__utility
def get_host(self):
return self.__TCP_IP
def get_port(self):
return self.__TCP_PORT
def get_saborn(self):
return self.__saborn