LangChain MLRun Integration with Kafka Support (CE Mode)#1
LangChain MLRun Integration with Kafka Support (CE Mode)#1tomerbv wants to merge 21 commits intoguy1992l:langchain_mlrunfrom
Conversation
Implement _KafkaMLRunEndPointClient with KafkaProducer Add kafka_broker and kafka_topic to MLRunTracerClientSettings Add Kafka parameters to setup_langchain_monitoring() Update notebook to auto-detect CE/Enterprise mode Add kafka-python, orjson, uuid-utils to requirements.txt
… and tsdb profiles
remove "raises:" docstring added kafka flush added s3fs to requirements.txt
- AWS_ENDPOINT_URL_S3 env variable in deployment - port forwarding scripts
- Update _KafkaMLRunEndPointClient to use DatastoreProfileKafkaStream - Fetch Kafka config (broker, topic, SASL, SSL) from registered profile - Auto-retrieve stream_profile_name from model monitoring credentials - Update MLRunTracerClientSettings with new stream_profile_name field - Update setup_langchain_monitoring() to use profile-based config - Update notebook to use simplified API
There was a problem hiding this comment.
REMOVE USER & PASSWORD from all.
in the notebook, when you run deploy you run it twice, you deploy and then redeploy instead of deploying once. Do the CE stuff before. In addition, when you create the strema and tsdb profiles, create them under the same variable and use it in the project.set_monitoring_credentials. The rest of the notebook I'll sort out.
Also: I like the usage of a datastore profile, do the same for v3io as I suggested in the comments. :)
| self._monitoring_broker = profile_attrs.get("brokers") | ||
| # Use profile's topic if available, otherwise use MLRun's standard naming: | ||
| topics = profile_attrs.get("topics", []) | ||
| self._monitoring_topic = topics[0] if topics else get_kafka_topic(project_obj.name) |
There was a problem hiding this comment.
the thing is while DatastoreProfileKafkaStream supports both a single and a list of topics, it has a get_topic method to return a single topic and also the mlruns helper function get_kafka_topic returns a single topic for a project.
we can of course implement support for more than one topic but it requires saving them and then iterating over them in the section:
def monitor -> self._kafka_producer.send
as i see it for mlruns model monitoring a single topic is sufficient
| kafka-python | ||
| orjson | ||
| uuid-utils | ||
| s3fs<=2025.7.0 No newline at end of file |
There was a problem hiding this comment.
this is the requirmeents for the tests, so these shouldn't be here.
There was a problem hiding this comment.
removed them and moved to item.yaml requirements
Unify profile variable naming for CE and Enterprise modes
…le with parent class handling in
enforce usage of stream_profile_name
revert ValueError message
Added mechanism to flush stream upon root run (instead of each monitor call)
Adds kafka_linger_ms parameter to control message delivery timing: - Explicit flush mode (linger_ms=0, default): flush after each root run - Kafka-managed mode (linger_ms>0): Kafka controls delivery timing The flush() method now handles the mode internally - it's a no-op when Kafka-managed mode is enabled, keeping the tracer code simple.
- Always flush at end of root run (removed conditional linger_ms check) - Set default kafka_linger_ms to 500ms for message batching - Simplify KafkaProducer initialization (pop bootstrap_servers instead of lambda)
guy1992l
left a comment
There was a problem hiding this comment.
in the notebook, use os.putenv("MY_VARIABLE", "my_value") instead of expert. Also, put all dependencies at the begining on the notebook right after the pip installs. No need to put AWS, then TSDB and the rest.
| - orjson | ||
| - uuid-utils |
There was a problem hiding this comment.
both orjson and uuid-utils comes from langchain. No need to add them here.
Summary
Replaces direct kafka_broker/kafka_topic parameters with stream_profile_name that references a registered DatastoreProfileKafkaStream. This reuses the same profile configured for set_model_monitoring_credentials().
Changes
Environment Variables
Before: MLRUN_TRACER_CLIENT_KAFKA_BROKER, MLRUN_TRACER_CLIENT_KAFKA_TOPIC
After: MLRUN_TRACER_CLIENT_STREAM_PROFILE_NAME
Usage
Simplest - auto-detects from registered profiles
env_vars = setup_langchain_monitoring()
Explicit
env_vars = setup_langchain_monitoring(stream_profile_name="my-kafka-profile")