@@ -450,7 +450,7 @@ def rsync_result(update: RSyncerUpdate):
450450 force_mdoc_metadata = self .force_mdoc_metadata ,
451451 limited = limited ,
452452 )
453- self .analysers [source ].subscribe (partial ( self ._start_dc , from_form = True ) )
453+ self .analysers [source ].subscribe (self ._start_dc )
454454 self .analysers [source ].start ()
455455 if transfer :
456456 self .rsync_processes [source ].subscribe (self .analysers [source ].enqueue )
@@ -492,17 +492,13 @@ def _rsync_update_converter(p: Path) -> None:
492492 )
493493 self ._environment .watchers [source ].start ()
494494
495- def _start_dc (self , metadata_json , from_form : bool = False ):
495+ def _start_dc (self , metadata_json ):
496496 if self .dummy_dc :
497497 return
498- # for multigrid the analyser sends the message straight to _start_dc by-passing user input
499- # it is then necessary to extract the data from the message
500- if from_form :
501- metadata_json = metadata_json .get ("form" , {})
502- # Safely convert all entries into strings, but leave None as-is
503- metadata_json = {
504- k : str (v ) if v is not None else None for k , v in metadata_json .items ()
505- }
498+ # Safely convert all entries into strings, but leave None as-is
499+ metadata_json = {
500+ k : str (v ) if v is not None else None for k , v in metadata_json .items ()
501+ }
506502 self ._environment .dose_per_frame = metadata_json .get ("dose_per_frame" )
507503 self ._environment .gain_ref = metadata_json .get ("gain_ref" )
508504 self ._environment .symmetry = metadata_json .get ("symmetry" )
@@ -576,82 +572,76 @@ def _start_dc(self, metadata_json, from_form: bool = False):
576572 environment = self ._environment ,
577573 token = self .token ,
578574 )
579- if from_form :
580- data = {
581- "voltage" : metadata_json ["voltage" ],
582- "pixel_size_on_image" : metadata_json ["pixel_size_on_image" ],
583- "experiment_type" : metadata_json ["experiment_type" ],
584- "image_size_x" : metadata_json ["image_size_x" ],
585- "image_size_y" : metadata_json ["image_size_y" ],
586- "file_extension" : metadata_json ["file_extension" ],
587- "acquisition_software" : metadata_json ["acquisition_software" ],
588- "image_directory" : str (
589- self ._environment .default_destinations [source ]
590- ),
591- "tag" : str (source ),
592- "source" : str (source ),
593- "magnification" : metadata_json ["magnification" ],
594- "total_exposed_dose" : metadata_json .get ("total_exposed_dose" ),
595- "c2aperture" : metadata_json .get ("c2aperture" ),
596- "exposure_time" : metadata_json .get ("exposure_time" ),
597- "slit_width" : metadata_json .get ("slit_width" ),
598- "phase_plate" : metadata_json .get ("phase_plate" , False ),
599- }
575+ data = {
576+ "voltage" : metadata_json ["voltage" ],
577+ "pixel_size_on_image" : metadata_json ["pixel_size_on_image" ],
578+ "experiment_type" : metadata_json ["experiment_type" ],
579+ "image_size_x" : metadata_json ["image_size_x" ],
580+ "image_size_y" : metadata_json ["image_size_y" ],
581+ "file_extension" : metadata_json ["file_extension" ],
582+ "acquisition_software" : metadata_json ["acquisition_software" ],
583+ "image_directory" : str (self ._environment .default_destinations [source ]),
584+ "tag" : str (source ),
585+ "source" : str (source ),
586+ "magnification" : metadata_json ["magnification" ],
587+ "total_exposed_dose" : metadata_json .get ("total_exposed_dose" ),
588+ "c2aperture" : metadata_json .get ("c2aperture" ),
589+ "exposure_time" : metadata_json .get ("exposure_time" ),
590+ "slit_width" : metadata_json .get ("slit_width" ),
591+ "phase_plate" : metadata_json .get ("phase_plate" , False ),
592+ }
593+ capture_post (
594+ base_url = str (self ._environment .url .geturl ()),
595+ router_name = "workflow.router" ,
596+ function_name = "start_dc" ,
597+ token = self .token ,
598+ visit_name = self ._environment .visit ,
599+ session_id = self .session_id ,
600+ data = data ,
601+ )
602+ for recipe in (
603+ "em-spa-preprocess" ,
604+ "em-spa-extract" ,
605+ "em-spa-class2d" ,
606+ "em-spa-class3d" ,
607+ "em-spa-refine" ,
608+ ):
600609 capture_post (
601610 base_url = str (self ._environment .url .geturl ()),
602611 router_name = "workflow.router" ,
603- function_name = "start_dc " ,
612+ function_name = "register_proc " ,
604613 token = self .token ,
605614 visit_name = self ._environment .visit ,
606615 session_id = self .session_id ,
607- data = data ,
608- )
609- for recipe in (
610- "em-spa-preprocess" ,
611- "em-spa-extract" ,
612- "em-spa-class2d" ,
613- "em-spa-class3d" ,
614- "em-spa-refine" ,
615- ):
616- capture_post (
617- base_url = str (self ._environment .url .geturl ()),
618- router_name = "workflow.router" ,
619- function_name = "register_proc" ,
620- token = self .token ,
621- visit_name = self ._environment .visit ,
622- session_id = self .session_id ,
623- data = {
624- "tag" : str (source ),
625- "source" : str (source ),
626- "recipe" : recipe ,
627- },
628- )
629- log .info (f"Posting SPA processing parameters: { metadata_json } " )
630- response = capture_post (
631- base_url = str (self ._environment .url .geturl ()),
632- router_name = "workflow.spa_router" ,
633- function_name = "register_spa_proc_params" ,
634- token = self .token ,
635- session_id = self .session_id ,
636616 data = {
637- ** {
638- k : None if v == "None" else v
639- for k , v in metadata_json .items ()
640- },
641617 "tag" : str (source ),
618+ "source" : str (source ),
619+ "recipe" : recipe ,
642620 },
643621 )
644- if response and not str (response .status_code ).startswith ("2" ):
645- log .warning (f"{ response .reason } " )
646- capture_post (
647- base_url = str (self ._environment .url .geturl ()),
648- router_name = "workflow.spa_router" ,
649- function_name = "flush_spa_processing" ,
650- token = self .token ,
651- visit_name = self ._environment .visit ,
652- session_id = self .session_id ,
653- data = {"tag" : str (source )},
654- )
622+ log .info (f"Posting SPA processing parameters: { metadata_json } " )
623+ response = capture_post (
624+ base_url = str (self ._environment .url .geturl ()),
625+ router_name = "workflow.spa_router" ,
626+ function_name = "register_spa_proc_params" ,
627+ token = self .token ,
628+ session_id = self .session_id ,
629+ data = {
630+ ** {k : None if v == "None" else v for k , v in metadata_json .items ()},
631+ "tag" : str (source ),
632+ },
633+ )
634+ if response and not str (response .status_code ).startswith ("2" ):
635+ log .warning (f"{ response .reason } " )
636+ capture_post (
637+ base_url = str (self ._environment .url .geturl ()),
638+ router_name = "workflow.spa_router" ,
639+ function_name = "flush_spa_processing" ,
640+ token = self .token ,
641+ visit_name = self ._environment .visit ,
642+ session_id = self .session_id ,
643+ data = {"tag" : str (source )},
644+ )
655645
656646 def _increment_file_count (
657647 self , observed_files : List [Path ], source : str , destination : str
0 commit comments