Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,16 @@
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.DeserializationException;
import org.apache.flink.util.Collector;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.regex.Pattern;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -55,6 +59,14 @@ public class RawFormatDeserializationSchema implements DeserializationSchema<Row

private final boolean isBigEndian;

@Nullable private final String lineDelimiter;

/**
* Pre-compiled pattern for splitting by {@link #lineDelimiter}, or {@code null} if no
* delimiter. Note: this field and {@link #lineDelimiter} are either both null or both non-null.
*/
@Nullable private final Pattern lineDelimiterPattern;

private final DeserializationRuntimeConverter converter;

private final DataLengthValidator validator;
Expand All @@ -66,12 +78,24 @@ public RawFormatDeserializationSchema(
TypeInformation<RowData> producedTypeInfo,
String charsetName,
boolean isBigEndian) {
this(deserializedType, producedTypeInfo, charsetName, isBigEndian, null);
}

public RawFormatDeserializationSchema(
LogicalType deserializedType,
TypeInformation<RowData> producedTypeInfo,
String charsetName,
boolean isBigEndian,
@Nullable String lineDelimiter) {
this.deserializedType = checkNotNull(deserializedType);
this.producedTypeInfo = checkNotNull(producedTypeInfo);
this.converter = createConverter(deserializedType, charsetName, isBigEndian);
this.validator = createDataLengthValidator(deserializedType);
this.charsetName = charsetName;
this.isBigEndian = isBigEndian;
this.lineDelimiter = lineDelimiter;
this.lineDelimiterPattern =
lineDelimiter != null ? Pattern.compile(Pattern.quote(lineDelimiter)) : null;
}

@Override
Expand All @@ -93,6 +117,41 @@ public RowData deserialize(byte[] message) throws IOException {
return reuse;
}

@Override
public void deserialize(byte[] message, Collector<RowData> out) throws IOException {
if (lineDelimiter == null) {
// no delimiter: default single-record behavior
RowData row = deserialize(message);
if (row != null) {
out.collect(row);
}
return;
}

if (message == null) {
return;
}

Charset charset = Charset.forName(charsetName);
String decoded = new String(message, charset);
// Use pre-compiled pattern. Split with -1 to keep intentional empty middle segments,
// but strip the single trailing empty string produced when the message ends with the
// delimiter (e.g. a serializer that appends one delimiter per row).
String[] parts = lineDelimiterPattern.split(decoded, -1);
int count = parts.length;
if (count > 0 && parts[count - 1].isEmpty()) {
count--;
}
for (int i = 0; i < count; i++) {
byte[] partBytes = parts[i].getBytes(charset);
validator.validate(partBytes);
Object field = converter.convert(partBytes);
GenericRowData rowData = new GenericRowData(1);
rowData.setField(0, field);
out.collect(rowData);
}
}

