-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
141 lines (113 loc) · 4.91 KB
/
main.py
File metadata and controls
141 lines (113 loc) · 4.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# -*- coding: UTF-8 -*-
import datetime
import gc
import os
import sys
import time
import traceback
from apscheduler.schedulers.background import BackgroundScheduler
import mainFunc
from algorithm.common import configManage, dbtools, tools
from algorithm.common_tools_pdf import subject_match_tools
THREAD_NUM = 10
def loadPeriodJob():
scheduler = BackgroundScheduler()
scheduler._logger = configManage.logger
als = configManage.algorithmMap.get('time_driven', None)
if als is None:
return
for al in als:
method = mainFunc.getMethod(al)
timePara = mainFunc.getTimePara(al['period'])
scheduler.add_job(func=method, trigger = 'cron',
year=timePara['year'],
month=timePara['month'],
day=timePara['day'],
day_of_week=timePara['day_of_week'],
hour=timePara['hour'],
minute=timePara['minute'],
second=timePara['second'])
# jobs = scheduler.get_jobs()
scheduler.start()
def monitor():
# counter
alreadyProcessNum = 0
roundtimes = 1
while 1:
try:
# 清一次cache,从数据处理逻辑中,我们不需要对同一个文件进行多次的读写【后续可做成时间驱动型任务】
os.system('echo 1 > /proc/sys/vm/drop_caches')
gc.disable()
# capturen pattern
# isHaveNew = capture_pattern.process() 【后续可做成时间驱动型任务】
#
# # 重跑表提取库缺失状态
# if isHaveNew:
# pass
# mainFunc.captureTableRerun() 【周期执行?数据驱动?数据驱动的话任务优先级问题】
# 上传处理完成的pdf到hdfs. 加个限制。有三万个再上传。【后续可做成时间驱动型任务】
alreadyProcessNum += THREAD_NUM
# merge_files.process('product') if alreadyProcessNum > 30000 else 0
# 当前优先级
curAlgroP = -1
# 当前优先级是否有执行
isExeCurAlgroP = False
# ==== monitor 正式流程
for al in configManage.algorithmMap['data_driven']:
# 获取算法入口
method = mainFunc.getMethod(al)
# 获取待处理数据
paraBox = mainFunc.getProcessFileInfo(al)
# 判断是否需要休眠【休眠放到for循环外面,while循环里面】
if len(paraBox) == 0:
continue
if curAlgroP == al['priority']:
isExeCurAlgroP = True
# 分发处理
mainFunc.exeAlgorithm(al, method, paraBox, roundtimes, THREAD_NUM)
else:
if isExeCurAlgroP == True:
break
else:
curAlgroP = al['priority']
isExeCurAlgroP = True
# 分发处理
mainFunc.exeAlgorithm(al, method, paraBox, roundtimes, THREAD_NUM)
roundtimes = roundtimes + 1
# 判断是否需要休眠【休眠放到for循环外面,while循环里面】
if not isExeCurAlgroP:
configManage.logger.info('sleep[NO_DATA] ' + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
print 'sleep[NO_DATA] ' + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
time.sleep(150)
continue
gc.enable()
gc.collect()
except Exception as e:
excepttext = traceback.format_exc()
print excepttext
configManage.logger.error(excepttext)
print 'sleep[Error] ' + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
configManage.logger.error('sleep[Error] ' + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
time.sleep(600)
def temp():
sql = 'select distinct original_subject from subject_statistics'
result = dbtools.query_pdfparse(sql)
box = set()
for s in result:
text = s[0]
matchtext = subject_match_tools.getPureSbuectText(text)
box.add(matchtext)
str = tools.linkStr(list(box).sort(), '\n')
f = file('./subjects_from_db.txt', 'w+')
f.write(str)
if __name__ == '__main__':
reload(sys)
sys.setdefaultencoding('utf-8')
# 初始化配置
configManage.initConfig(False)
# 系统初始化
# mainFunc.projectInit()
# # 注册时间驱动型任务【此处写一个周期任务,周期algrithm.json及reload工程实现热加载】
# loadPeriodJob()
# 启动监控,触发数据驱动型任务d
monitor()