-
Notifications
You must be signed in to change notification settings - Fork 18
Expand file tree
/
Copy pathcontext.py
More file actions
110 lines (82 loc) · 3.41 KB
/
context.py
File metadata and controls
110 lines (82 loc) · 3.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
"""
FSContext module provides a context object that manages configuration access for FunctionStream SDK.
This module defines the FSContext class which serves as a wrapper around the Config object,
providing a clean interface for accessing configuration values and handling any potential
errors during access. It also provides methods for metadata access and data production.
"""
import logging
from typing import Any, Dict
from datetime import datetime
from .config import Config
# Configure logging
logger = logging.getLogger(__name__)
class FSContext:
"""
Context class that provides access to configuration values and runtime context.
This class serves as a wrapper around the Config object, providing a clean interface
for accessing configuration values and handling any potential errors during access.
It also provides methods for metadata access and data production capabilities.
Attributes:
config (Config): The configuration object containing all settings.
function (FSFunction, optional): Reference to the parent FSFunction instance.
"""
def __init__(self, config: Config):
"""
Initialize the FSContext with a configuration object.
Args:
config (Config): The configuration object to be used by this context.
"""
self.config = config
def get_config(self, config_name: str) -> Any:
"""
Get a configuration value by name.
This method safely retrieves a configuration value, handling any potential
errors during the retrieval process. If an error occurs, it logs the error
and returns an empty string.
Args:
config_name (str): The name of the configuration to retrieve.
Returns:
Any: The configuration value if found, empty string if not found or error occurs.
"""
try:
return self.config.get_config_value(config_name)
except Exception as e:
logger.error(f"Error getting config {config_name}: {str(e)}")
return ""
def get_metadata(self, key: str) -> Any:
"""
Get metadata value by key.
This method retrieves metadata associated with the current message.
Args:
key (str): The metadata key to retrieve.
Returns:
Any: The metadata value, currently always None.
"""
return None
def produce(self, data: Dict[str, Any], event_time: datetime = None) -> None:
"""
Produce data to the output stream.
This method is intended to send processed data to the output stream.
Args:
data (Dict[str, Any]): The data to produce.
event_time (datetime, optional): The timestamp for the event. Defaults to None.
Returns:
None: Currently always returns None.
"""
return None
def get_configs(self) -> Dict[str, Any]:
"""
Get all configuration values.
Returns a dictionary containing all configuration key-value pairs.
Returns:
Dict[str, Any]: A dictionary containing all configuration values.
"""
return self.config.config
def get_module(self) -> str:
"""
Get the current module name.
Returns the name of the module currently being executed.
Returns:
str: The name of the current module.
"""
return self.config.module