-
Notifications
You must be signed in to change notification settings - Fork 1.1k
feat(jdbc): implement PreparedStatement #17027
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Young-Leo
commented
Jan 14, 2026
- Add Thrift RPC interfaces: TSPrepareReq/Resp, TSExecutePreparedReq, TSDeallocatePreparedReq
- Implement prepareStatement/executePreparedStatement/deallocatePreparedStatement in ClientRPCServiceImpl
- Refactor IoTDBPreparedStatement to use binary parameter serialization
- Extend Coordinator.executeForTableModel to support external parameters
- Eliminate SQL injection risk by separating parameters from SQL structure
… serialization logic to shared service-rpc module
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This pull request implements JDBC PreparedStatement support for IoTDB's table model, enabling parameterized query execution with proper SQL injection prevention. The implementation adds Thrift RPC interfaces for prepare/execute/deallocate operations, binary parameter serialization, and a new client-side PreparedStatement implementation.
Changes:
- Added Thrift RPC interfaces (TSPrepareReq/Resp, TSExecutePreparedReq, TSDeallocatePreparedReq) for prepared statement lifecycle management
- Implemented server-side RPC handlers in ClientRPCServiceImpl that parse SQL, cache AST, and execute with bound parameters
- Created IoTDBTablePreparedStatement for table dialect that uses binary parameter serialization instead of SQL string concatenation
- Refactored prepared statement management into PreparedStatementHelper for code reuse between RPC and ConfigTask paths
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| client.thrift | Defines Thrift RPC structures and service methods for prepared statement operations |
| PreparedStatementHelper.java | New helper class extracting common registration/unregistration logic for prepared statements |
| PrepareTask.java | Refactored to use PreparedStatementHelper |
| DeallocateTask.java | Refactored to use PreparedStatementHelper |
| ClientRPCServiceImpl.java | Implements three RPC methods: prepareStatement, executePreparedStatement, deallocatePreparedStatement |
| PreparedParameterSerializer.java | New serializer for binary encoding/decoding of prepared statement parameters |
| IoTDBTablePreparedStatementTest.java | Unit tests demonstrating SQL injection prevention |
| IoTDBPreparedStatementTest.java | Removed table model tests (moved to new test file) |
| IoTDBTablePreparedStatement.java | New JDBC PreparedStatement implementation for table model using RPC-based preparation |
| IoTDBStatement.java | Changed visibility of fields to protected for subclass access |
| IoTDBConnection.java | Routes prepareStatement() to IoTDBTablePreparedStatement for table dialect |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } catch (Exception e) { | ||
| return new TSPrepareResp( | ||
| onQueryException( | ||
| e, OperationType.EXECUTE_STATEMENT.getName(), TSStatusCode.INTERNAL_SERVER_ERROR)); |
Copilot
AI
Jan 19, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message uses OperationType.EXECUTE_STATEMENT for a PREPARE operation. This is misleading and could confuse users debugging errors. Consider using a more appropriate operation type like OperationType.PREPARE_STATEMENT or creating a new operation type if it doesn't exist.
| e, OperationType.EXECUTE_STATEMENT.getName(), TSStatusCode.INTERNAL_SERVER_ERROR)); | |
| e, OperationType.PREPARE_STATEMENT.getName(), TSStatusCode.INTERNAL_SERVER_ERROR)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); | ||
| } catch (Exception e) { | ||
| return onQueryException( | ||
| e, OperationType.EXECUTE_STATEMENT.getName(), TSStatusCode.INTERNAL_SERVER_ERROR); |
Copilot
AI
Jan 19, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message uses OperationType.EXECUTE_STATEMENT for a DEALLOCATE operation. This is misleading and could confuse users debugging errors. Consider using a more appropriate operation type like OperationType.DEALLOCATE_STATEMENT or creating a new operation type if it doesn't exist.
| e, OperationType.EXECUTE_STATEMENT.getName(), TSStatusCode.INTERNAL_SERVER_ERROR); | |
| e, OperationType.DEALLOCATE_STATEMENT.getName(), TSStatusCode.INTERNAL_SERVER_ERROR); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
|
||
| // Create and store PreparedStatementInfo | ||
| PreparedStatementInfo info = new PreparedStatementInfo(statementName, sql, memorySizeInBytes); | ||
| session.addPreparedStatement(statementName, info); |
Copilot
AI
Jan 19, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If session.addPreparedStatement throws an exception after PreparedStatementMemoryManager.allocate succeeds, the allocated memory will not be released, causing a memory leak. Consider wrapping the addPreparedStatement call in a try-catch block and releasing the allocated memory if it fails.
| session.addPreparedStatement(statementName, info); | |
| try { | |
| session.addPreparedStatement(statementName, info); | |
| } catch (RuntimeException e) { | |
| // Roll back memory allocation if registration fails | |
| PreparedStatementMemoryManager.getInstance().release(memorySizeInBytes); | |
| throw e; | |
| } |
| buffer.rewind(); | ||
| int count = buffer.getInt(); | ||
| List<DeserializedParam> result = new ArrayList<>(count); |
Copilot
AI
Jan 19, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The deserialize method does not validate the count parameter read from the buffer. A malicious or corrupted buffer with a very large count value could cause excessive memory allocation or out-of-memory errors. Consider adding a reasonable upper bound validation on count before allocating the result list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
| String tsStr = zonedDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME); | ||
| parameterValues[parameterIndex - 1] = tsStr; | ||
| parameterTypes[parameterIndex - 1] = Types.VARCHAR; | ||
| this.parameters.put(parameterIndex, tsStr); |
Copilot
AI
Jan 19, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The timestamp string is stored in the parameters map without quotes (line 448), while strings are stored with quotes (line 381). This inconsistency in the parameters map could cause issues if this map is used for SQL string construction or backward compatibility. Consider adding quotes around the timestamp string for consistency, or document why timestamps don't need quotes.
| this.parameters.put(parameterIndex, tsStr); | |
| this.parameters.put(parameterIndex, "\"" + tsStr + "\""); |
| if (finished) { | ||
| COORDINATOR.cleanupQueryExecution(queryId, null, t); | ||
| } | ||
| COORDINATOR.recordExecutionTime(queryId, currentOperationCost); |
Copilot
AI
Jan 19, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If an exception occurs before requestQueryId is called (line 1573), the queryId will remain Long.MIN_VALUE and cleanupQueryExecution will be called with this invalid ID in the finally block. While this might be handled gracefully by the Coordinator, it would be cleaner to only call cleanupQueryExecution if a valid queryId was obtained. Consider checking if queryId != Long.MIN_VALUE before calling cleanup.
| if (finished) { | |
| COORDINATOR.cleanupQueryExecution(queryId, null, t); | |
| } | |
| COORDINATOR.recordExecutionTime(queryId, currentOperationCost); | |
| if (queryId != Long.MIN_VALUE) { | |
| if (finished) { | |
| COORDINATOR.cleanupQueryExecution(queryId, null, t); | |
| } | |
| COORDINATOR.recordExecutionTime(queryId, currentOperationCost); | |
| } |
| public class IoTDBTablePreparedStatement extends IoTDBStatement implements PreparedStatement { | ||
|
|
||
| private static final Logger logger = LoggerFactory.getLogger(IoTDBTablePreparedStatement.class); | ||
| private static final String METHOD_NOT_SUPPORTED_STRING = "Method not supported"; | ||
|
|
||
| private final String sql; | ||
| private final String preparedStatementName; | ||
| private final int parameterCount; | ||
|
|
||
| // Parameter values stored as objects for binary serialization | ||
| private final Object[] parameterValues; | ||
| private final int[] parameterTypes; | ||
|
|
||
| /** save the SQL parameters as (paramLoc,paramValue) pairs for backward compatibility. */ | ||
| private final Map<Integer, String> parameters = new HashMap<>(); | ||
|
|
||
| IoTDBTablePreparedStatement( | ||
| IoTDBConnection connection, | ||
| Iface client, | ||
| Long sessionId, | ||
| String sql, | ||
| ZoneId zoneId, | ||
| Charset charset) | ||
| throws SQLException { | ||
| super(connection, client, sessionId, zoneId, charset); | ||
| this.sql = sql; | ||
| this.preparedStatementName = generateStatementName(); | ||
|
|
||
| // Send PREPARE request to server | ||
| TSPrepareReq prepareReq = new TSPrepareReq(); | ||
| prepareReq.setSessionId(sessionId); | ||
| prepareReq.setSql(sql); | ||
| prepareReq.setStatementName(preparedStatementName); | ||
|
|
||
| try { | ||
| TSPrepareResp resp = client.prepareStatement(prepareReq); | ||
| RpcUtils.verifySuccess(resp.getStatus()); | ||
|
|
||
| this.parameterCount = resp.isSetParameterCount() ? resp.getParameterCount() : 0; | ||
| this.parameterValues = new Object[parameterCount]; | ||
| this.parameterTypes = new int[parameterCount]; | ||
|
|
||
| // Initialize all parameter types to NULL | ||
| for (int i = 0; i < parameterCount; i++) { | ||
| parameterTypes[i] = Types.NULL; | ||
| } | ||
| } catch (TException | StatementExecutionException e) { | ||
| throw new SQLException("Failed to prepare statement: " + e.getMessage(), e); | ||
| } | ||
| } | ||
|
|
||
| // Only for tests | ||
| IoTDBTablePreparedStatement( | ||
| IoTDBConnection connection, Iface client, Long sessionId, String sql, ZoneId zoneId) | ||
| throws SQLException { | ||
| this(connection, client, sessionId, sql, zoneId, TSFileConfig.STRING_CHARSET); | ||
| } | ||
|
|
||
| private String generateStatementName() { | ||
| // StatementId is unique across all sessions in one IoTDB instance | ||
| return "jdbc_ps_" + getStmtId(); | ||
| } | ||
|
|
||
| @Override | ||
| public void addBatch() throws SQLException { | ||
| super.addBatch(createCompleteSql(sql, parameters)); | ||
| } | ||
|
|
||
| @Override | ||
| public void clearParameters() { | ||
| this.parameters.clear(); | ||
| for (int i = 0; i < parameterCount; i++) { | ||
| parameterValues[i] = null; | ||
| parameterTypes[i] = Types.NULL; | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public boolean execute() throws SQLException { | ||
| TSExecuteStatementResp resp = executeInternal(); | ||
| return resp.isSetQueryDataSet() || resp.isSetQueryResult(); | ||
| } | ||
|
|
||
| @Override | ||
| public ResultSet executeQuery() throws SQLException { | ||
| TSExecuteStatementResp resp = executeInternal(); | ||
| return processQueryResult(resp); | ||
| } | ||
|
|
||
| @Override | ||
| public int executeUpdate() throws SQLException { | ||
| executeInternal(); | ||
| return 0; // IoTDB doesn't return affected row count | ||
| } | ||
|
|
||
| private TSExecuteStatementResp executeInternal() throws SQLException { | ||
| // Validate all parameters are set | ||
| for (int i = 0; i < parameterCount; i++) { | ||
| if (parameterTypes[i] == Types.NULL | ||
| && parameterValues[i] == null | ||
| && !parameters.containsKey(i + 1)) { | ||
| throw new SQLException("Parameter #" + (i + 1) + " is unset"); | ||
| } | ||
| } | ||
|
|
||
| TSExecutePreparedReq req = new TSExecutePreparedReq(); | ||
| req.setSessionId(sessionId); | ||
| req.setStatementName(preparedStatementName); | ||
| req.setParameters( | ||
| PreparedParameterSerializer.serialize(parameterValues, parameterTypes, parameterCount)); | ||
| req.setStatementId(getStmtId()); | ||
|
|
||
| if (queryTimeout > 0) { | ||
| req.setTimeout(queryTimeout * 1000L); | ||
| } | ||
|
|
||
| try { | ||
| TSExecuteStatementResp resp = client.executePreparedStatement(req); | ||
| RpcUtils.verifySuccess(resp.getStatus()); | ||
| return resp; | ||
| } catch (TException e) { | ||
| throw new SQLException("Failed to execute prepared statement: " + e.getMessage(), e); | ||
| } catch (StatementExecutionException e) { | ||
| throw new SQLException("Failed to execute prepared statement: " + e.getMessage(), e); | ||
| } | ||
| } | ||
|
|
||
| private ResultSet processQueryResult(TSExecuteStatementResp resp) throws SQLException { | ||
| if (resp.isSetQueryDataSet() || resp.isSetQueryResult()) { | ||
| // Create ResultSet from response | ||
| this.resultSet = | ||
| new IoTDBJDBCResultSet( | ||
| this, | ||
| resp.getColumns(), | ||
| resp.getDataTypeList(), | ||
| resp.columnNameIndexMap, | ||
| resp.ignoreTimeStamp, | ||
| client, | ||
| sql, | ||
| resp.queryId, | ||
| sessionId, | ||
| resp.queryResult, | ||
| resp.tracingInfo, | ||
| (long) queryTimeout * 1000, | ||
| resp.isSetMoreData() && resp.isMoreData(), | ||
| zoneId); | ||
| return resultSet; | ||
| } | ||
| return null; | ||
| } | ||
|
|
||
| @Override | ||
| public void close() throws SQLException { | ||
| if (!isClosed()) { | ||
| // Deallocate prepared statement on server | ||
| TSDeallocatePreparedReq req = new TSDeallocatePreparedReq(); | ||
| req.setSessionId(sessionId); | ||
| req.setStatementName(preparedStatementName); | ||
|
|
||
| try { | ||
| TSStatus status = client.deallocatePreparedStatement(req); | ||
| if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { | ||
| logger.warn("Failed to deallocate prepared statement: {}", status.getMessage()); | ||
| } | ||
| } catch (TException e) { | ||
| logger.warn("Error deallocating prepared statement", e); | ||
| } | ||
| } | ||
| super.close(); | ||
| } | ||
|
|
||
| @Override | ||
| public ResultSetMetaData getMetaData() throws SQLException { | ||
| if (resultSet != null) { | ||
| return resultSet.getMetaData(); | ||
| } | ||
| return null; | ||
| } | ||
|
|
||
| @Override | ||
| public ParameterMetaData getParameterMetaData() { | ||
| return new ParameterMetaData() { | ||
| @Override | ||
| public int getParameterCount() { | ||
| return parameterCount; | ||
| } | ||
|
|
||
| @Override | ||
| public int isNullable(int param) { | ||
| return ParameterMetaData.parameterNullableUnknown; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isSigned(int param) { | ||
| int type = parameterTypes[param - 1]; | ||
| return type == Types.INTEGER | ||
| || type == Types.BIGINT | ||
| || type == Types.FLOAT | ||
| || type == Types.DOUBLE; | ||
| } | ||
|
|
||
| @Override | ||
| public int getPrecision(int param) { | ||
| return 0; | ||
| } | ||
|
|
||
| @Override | ||
| public int getScale(int param) { | ||
| return 0; | ||
| } | ||
|
|
||
| @Override | ||
| public int getParameterType(int param) { | ||
| return parameterTypes[param - 1]; | ||
| } | ||
|
|
||
| @Override | ||
| public String getParameterTypeName(int param) { | ||
| return null; | ||
| } | ||
|
|
||
| @Override | ||
| public String getParameterClassName(int param) { | ||
| return null; | ||
| } | ||
|
|
||
| @Override | ||
| public int getParameterMode(int param) { | ||
| return ParameterMetaData.parameterModeIn; | ||
| } | ||
|
|
||
| @Override | ||
| public <T> T unwrap(Class<T> iface) { | ||
| return null; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isWrapperFor(Class<?> iface) { | ||
| return false; | ||
| } | ||
| }; | ||
| } | ||
|
|
||
| // ================== Parameter Setters ================== | ||
|
|
||
| @Override | ||
| public void setNull(int parameterIndex, int sqlType) throws SQLException { | ||
| checkParameterIndex(parameterIndex); | ||
| parameterValues[parameterIndex - 1] = null; | ||
| parameterTypes[parameterIndex - 1] = Types.NULL; | ||
| this.parameters.put(parameterIndex, "NULL"); | ||
| } | ||
|
|
||
| @Override | ||
| public void setNull(int parameterIndex, int sqlType, String typeName) throws SQLException { | ||
| setNull(parameterIndex, sqlType); | ||
| } | ||
|
|
||
| @Override | ||
| public void setBoolean(int parameterIndex, boolean x) throws SQLException { | ||
| checkParameterIndex(parameterIndex); | ||
| parameterValues[parameterIndex - 1] = x; | ||
| parameterTypes[parameterIndex - 1] = Types.BOOLEAN; | ||
| this.parameters.put(parameterIndex, Boolean.toString(x)); | ||
| } | ||
|
|
||
| @Override | ||
| public void setInt(int parameterIndex, int x) throws SQLException { | ||
| checkParameterIndex(parameterIndex); | ||
| parameterValues[parameterIndex - 1] = x; | ||
| parameterTypes[parameterIndex - 1] = Types.INTEGER; | ||
| this.parameters.put(parameterIndex, Integer.toString(x)); | ||
| } | ||
|
|
||
| @Override | ||
| public void setLong(int parameterIndex, long x) throws SQLException { | ||
| checkParameterIndex(parameterIndex); | ||
| parameterValues[parameterIndex - 1] = x; | ||
| parameterTypes[parameterIndex - 1] = Types.BIGINT; | ||
| this.parameters.put(parameterIndex, Long.toString(x)); | ||
| } | ||
|
|
||
| @Override | ||
| public void setFloat(int parameterIndex, float x) throws SQLException { | ||
| checkParameterIndex(parameterIndex); | ||
| parameterValues[parameterIndex - 1] = x; | ||
| parameterTypes[parameterIndex - 1] = Types.FLOAT; | ||
| this.parameters.put(parameterIndex, Float.toString(x)); | ||
| } | ||
|
|
||
| @Override | ||
| public void setDouble(int parameterIndex, double x) throws SQLException { | ||
| checkParameterIndex(parameterIndex); | ||
| parameterValues[parameterIndex - 1] = x; | ||
| parameterTypes[parameterIndex - 1] = Types.DOUBLE; | ||
| this.parameters.put(parameterIndex, Double.toString(x)); | ||
| } | ||
|
|
||
| @Override | ||
| public void setString(int parameterIndex, String x) throws SQLException { | ||
| checkParameterIndex(parameterIndex); | ||
| parameterValues[parameterIndex - 1] = x; | ||
| parameterTypes[parameterIndex - 1] = Types.VARCHAR; | ||
| if (x == null) { | ||
| this.parameters.put(parameterIndex, null); | ||
| } else { | ||
| this.parameters.put(parameterIndex, "'" + escapeSingleQuotes(x) + "'"); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void setBytes(int parameterIndex, byte[] x) throws SQLException { | ||
| checkParameterIndex(parameterIndex); | ||
| parameterValues[parameterIndex - 1] = x; | ||
| parameterTypes[parameterIndex - 1] = Types.BINARY; | ||
| Binary binary = new Binary(x); | ||
| this.parameters.put(parameterIndex, binary.getStringValue(TSFileConfig.STRING_CHARSET)); | ||
| } | ||
|
|
||
| @Override | ||
| public void setDate(int parameterIndex, Date x) throws SQLException { | ||
| checkParameterIndex(parameterIndex); | ||
| DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); | ||
| String dateStr = dateFormat.format(x); | ||
| parameterValues[parameterIndex - 1] = dateStr; | ||
| parameterTypes[parameterIndex - 1] = Types.VARCHAR; | ||
| this.parameters.put(parameterIndex, "'" + dateStr + "'"); | ||
| } | ||
|
|
||
| @Override | ||
| public void setDate(int parameterIndex, Date x, Calendar cal) throws SQLException { | ||
| setDate(parameterIndex, x); | ||
| } | ||
|
|
||
| @Override | ||
| public void setTime(int parameterIndex, Time x) throws SQLException { | ||
| checkParameterIndex(parameterIndex); | ||
| try { | ||
| long time = x.getTime(); | ||
| String timeprecision = client.getProperties().getTimestampPrecision(); | ||
| switch (timeprecision.toLowerCase()) { | ||
| case "ms": | ||
| break; | ||
| case "us": | ||
| time = time * 1000; | ||
| break; | ||
| case "ns": | ||
| time = time * 1000000; | ||
| break; | ||
| default: | ||
| break; | ||
| } | ||
| parameterValues[parameterIndex - 1] = time; | ||
| parameterTypes[parameterIndex - 1] = Types.BIGINT; | ||
| this.parameters.put(parameterIndex, Long.toString(time)); | ||
| } catch (TException e) { | ||
| throw new SQLException("Failed to get time precision: " + e.getMessage(), e); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void setTime(int parameterIndex, Time x, Calendar cal) throws SQLException { | ||
| setTime(parameterIndex, x); | ||
| } | ||
|
|
||
| @Override | ||
| public void setTimestamp(int parameterIndex, Timestamp x) throws SQLException { | ||
| checkParameterIndex(parameterIndex); | ||
| ZonedDateTime zonedDateTime = | ||
| ZonedDateTime.ofInstant(Instant.ofEpochMilli(x.getTime()), super.zoneId); | ||
| String tsStr = zonedDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME); | ||
| parameterValues[parameterIndex - 1] = tsStr; | ||
| parameterTypes[parameterIndex - 1] = Types.VARCHAR; | ||
| this.parameters.put(parameterIndex, tsStr); | ||
| } | ||
|
|
||
| @Override | ||
| public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws SQLException { | ||
| setTimestamp(parameterIndex, x); | ||
| } | ||
|
|
||
| @Override | ||
| public void setObject(int parameterIndex, Object x) throws SQLException { | ||
| if (x == null) { | ||
| setNull(parameterIndex, Types.NULL); | ||
| } else if (x instanceof String) { | ||
| setString(parameterIndex, (String) x); | ||
| } else if (x instanceof Integer) { | ||
| setInt(parameterIndex, (Integer) x); | ||
| } else if (x instanceof Long) { | ||
| setLong(parameterIndex, (Long) x); | ||
| } else if (x instanceof Float) { | ||
| setFloat(parameterIndex, (Float) x); | ||
| } else if (x instanceof Double) { | ||
| setDouble(parameterIndex, (Double) x); | ||
| } else if (x instanceof Boolean) { | ||
| setBoolean(parameterIndex, (Boolean) x); | ||
| } else if (x instanceof Timestamp) { | ||
| setTimestamp(parameterIndex, (Timestamp) x); | ||
| } else if (x instanceof Date) { | ||
| setDate(parameterIndex, (Date) x); | ||
| } else if (x instanceof Time) { | ||
| setTime(parameterIndex, (Time) x); | ||
| } else if (x instanceof byte[]) { | ||
| setBytes(parameterIndex, (byte[]) x); | ||
| } else { | ||
| throw new SQLException( | ||
| String.format( | ||
| "Can't infer the SQL type for an instance of %s. Use setObject() with explicit type.", | ||
| x.getClass().getName())); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void setObject(int parameterIndex, Object x, int targetSqlType) throws SQLException { | ||
| setObject(parameterIndex, x); | ||
| } | ||
|
|
||
| @Override | ||
| public void setObject(int parameterIndex, Object parameterObj, int targetSqlType, int scale) | ||
| throws SQLException { | ||
| setObject(parameterIndex, parameterObj); | ||
| } | ||
|
|
||
| private void checkParameterIndex(int index) throws SQLException { | ||
| if (index < 1 || index > parameterCount) { | ||
| throw new SQLException( | ||
| "Parameter index out of range: " + index + " (expected 1-" + parameterCount + ")"); | ||
| } | ||
| } | ||
|
|
||
| private String escapeSingleQuotes(String value) { | ||
| return value.replace("'", "''"); | ||
| } | ||
|
|
||
| // ================== Unsupported Methods ================== | ||
|
|
||
| @Override | ||
| public void setArray(int parameterIndex, Array x) throws SQLException { | ||
| throw new SQLException(Constant.PARAMETER_SUPPORTED); | ||
| } | ||
|
|
||
| @Override | ||
| public void setAsciiStream(int parameterIndex, InputStream x) throws SQLException { | ||
| throw new SQLException(Constant.PARAMETER_SUPPORTED); | ||
| } | ||
|
|
||
| @Override | ||
| public void setAsciiStream(int parameterIndex, InputStream x, int length) throws SQLException { | ||
| throw new SQLException(Constant.PARAMETER_SUPPORTED); | ||
| } | ||
|
|
||
| @Override | ||
| public void setAsciiStream(int parameterIndex, InputStream x, long length) throws SQLException { | ||
| throw new SQLException(Constant.PARAMETER_SUPPORTED); | ||
| } | ||
|
|
||
| @Override | ||
| public void setBigDecimal(int parameterIndex, BigDecimal x) throws SQLException { | ||
| throw new SQLException(Constant.PARAMETER_SUPPORTED); | ||
| } | ||
|
|
||
| @Override | ||
| public void setBinaryStream(int parameterIndex, InputStream x) throws SQLException { | ||
| throw new SQLException(Constant.PARAMETER_SUPPORTED); | ||
| } | ||
|
|
||
| @Override | ||
| public void setBinaryStream(int parameterIndex, InputStream x, int length) throws SQLException { | ||
| try { | ||
| byte[] bytes = ReadWriteIOUtils.readBytes(x, length); | ||
| setBytes(parameterIndex, bytes); | ||
| } catch (IOException e) { | ||
| throw new SQLException("Failed to read binary stream: " + e.getMessage(), e); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void setBinaryStream(int parameterIndex, InputStream x, long length) throws SQLException { | ||
| throw new SQLException(Constant.PARAMETER_SUPPORTED); | ||
| } | ||
|
|
||
| @Override | ||
| public void setBlob(int parameterIndex, Blob x) throws SQLException { | ||
| throw new SQLException(Constant.PARAMETER_SUPPORTED); | ||
| } | ||
|
|
||
| @Override | ||
| public void setBlob(int parameterIndex, InputStream inputStream) throws SQLException { | ||
| throw new SQLException(Constant.PARAMETER_SUPPORTED); | ||
| } | ||
|
|
||
| @Override | ||
| public void setBlob(int parameterIndex, InputStream inputStream, long length) | ||
| throws SQLException { | ||
| throw new SQLException(Constant.PARAMETER_SUPPORTED); | ||
| } | ||
|
|
||
| @Override | ||
| public void setByte(int parameterIndex, byte x) throws SQLException { | ||
| throw new SQLException(Constant.PARAMETER_SUPPORTED); | ||
| } | ||
|
|
||
| @Override | ||
| public void setCharacterStream(int parameterIndex, Reader reader) throws SQLException { | ||
| throw new SQLException(Constant.PARAMETER_SUPPORTED); | ||
| } | ||
|
|
||
| @Override | ||
| public void setCharacterStream(int parameterIndex, Reader reader, int length) | ||
| throws SQLException { | ||
| throw new SQLException(Constant.PARAMETER_SUPPORTED); | ||
| } | ||
|
|
||
| @Override | ||
| public void setCharacterStream(int parameterIndex, Reader reader, long length) | ||
| throws SQLException { | ||
| throw new SQLException(Constant.PARAMETER_SUPPORTED); | ||
| } | ||
|
|
||
| @Override | ||
| public void setClob(int parameterIndex, Clob x) throws SQLException { | ||
| throw new SQLException(Constant.PARAMETER_SUPPORTED); | ||
| } | ||
|
|
||
| @Override | ||
| public void setClob(int parameterIndex, Reader reader) throws SQLException { | ||
| throw new SQLException(Constant.PARAMETER_SUPPORTED); | ||
| } | ||
|
|
||
| @Override | ||
| public void setClob(int parameterIndex, Reader reader, long length) throws SQLException { | ||
| throw new SQLException(Constant.PARAMETER_SUPPORTED); | ||
| } | ||
|
|
||
| @Override | ||
| public void setNCharacterStream(int parameterIndex, Reader value) throws SQLException { | ||
| throw new SQLException(Constant.PARAMETER_SUPPORTED); | ||
| } | ||
|
|
||
| @Override | ||
| public void setNCharacterStream(int parameterIndex, Reader value, long length) | ||
| throws SQLException { | ||
| throw new SQLException(Constant.PARAMETER_SUPPORTED); | ||
| } | ||
|
|
||
| @Override | ||
| public void setNClob(int parameterIndex, NClob value) throws SQLException { | ||
| throw new SQLException(Constant.PARAMETER_SUPPORTED); | ||
| } | ||
|
|
||
| @Override | ||
| public void setNClob(int parameterIndex, Reader reader) throws SQLException { | ||
| throw new SQLException(Constant.PARAMETER_SUPPORTED); | ||
| } | ||
|
|
||
| @Override | ||
| public void setNClob(int parameterIndex, Reader reader, long length) throws SQLException { | ||
| throw new SQLException(Constant.PARAMETER_SUPPORTED); | ||
| } | ||
|
|
||
| @Override | ||
| public void setNString(int parameterIndex, String value) throws SQLException { | ||
| throw new SQLException(Constant.PARAMETER_SUPPORTED); | ||
| } | ||
|
|
||
| @Override | ||
| public void setRef(int parameterIndex, Ref x) throws SQLException { | ||
| throw new SQLException(Constant.PARAMETER_SUPPORTED); | ||
| } | ||
|
|
||
| @Override | ||
| public void setRowId(int parameterIndex, RowId x) throws SQLException { | ||
| throw new SQLException(METHOD_NOT_SUPPORTED_STRING); | ||
| } | ||
|
|
||
| @Override | ||
| public void setSQLXML(int parameterIndex, SQLXML xmlObject) throws SQLException { | ||
| throw new SQLException(METHOD_NOT_SUPPORTED_STRING); | ||
| } | ||
|
|
||
| @Override | ||
| public void setShort(int parameterIndex, short x) throws SQLException { | ||
| setInt(parameterIndex, x); | ||
| } | ||
|
|
||
| @Override | ||
| public void setURL(int parameterIndex, URL x) throws SQLException { | ||
| throw new SQLException(Constant.PARAMETER_SUPPORTED); | ||
| } | ||
|
|
||
| @Override | ||
| public void setUnicodeStream(int parameterIndex, InputStream x, int length) throws SQLException { | ||
| throw new SQLException(Constant.PARAMETER_SUPPORTED); | ||
| } | ||
|
|
||
| // ================== Helper Methods for Backward Compatibility ================== | ||
|
|
||
| private String createCompleteSql(final String sql, Map<Integer, String> parameters) | ||
| throws SQLException { | ||
| List<String> parts = splitSqlStatement(sql); | ||
|
|
||
| StringBuilder newSql = new StringBuilder(parts.get(0)); | ||
| for (int i = 1; i < parts.size(); i++) { | ||
| if (!parameters.containsKey(i)) { | ||
| throw new SQLException("Parameter #" + i + " is unset"); | ||
| } | ||
| newSql.append(parameters.get(i)); | ||
| newSql.append(parts.get(i)); | ||
| } | ||
| return newSql.toString(); | ||
| } | ||
|
|
||
| private List<String> splitSqlStatement(final String sql) { | ||
| List<String> parts = new ArrayList<>(); | ||
| int apCount = 0; | ||
| int off = 0; | ||
| boolean skip = false; | ||
|
|
||
| for (int i = 0; i < sql.length(); i++) { | ||
| char c = sql.charAt(i); | ||
| if (skip) { | ||
| skip = false; | ||
| continue; | ||
| } | ||
| switch (c) { | ||
| case '\'': | ||
| apCount++; | ||
| break; | ||
| case '\\': | ||
| skip = true; | ||
| break; | ||
| case '?': | ||
| if ((apCount & 1) == 0) { | ||
| parts.add(sql.substring(off, i)); | ||
| off = i + 1; | ||
| } | ||
| break; | ||
| default: | ||
| break; | ||
| } | ||
| } | ||
| parts.add(sql.substring(off)); | ||
| return parts; | ||
| } | ||
| } |
Copilot
AI
Jan 19, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new IoTDBTablePreparedStatement class and the RPC methods (prepareStatement, executePreparedStatement, deallocatePreparedStatement) lack integration tests. While unit tests exist for the client-side implementation and integration tests exist for the SQL PREPARE/EXECUTE syntax, there are no tests that exercise the JDBC PreparedStatement API end-to-end. Consider adding integration tests that use the JDBC API to prepare and execute statements with parameters.
| public class PreparedParameterSerializer { | ||
|
|
||
| /** Deserialized parameter holding type and value. */ | ||
| public static class DeserializedParam { | ||
| public final TSDataType type; | ||
| public final Object value; | ||
|
|
||
| DeserializedParam(TSDataType type, Object value) { | ||
| this.type = type; | ||
| this.value = value; | ||
| } | ||
|
|
||
| public boolean isNull() { | ||
| return type == TSDataType.UNKNOWN || value == null; | ||
| } | ||
| } | ||
|
|
||
| private PreparedParameterSerializer() {} | ||
|
|
||
| // ================== Serialize (Client Side) ================== | ||
|
|
||
| /** | ||
| * Serialize parameters to binary format. | ||
| * | ||
| * @param values parameter values | ||
| * @param jdbcTypes JDBC type codes (java.sql.Types) | ||
| * @param count number of parameters | ||
| * @return ByteBuffer containing serialized parameters | ||
| */ | ||
| public static ByteBuffer serialize(Object[] values, int[] jdbcTypes, int count) { | ||
| try { | ||
| ByteArrayOutputStream baos = new ByteArrayOutputStream(); | ||
| DataOutputStream dos = new DataOutputStream(baos); | ||
|
|
||
| dos.writeInt(count); | ||
| for (int i = 0; i < count; i++) { | ||
| serializeParameter(dos, values[i], jdbcTypes[i]); | ||
| } | ||
|
|
||
| dos.flush(); | ||
| return ByteBuffer.wrap(baos.toByteArray()); | ||
| } catch (IOException e) { | ||
| throw new RuntimeException("Failed to serialize parameters", e); | ||
| } | ||
| } | ||
|
|
||
| private static void serializeParameter(DataOutputStream dos, Object value, int jdbcType) | ||
| throws IOException { | ||
| if (value == null || jdbcType == Types.NULL) { | ||
| dos.writeByte(TSDataType.UNKNOWN.serialize()); | ||
| return; | ||
| } | ||
|
|
||
| switch (jdbcType) { | ||
| case Types.BOOLEAN: | ||
| dos.writeByte(TSDataType.BOOLEAN.serialize()); | ||
| dos.writeByte((Boolean) value ? 1 : 0); | ||
| break; | ||
|
|
||
| case Types.INTEGER: | ||
| dos.writeByte(TSDataType.INT32.serialize()); | ||
| dos.writeInt(((Number) value).intValue()); | ||
| break; | ||
|
|
||
| case Types.BIGINT: | ||
| dos.writeByte(TSDataType.INT64.serialize()); | ||
| dos.writeLong(((Number) value).longValue()); | ||
| break; | ||
|
|
||
| case Types.FLOAT: | ||
| dos.writeByte(TSDataType.FLOAT.serialize()); | ||
| dos.writeFloat(((Number) value).floatValue()); | ||
| break; | ||
|
|
||
| case Types.DOUBLE: | ||
| dos.writeByte(TSDataType.DOUBLE.serialize()); | ||
| dos.writeDouble(((Number) value).doubleValue()); | ||
| break; | ||
|
|
||
| case Types.VARCHAR: | ||
| case Types.CHAR: | ||
| byte[] strBytes = ((String) value).getBytes(StandardCharsets.UTF_8); | ||
| dos.writeByte(TSDataType.STRING.serialize()); | ||
| dos.writeInt(strBytes.length); | ||
| dos.write(strBytes); | ||
| break; | ||
|
|
||
| case Types.BINARY: | ||
| case Types.VARBINARY: | ||
| byte[] binBytes = (byte[]) value; | ||
| dos.writeByte(TSDataType.BLOB.serialize()); | ||
| dos.writeInt(binBytes.length); | ||
| dos.write(binBytes); | ||
| break; | ||
|
|
||
| default: | ||
| byte[] defaultBytes = String.valueOf(value).getBytes(StandardCharsets.UTF_8); | ||
| dos.writeByte(TSDataType.STRING.serialize()); | ||
| dos.writeInt(defaultBytes.length); | ||
| dos.write(defaultBytes); | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| // ================== Deserialize (Server Side) ================== | ||
|
|
||
| /** | ||
| * Deserialize parameters from binary format. | ||
| * | ||
| * @param buffer ByteBuffer containing serialized parameters | ||
| * @return list of deserialized parameters with type and value | ||
| */ | ||
| public static List<DeserializedParam> deserialize(ByteBuffer buffer) { | ||
| if (buffer == null || buffer.remaining() == 0) { | ||
| return new ArrayList<>(); | ||
| } | ||
|
|
||
| buffer.rewind(); | ||
| int count = buffer.getInt(); | ||
| List<DeserializedParam> result = new ArrayList<>(count); | ||
|
|
||
| for (int i = 0; i < count; i++) { | ||
| byte typeCode = buffer.get(); | ||
| TSDataType type = TSDataType.deserialize(typeCode); | ||
| Object value = deserializeValue(buffer, type); | ||
| result.add(new DeserializedParam(type, value)); | ||
| } | ||
|
|
||
| return result; | ||
| } | ||
|
|
||
| private static Object deserializeValue(ByteBuffer buffer, TSDataType type) { | ||
| switch (type) { | ||
| case UNKNOWN: | ||
| return null; | ||
| case BOOLEAN: | ||
| return buffer.get() != 0; | ||
| case INT32: | ||
| return buffer.getInt(); | ||
| case INT64: | ||
| return buffer.getLong(); | ||
| case FLOAT: | ||
| return buffer.getFloat(); | ||
| case DOUBLE: | ||
| return buffer.getDouble(); | ||
| case TEXT: | ||
| case STRING: | ||
| int strLen = buffer.getInt(); | ||
| byte[] strBytes = new byte[strLen]; | ||
| buffer.get(strBytes); | ||
| return new String(strBytes, StandardCharsets.UTF_8); | ||
| case BLOB: | ||
| int binLen = buffer.getInt(); | ||
| byte[] binBytes = new byte[binLen]; | ||
| buffer.get(binBytes); | ||
| return binBytes; | ||
| default: | ||
| throw new IllegalArgumentException("Unsupported type: " + type); | ||
| } | ||
| } | ||
| } |
Copilot
AI
Jan 19, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PreparedParameterSerializer lacks unit tests for its serialization and deserialization logic. Consider adding unit tests to verify correct handling of all data types, null values, empty parameter lists, and edge cases like very large strings or binary data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added