diff --git a/chunjun-connectors/chunjun-connector-gbase8s/pom.xml b/chunjun-connectors/chunjun-connector-gbase8s/pom.xml
index bf317b4ee3..3d01871766 100644
--- a/chunjun-connectors/chunjun-connector-gbase8s/pom.xml
+++ b/chunjun-connectors/chunjun-connector-gbase8s/pom.xml
@@ -41,6 +41,8 @@
chunjun-connector-jdbc-base
${project.version}
+
+
com.gbasedbt.jdbc.Driver
gbasedbt
diff --git a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsOrcOutputFormat.java b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsOrcOutputFormat.java
index 6adafddaad..75a8b53902 100644
--- a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsOrcOutputFormat.java
+++ b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsOrcOutputFormat.java
@@ -149,7 +149,7 @@ protected void openSource() {
@Override
// todo the deviation needs to be calculated accurately
protected long getCurrentFileSize() {
- return (long) (bytesWriteCounter.getLocalValue() * getDeviation());
+ return (long) ((bytesWriteCounter.getLocalValue() - lastWriteSize) * getDeviation());
}
@Override
diff --git a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsParquetOutputFormat.java b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsParquetOutputFormat.java
index 445c69d227..a182a4ca4d 100644
--- a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsParquetOutputFormat.java
+++ b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsParquetOutputFormat.java
@@ -156,7 +156,7 @@ protected void nextBlock() {
public void flushDataInternal() {
log.info(
"Close current parquet record writer, write data size:[{}]",
- SizeUnitType.readableFileSize(bytesWriteCounter.getLocalValue()));
+ SizeUnitType.readableFileSize(bytesWriteCounter.getLocalValue() - lastWriteSize));
try {
if (writer != null) {
writer.close();
diff --git a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsTextOutputFormat.java b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsTextOutputFormat.java
index a35a733a33..2b54cf2912 100644
--- a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsTextOutputFormat.java
+++ b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsTextOutputFormat.java
@@ -77,7 +77,7 @@ protected void nextBlock() {
public void flushDataInternal() {
log.info(
"Close current text stream, write data size:[{}]",
- SizeUnitType.readableFileSize(bytesWriteCounter.getLocalValue()));
+ SizeUnitType.readableFileSize(bytesWriteCounter.getLocalValue() - lastWriteSize));
try {
if (stream != null) {
diff --git a/chunjun-connectors/chunjun-connector-opengauss/pom.xml b/chunjun-connectors/chunjun-connector-opengauss/pom.xml
index d110e479bd..1f38710d82 100644
--- a/chunjun-connectors/chunjun-connector-opengauss/pom.xml
+++ b/chunjun-connectors/chunjun-connector-opengauss/pom.xml
@@ -47,7 +47,7 @@
org.opengauss
opengauss-jdbc
- 5.0.1
+ 5.0.1-og
diff --git a/chunjun-connectors/pom.xml b/chunjun-connectors/pom.xml
index 4e14d2d0bc..2d43e9eaa1 100755
--- a/chunjun-connectors/pom.xml
+++ b/chunjun-connectors/pom.xml
@@ -58,7 +58,7 @@
chunjun-connector-greenplum
chunjun-connector-dm
chunjun-connector-gbase
- chunjun-connector-gbase8s
+
chunjun-connector-clickhouse
chunjun-connector-saphana
diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/sink/format/BaseFileOutputFormat.java b/chunjun-core/src/main/java/com/dtstack/chunjun/sink/format/BaseFileOutputFormat.java
index 3dba41f14b..7c5998f3ad 100644
--- a/chunjun-core/src/main/java/com/dtstack/chunjun/sink/format/BaseFileOutputFormat.java
+++ b/chunjun-core/src/main/java/com/dtstack/chunjun/sink/format/BaseFileOutputFormat.java
@@ -58,6 +58,7 @@ public abstract class BaseFileOutputFormat extends BaseRichOutputFormat {
protected List preCommitFilePathList = new ArrayList<>();
protected long nextNumForCheckDataSize;
protected long lastWriteTime = System.currentTimeMillis();
+ protected long lastWriteSize;
@Override
public void initializeGlobal(int parallelism) {
@@ -155,6 +156,7 @@ public void flushData() {
if (rowsOfCurrentBlock != 0) {
flushDataInternal();
sumRowsOfBlock += rowsOfCurrentBlock;
+ lastWriteSize = bytesWriteCounter.getLocalValue();
log.info(
"flush file:{}, rowsOfCurrentBlock = {}, sumRowsOfBlock = {}",
currentFileName,
diff --git a/chunjun-local-test/pom.xml b/chunjun-local-test/pom.xml
index e476e2142d..22f564d18b 100644
--- a/chunjun-local-test/pom.xml
+++ b/chunjun-local-test/pom.xml
@@ -51,58 +51,71 @@
chunjun-connector-stream
${project.version}
-
- com.dtstack.chunjun
- chunjun-connector-binlog
- ${project.version}
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
- com.dtstack.chunjun
- chunjun-connector-iceberg
- ${project.version}
-
-
- com.dtstack.chunjun
- chunjun-connector-mongodb
- ${project.version}
-
-
-
- com.dtstack.chunjun
- chunjun-connector-emqx
- ${project.version}
-
-
- com.dtstack.chunjun
- chunjun-connector-jdbc-base
- ${project.version}
-
-
-
- com.dtstack.chunjun
- chunjun-connector-mysql
- ${project.version}
-
-
-
- com.dtstack.chunjun
- chunjun-connector-kafka
- ${project.version}
-
-
-
- com.dtstack.chunjun
- chunjun-connector-kingbase
- ${project.version}
-
-
-
-
- com.dtstack.chunjun
- chunjun-connector-ftp
- ${project.version}
-
org.apache.flink
@@ -110,12 +123,6 @@
${flink.version}
-
- com.dtstack.chunjun
- chunjun-connector-oceanbase
- master
-
-
org.apache.avro
avro