Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common.maxcompute;
package org.apache.doris.maxcompute;

import org.apache.doris.common.maxcompute.MCProperties;

import com.aliyun.auth.credentials.Credential;
import com.aliyun.auth.credentials.provider.EcsRamRoleCredentialProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.doris.common.jni.JniScanner;
import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.maxcompute.MCUtils;

import com.aliyun.odps.Odps;
import com.aliyun.odps.table.configuration.CompressionCodec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.doris.common.jni.vec.VectorColumn;
import org.apache.doris.common.jni.vec.VectorTable;
import org.apache.doris.common.maxcompute.MCProperties;
import org.apache.doris.common.maxcompute.MCUtils;

import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsType;
Expand Down
12 changes: 12 additions & 0 deletions fe/be-java-extensions/preload-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,18 @@ under the License.
<artifactId>commons-io</artifactId>
<version>${commons-io.version}</version>
</dependency>
<!-- Runtime-only: Hive-using JNI scanners (paimon-hive-connector / hudi / HiveConf)
need org.apache.commons.lang.StringUtils (commons-lang 2.x) at runtime. It used to
arrive transitively via fe-common's odps-sdk-core; after P4-T09 (a53f2b17b8d) made
fe-common odps-free it was evicted from this shared preload classpath, breaking every
scanner with NoClassDefFoundError. Restore it here (version managed by fe/pom.xml) so
it lands in the preload-extensions runtime classpath, the shared parent of all JNI
scanners, while staying out of java-udf's own jar. -->
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-unsafe</artifactId>
Expand Down
24 changes: 8 additions & 16 deletions fe/fe-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,23 +134,15 @@ under the License.
<artifactId>antlr4-runtime</artifactId>
<version>${antlr4.version}</version>
</dependency>
<!-- Used by DorisHttpException (netty) and GsonUtilsBase (protobuf); previously pulled in
transitively via odps-sdk-core, now declared directly so fe-common is odps-free. -->
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-core</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
</exclusion>
<exclusion>
<groupId>org.ini4j</groupId>
<artifactId>ini4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
</exclusion>
</exclusions>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
</dependencies>
<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.connector.api;

import org.apache.doris.connector.api.scan.ConnectorScanPlanProvider;
import org.apache.doris.connector.api.write.ConnectorWritePlanProvider;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -41,6 +42,14 @@ default ConnectorScanPlanProvider getScanPlanProvider() {
return null;
}

/**
* Returns the write plan provider for sink ({@code TDataSink}) generation,
* or {@code null} if this connector does not support writes.
*/
default ConnectorWritePlanProvider getWritePlanProvider() {
return null;
}

