Skip to content

LangChain MLRun Integration with Kafka Support (CE Mode)#1

Open
tomerbv wants to merge 21 commits intoguy1992l:langchain_mlrunfrom
tomerbv:kafka-stream-profile-config
Open

LangChain MLRun Integration with Kafka Support (CE Mode)#1
tomerbv wants to merge 21 commits intoguy1992l:langchain_mlrunfrom
tomerbv:kafka-stream-profile-config

Conversation

@tomerbv
Copy link

@tomerbv tomerbv commented Jan 27, 2026

Summary

Replaces direct kafka_broker/kafka_topic parameters with stream_profile_name that references a registered DatastoreProfileKafkaStream. This reuses the same profile configured for set_model_monitoring_credentials().

Changes

  • _KafkaMLRunEndPointClient: Now fetches broker, topic, and auth config (SASL/SSL) from the datastore profile
  • MLRunTracerClientSettings: Replaced kafka_broker/kafka_topic with stream_profile_name
  • setup_langchain_monitoring(): Auto-detects Kafka profile if not specified
  • get_kafka_stream_profile_name(): New utility to find registered Kafka profiles

Environment Variables

Before: MLRUN_TRACER_CLIENT_KAFKA_BROKER, MLRUN_TRACER_CLIENT_KAFKA_TOPIC
After: MLRUN_TRACER_CLIENT_STREAM_PROFILE_NAME

Usage

Simplest - auto-detects from registered profiles

env_vars = setup_langchain_monitoring()

Explicit

env_vars = setup_langchain_monitoring(stream_profile_name="my-kafka-profile")

Implement _KafkaMLRunEndPointClient with KafkaProducer
Add kafka_broker and kafka_topic to MLRunTracerClientSettings
Add Kafka parameters to setup_langchain_monitoring()
Update notebook to auto-detect CE/Enterprise mode
Add kafka-python, orjson, uuid-utils to requirements.txt
remove "raises:" docstring
added kafka flush
added s3fs to requirements.txt
- AWS_ENDPOINT_URL_S3 env variable in deployment
- port forwarding scripts
 - Update _KafkaMLRunEndPointClient to use DatastoreProfileKafkaStream
 - Fetch Kafka config (broker, topic, SASL, SSL) from registered profile
 - Auto-retrieve stream_profile_name from model monitoring credentials
 - Update MLRunTracerClientSettings with new stream_profile_name field
 - Update setup_langchain_monitoring() to use profile-based config
 - Update notebook to use simplified API
Copy link
Owner

@guy1992l guy1992l left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

REMOVE USER & PASSWORD from all.

in the notebook, when you run deploy you run it twice, you deploy and then redeploy instead of deploying once. Do the CE stuff before. In addition, when you create the strema and tsdb profiles, create them under the same variable and use it in the project.set_monitoring_credentials. The rest of the notebook I'll sort out.

Also: I like the usage of a datastore profile, do the same for v3io as I suggested in the comments. :)

self._monitoring_broker = profile_attrs.get("brokers")
# Use profile's topic if available, otherwise use MLRun's standard naming:
topics = profile_attrs.get("topics", [])
self._monitoring_topic = topics[0] if topics else get_kafka_topic(project_obj.name)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can only be one topic for sure?

Copy link
Author

@tomerbv tomerbv Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the thing is while DatastoreProfileKafkaStream supports both a single and a list of topics, it has a get_topic method to return a single topic and also the mlruns helper function get_kafka_topic returns a single topic for a project.

we can of course implement support for more than one topic but it requires saving them and then iterating over them in the section:
def monitor -> self._kafka_producer.send

as i see it for mlruns model monitoring a single topic is sufficient

Comment on lines 4 to 7
kafka-python
orjson
uuid-utils
s3fs<=2025.7.0 No newline at end of file
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the requirmeents for the tests, so these shouldn't be here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed them and moved to item.yaml requirements

Unify profile variable naming for CE and Enterprise modes
enforce usage of stream_profile_name
Added mechanism to flush stream upon root run (instead of each monitor call)
Adds kafka_linger_ms parameter to control message delivery timing:
 - Explicit flush mode (linger_ms=0, default): flush after each root run
 - Kafka-managed mode (linger_ms>0): Kafka controls delivery timing

The flush() method now handles the mode internally - it's a no-op when Kafka-managed mode is enabled, keeping the tracer code simple.
- Always flush at end of root run (removed conditional linger_ms check)
- Set default kafka_linger_ms to 500ms for message batching
- Simplify KafkaProducer initialization (pop bootstrap_servers instead of lambda)
Copy link
Owner

@guy1992l guy1992l left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the notebook, use os.putenv("MY_VARIABLE", "my_value") instead of expert. Also, put all dependencies at the begining on the notebook right after the pip installs. No need to put AWS, then TSDB and the rest.

Comment on lines +24 to +25
- orjson
- uuid-utils
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both orjson and uuid-utils comes from langchain. No need to add them here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants