-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprimary.py
More file actions
111 lines (94 loc) · 3.77 KB
/
primary.py
File metadata and controls
111 lines (94 loc) · 3.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
"""
This DAG is the primary DAG. It will:
- pick out images from the stream from detectors.
- ensure that the attributes coming in have all needed keys
If they don't, it won'process them
- This spawns the one_image_dag DAG for events
with the images expected and attributes
"""
from lightflow.models import Dag
from lightflow.tasks import PythonTask
from lightflow.models.task_data import TaskData, MultiTaskData
# TODO : make callback something else callback
#
from databroker import Broker
import matplotlib.pyplot as plt
import numpy as np
from SciStreams.workflows.one_image import one_image_dag
import numbers
# TODO : put in config files in this repo
required_attributes = {'main': {}}
typesdict = {'float': float, 'int': int, 'number': numbers.Number, 'str': str}
# filter a streamdoc with certain attributes (set in the yml file)
# required_attributes, typesdict globals needed
def filter_attributes(attr, type='main'):
'''
Filter attributes.
Note that this ultimately checks that attributes match what is seen in
yml file.
'''
#print("filterting attributes")
# get the sub required attributes
reqattr = required_attributes['main']
for key, val in reqattr.items():
if key not in attr:
print("bad attributes")
print("{} not in attributes".format(key))
return False
elif not isinstance(attr[key], typesdict[val]):
print("bad attributes")
print("key {} not an instance of {}".format(key, val))
return False
#print("good attributes")
return True
# this splits images into one image to send to tasks
def primary_func(data, store, signal, context):
run_start = data['run_start']
# md is also 'start'
md = data['md']
dbname = data['dbname']
descriptor = data['descriptor']
event = data['event']
db = Broker.named(dbname)
hdr = db[run_start]
# pull the strema name
stream_name = descriptor['name']
fields = hdr.fields(stream_name)
print("looking for an image with keys {}".format(fields))
dag_names = list()
# eventually iterate and spawn new tasks
detector_keys = ['pilatus2M_image']
for detector_key in detector_keys:
if detector_key in fields:
print("Found , saving an image")
try:
imgs = hdr.data(detector_key, fill=True)
for img in imgs:
# give it some provenance and data
new_data = dict(img=img)
new_data['run_start'] = run_start
new_data['md'] = md.copy()
new_data['md']['detector_key'] = detector_key
new_data['descriptor'] = descriptor
new_data = TaskData(data=new_data)
new_data = MultiTaskData(dataset=new_data)
good_attr = filter_attributes(new_data['md'])
if good_attr:
print("got a good image")
# one image dags should go here
dag_name = signal.start_dag(one_image_dag, data=new_data)
print("primary node, dag name: {}".format(dag_name))
dag_names.append(dag_name)
except FileNotFoundError as exc:
print("Error, could not find file, ignoring...")
print(exc)
signal.join_dags(dag_names)
# create the main DAG that spawns others
#img_dag = Dag('img_dag')
primary_task = PythonTask(name="primary_task",
callback=primary_func, queue='cms-primary-task')
primary_dag_dict = {
primary_task: None,
}
primary_dag = Dag("primary_dag", autostart=False, queue='cms-primary')
primary_dag.define(primary_dag_dict)