Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions solr/core/src/java/org/apache/solr/cli/CLIUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public static SolrClient getSolrClient(CommandLine cli) throws Exception {
* is used, and warns those users. In the future we'll have urls ending with /api as well.
*
* @param solrUrl The user supplied url to Solr.
* @return the solrUrl in the format that Solr expects to see internally.
* @return a URL without any path, e.g. {@code http://localhost:8983}
*/
public static String normalizeSolrUrl(String solrUrl) {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO these "normalize" methods are non-obvious since the URL normalization I'm used to seeing within Solr produce Solr URLs ending with "/solr". The javadoc here refers to what I'm used to as "legacy". Wow... am I a dinosaur or has v2 taken over? SolrJ didn't get the memo, I can tell you that.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a little bit dinosaur... But not a lot dinosaur... We've been trying to simpliy users lives so they don't have to think about /solr or /v2 or /api all the time, unless is critical. So, we normalize to the root version, and then whatever tool can append what it needs...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO URL utility methods should be centralized to URLUtil. And naming reconsidered. If this method produces URLs without any path, then this is an opportunity to very simply state that in the method name rather than ambiguous "normalization", (it once wasn't ambiguous but in our transition period, is ambiguous).

return normalizeSolrUrl(solrUrl, true);
Expand All @@ -192,7 +192,7 @@ public static String normalizeSolrUrl(String solrUrl) {
*
* @param solrUrl The user supplied url to Solr.
* @param logUrlFormatWarning If a warning message should be logged about the url format
* @return the solrUrl in the format that Solr expects to see internally.
* @return a URL without any path, e.g. {@code http://localhost:8983}
*/
public static String normalizeSolrUrl(String solrUrl, boolean logUrlFormatWarning) {
if (solrUrl != null) {
Expand Down
96 changes: 45 additions & 51 deletions solr/core/src/java/org/apache/solr/cli/StreamTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.solr.client.solrj.io.Lang;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
Expand All @@ -49,8 +48,8 @@
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.jetty.HttpJettySolrClient;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.handler.CatStream;
Expand All @@ -62,8 +61,6 @@ public StreamTool(ToolRuntime runtime) {
super(runtime);
}

private final SolrClientCache solrClientCache = new SolrClientCache();

@Override
public String getName() {
return "stream";
Expand Down Expand Up @@ -166,14 +163,17 @@ public void runImpl(CommandLine cli) throws Exception {
}
}

// a stream needs a context
StreamContext streamContext = createStreamContext(cli);
// create the stream
PushBackStream pushBackStream;
if (execution.equalsIgnoreCase("local")) {
pushBackStream = doLocalMode(cli, expr);
pushBackStream = doLocalMode(expr, streamContext.getStreamFactory());
} else {
pushBackStream = doRemoteMode(cli, expr);
pushBackStream = doRemoteMode(expr, cli);
}

try {
pushBackStream.setStreamContext(streamContext);
pushBackStream.open();

if (outputHeaders == null) {
Expand Down Expand Up @@ -227,35 +227,52 @@ public void runImpl(CommandLine cli) throws Exception {
}
} finally {
pushBackStream.close();
solrClientCache.close();
streamContext.getSolrClientCache().close();
}

echoIfVerbose("StreamTool -- Done.");
}

private StreamContext createStreamContext(CommandLine cli) throws Exception {
var jettyClientBuilder = new HttpJettySolrClient.Builder();
String credentials = cli.getOptionValue(CommonCLIOptions.CREDENTIALS_OPTION);
if (credentials != null) {
String[] userPass = credentials.split(":");
jettyClientBuilder.withBasicAuthCredentials(userPass[0], userPass[1]);
}
HttpJettySolrClient client = jettyClientBuilder.build();

// subclass so we can ensure our client is closed when the cache is closed

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is some magic! If this isn't there, do we see errors?

var solrClientCache =
new SolrClientCache(client) {
@Override
public synchronized void close() {
super.close();
client.close();
}
};

var solrConnection = CLIUtils.getSolrConnection(cli);
echoIfVerbose("Connecting to Solr at " + solrConnection);

StreamContext streamContext = new StreamContext();
streamContext.setSolrClientCache(solrClientCache);
streamContext.getStreamFactory().withDefaultSolrConnection(solrConnection);
return streamContext;
}

/**
* Runs a streaming expression in the local process of the CLI.
*
* <p>Running locally means that parallelization support or those expressions requiring access to
* internal Solr capabilities will not function.
*
* @param cli The CLI invoking the call
* @param expr The streaming expression to be parsed and in the context of the CLI process
* @param expr The streaming expression to be parsed and run in the context of the CLI process
* @param streamFactory The factory used to construct the streaming expression
* @return A connection to the streaming expression that receives Tuples as they are emitted
* locally.
*/
private PushBackStream doLocalMode(CommandLine cli, String expr) throws Exception {
var solrConnection = CLIUtils.getSolrConnection(cli);
echoIfVerbose("Connecting to Solr at " + solrConnection.toString());
solrClientCache.setBasicAuthCredentials(
cli.getOptionValue(CommonCLIOptions.CREDENTIALS_OPTION));
solrClientCache.getCloudSolrClient(solrConnection);

TupleStream stream;
PushBackStream pushBackStream;

StreamExpression streamExpression = StreamExpressionParser.parse(expr);
StreamFactory streamFactory = new StreamFactory();
private PushBackStream doLocalMode(String expr, StreamFactory streamFactory) throws Exception {

// stdin is ONLY available in the local mode, not in the remote mode as it
// requires access to System.in
Expand All @@ -265,23 +282,7 @@ private PushBackStream doLocalMode(CommandLine cli, String expr) throws Exceptio
// logic about where to read data from.
streamFactory.withFunctionName("cat", LocalCatStream.class);

streamFactory.withDefaultSolrConnection(solrConnection);

Lang.register(streamFactory);

assert streamExpression != null;
stream = streamFactory.constructStream(streamExpression);

pushBackStream = new PushBackStream(stream);

// Now we can run the stream and return the results.
StreamContext streamContext = new StreamContext();
streamContext.setSolrClientCache(solrClientCache);

// Output the headers
pushBackStream.setStreamContext(streamContext);

return pushBackStream;
return new PushBackStream(streamFactory.constructStream(expr));
}

/**
Expand All @@ -291,14 +292,15 @@ private PushBackStream doLocalMode(CommandLine cli, String expr) throws Exceptio
* <p>Running remotely allows you to use all the standard Streaming Expression capabilities as the
* expression is running in a Solr environment.
*
* @param cli The CLI invoking the call
* @param expr The streaming expression to be parsed and run remotely
* @param cli The CLI invoking the call
* @return A connection to the streaming expression that receives Tuples as they are emitted from
* Solr /stream.
*/
private PushBackStream doRemoteMode(CommandLine cli, String expr) throws Exception {
private PushBackStream doRemoteMode(String expr, CommandLine cli) throws Exception {

String solrUrl = CLIUtils.normalizeSolrUrl(cli);

if (!cli.hasOption(COLLECTION_OPTION)) {
throw new IllegalStateException(
"You must provide --name COLLECTION with --execution remote parameter.");
Expand All @@ -310,16 +312,8 @@ private PushBackStream doRemoteMode(CommandLine cli, String expr) throws Excepti
"The stdin() expression is only usable with --worker local set up.");
}

final SolrStream solrStream =
new SolrStream(solrUrl + "/solr/" + collection, params("qt", "/stream", "expr", expr));

String credentials = cli.getOptionValue(CommonCLIOptions.CREDENTIALS_OPTION);
if (credentials != null) {
String username = credentials.split(":")[0];
String password = credentials.split(":")[1];
solrStream.setCredentials(username, password);
}
return new PushBackStream(solrStream);
return new PushBackStream(
new SolrStream(solrUrl + "/solr/" + collection, params("qt", "/stream", "expr", expr)));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i feel like this should be "/solr/" + collection + "/stream", params("expr",expr)... but that is a bigger change.

}

private static ModifiableSolrParams params(String... params) {
Expand Down
Loading