Skip to content

Commit bb33242

Browse files
committed
[FLINK-39401][formats] Extend raw format to support line-delimiter option
1 parent c0265db commit bb33242

8 files changed

Lines changed: 429 additions & 7 deletions

File tree

docs/content.zh/docs/connectors/table/formats/raw.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,17 @@ Format 参数
105105
<td>指定字节序来编码数字值的字节。有效值为'big-endian'和'little-endian'。
106106
更多细节可查阅 <a href="https://zh.wikipedia.org/wiki/字节序">字节序</a>。</td>
107107
</tr>
108+
<tr>
109+
<td><h5>raw.line-delimiter</h5></td>
110+
<td>可选</td>
111+
<td style="word-wrap: break-word;">(无)</td>
112+
<td>String</td>
113+
<td>指定行分隔符,用于在反序列化时将一条消息拆分为多行。设置后,每条消息将使用
114+
'raw.charset' 解码,并按此分隔符切分,每段输出一条数据行。在序列化时,分隔符字节会被
115+
追加到每条序列化值的末尾。常用值为 '\n'(换行符)或 '||'。
116+
<br><strong>注意:</strong>当序列化与反序列化使用相同的分隔符配置时,两者具有
117+
round-trip 兼容性:序列化器追加的末尾分隔符会在反序列化时自动被去除。</td>
118+
</tr>
108119
</tbody>
109120
</table>
110121

docs/content/docs/connectors/table/formats/raw.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,19 @@ Format Options
105105
<td>Specify the endianness to encode the bytes of numeric value. Valid values are 'big-endian' and 'little-endian'.
106106
See more details of <a href="https://en.wikipedia.org/wiki/Endianness">endianness</a>.</td>
107107
</tr>
108+
<tr>
109+
<td><h5>raw.line-delimiter</h5></td>
110+
<td>optional</td>
111+
<td style="word-wrap: break-word;">(none)</td>
112+
<td>String</td>
113+
<td>Specify the line delimiter for splitting incoming messages into multiple rows during
114+
deserialization. When set, each incoming message is decoded using 'raw.charset' and then split
115+
by this delimiter; one row is emitted per segment. During serialization, the delimiter bytes
116+
are appended to each serialized value. Common values are '\n' (newline) or '||'.
117+
<br><strong>Note:</strong> When the same delimiter is configured for both serialization and
118+
deserialization, the two are round-trip compatible: a trailing delimiter appended by the
119+
serializer is automatically stripped during deserialization.</td>
120+
</tr>
108121
</tbody>
109122
</table>
110123

flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,16 @@
2929
import org.apache.flink.table.data.StringData;
3030
import org.apache.flink.table.types.logical.LogicalType;
3131
import org.apache.flink.types.DeserializationException;
32+
import org.apache.flink.util.Collector;
33+
34+
import javax.annotation.Nullable;
3235

3336
import java.io.IOException;
3437
import java.io.Serializable;
3538
import java.nio.charset.Charset;
3639
import java.nio.charset.StandardCharsets;
3740
import java.util.Objects;
41+
import java.util.regex.Pattern;
3842

3943
import static org.apache.flink.util.Preconditions.checkNotNull;
4044

@@ -55,6 +59,14 @@ public class RawFormatDeserializationSchema implements DeserializationSchema<Row
5559

5660
private final boolean isBigEndian;
5761

62+
@Nullable private final String lineDelimiter;
63+
64+
/**
65+
* Pre-compiled pattern for splitting by {@link #lineDelimiter}, or {@code null} if no
66+
* delimiter.
67+
*/
68+
@Nullable private final Pattern lineDelimiterPattern;
69+
5870
private final DeserializationRuntimeConverter converter;
5971