@Override
public boolean isEndOfStream(RowData nextElement) {
return false;
Expand All @@ -115,12 +174,14 @@ public boolean equals(Object o) {
return producedTypeInfo.equals(that.producedTypeInfo)
&& deserializedType.equals(that.deserializedType)
&& charsetName.equals(that.charsetName)
&& isBigEndian == that.isBigEndian;
&& isBigEndian == that.isBigEndian
&& Objects.equals(lineDelimiter, that.lineDelimiter);
}

@Override
public int hashCode() {
return Objects.hash(producedTypeInfo, deserializedType, charsetName, isBigEndian);
return Objects.hash(
producedTypeInfo, deserializedType, charsetName, isBigEndian, lineDelimiter);
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.nio.charset.Charset;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -72,6 +73,7 @@ public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(RawFormatOptions.ENDIANNESS);
options.add(RawFormatOptions.CHARSET);
options.add(RawFormatOptions.LINE_DELIMITER);
return options;
}

Expand All @@ -81,6 +83,8 @@ public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
FactoryUtil.validateFactoryOptions(this, formatOptions);
final String charsetName = validateAndGetCharsetName(formatOptions);
final boolean isBigEndian = isBigEndian(formatOptions);
final Optional<String> lineDelimiter =
formatOptions.getOptional(RawFormatOptions.LINE_DELIMITER);

return new DecodingFormat<DeserializationSchema<RowData>>() {
@Override
Expand All @@ -91,7 +95,11 @@ public DeserializationSchema<RowData> createRuntimeDecoder(
final TypeInformation<RowData> producedTypeInfo =
context.createTypeInformation(producedDataType);
return new RawFormatDeserializationSchema(
fieldType, producedTypeInfo, charsetName, isBigEndian);
fieldType,
producedTypeInfo,
charsetName,
isBigEndian,
lineDelimiter.orElse(null));
}

@Override
Expand All @@ -107,14 +115,17 @@ public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
FactoryUtil.validateFactoryOptions(this, formatOptions);
final String charsetName = validateAndGetCharsetName(formatOptions);
final boolean isBigEndian = isBigEndian(formatOptions);
final Optional<String> lineDelimiter =
formatOptions.getOptional(RawFormatOptions.LINE_DELIMITER);

return new EncodingFormat<SerializationSchema<RowData>>() {
@Override
public SerializationSchema<RowData> createRuntimeEncoder(
DynamicTableSink.Context context, DataType consumedDataType) {
final RowType physicalRowType = (RowType) consumedDataType.getLogicalType();
final LogicalType fieldType = validateAndExtractSingleField(physicalRowType);
return new RawFormatSerializationSchema(fieldType, charsetName, isBigEndian);
return new RawFormatSerializationSchema(
fieldType, charsetName, isBigEndian, lineDelimiter.orElse(null));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,14 @@ public class RawFormatOptions {
.defaultValue(StandardCharsets.UTF_8.displayName())
.withDescription("Defines the string charset.");

public static final ConfigOption<String> LINE_DELIMITER =
ConfigOptions.key("line-delimiter")
.stringType()
.noDefaultValue()
.withDescription(
"Optional line delimiter. Supports Java escape sequences (e.g. '\\n', '\\r\\n'). "
+ "When set, deserialization splits each message by this delimiter and emits "
+ "one RowData per part. Serialization appends the delimiter after each row's value.");

private RawFormatOptions() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RawType;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Objects;

/** Serialization schema that serializes an {@link RowData} object into raw (byte based) value. */
Expand All @@ -47,12 +50,28 @@ public class RawFormatSerializationSchema implements SerializationSchema<RowData

private final boolean isBigEndian;

@Nullable private final String lineDelimiter;

/** Pre-computed delimiter bytes, or {@code null} if no delimiter is set. */
@Nullable private final byte[] delimiterBytes;

public RawFormatSerializationSchema(
LogicalType serializedType, String charsetName, boolean isBigEndian) {
this(serializedType, charsetName, isBigEndian, null);
}

public RawFormatSerializationSchema(
LogicalType serializedType,
String charsetName,
boolean isBigEndian,
@Nullable String lineDelimiter) {
this.serializedType = serializedType;
this.converter = createConverter(serializedType, charsetName, isBigEndian);
this.charsetName = charsetName;
this.isBigEndian = isBigEndian;
this.lineDelimiter = lineDelimiter;
this.delimiterBytes =
lineDelimiter != null ? lineDelimiter.getBytes(Charset.forName(charsetName)) : null;
}

@Override
Expand All @@ -63,7 +82,13 @@ public void open(InitializationContext context) throws Exception {
@Override
public byte[] serialize(RowData row) {
try {
return converter.convert(row);
byte[] valueBytes = converter.convert(row);
if (delimiterBytes == null || valueBytes == null) {
return valueBytes;
}
byte[] result = Arrays.copyOf(valueBytes, valueBytes.length + delimiterBytes.length);
System.arraycopy(delimiterBytes, 0, result, valueBytes.length, delimiterBytes.length);
return result;
} catch (IOException e) {
throw new RuntimeException("Could not serialize row '" + row + "'. ", e);
}
Expand All @@ -80,12 +105,13 @@ public boolean equals(Object o) {
RawFormatSerializationSchema that = (RawFormatSerializationSchema) o;
return serializedType.equals(that.serializedType)
&& charsetName.equals(that.charsetName)
&& isBigEndian == that.isBigEndian;
&& isBigEndian == that.isBigEndian
&& Objects.equals(lineDelimiter, that.lineDelimiter);
}

@Override
public int hashCode() {
return Objects.hash(serializedType, charsetName, isBigEndian);
return Objects.hash(serializedType, charsetName, isBigEndian, lineDelimiter);
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,29 @@ public void testInvalidEndianness() {
}
}

@Test
public void testLineDelimiterOption() {
final Map<String, String> tableOptions =
getModifiedOptions(
options -> {
options.put("raw.line-delimiter", "\n");
});

// test deserialization schema contains line delimiter
final RawFormatDeserializationSchema expectedDeser =
new RawFormatDeserializationSchema(
ROW_TYPE.getTypeAt(0), InternalTypeInfo.of(ROW_TYPE), "UTF-8", true, "\n");
DeserializationSchema<RowData> actualDeser =
createDeserializationSchema(SCHEMA, tableOptions);
assertEquals(expectedDeser, actualDeser);

// test serialization schema contains line delimiter
final RawFormatSerializationSchema expectedSer =
new RawFormatSerializationSchema(ROW_TYPE.getTypeAt(0), "UTF-8", true, "\n");
SerializationSchema<RowData> actualSer = createSerializationSchema(SCHEMA, tableOptions);
assertEquals(expectedSer, actualSer);
}

@Test
public void testInvalidFieldTypes() {
try {
Expand Down
Loading