-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathparse_workflow.py
More file actions
32 lines (24 loc) · 846 Bytes
/
parse_workflow.py
File metadata and controls
32 lines (24 loc) · 846 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import luigi
import glob
import os
from tasks.parse_tasks import ParseTask
from tasks.task_helpers import parse_yaml
from tasks.task_helpers import run_init
class ParseWorkflow(luigi.Task):
doc_dir = luigi.Parameter()
yaml_file = luigi.Parameter()
start_index = luigi.Parameter(default=0)
end_index = luigi.Parameter(default=1000)
def requires(self):
return [ParseTask(input_file=f, yaml_file=self.yaml_file) for f in self._iterator()]
def output(self):
return luigi.LocalTarget('log.txt')
def run(self):
self._configure()
print 'running'
def _configure(self):
config = parse_yaml(self.yaml_file)
run_init(config)
def _iterator(self):
for f in glob.glob(os.path.join(self.doc_dir, '*.json'))[self.start_index:self.end_index]:
yield f