6072
private final DataLengthValidator validator;
@@ -64,12 +76,24 @@ public RawFormatDeserializationSchema(
6476
TypeInformation<RowData> producedTypeInfo,
6577
String charsetName,
6678
boolean isBigEndian) {
79+
this(deserializedType, producedTypeInfo, charsetName, isBigEndian, null);
80+
}
81+
82+
public RawFormatDeserializationSchema(
83+
LogicalType deserializedType,
84+
TypeInformation<RowData> producedTypeInfo,
85+
String charsetName,
86+
boolean isBigEndian,
87+
@Nullable String lineDelimiter) {
6788
this.deserializedType = checkNotNull(deserializedType);
6889
this.producedTypeInfo = checkNotNull(producedTypeInfo);
6990
this.converter = createConverter(deserializedType, charsetName, isBigEndian);
7091
this.validator = createDataLengthValidator(deserializedType);
7192
this.charsetName = charsetName;
7293
this.isBigEndian = isBigEndian;
94+
this.lineDelimiter = lineDelimiter;
95+
this.lineDelimiterPattern =
96+
lineDelimiter != null ? Pattern.compile(Pattern.quote(lineDelimiter)) : null;
7397
}
7498

7599
@Override
@@ -92,6 +116,41 @@ public RowData deserialize(byte[] message) throws IOException {
92116
return rowData;
93117
}
94118

