Skip to content

Commit 0589e7d

Browse files
committed
add create time to storage
1 parent 3b5b378 commit 0589e7d

11 files changed

Lines changed: 101 additions & 55 deletions

File tree

src/runtime/processor/wasm/wasm_host.rs

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,7 @@ pub fn create_wasm_host_with_component(
454454
output_sinks: Vec<Box<dyn OutputSink>>,
455455
init_context: &crate::runtime::taskexecutor::InitContext,
456456
task_name: String,
457+
create_time: u64,
457458
) -> anyhow::Result<(Processor, Store<HostState>)> {
458459
let mut linker = Linker::new(engine);
459460

@@ -463,21 +464,9 @@ pub fn create_wasm_host_with_component(
463464
Processor::add_to_linker::<HostState, HostStateData>(&mut linker, |s| s)
464465
.map_err(|e| anyhow::anyhow!("Failed to add interfaces to linker: {}", e))?;
465466

466-
let created_at = init_context
467-
.task_storage
468-
.load_task(&task_name)
469-
.ok()
470-
.map(|info| info.created_at)
471-
.unwrap_or_else(|| {
472-
std::time::SystemTime::now()
473-
.duration_since(std::time::UNIX_EPOCH)
474-
.unwrap()
475-
.as_secs()
476-
});
477-
478467
let factory = init_context
479468
.state_storage_server
480-
.create_factory(task_name.clone(), created_at)
469+
.create_factory(task_name.clone(), create_time)
481470
.map_err(|e| anyhow::anyhow!("Failed to create state store factory: {}", e))?;
482471

483472
let mut store = Store::new(
@@ -511,11 +500,19 @@ pub fn create_wasm_host(
511500
output_sinks: Vec<Box<dyn OutputSink>>,
512501
init_context: &crate::runtime::taskexecutor::InitContext,
513502
task_name: String,
503+
create_time: u64,
514504
) -> anyhow::Result<(Processor, Store<HostState>)> {
515505
let engine = get_global_engine(wasm_bytes.len())?;
516506

517507
let component = Component::from_binary(&engine, wasm_bytes)
518508
.map_err(|e| anyhow::anyhow!("Failed to parse WebAssembly component: {}", e))?;
519509

520-
create_wasm_host_with_component(&engine, &component, output_sinks, init_context, task_name)
510+
create_wasm_host_with_component(
511+
&engine,
512+
&component,
513+
output_sinks,
514+
init_context,
515+
task_name,
516+
create_time,
517+
)
521518
}

src/runtime/processor/wasm/wasm_processor.rs

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,7 @@ impl WasmProcessor for WasmProcessorImpl {
509509
output_sinks: Vec<Box<dyn OutputSink>>,
510510
init_context: &crate::runtime::taskexecutor::InitContext,
511511
task_name: String,
512+
create_time: u64,
512513
) -> Result<(), Box<dyn Error + Send>> {
513514
use super::wasm_host::create_wasm_host;
514515

@@ -536,6 +537,7 @@ impl WasmProcessor for WasmProcessorImpl {
536537
output_sinks,
537538
init_context,
538539
task_name,
540+
create_time,
539541
)
540542
.map_err(|e| -> Box<dyn Error + Send> {
541543
let error_msg = format!(
@@ -564,26 +566,31 @@ impl WasmProcessor for WasmProcessorImpl {
564566
.first()
565567
.map(|(_, b)| b.as_slice())
566568
.unwrap_or(&[]);
567-
create_wasm_host(first_bytes, output_sinks, init_context, task_name).map_err(
568-
|e| -> Box<dyn Error + Send> {
569-
let error_msg = format!("Failed to create WasmHost: {}", e);
570-
log::error!("{}", error_msg);
571-
let mut full_error = error_msg.clone();
572-
let mut source = e.source();
573-
let mut depth = 0;
574-
while let Some(err) = source {
575-
depth += 1;
576-
full_error.push_str(&format!("\n Caused by ({}): {}", depth, err));
577-
source = err.source();
578-
if depth > 10 {
579-
full_error.push_str("\n ... (error chain too long, truncated)");
580-
break;
581-
}
569+
create_wasm_host(
570+
first_bytes,
571+
output_sinks,
572+
init_context,
573+
task_name,
574+
create_time,
575+
)
576+
.map_err(|e| -> Box<dyn Error + Send> {
577+
let error_msg = format!("Failed to create WasmHost: {}", e);
578+
log::error!("{}", error_msg);
579+
let mut full_error = error_msg.clone();
580+
let mut source = e.source();
581+
let mut depth = 0;
582+
while let Some(err) = source {
583+
depth += 1;
584+
full_error.push_str(&format!("\n Caused by ({}): {}", depth, err));
585+
source = err.source();
586+
if depth > 10 {
587+
full_error.push_str("\n ... (error chain too long, truncated)");
588+
break;
582589
}
583-
log::error!("Full error chain:\n{}", full_error);
584-
Box::new(WasmProcessorError::InitError(full_error))
585-
},
586-
)?
590+
}
591+
log::error!("Full error chain:\n{}", full_error);
592+
Box::new(WasmProcessorError::InitError(full_error))
593+
})?
587594
};
588595

589596
*self.processor.borrow_mut() = Some(processor);

src/runtime/processor/wasm/wasm_processor_trait.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ pub trait WasmProcessor: Send + Sync {
8484
/// - `output_sinks`: Output sink list
8585
/// - `init_context`: Initialization context
8686
/// - `task_name`: Task name
87+
/// - `create_time`: Creation timestamp for state storage
8788
///
8889
/// # Returns
8990
/// Ok(()) if initialization succeeds, or an error if it fails
@@ -92,6 +93,7 @@ pub trait WasmProcessor: Send + Sync {
9293
_output_sinks: Vec<Box<dyn OutputSink>>,
9394
_init_context: &InitContext,
9495
_task_name: String,
96+
_create_time: u64,
9597
) -> Result<(), Box<dyn std::error::Error + Send>> {
9698
Ok(())
9799
}

src/runtime/processor/wasm/wasm_task.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ pub struct WasmTask {
118118
failure_cause: Arc<Mutex<Option<String>>>,
119119
thread_running: Arc<AtomicBool>,
120120
termination_future: Arc<Mutex<Option<mpsc::Receiver<ExecutionState>>>>,
121+
create_time: u64,
121122
}
122123

123124
impl WasmTask {
@@ -127,6 +128,7 @@ impl WasmTask {
127128
inputs: Vec<Box<dyn InputSource>>,
128129
processor: Box<dyn WasmProcessor>,
129130
sinks: Vec<Box<dyn OutputSink>>,
131+
create_time: u64,
130132
) -> Self {
131133
let (_tx, rx) = mpsc::channel();
132134
Self {
@@ -143,6 +145,7 @@ impl WasmTask {
143145
failure_cause: Arc::new(Mutex::new(None)),
144146
thread_running: Arc::new(AtomicBool::new(false)),
145147
termination_future: Arc::new(Mutex::new(Some(rx))),
148+
create_time,
146149
}
147150
}
148151

@@ -183,7 +186,10 @@ impl WasmTask {
183186
))));
184187
}
185188

186-
if let Err(e) = processor.init_wasm_host(sinks, &init_context, self.task_name.clone()) {
189+
let create_time = self.get_create_time();
190+
if let Err(e) =
191+
processor.init_wasm_host(sinks, &init_context, self.task_name.clone(), create_time)
192+
{
187193
log::error!("Failed to init WasmHost: {}", e);
188194
return Err(Box::new(std::io::Error::other(format!(
189195
"Failed to init WasmHost: {}",
@@ -789,6 +795,10 @@ impl WasmTask {
789795
&self.task_name
790796
}
791797

798+
pub fn get_create_time(&self) -> u64 {
799+
self.create_time
800+
}
801+
792802
pub fn take_thread_groups(&mut self) -> Option<Vec<ThreadGroup>> {
793803
self.thread_groups.take()
794804
}
@@ -899,6 +909,7 @@ impl TaskLifecycle for WasmTask {
899909
name: self.task_name.clone(),
900910
task_type: self.task_type.clone(),
901911
status: format!("{:?}", self.get_state()),
912+
create_time: self.get_create_time(),
902913
}
903914
}
904915

src/runtime/task/builder/processor/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ impl ProcessorBuilder {
4242
task_name: String,
4343
yaml_value: &Value,
4444
module_bytes: Vec<u8>,
45+
create_time: u64,
4546
) -> Result<Arc<WasmTask>, Box<dyn std::error::Error + Send>> {
4647
let config_type = yaml_value
4748
.get(TYPE)
@@ -118,6 +119,7 @@ impl ProcessorBuilder {
118119
all_inputs,
119120
processor,
120121
outputs,
122+
create_time,
121123
);
122124
let task = Arc::new(task);
123125

src/runtime/task/builder/python/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ impl PythonBuilder {
3232
task_name: String,
3333
yaml_value: &Value,
3434
modules: &[(String, Vec<u8>)],
35+
create_time: u64,
3536
) -> Result<Box<dyn crate::runtime::task::TaskLifecycle>, Box<dyn std::error::Error + Send>>
3637
{
3738
let config_type = yaml_value
@@ -109,6 +110,7 @@ impl PythonBuilder {
109110
all_inputs,
110111
processor,
111112
outputs,
113+
create_time,
112114
);
113115
let task = Arc::new(task);
114116

src/runtime/task/builder/sink/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ impl SinkBuilder {
3737
_task_name: String,
3838
yaml_value: &Value,
3939
_module_bytes: Vec<u8>,
40+
_create_time: u64,
4041
) -> Result<Arc<WasmTask>, Box<dyn std::error::Error + Send>> {
4142
// Validate configuration type
4243
let config_type = yaml_value

src/runtime/task/builder/source/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ impl SourceBuilder {
3737
_task_name: String,
3838
yaml_value: &Value,
3939
_module_bytes: Vec<u8>,
40+
_create_time: u64,
4041
) -> Result<Arc<WasmTask>, Box<dyn std::error::Error + Send>> {
4142
// Validate configuration type
4243
let config_type = yaml_value

src/runtime/task/builder/task_builder.rs

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,26 @@ impl TaskBuilder {
4545
/// # Arguments
4646
/// * `config_bytes` - YAML configuration as bytes
4747
/// * `module_bytes` - WASM/Python module bytes
48+
/// * `create_time` - Creation timestamp for the task
4849
///
4950
/// # Returns
5051
/// A boxed TaskLifecycle implementation based on the task type in config
51-
pub fn from_yaml_config(config_bytes: &[u8], module_bytes: &[u8]) -> BuildResult {
52+
pub fn from_yaml_config(
53+
config_bytes: &[u8],
54+
module_bytes: &[u8],
55+
create_time: u64,
56+
) -> BuildResult {
5257
let yaml_value = Self::parse_yaml(config_bytes)?;
5358
let task_name = Self::extract_task_name(&yaml_value)?;
5459
let task_type = Self::extract_task_type(&yaml_value, &task_name)?;
5560

56-
Self::build_task(&task_type, task_name, &yaml_value, module_bytes.to_vec())
61+
Self::build_task(
62+
&task_type,
63+
task_name,
64+
&yaml_value,
65+
module_bytes.to_vec(),
66+
create_time,
67+
)
5768
}
5869

5970
/// Create a Python task from YAML configuration (for fs-exec)
@@ -64,13 +75,18 @@ impl TaskBuilder {
6475
/// # Arguments
6576
/// * `config_bytes` - YAML configuration as bytes
6677
/// * `modules` - Python modules as (name, bytes) pairs
78+
/// * `create_time` - Creation timestamp for the task
6779
#[cfg(feature = "python")]
68-
pub fn from_python_config(config_bytes: &[u8], modules: &[(String, Vec<u8>)]) -> BuildResult {
80+
pub fn from_python_config(
81+
config_bytes: &[u8],
82+
modules: &[(String, Vec<u8>)],
83+
create_time: u64,
84+
) -> BuildResult {
6985
let yaml_value = Self::parse_yaml(config_bytes)?;
7086
let task_name = Self::extract_task_name(&yaml_value)?;
7187

7288
log::debug!("Creating Python task '{}' via fs-exec", task_name);
73-
PythonBuilder::build(task_name, &yaml_value, modules)
89+
PythonBuilder::build(task_name, &yaml_value, modules, create_time)
7490
}
7591

7692
/// Parse YAML configuration
@@ -128,18 +144,19 @@ impl TaskBuilder {
128144
task_name: String,
129145
yaml: &Value,
130146
module_bytes: Vec<u8>,
147+
create_time: u64,
131148
) -> BuildResult {
132149
match task_type {
133150
type_values::PROCESSOR => Self::build_wasm_task(
134-
ProcessorBuilder::build(task_name.clone(), yaml, module_bytes),
151+
ProcessorBuilder::build(task_name.clone(), yaml, module_bytes, create_time),
135152
&task_name,
136153
),
137154
type_values::SOURCE => Self::build_wasm_task(
138-
SourceBuilder::build(task_name.clone(), yaml, module_bytes),
155+
SourceBuilder::build(task_name.clone(), yaml, module_bytes, create_time),
139156
&task_name,
140157
),
141158
type_values::SINK => Self::build_wasm_task(
142-
SinkBuilder::build(task_name.clone(), yaml, module_bytes),
159+
SinkBuilder::build(task_name.clone(), yaml, module_bytes, create_time),
143160
&task_name,
144161
),
145162
_ => {

src/runtime/taskexecutor/task_manager.rs

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use anyhow::{Context, Result, anyhow};
2424
use parking_lot::RwLock;
2525
use std::collections::HashMap;
2626
use std::sync::{Arc, OnceLock};
27-
use std::time::{SystemTime, UNIX_EPOCH};
2827

2928
type SharedTask = Arc<RwLock<Box<dyn TaskLifecycle>>>;
3029
type TaskMap = Arc<RwLock<HashMap<String, SharedTask>>>;
@@ -87,7 +86,12 @@ impl TaskManager {
8786

8887
impl TaskManager {
8988
pub fn register_task(&self, config_bytes: &[u8], module_bytes: &[u8]) -> Result<()> {
90-
let task = TaskBuilder::from_yaml_config(config_bytes, module_bytes)
89+
use std::time::{SystemTime, UNIX_EPOCH};
90+
let create_time = SystemTime::now()
91+
.duration_since(UNIX_EPOCH)
92+
.unwrap()
93+
.as_secs();
94+
let task = TaskBuilder::from_yaml_config(config_bytes, module_bytes, create_time)
9195
.map_err(|e| anyhow!("Failed to build task: {}", e))?;
9296
let info = task.get_function_info();
9397
let task_info = StoredTaskInfo {
@@ -96,10 +100,7 @@ impl TaskManager {
96100
module_bytes: Some(TaskModuleBytes::Wasm(module_bytes.to_vec())),
97101
config_bytes: config_bytes.to_vec(),
98102
state: ComponentState::Initialized,
99-
created_at: SystemTime::now()
100-
.duration_since(UNIX_EPOCH)
101-
.unwrap()
102-
.as_secs(),
103+
created_at: info.create_time,
103104
checkpoint_id: None,
104105
};
105106
self.register_task_internal(task, Some(task_info))
@@ -112,7 +113,12 @@ impl TaskManager {
112113
) -> Result<()> {
113114
#[cfg(feature = "python")]
114115
{
115-
let task = TaskBuilder::from_python_config(config_bytes, modules)
116+
use std::time::{SystemTime, UNIX_EPOCH};
117+
let create_time = SystemTime::now()
118+
.duration_since(UNIX_EPOCH)
119+
.unwrap()
120+
.as_secs();
121+
let task = TaskBuilder::from_python_config(config_bytes, modules, create_time)
116122
.map_err(|e| anyhow!("Failed to build Python task: {}", e))?;
117123
let (class_name, module_name, module_bytes) = match modules.first() {
118124
Some((name, bytes)) => (name.clone(), name.clone(), Some(bytes.clone())),
@@ -129,10 +135,7 @@ impl TaskManager {
129135
}),
130136
config_bytes: config_bytes.to_vec(),
131137
state: ComponentState::Initialized,
132-
created_at: SystemTime::now()
133-
.duration_since(UNIX_EPOCH)
134-
.unwrap()
135-
.as_secs(),
138+
created_at: info.create_time,
136139
checkpoint_id: None,
137140
};
138141
self.register_task_internal(task, Some(task_info))
@@ -288,10 +291,12 @@ impl TaskManager {
288291
return Ok(());
289292
}
290293

294+
let create_time = stored.created_at;
295+
291296
let task = match &stored.module_bytes {
292-
None => TaskBuilder::from_yaml_config(&stored.config_bytes, &[]),
297+
None => TaskBuilder::from_yaml_config(&stored.config_bytes, &[], create_time),
293298
Some(TaskModuleBytes::Wasm(bytes)) => {
294-
TaskBuilder::from_yaml_config(&stored.config_bytes, bytes)
299+
TaskBuilder::from_yaml_config(&stored.config_bytes, bytes, create_time)
295300
}
296301
Some(TaskModuleBytes::Python {
297302
class_name: _,
@@ -301,7 +306,7 @@ impl TaskManager {
301306
#[cfg(feature = "python")]
302307
{
303308
let modules = [(module.clone(), py_bytes.clone().unwrap_or_default())];
304-
TaskBuilder::from_python_config(&stored.config_bytes, &modules)
309+
TaskBuilder::from_python_config(&stored.config_bytes, &modules, create_time)
305310
}
306311
#[cfg(not(feature = "python"))]
307312
{

0 commit comments

Comments
 (0)