/** Returns the set of capabilities this connector supports. */
default Set<ConnectorCapability> getCapabilities() {
return Collections.emptySet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,37 @@ public enum ConnectorCapability {
* parallel writers should declare this capability.</p>
*/
SUPPORTS_PARALLEL_WRITE,
/**
* Indicates the connector requires dynamic-partition writes to be hash-distributed by
* partition columns and locally sorted by them before reaching the sink.
*
* <p>Streaming partition writers (e.g. the MaxCompute Storage API) close the previous
* partition writer as soon as a new partition value appears; un-grouped (unsorted)
* multi-partition rows therefore cause "writer has been closed" errors. The planner uses
* this capability to require a hash-by-partition distribution plus a mandatory local sort
* on the partition columns for dynamic-partition writes.</p>
*
* <p>A connector declaring this is expected to also declare
* {@link #SUPPORTS_PARALLEL_WRITE} (hash distribution is inherently parallel) and
* {@link #SINK_REQUIRE_FULL_SCHEMA_ORDER}: the sink distribution locates partition columns by their
* <b>full-schema</b> position in the child output, which only holds when the bind layer projects the
* write to full-schema order (the projection gated by {@code SINK_REQUIRE_FULL_SCHEMA_ORDER}). A
* connector declaring this without {@code SINK_REQUIRE_FULL_SCHEMA_ORDER} would shuffle/sort by the
* wrong column whenever cols order diverges from the full schema.</p>
*/
SINK_REQUIRE_PARTITION_LOCAL_SORT,
/**
* Indicates the connector's write path maps data columns <b>positionally</b> against the full
* table schema (e.g. MaxCompute's columnar Storage API / JNI writer), rather than by column name.
*
* <p>For such connectors the sink's output rows must be projected to <b>full table schema order</b>
* with any unmentioned columns filled (NULL / default) — exactly like the legacy MaxCompute bind
* path — so that a reordered or partial explicit column list does not land values in the wrong
* remote columns. Name-mapped connectors (e.g. JDBC, which builds an {@code INSERT INTO t (cols)}
* statement) must NOT declare this capability: their data stays in user/cols order to match the
* generated column list.</p>
*/
SINK_REQUIRE_FULL_SCHEMA_ORDER,
/**
* Indicates the connector supports passthrough query via the {@code query()} TVF.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public final class ConnectorColumn {
private final boolean nullable;
private final String defaultValue;
private final boolean isKey;
private final boolean isAutoInc;
private final boolean isAggregated;

public ConnectorColumn(String name, ConnectorType type, String comment,
boolean nullable, String defaultValue) {
Expand All @@ -38,12 +40,25 @@ public ConnectorColumn(String name, ConnectorType type, String comment,

public ConnectorColumn(String name, ConnectorType type, String comment,
boolean nullable, String defaultValue, boolean isKey) {
this(name, type, comment, nullable, defaultValue, isKey, false);
}

public ConnectorColumn(String name, ConnectorType type, String comment,
boolean nullable, String defaultValue, boolean isKey, boolean isAutoInc) {
this(name, type, comment, nullable, defaultValue, isKey, isAutoInc, false);
}

public ConnectorColumn(String name, ConnectorType type, String comment,
boolean nullable, String defaultValue, boolean isKey, boolean isAutoInc,
boolean isAggregated) {
this.name = Objects.requireNonNull(name, "name");
this.type = Objects.requireNonNull(type, "type");
this.comment = comment;
this.nullable = nullable;
this.defaultValue = defaultValue;
this.isKey = isKey;
this.isAutoInc = isAutoInc;
this.isAggregated = isAggregated;
}

public String getName() {
Expand All @@ -70,6 +85,14 @@ public boolean isKey() {
return isKey;
}

public boolean isAutoInc() {
return isAutoInc;
}

public boolean isAggregated() {
return isAggregated;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -81,6 +104,8 @@ public boolean equals(Object o) {
ConnectorColumn that = (ConnectorColumn) o;
return nullable == that.nullable
&& isKey == that.isKey
&& isAutoInc == that.isAutoInc
&& isAggregated == that.isAggregated
&& name.equals(that.name)
&& type.equals(that.type)
&& Objects.equals(comment, that.comment)
Expand All @@ -89,7 +114,7 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(name, type, comment, nullable, defaultValue, isKey);
return Objects.hash(name, type, comment, nullable, defaultValue, isKey, isAutoInc, isAggregated);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

package org.apache.doris.connector.api;

import org.apache.doris.connector.api.handle.ConnectorTableHandle;
import org.apache.doris.connector.api.mvcc.ConnectorMvccSnapshot;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;

/**
* Central metadata interface that a connector must implement.
Expand All @@ -44,6 +48,34 @@ default Map<String, String> getProperties() {
return Collections.emptyMap();
}

// ──────────────────── MVCC Snapshots ────────────────────

/**
* Returns the current snapshot at query begin time, used as the MVCC pin
* for all subsequent reads of {@code handle}.
*
* <p>Returning {@link Optional#empty()} means the connector does not
* support MVCC and reads see whatever is current.</p>
*/
default Optional<ConnectorMvccSnapshot> beginQuerySnapshot(
ConnectorSession session, ConnectorTableHandle handle) {
return Optional.empty();
}

/** Returns the snapshot at the given wall-clock time, or empty if none. */
default Optional<ConnectorMvccSnapshot> getSnapshotAt(
ConnectorSession session, ConnectorTableHandle handle,
long timestampMillis) {
return Optional.empty();
}

/** Returns the snapshot with the given id, or empty if none. */
default Optional<ConnectorMvccSnapshot> getSnapshotById(
ConnectorSession session, ConnectorTableHandle handle,
long snapshotId) {
return Optional.empty();
}

@Override
default void close() throws IOException {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,31 @@
*/
public final class ConnectorPartitionInfo {

/** Sentinel for "unknown" on the numeric stats fields. */
public static final long UNKNOWN = -1L;

private final String partitionName;
private final Map<String, String> partitionValues;
private final Map<String, String> properties;
private final long rowCount;
private final long sizeBytes;
private final long lastModifiedMillis;

/**
* Backward-compatible constructor. Numeric stats fields are set to
* {@link #UNKNOWN}.
*/
public ConnectorPartitionInfo(String partitionName,
Map<String, String> partitionValues,
Map<String, String> properties) {
this(partitionName, partitionValues, properties,
UNKNOWN, UNKNOWN, UNKNOWN);
}

public ConnectorPartitionInfo(String partitionName,
Map<String, String> partitionValues,
Map<String, String> properties,
long rowCount, long sizeBytes, long lastModifiedMillis) {
this.partitionName = Objects.requireNonNull(
partitionName, "partitionName");
this.partitionValues = partitionValues == null
Expand All @@ -41,6 +59,9 @@ public ConnectorPartitionInfo(String partitionName,
this.properties = properties == null
? Collections.emptyMap()
: Collections.unmodifiableMap(properties);
this.rowCount = rowCount;
this.sizeBytes = sizeBytes;
this.lastModifiedMillis = lastModifiedMillis;
}

public String getPartitionName() {
Expand All @@ -55,6 +76,21 @@ public Map<String, String> getProperties() {
return properties;
}

/** @return row count, or {@link #UNKNOWN} when not collected. */
public long getRowCount() {
return rowCount;
}

/** @return on-disk size in bytes, or {@link #UNKNOWN}. */
public long getSizeBytes() {
return sizeBytes;
}

/** @return last-modified epoch millis, or {@link #UNKNOWN}. */
public long getLastModifiedMillis() {
return lastModifiedMillis;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -64,19 +100,25 @@ public boolean equals(Object o) {
return false;
}
ConnectorPartitionInfo that = (ConnectorPartitionInfo) o;
return partitionName.equals(that.partitionName)
return rowCount == that.rowCount
&& sizeBytes == that.sizeBytes
&& lastModifiedMillis == that.lastModifiedMillis
&& partitionName.equals(that.partitionName)
&& partitionValues.equals(that.partitionValues)
&& properties.equals(that.properties);
}

@Override
public int hashCode() {
return Objects.hash(partitionName, partitionValues, properties);
return Objects.hash(partitionName, partitionValues, properties,
rowCount, sizeBytes, lastModifiedMillis);
}

@Override
public String toString() {
return "ConnectorPartitionInfo{name='" + partitionName
+ "', values=" + partitionValues + "}";
+ "', values=" + partitionValues
+ ", rowCount=" + rowCount
+ ", sizeBytes=" + sizeBytes + "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ default ConnectorDatabaseMetadata getDatabase(
"getDatabase not implemented");
}

/**
* Whether this connector supports CREATE DATABASE. Defaults to false so the FE
* {@code CREATE DATABASE IF NOT EXISTS} remote existence precheck applies only to
* connectors that can actually create databases; connectors that cannot keep their
* existing "CREATE DATABASE not supported" behavior unchanged.
*/
default boolean supportsCreateDatabase() {
return false;
}

/** Creates a new database with the given name and properties. */
default void createDatabase(ConnectorSession session,
String dbName, Map<String, String> properties) {
Expand All @@ -57,4 +67,15 @@ default void dropDatabase(ConnectorSession session,
throw new DorisConnectorException(
"DROP DATABASE not supported");
}

/**
* Drops the specified database, cascading to its tables when {@code force} is
* true. The default delegates to the non-cascading 3-arg form, so connectors
* that do not support cascade keep their current behavior with zero change;
* a connector that supports FORCE overrides this overload.
*/
default void dropDatabase(ConnectorSession session,
String dbName, boolean ifExists, boolean force) {
dropDatabase(session, dbName, ifExists);
}
}
Loading
Loading