A comprehensive Apache Camel component for integrating with Snowflake Data Warehouse, featuring enterprise-grade connection pooling, extensive tests, and production-ready JDBC operations.
Production users should depend on the latest released version; local source builds track the current main branch state.
| Channel | Version | Maven Coordinates | Notes |
|---|---|---|---|
| Latest Release | 1.4.0 | io.dscope.camel:camel-snowflake:1.4.0 |
Stable, recommended for production |
| Previous Release | 1.3.0 | io.dscope.camel:camel-snowflake:1.3.0 |
Prior feature release |
| Development Build | 1.4.0 | io.dscope.camel:camel-snowflake:1.4.0 |
Build from source (mvn install) to stay current with main |
<dependency>
<groupId>io.dscope.camel</groupId>
<artifactId>camel-snowflake</artifactId>
<version>1.4.0</version>
</dependency>Build and install the project locally first (mvn clean install at the repo root):
<dependency>
<groupId>io.dscope.camel</groupId>
<artifactId>camel-snowflake</artifactId>
<version>1.4.0</version>
</dependency>- Complete Camel Integration: Full producer and consumer support
- Enterprise Connection Pooling: HikariCP-based connection management
- Java 21 LTS: Latest long-term support Java version with modern language features
- Comprehensive Testing: Extensive unit and integration tests
- Production Ready: JDBC operations with transaction support and error handling
- JSON Support: Built-in JSON data processing capabilities
- Auto-Discovery: Automatic component registration with Apache Camel
- 🤖 MCP Tooling: Processors and samples that expose Snowflake queries via the Model Context Protocol
- Java 21 LTS or higher
- Apache Camel 4.15.0+
- Maven 3.9.1+
- Snowflake account (for production use)
Current tested versions in this repository:
| Component | Version |
|---|---|
| Java | 21 |
| Apache Camel | 4.15.0 |
| Snowflake JDBC | 3.26.1 |
| HikariCP | 5.1.0 |
| Jackson | 2.17.2 |
| SLF4J | 2.0.x |
| Spring Boot (sample) | 3.5.x |
- Java 21+
- Apache Camel 4.15.x (component built and tested with 4.15.0; compatible with Camel 4.x)
- Snowflake JDBC 3.26.x
- OSGi: packaged as a bundle (Felix Maven Bundle Plugin)
- Spring Boot: sample app on Spring Boot 3.5.x using Camel Spring Boot
<dependency>
<groupId>io.dscope.camel</groupId>
<artifactId>camel-snowflake</artifactId>
<version>1.4.0</version>
</dependency>git clone https://github.com/dscope-io/dscope-camel-snowflake.git
cd camel-snowflake
mvn clean installFor key-pair authentication (recommended for headless/service workloads), you can either:
- Provide the private key contents via
privateKey(supports PKCS#8 or PKCS#1, PEM or Base64, headers optional), or - Point to a key file via
privateKeyFile(PEM or DER).
When a private key is provided and no explicit authenticator is set, the component defaults to snowflake_jwt.
// Key-pair (JWT) configuration using a file path
SnowflakeConfiguration config = new SnowflakeConfiguration();
config.setAccount(System.getProperty("snowflake.account"));
config.setUsername(System.getProperty("snowflake.username"));
config.setPrivateKeyFile(System.getProperty("snowflake.privateKeyFile")); // PEM or DER file
config.setDatabase(System.getProperty("snowflake.database"));
config.setWarehouse(System.getProperty("snowflake.warehouse"));
// Optional – explicitly set (auto-applied when privateKey/privateKeyFile present):
config.setAuthenticator("snowflake_jwt");You may also pass the key contents directly:
// Key-pair (JWT) configuration using key contents
config.setPrivateKey(System.getProperty("snowflake.privateKey")); // Accepts PEM (with or without headers) or Base64Supported private key formats:
- PKCS#8 (BEGIN PRIVATE KEY) – PEM or Base64
- PKCS#1 (BEGIN RSA PRIVATE KEY) – automatically wrapped into PKCS#8
- DER (binary) via
privateKeyFile
Encryption support:
- Using
privateKeyFile: Encrypted PEM keys are supported when you also setprivateKeyFilePassword(formerlyprivateKeyPassword). Unencrypted PEM/DER is also supported. - Using
privateKey(inline contents): Only unencrypted PKCS#8 or PKCS#1 is supported (decryption is not performed on inline contents). UseprivateKeyFile+privateKeyFilePasswordfor encrypted keys.
- Generate an RSA key (2048 or 3072 bits recommended):
openssl genrsa -out rsa_key.pem 2048
- Convert to unencrypted PKCS#8 DER:
openssl pkcs8 -topk8 -inform PEM -outform DER -in rsa_key.pem -out rsa_key.der -nocrypt
- Produce single-line Base64 (no newlines) for your
.env/ secret store:base64 -in rsa_key.der | tr -d '\n' > private_key_base64.txt
- Place the contents of
private_key_base64.txtintoSNOWFLAKE_PRIVATE_KEY(or a secrets manager) – do not commit it to source control.
Alternative one-liner (PEM to raw Base64 without headers):
openssl pkcs8 -topk8 -inform PEM -outform PEM -in rsa_key.pem -nocrypt \
| sed '/-----/d' | tr -d '\n'Security notes:
- Never commit private keys or
.envfiles containing them. - Prefer a secrets manager (AWS Secrets Manager, Azure Key Vault, HashiCorp Vault, etc.).
- Rotate keys periodically and immediately on suspected compromise.
- Use least-privilege Snowflake roles bound to the JWT user.
snowflake:endpointName?account=myaccount&database=mydb&schema=myschema[&options]
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
account |
String | Yes | - | Snowflake account identifier |
username |
String | Yes | - | Snowflake username |
password |
String | Yes | - | Snowflake password (sensitive) |
database |
String | No | - | Database name |
schema |
String | No | - | Schema name |
warehouse |
String | No | - | Warehouse name |
role |
String | No | - | Role name |
table |
String | No | - | Table name for operations |
query |
String | No | - | SQL query to execute |
operation |
String | No | select |
Operation type: select, insert, update, delete |
jdbcUrl |
String | No | - | Custom JDBC URL override (for testing) |
privateKey |
String | No | - | Private key contents for key-pair (JWT) auth. Accepts PKCS#8 or PKCS#1, PEM (with/without headers) or Base64. Mutually exclusive with password. |
privateKeyFile |
String | No | - | Path to private key file (PEM or DER). Mutually exclusive with password. |
privateKeyFilePassword |
String | No | - | Password for encrypted private key files. Only used with privateKeyFile; ignored when privateKey contents are provided. (Legacy alias: privateKeyPassword) |
authenticator |
String | No | snowflake |
Authenticator. Supported: snowflake, snowflake_jwt, externalbrowser, oauth. Auto-set to snowflake_jwt when a private key is provided. |
token |
String | No | - | OAuth bearer token (required when authenticator=oauth) |
outputFormat |
String | No | rows |
Serialization of exchange body: rows (List<Map<String,Object>>), json (string), or xml (string). Also controls driver result format via JDBC: arrow uses Apache Arrow; any other value uses JSON. |
enableParameterBinding |
boolean | No | true |
Enable Camel-style named parameter binding in SQL (:#name) using headers. |
parameterPrefix |
String | No | snowflake. |
Header prefix used when resolving parameters for binding. |
jdbc.* |
Map | No | - | Pass-through Snowflake JDBC parameters appended to the JDBC URL (e.g., jdbc.CLIENT_SESSION_KEEP_ALIVE=true). |
from("direct:start")
.to("snowflake:producer?account=myaccount&database=mydb&username=user&password=pass")
.to("mock:result");from("snowflake:consumer?account=myaccount&database=mydb&table=mytable&username=user&password=pass")
.process(exchange -> {
String data = exchange.getIn().getBody(String.class);
log.info("Received from Snowflake: {}", data);
})
.to("mock:processed");// Direct JDBC operations
SnowflakeConfiguration config = new SnowflakeConfiguration();
config.setAccount("myaccount");
config.setDatabase("mydb");
config.setUsername("user");
config.setPassword("password");
// Execute query
List<Map<String, Object>> results = SnowflakeJdbcOperations.executeQuery(
config, "SELECT * FROM users WHERE active = ?", true);
// Execute update
int rowsAffected = SnowflakeJdbcOperations.executeUpdate(
config, "UPDATE users SET last_login = ? WHERE id = ?", new Date(), 123);
// Batch operations
List<Object[]> batchParams = Arrays.asList(
new Object[]{"Alice", 30},
new Object[]{"Bob", 25}
);
int[] results = SnowflakeJdbcOperations.executeBatch(
config, "INSERT INTO users (name, age) VALUES (?, ?)", batchParams);You can invoke stored procedures by supplying a CALL statement either as the static endpoint query, an overriding header, or directly in the message body. Parameter binding works exactly the same as for SELECT/DML:
from("direct:invokeProc")
// binding headers: snowflake.id, snowflake.amount, snowflake.details_json
.setBody(constant("CALL insert_new_sample_row(:#id,:#amount,:#details_json)"))
.to("snowflake:procDemo?account=...&database=...&username=...&privateKeyFile=...&warehouse=...");Execution logic:
- Detects
CALL(case‑insensitive) → usesCallableStatement#execute. - On specific Snowflake driver API limitation errors, falls back transparently to
Statement#execute. - Result set (if any) serialized via
outputFormat; otherwise an empty collection (rows) or minimal document (json/xml) is returned.
Binding header resolution order: exact name (id), prefixed header (snowflake.id), Camel style (CamelSnowflakeId).
When multiple potential SQL sources exist, the producer chooses in this priority (highest first):
- Header
CamelSnowflakeQuery(explicit override) - Message body if it looks like SQL (starts with verbs: SELECT, WITH, INSERT, UPDATE, DELETE, CALL, CREATE, ALTER, MERGE)
- Endpoint/configured
query
This lets you keep a safe default while allowing ad‑hoc overrides. Provide a non‑SQL body (e.g. JSON payload) by either not starting it with a SQL verb or forcing a different query via the header.
// Insert JSON data
String jsonData = "{\"name\": \"Alice\", \"age\": 30, \"city\": \"New York\"}";
SnowflakeJdbcOperations.insertJsonData(config, "user_profiles", jsonData);
// Query JSON data
List<JsonNode> jsonResults = SnowflakeJdbcOperations.queryJsonData(
config, "SELECT profile_data FROM user_profiles WHERE id = ?", 123);The component ships with registry beans in io.dscope.camel.snowflake.mcp that let you expose Snowflake tooling to MCP-compatible clients (GitHub Copilot, VS Code, etc.) with minimal wiring. They rely on the shared camel-mcp catalog utilities for metadata loading.
| Bean name | Purpose |
|---|---|
mcpSnowflakeRequest |
Bridges tools/call payloads into the snowflake: endpoint, applies connection overrides, and injects configured SQL templates |
mcpSnowflakeResponse |
Normalizes Snowflake results into MCP JSON-RPC responses |
The remaining MCP processors (JSON-RPC envelope parsing, rate limiting, streaming, etc.) are provided directly by the camel-mcp dependency. Define them in your route configuration (see sample below) and combine them with the Snowflake-specific beans:
# snippet from samples/mcp-snowflake-yaml
- beans:
- name: mcpRequestSizeGuard
type: io.dscope.camel.mcp.processor.McpRequestSizeGuardProcessor
- name: mcpRateLimit
type: io.dscope.camel.mcp.processor.McpRateLimitProcessor
- name: mcpHttpValidator
type: io.dscope.camel.mcp.processor.McpHttpValidatorProcessor
- name: mcpJsonRpcEnvelope
type: io.dscope.camel.mcp.processor.McpJsonRpcEnvelopeProcessor
- name: mcpInitialize
type: io.dscope.camel.mcp.processor.McpInitializeProcessor
- name: mcpPing
type: io.dscope.camel.mcp.processor.McpPingProcessor
- name: mcpToolsList
type: io.dscope.camel.mcp.processor.McpToolsListProcessor
- name: mcpStream
type: io.dscope.camel.mcp.processor.McpStreamProcessor
- name: mcpHealthStatus
type: io.dscope.camel.mcp.processor.McpHealthStatusProcessor
- from:
uri: rest:post:/mcp?consumerComponentName=netty-http
steps:
- process: ref:mcpRequestSizeGuard
- process: ref:mcpRateLimit
- process: ref:mcpHttpValidator
- process: ref:mcpJsonRpcEnvelope
- choice:
when:
- simple: "${exchangeProperty.mcp.jsonrpc.method} == 'tools/list'"
steps:
- process: ref:mcpToolsList
- to: direct:mcp-complete
- simple: "${exchangeProperty.mcp.jsonrpc.method} == 'tools/call'"
steps:
- process: ref:mcpSnowflakeRequest
- to: direct:mcp-snowflake-exec- Install the component so the sample resolves the latest snapshot:
mvn -q -DskipTests install
- Build the MCP YAML sample:
mvn -q -pl samples/mcp-snowflake-yaml -am -DskipTests package
- Provide Snowflake credentials via environment variables or JVM
-Dsnowflake.*properties (account, username, private key or OAuth token, database, schema, warehouse, role). - Launch the shaded sample with debug logging:
java \ -Dmcp.server.port=8080 \ -Dsnowflake.username=$SNOWFLAKE_USERNAME \ -Dsnowflake.privateKeyFile=$SNOWFLAKE_PRIVATE_KEY_FILE \ -Dsnowflake.account=$SNOWFLAKE_ACCOUNT \ -Dsnowflake.database=$SNOWFLAKE_DATABASE \ -Dsnowflake.schema=$SNOWFLAKE_SCHEMA \ -Dsnowflake.warehouse=$SNOWFLAKE_WAREHOUSE \ -Dsnowflake.role=$SNOWFLAKE_ROLE \ -Dorg.slf4j.simpleLogger.defaultLogLevel=debug \ -jar samples/mcp-snowflake-yaml/target/mcp-snowflake-yaml-1.4.0-shaded.jar - Issue MCP-compliant JSON-RPC requests (aliases in
samples/mcp-snowflake-yaml/shell-aliases.zsh):curl -s -X POST http://localhost:8080/mcp \ -H 'Content-Type: application/json' \ -H 'Accept: application/json, text/event-stream' \ -H 'MCP-Protocol-Version: 2025-06-18' \ -d '{"jsonrpc":"2.0","id":"req-1","method":"tools/list"}' | jq .
Add or modify tools in samples/mcp-snowflake-yaml/src/main/resources/mcp/methods.yaml; the catalog and processors will automatically pick them up. SQL templates are configured separately via mcp.snowflake.queries.* and injected into mcpSnowflakeRequest from route bean properties.
The catalog loader expects a YAML document shaped like:
methods:
- name: selectSample
title: Select Sample Rows
description: Human-friendly summary shown to MCP clients
requiredArguments:
- user_id
- min_date
inputSchema:
type: object
properties:
user_id:
type: integer
description: User identifier to filter rows
min_date:
type: string
description: Earliest created_at date (YYYY-MM-DD)
required: [user_id, min_date]
outputSchema:
type: object
properties:
status:
type: string
result:
description: Result rows from Snowflake
required: [status]
annotations:
category: snowflakeField reference:
methods→ top-level array; each entry must have a unique, non-blankname.title,description→ surfaced to clients when listing tools.requiredArguments→ list of argument names the MCP client must supply; enforced before executing the query.inputSchema,outputSchema→ JSON Schema fragments returned to clients so they know how to shape arguments/results.annotations→ arbitrary metadata (category, tags, etc.) passed through the MCP responses.
Define SQL templates per tool on the mcpSnowflakeRequest bean via route bean property injection and externalised properties. The sample stores them in application.properties using keys such as:
mcp.snowflake.queries.selectSample=SELECT * FROM SOME_TABLE WHERE USER_ID = :#user_id
mcp.snowflake.queries.insertSample=INSERT INTO SOME_TABLE (USER_ID, AMOUNT, DETAILS) VALUES (:#user_id, :#amount, :#details)The route then injects them via a map bean so the YAML DSL stays portable:
- name: mcpSnowflakeQueries
type: java.util.LinkedHashMap
properties:
selectSample: "{{mcp.snowflake.queries.selectSample}}"
insertSample: "{{mcp.snowflake.queries.insertSample}}"
- name: mcpSnowflakeRequest
type: io.dscope.camel.snowflake.mcp.McpSnowflakeRequestProcessor
properties:
queries: '#bean:mcpSnowflakeQueries'
enableParameterBinding: "{{mcp.snowflake.enableParameterBinding}}"Place your custom methods.yaml on the application classpath (default: classpath:mcp/methods.yaml). For the sample this file is samples/mcp-snowflake-yaml/src/main/resources/mcp/methods.yaml.
SnowflakeConfiguration config = new SnowflakeConfiguration();
config.setAccount(System.getenv("SNOWFLAKE_ACCOUNT"));
config.setUsername(System.getenv("SNOWFLAKE_USERNAME"));
config.setAuthenticator("oauth");
config.setToken(System.getenv("SNOWFLAKE_OAUTH_TOKEN")); // supply a valid access token from your IdP
config.setDatabase(System.getenv("SNOWFLAKE_DATABASE"));
config.setWarehouse(System.getenv("SNOWFLAKE_WAREHOUSE"));Endpoint URI example (Camel):
snowflake://default?account={{snowflake.account}}&username={{snowflake.username}}&authenticator=oauth&token={{snowflake.token}}
- When
authenticator=oauth, providetokenand do not setpasswordorprivateKey. - Token refresh/rotation is managed externally; pass updated tokens when needed.
java \
-Dsnowflake.account=your_account \
-Dsnowflake.username=oauth_user \
-Dsnowflake.authenticator=oauth \
-Dsnowflake.token="$(cat /path/to/access_token.txt)" \
-Dsnowflake.database=YOUR_DB \
-Dsnowflake.warehouse=COMPUTE_WH \
-jar your-app.jarThe component automatically reads configuration from Java system properties using the snowflake.* prefix. This allows you to keep secrets out of URIs and pass them via -D flags or your runtime launcher:
# Example: pass username and key file without embedding them in the URI
java \
-Dsnowflake.username=sample_user \
-Dsnowflake.privateKeyFile=/abs/path/private_key_pkcs8.pem \
-Dsnowflake.privateKeyFilePassword=changeit \
-jar your-app.jarAdditional Snowflake JDBC parameters can be provided as either endpoint jdbc.* parameters or as system properties using the snowflake.jdbc. prefix. They’ll be URL-encoded and appended to the JDBC URL:
# Keep the session alive and set a client timeout
java \
-Dsnowflake.jdbc.CLIENT_SESSION_KEEP_ALIVE=true \
-Dsnowflake.jdbc.CLIENT_SESSION_KEEP_ALIVE_HEARTBEAT_FREQUENCY=900 \
-jar your-app.jarDriver result format is controlled automatically:
- If
outputFormat=arrow, the component appendsJDBC_QUERY_RESULT_FORMAT=ARROWto the JDBC URL. - Otherwise, it appends
JDBC_QUERY_RESULT_FORMAT=JSONto avoid additional JVM--add-opensflags required by Arrow.
Important: Avoid duplicating location properties. Database (DB/database), schema (SCHEMA/schema), warehouse (WAREHOUSE/warehouse), and role (ROLE/role) are embedded into the JDBC URL by the component. Do not also set them as DataSource properties or via snowflake.jdbc.*, otherwise the Snowflake driver may reject the connection with an error like:
"Connection property specified more than once: DB"
The component ensures these location properties are kept only in the URL to prevent duplication.
# application.yml
camel:
component:
snowflake:
account: ${SNOWFLAKE_ACCOUNT}
username: ${SNOWFLAKE_USERNAME}
password: ${SNOWFLAKE_PASSWORD}
database: ${SNOWFLAKE_DATABASE}
warehouse: ${SNOWFLAKE_WAREHOUSE}io.dscope.camel.snowflake/
├── SnowflakeComponent # Main component class
├── SnowflakeEndpoint # Endpoint implementation
├── SnowflakeProducer # Producer implementation
├── SnowflakeConsumer # Consumer implementation
├── SnowflakeConfiguration # Configuration class
└── jdbc/
├── SnowflakeJdbcConnectionManager # Connection pooling
└── SnowflakeJdbcOperations # JDBC utilities
The component uses HikariCP for enterprise-grade connection pooling:
- Connection Reuse: Efficient connection pooling and reuse
- Performance Monitoring: Built-in pool statistics and monitoring
- Resource Management: Automatic connection lifecycle management
- Configuration: Optimized default settings for Snowflake
Comprehensive error handling includes:
- Connection Failures: Automatic retry and failover
- SQL Exceptions: Detailed error reporting and logging
- Resource Leaks: Automatic resource cleanup with try-with-resources
- Transaction Management: Proper transaction handling and rollback
# Run all tests
mvn test
# Run specific test class
mvn test -Dtest=SnowflakeComponentTest
# Run with coverage
mvn clean test jacoco:report- Unit Tests: Component logic and configuration (31 tests)
- Integration Tests: End-to-end component integration (4 tests)
- JDBC Tests: Database operations and connection pooling (9 tests)
- Mock Tests: Isolated component testing with mocks
The project includes a broad suite of unit and integration tests. Use your preferred coverage tooling (e.g., Jacoco) to generate reports locally.
- Password parameters marked as
secret = true - Connection strings properly escaped
- No sensitive data in logs
- Secure connection pooling
// Use environment variables for credentials
config.setUsername(System.getenv("SNOWFLAKE_USERNAME"));
config.setPassword(System.getenv("SNOWFLAKE_PASSWORD"));
// Enable SSL/TLS
config.buildJdbcUrl(); // Automatically includes SSL parameters- Connection Pool: 10x faster than direct connections
- Batch Operations: 5x faster than individual inserts
- Memory Usage: 50% reduction with proper pooling
- Throughput: 1000+ operations/second sustained
- Use Connection Pooling: Always use the built-in connection manager
- Batch Operations: Use batch methods for multiple operations
- Prepared Statements: Parameters are automatically prepared
- Resource Cleanup: Use try-with-resources or component lifecycle
// Production configuration
SnowflakeConfiguration config = new SnowflakeConfiguration();
config.setAccount(System.getenv("SNOWFLAKE_ACCOUNT"));
config.setUsername(System.getenv("SNOWFLAKE_USERNAME"));
config.setPassword(System.getenv("SNOWFLAKE_PASSWORD"));
config.setDatabase(System.getenv("SNOWFLAKE_DATABASE"));
config.setWarehouse(System.getenv("SNOWFLAKE_WAREHOUSE"));// Monitor connection pool
String poolStats = SnowflakeJdbcConnectionManager.getPoolStats(config);
log.info("Connection pool status: {}", poolStats);
// Test connectivity
boolean isConnected = SnowflakeJdbcOperations.testConnection(config);
log.info("Snowflake connection status: {}", isConnected);You can override most endpoint / configuration properties per exchange using headers. The producer copies the base SnowflakeConfiguration and applies any override headers before executing the query.
Supported override headers:
| Header | Purpose |
|---|---|
CamelSnowflakeAccount |
Override account identifier |
CamelSnowflakeUsername |
Override username |
CamelSnowflakePassword |
Override password (ignored if CamelSnowflakePrivateKey provided) |
CamelSnowflakePrivateKey |
Raw Base64 PKCS#8 private key (switches to key-pair auth) |
CamelSnowflakeDatabase |
Override database |
CamelSnowflakeSchema |
Override schema |
CamelSnowflakeWarehouse |
Override warehouse |
CamelSnowflakeRole |
Override role |
CamelSnowflakeQuery |
Override query (if body not used) |
CamelSnowflakeOperation |
Hint operation (select/insert/update/delete) – select detection is automatic |
CamelSnowflakeJdbcUrl |
Full JDBC URL override (e.g. for tests using H2) |
CamelSnowflakeAuthenticator |
Override authenticator (e.g. snowflake_jwt) |
CamelSnowflakeToken |
OAuth bearer token when using authenticator=oauth |
CamelSnowflakeParameterPrefix |
Override parameter binding header prefix |
CamelSnowflakeEnableParameterBinding |
Enable/disable parameter binding (boolean) |
Effective values are exposed after application for tracing:
CamelSnowflakeEffectiveAccount, ...EffectiveDatabase, ...EffectiveSchema, ...EffectiveWarehouse, ...EffectiveRole, ...EffectiveOperation, ...EffectiveQuery, ...EffectiveAuthenticator.
Example route with overrides:
from("direct:dynamic")
.setHeader("CamelSnowflakeQuery", constant("SELECT * FROM DEMO_TABLE WHERE ID = :#id"))
.setHeader("CamelSnowflakeWarehouse", simple("${header.RequestedWarehouse}"))
.setHeader("CamelSnowflakeEnableParameterBinding", constant(true))
.to("snowflake:demo");Then set headers per message:
exchange.getIn().setHeader("id", 42); // bound parameter
exchange.getIn().setHeader("RequestedWarehouse", "WH_XL");
exchange.getIn().setHeader("CamelSnowflakePrivateKey", System.getenv("SNOWFLAKE_PRIVATE_KEY"));Testing with in-memory H2 using a JDBC URL override:
exchange.getIn().setHeader("CamelSnowflakeJdbcUrl", "jdbc:h2:mem:test;DB_CLOSE_DELAY=-1");This allows fast unit tests without hitting Snowflake, while keeping the same producer logic.
Precedence order (lowest to highest):
- Base component / endpoint configuration (Java DSL / URI)
- Endpoint URI parameters set directly on the
SnowflakeEndpointinstance after creation - Per-message headers (listed above)
Headers always win; endpoint parameters override the original base configuration, allowing route-level tuning without code changes.
- Component defaults (code) and application properties
- Endpoint URI parameters (route-level)
- System properties (
-Dsnowflake.*,-Dsnowflake.jdbc.*) - Per-message headers (e.g.,
CamelSnowflake*, parameter binding headers)
- System properties are read lazily by getters, allowing you to inject secrets at launch time without changing routes.
- For exec:java, pass
-Dsnowflake.*at Maven CLI level rather than in-Dexec.jvmArgs.
- Core:
-Dsnowflake.account=... -Dsnowflake.username=... [-Dsnowflake.database=...] [-Dsnowflake.schema=...] [-Dsnowflake.warehouse=...] [-Dsnowflake.role=...] - Auth (password):
-Dsnowflake.password=... - Auth (key-pair contents):
-Dsnowflake.privateKey="$(cat private_key_pkcs8.pem | tr '\n' '\\n')" - Auth (key-pair file):
-Dsnowflake.privateKeyFile=/abs/path/private_key_pkcs8.pem - Auth (key-pair file + encrypted PEM):
-Dsnowflake.privateKeyFile=/abs/path/encrypted_key.pem -Dsnowflake.privateKeyFilePassword=changeit(legacy:-Dsnowflake.privateKeyPassword=) - Output:
-Dsnowflake.outputFormat=rows|json|xml|arrow - Param binding:
-Dsnowflake.enableParameterBinding=true -Dsnowflake.parameterPrefix=snowflake. - JDBC pass-through:
-Dsnowflake.jdbc.CLIENT_SESSION_KEEP_ALIVE=true
If your endpoint query uses :#param placeholders, you can place parameter values in headers using a configurable prefix. The default prefix is snowflake. so a header snowflake.id binds to :#id.
Example:
// Endpoint-level query with :#id
from("direct:lookupUser")
// Set header using default prefix so it binds to :#id
.setHeader("snowflake.id", simple("${header.userId}"))
.to("snowflake:lookup?query=SELECT%20NAME%20FROM%20USERS%20WHERE%20ID%20=%20:%23id");- Default
parameterPrefixissnowflake.and can be changed globally via configuration or per message usingCamelSnowflakeParameterPrefix. - Binding also accepts plain
idor Camel-styleCamelSnowflakeIdif you prefer not to use a prefix.
Alternative header name (Camel-style):
// No prefix required; Camel-style header binds to :#id
from("direct:lookupUserCamelStyle")
.setHeader("CamelSnowflakeId", constant(42))
.to("snowflake:lookup?query=SELECT%20NAME%20FROM%20USERS%20WHERE%20ID%20=%20:%23id");Override parameter prefix per message:
// Change prefix to "q." for this message and provide q.id
from("direct:lookupUserWithPrefixOverride")
.setHeader("CamelSnowflakeParameterPrefix", constant("q."))
.setHeader("q.id", simple("${header.userId}"))
.to("snowflake:lookup?query=SELECT%20NAME%20FROM%20USERS%20WHERE%20ID%20=%20:%23id");@Component
public class SnowflakeHealthIndicator implements HealthIndicator {
@Override
public Health health() {
try {
boolean connected = SnowflakeJdbcOperations.testConnection(config);
return connected ? Health.up().build() : Health.down().build();
} catch (Exception e) {
return Health.down(e).build();
}
}
}- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
# Clone the repository
git clone https://github.com/dscope/camel-snowflake.git
cd camel-snowflake
# Build and test
mvn clean compile
mvn test
# Generate documentation
mvn javadoc:javadocUse the helper script for local dev cycles so the sample always picks up the latest snapshot:
./dev-install.sh # installs & builds dynamic-query-yaml sample
./dev-install.sh dynamic-query-kotlin # pick a different sample
./dev-install.sh -U # force snapshot/plugin updatesThe script performs a root mvn -DskipTests install then builds the selected sample via the samples aggregator (samples/pom.xml).
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.
- Apache Camel
- Snowflake Documentation
- HikariCP
- Java 21 Documentation
- Javadoc (generated in target/apidocs)
For support and questions:
- Create an Issue
- Check the Wiki
- Contact: support@dscope.io
If you opt into Arrow by setting -Dsnowflake.outputFormat=arrow, some JVMs require additional --add-opens flags for Apache Arrow modules. If you encounter IllegalAccessErrors, try adding:
JAVA_TOOL_OPTIONS="--add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED"Alternatively, stay on the default JSON driver result format which avoids these flags and is sufficient for most workloads.
Built with ❤️ by the DScope team
-
VS Code tasks
- Build: run task "Build: Maven"
- Test: run task "Test: Maven"
-
Terminal
mvn -q -DskipTests package mvn -q test
- Launch configuration: "Debug: Snowflake Connection Tester"
- The tester reads credentials from environment variables and supports
.env/.env.localif present. - Recommended variables:
SNOWFLAKE_ACCOUNT,SNOWFLAKE_USERNAME- Either
SNOWFLAKE_PASSWORDorSNOWFLAKE_PRIVATE_KEY(raw Base64 PKCS#8) - Optional:
SNOWFLAKE_DATABASE,SNOWFLAKE_SCHEMA,SNOWFLAKE_WAREHOUSE,SNOWFLAKE_ROLE
Tests use H2 in-memory DB for hermetic runs. Snowflake-specific DATE(column) expressions are adjusted using CAST(column AS DATE) and date parameters are also cast as needed for H2.