119+
@Override
120+
public void deserialize(byte[] message, Collector<RowData> out) throws IOException {
121+
if (lineDelimiter == null) {
122+
// no delimiter: default single-record behavior
123+
RowData row = deserialize(message);
124+
if (row != null) {
125+
out.collect(row);
126+
}
127+
return;
128+
}
129+
130+
if (message == null) {
131+
return;
132+
}
133+
134+
Charset charset = Charset.forName(charsetName);
135+
String decoded = new String(message, charset);
136+
// Use pre-compiled pattern. Split with -1 to keep intentional empty middle segments,
137+
// but strip the single trailing empty string produced when the message ends with the
138+
// delimiter (e.g. a serializer that appends one delimiter per row).
139+
String[] parts = lineDelimiterPattern.split(decoded, -1);
140+
int count = parts.length;
141+
if (count > 0 && parts[count - 1].isEmpty()) {
142+
count--;
143+
}
144+
for (int i = 0; i < count; i++) {
145+
byte[] partBytes = parts[i].getBytes(charset);
146+
validator.validate(partBytes);
147+
Object field = converter.convert(partBytes);
148+
GenericRowData rowData = new GenericRowData(1);
149+
rowData.setField(0, field);
150+
out.collect(rowData);
151+
}
152+
}
153+
95154
@Override
96155
public boolean isEndOfStream(RowData nextElement) {
97156
return false;
@@ -114,12 +173,14 @@ public boolean equals(Object o) {
114173
return producedTypeInfo.equals(that.producedTypeInfo)
115174
&& deserializedType.equals(that.deserializedType)
116175
&& charsetName.equals(that.charsetName)
117-
&& isBigEndian == that.isBigEndian;
176+
&& isBigEndian == that.isBigEndian
177+
&& Objects.equals(lineDelimiter, that.lineDelimiter);
118178
}
119179

120180
@Override
121181
public int hashCode() {
122-
return Objects.hash(producedTypeInfo, deserializedType, charsetName, isBigEndian);
182+
return Objects.hash(
183+
producedTypeInfo, deserializedType, charsetName, isBigEndian, lineDelimiter);
123184
}
124185

125186
// ------------------------------------------------------------------------

flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatFactory.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.nio.charset.Charset;
4646
import java.util.Collections;
4747
import java.util.HashSet;
48+
import java.util.Optional;
4849
import java.util.Set;
4950
import java.util.stream.Collectors;
5051

@@ -72,6 +73,7 @@ public Set<ConfigOption<?>> optionalOptions() {
7273
Set<ConfigOption<?>> options = new HashSet<>();
7374
options.add(RawFormatOptions.ENDIANNESS);
7475
options.add(RawFormatOptions.CHARSET);
76+
options.add(RawFormatOptions.LINE_DELIMITER);
7577
return options;
7678
}
7779

@@ -81,6 +83,8 @@ public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
8183
FactoryUtil.validateFactoryOptions(this, formatOptions);
8284
final String charsetName = validateAndGetCharsetName(formatOptions);
8385
final boolean isBigEndian = isBigEndian(formatOptions);
86+
final Optional<String> lineDelimiter =
87+
formatOptions.getOptional(RawFormatOptions.LINE_DELIMITER);
8488

8589
return new DecodingFormat<DeserializationSchema<RowData>>() {
8690
@Override
@@ -91,7 +95,11 @@ public DeserializationSchema<RowData> createRuntimeDecoder(
9195
final TypeInformation<RowData> producedTypeInfo =
9296
context.createTypeInformation(producedDataType);
9397
return new RawFormatDeserializationSchema(
94-
fieldType, producedTypeInfo, charsetName, isBigEndian);
98+
fieldType,
99+
producedTypeInfo,
100+
charsetName,
101+
isBigEndian,
102+
lineDelimiter.orElse(null));
95103
}
96104

97105
@Override
@@ -107,14 +115,17 @@ public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
107115
FactoryUtil.validateFactoryOptions(this, formatOptions);
108116
final String charsetName = validateAndGetCharsetName(formatOptions);
109117
final boolean isBigEndian = isBigEndian(formatOptions);
118+
final Optional<String> lineDelimiter =
119+
formatOptions.getOptional(RawFormatOptions.LINE_DELIMITER);
110120

111121
return new EncodingFormat<SerializationSchema<RowData>>() {
112122
@Override
113123
public SerializationSchema<RowData> createRuntimeEncoder(
114124
DynamicTableSink.Context context, DataType consumedDataType) {
115125
final RowType physicalRowType = (RowType) consumedDataType.getLogicalType();
116126
final LogicalType fieldType = validateAndExtractSingleField(physicalRowType);
117-
return new RawFormatSerializationSchema(fieldType, charsetName, isBigEndian);
127+
return new RawFormatSerializationSchema(
128+
fieldType, charsetName, isBigEndian, lineDelimiter.orElse(null));
118129
}
119130

120131
@Override

flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatOptions.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,5 +43,14 @@ public class RawFormatOptions {
4343
.defaultValue(StandardCharsets.UTF_8.displayName())
4444
.withDescription("Defines the string charset.");
4545

46+
public static final ConfigOption<String> LINE_DELIMITER =
47+
ConfigOptions.key("line-delimiter")
48+
.stringType()
49+
.noDefaultValue()
50+
.withDescription(
51+
"Optional line delimiter. Supports Java escape sequences (e.g. '\\n', '\\r\\n'). "
52+
+ "When set, deserialization splits each message by this delimiter and emits "
53+
+ "one RowData per part. Serialization appends the delimiter after each row's value.");
54+
4655
private RawFormatOptions() {}
4756
}

flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatSerializationSchema.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,13 @@
2727
import org.apache.flink.table.types.logical.LogicalType;
2828
import org.apache.flink.table.types.logical.RawType;
2929

30+
import javax.annotation.Nullable;
31+
3032
import java.io.IOException;
3133
import java.io.Serializable;
3234
import java.nio.charset.Charset;
3335
import java.nio.charset.StandardCharsets;
36+
import java.util.Arrays;
3437
import java.util.Objects;
3538

3639
/** Serialization schema that serializes an {@link RowData} object into raw (byte based) value. */
@@ -47,12 +50,28 @@ public class RawFormatSerializationSchema implements SerializationSchema<RowData
4750

4851
private final boolean isBigEndian;
4952

53+
@Nullable private final String lineDelimiter;
54+
55+
/** Pre-computed delimiter bytes, or {@code null} if no delimiter is set. */
56+
@Nullable private final byte[] delimiterBytes;
57+
5058
public RawFormatSerializationSchema(
5159
LogicalType serializedType, String charsetName, boolean isBigEndian) {
60+
this(serializedType, charsetName, isBigEndian, null);
61+
}
62+
63+
public RawFormatSerializationSchema(
64+
LogicalType serializedType,
65+
String charsetName,
66+
boolean isBigEndian,
67+
@Nullable String lineDelimiter) {
5268
this.serializedType = serializedType;
5369
this.converter = createConverter(serializedType, charsetName, isBigEndian);
5470
this.charsetName = charsetName;
5571
this.isBigEndian = isBigEndian;
72+
this.lineDelimiter = lineDelimiter;
73+
this.delimiterBytes =
74+
lineDelimiter != null ? lineDelimiter.getBytes(Charset.forName(charsetName)) : null;
5675
}
5776

5877
@Override
@@ -63,7 +82,13 @@ public void open(InitializationContext context) throws Exception {
6382
@Override
6483
public byte[] serialize(RowData row) {
6584
try {
66-
return converter.convert(row);
85+
byte[] valueBytes = converter.convert(row);
86+
if (delimiterBytes == null || valueBytes == null) {
87+
return valueBytes;
88+
}
89+
byte[] result = Arrays.copyOf(valueBytes, valueBytes.length + delimiterBytes.length);
90+
System.arraycopy(delimiterBytes, 0, result, valueBytes.length, delimiterBytes.length);
91+
return result;
6792
} catch (IOException e) {
6893
throw new RuntimeException("Could not serialize row '" + row + "'. ", e);
6994
}
@@ -80,12 +105,13 @@ public boolean equals(Object o) {
80105
RawFormatSerializationSchema that = (RawFormatSerializationSchema) o;
81106
return serializedType.equals(that.serializedType)
82107
&& charsetName.equals(that.charsetName)
83-
&& isBigEndian == that.isBigEndian;
108+
&& isBigEndian == that.isBigEndian
109+
&& Objects.equals(lineDelimiter, that.lineDelimiter);
84110
}
85111

86112
@Override
87113
public int hashCode() {
88-
return Objects.hash(serializedType, charsetName, isBigEndian);
114+
return Objects.hash(serializedType, charsetName, isBigEndian, lineDelimiter);
89115
}
90116

91117
// ------------------------------------------------------------------------

flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatFactoryTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,29 @@ void testInvalidFieldTypes() {
175175
.hasMessage("The 'raw' format doesn't supports 'MAP<INT, STRING>' as column type.");
176176
}
177177

178+
@Test
179+
void testLineDelimiterOption() {
180+
final Map<String, String> tableOptions =
181+
getModifiedOptions(
182+
options -> {
183+
options.put("raw.line-delimiter", "\n");
184+
});
185+
186+
// test deserialization schema contains line delimiter
187+
final RawFormatDeserializationSchema expectedDeser =
188+
new RawFormatDeserializationSchema(
189+
ROW_TYPE.getTypeAt(0), InternalTypeInfo.of(ROW_TYPE), "UTF-8", true, "\n");
190+
DeserializationSchema<RowData> actualDeser =
191+
createDeserializationSchema(SCHEMA, tableOptions);
192+
assertThat(actualDeser).isEqualTo(expectedDeser);
193+
194+
// test serialization schema contains line delimiter
195+
final RawFormatSerializationSchema expectedSer =
196+
new RawFormatSerializationSchema(ROW_TYPE.getTypeAt(0), "UTF-8", true, "\n");
197+
SerializationSchema<RowData> actualSer = createSerializationSchema(SCHEMA, tableOptions);
198+
assertThat(actualSer).isEqualTo(expectedSer);
199+
}
200+
178201
// ------------------------------------------------------------------------
179202
// Utilities
180203
// ------------------------------------------------------------------------

0 commit comments

Comments
 (0)