Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public boolean configure(CamelContext camelContext, Object obj, String name, Obj
case "collection": target.setCollection(property(camelContext, java.lang.String.class, value)); return true;
case "connecttimeout":
case "connectTimeout": target.setConnectTimeout(property(camelContext, java.time.Duration.class, value).toMillis()); return true;
case "connectionstring":
case "connectionString": target.setConnectionString(property(camelContext, java.lang.String.class, value)); return true;
case "consumerprocessedstrategy":
case "consumerProcessedStrategy": target.setConsumerProcessedStrategy(property(camelContext, java.lang.String.class, value)); return true;
case "consumerretrypause":
Expand Down Expand Up @@ -128,6 +130,8 @@ public Class<?> getOptionType(String name, boolean ignoreCase) {
case "collection": return java.lang.String.class;
case "connecttimeout":
case "connectTimeout": return long.class;
case "connectionstring":
case "connectionString": return java.lang.String.class;
case "consumerprocessedstrategy":
case "consumerProcessedStrategy": return java.lang.String.class;
case "consumerretrypause":
Expand Down Expand Up @@ -218,6 +222,8 @@ public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
case "collection": return target.getCollection();
case "connecttimeout":
case "connectTimeout": return target.getConnectTimeout();
case "connectionstring":
case "connectionString": return target.getConnectionString();
case "consumerprocessedstrategy":
case "consumerProcessedStrategy": return target.getConsumerProcessedStrategy();
case "consumerretrypause":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class CouchbaseEndpointUriFactory extends org.apache.camel.support.compon
private static final Set<String> SECRET_PROPERTY_NAMES;
private static final Map<String, String> MULTI_VALUE_PREFIXES;
static {
Set<String> props = new HashSet<>(51);
Set<String> props = new HashSet<>(52);
props.add("additionalHosts");
props.add("autoStartIdForInserts");
props.add("backoffErrorThreshold");
Expand All @@ -33,6 +33,7 @@ public class CouchbaseEndpointUriFactory extends org.apache.camel.support.compon
props.add("bucket");
props.add("collection");
props.add("connectTimeout");
props.add("connectionString");
props.add("consumerProcessedStrategy");
props.add("consumerRetryPause");
props.add("delay");
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ public class CouchbaseEndpoint extends ScheduledPollEndpoint implements Endpoint
@UriParam(label = "advanced")
private String additionalHosts;

// Connection string
@UriParam(label = "advanced")
private String connectionString;

// Persistence and replication parameters
@UriParam(label = "producer", defaultValue = "0")
private int persistTo;
Expand Down Expand Up @@ -325,6 +329,19 @@ public void setAdditionalHosts(String additionalHosts) {
this.additionalHosts = additionalHosts;
}

public String getConnectionString() {
return connectionString;
}

/**
* The Couchbase SDK connection string to use (e.g., couchbase://hostname:11210). When set, this takes precedence
* over the hostname and port options for the SDK connection. This is useful when the KV port is not the default
* 11210, for example when connecting to a container with dynamic port mappings.
*/
public void setConnectionString(String connectionString) {
this.connectionString = connectionString;
}

public int getPersistTo() {
return persistTo;
}
Expand Down Expand Up @@ -653,26 +670,24 @@ private URI[] getAllUris() throws URISyntaxException {

//create from couchbase-client
private Bucket createClient() throws Exception {
List<URI> hosts = Arrays.asList(makeBootstrapURI());
String connectionString;

if (bucket == null || bucket.isEmpty()) {
throw new CamelException(COUCHBASE_URI_ERROR);
}

ClusterEnvironment env = createClusterEnvironment();

String addHosts = hosts.stream()
.map(URI::getHost)
.collect(Collectors.joining(","));

if (!addHosts.isEmpty()) {
connectionString = addHosts;
String connStr;
if (connectionString != null && !connectionString.isEmpty()) {
connStr = connectionString;
} else {
connectionString = hostname;
List<URI> hosts = Arrays.asList(makeBootstrapURI());
String addHosts = hosts.stream()
.map(URI::getHost)
.collect(Collectors.joining(","));
connStr = !addHosts.isEmpty() ? addHosts : hostname;
}

Cluster cluster = Cluster.connect(connectionString, ClusterOptions
Cluster cluster = Cluster.connect(connStr, ClusterOptions
.clusterOptions(username, password)
.environment(env));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,11 @@ public static void tearDownCouchbase() {
}

public String getConnectionUri() {
return String.format("couchbase:http://%s:%d?bucket=%s&username=%s&password=%s", service.getHostname(),
service.getPort(), bucketName, service.getUsername(), service.getPassword());
return String.format(
"couchbase:http://%s:%d?bucket=%s&username=%s&password=%s&connectionString=%s",
service.getHostname(),
service.getPort(), bucketName, service.getUsername(), service.getPassword(),
service.getConnectionString());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,11 @@ public static void tearDownCouchbase() {
}

public String getConnectionUri() {
return String.format("couchbase:http://%s:%d?bucket=%s&username=%s&password=%s", service.getHostname(),
service.getPort(), bucketName, service.getUsername(), service.getPassword());
return String.format(
"couchbase:http://%s:%d?bucket=%s&username=%s&password=%s&connectionString=%s",
service.getHostname(),
service.getPort(), bucketName, service.getUsername(), service.getPassword(),
service.getConnectionString());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1079,6 +1079,24 @@ default AdvancedCouchbaseEndpointConsumerBuilder additionalHosts(String addition
doSetProperty("additionalHosts", additionalHosts);
return this;
}
/**
* The Couchbase SDK connection string to use (e.g.,
* couchbase://hostname:11210). When set, this takes precedence over the
* hostname and port options for the SDK connection. This is useful when
* the KV port is not the default 11210, for example when connecting to
* a container with dynamic port mappings.
*
* The option is a: <code>java.lang.String</code> type.
*
* Group: advanced
*
* @param connectionString the value to set
* @return the dsl builder
*/
default AdvancedCouchbaseEndpointConsumerBuilder connectionString(String connectionString) {
doSetProperty("connectionString", connectionString);
return this;
}
/**
* Define the timeoutconnect in milliseconds.
*
Expand Down Expand Up @@ -1501,6 +1519,24 @@ default AdvancedCouchbaseEndpointProducerBuilder additionalHosts(String addition
doSetProperty("additionalHosts", additionalHosts);
return this;
}
/**
* The Couchbase SDK connection string to use (e.g.,
* couchbase://hostname:11210). When set, this takes precedence over the
* hostname and port options for the SDK connection. This is useful when
* the KV port is not the default 11210, for example when connecting to
* a container with dynamic port mappings.
*
* The option is a: <code>java.lang.String</code> type.
*
* Group: advanced
*
* @param connectionString the value to set
* @return the dsl builder
*/
default AdvancedCouchbaseEndpointProducerBuilder connectionString(String connectionString) {
doSetProperty("connectionString", connectionString);
return this;
}
/**
* Define the timeoutconnect in milliseconds.
*
Expand Down Expand Up @@ -1686,6 +1722,24 @@ default AdvancedCouchbaseEndpointBuilder additionalHosts(String additionalHosts)
doSetProperty("additionalHosts", additionalHosts);
return this;
}
/**
* The Couchbase SDK connection string to use (e.g.,
* couchbase://hostname:11210). When set, this takes precedence over the
* hostname and port options for the SDK connection. This is useful when
* the KV port is not the default 11210, for example when connecting to
* a container with dynamic port mappings.
*
* The option is a: <code>java.lang.String</code> type.
*
* Group: advanced
*
* @param connectionString the value to set
* @return the dsl builder
*/
default AdvancedCouchbaseEndpointBuilder connectionString(String connectionString) {
doSetProperty("connectionString", connectionString);
return this;
}
/**
* Define the timeoutconnect in milliseconds.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,17 @@
serviceAlias = { "couchbase" })
public class CouchbaseLocalContainerInfraService implements CouchbaseInfraService, ContainerService<CouchbaseContainer> {

/*
* Couchbase container uses a dynamic port for the KV service. The configuration
* used in the Camel component tries to use that port by default, and it seems
* we cannot configure it. Therefore, we override the default container and
* force the default KV port to be used.
*/
private class CustomCouchbaseContainer extends CouchbaseContainer {
public CustomCouchbaseContainer(String imageName) {
super(DockerImageName.parse(imageName).asCompatibleSubstituteFor("couchbase/server"));

boolean fixedPort = ContainerEnvironmentUtil.isFixedPort(CouchbaseLocalContainerInfraService.class);
final int kvPort = 11210;
final int managementPort = 8091;
final int viewPort = 8092;
final int queryPort = 8093;
final int searchPort = 8094;
ContainerEnvironmentUtil.configurePorts(this, true,
ContainerEnvironmentUtil.configurePorts(this, fixedPort,
ContainerEnvironmentUtil.PortConfig.primary(kvPort),
ContainerEnvironmentUtil.PortConfig.secondary(managementPort),
ContainerEnvironmentUtil.PortConfig.secondary(viewPort),
Expand Down
Loading