-
Notifications
You must be signed in to change notification settings - Fork 833
Refactor CLI StreamTool SolrClientCache usage #4513
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,7 +38,6 @@ | |
| import org.apache.commons.cli.CommandLine; | ||
| import org.apache.commons.cli.Option; | ||
| import org.apache.commons.cli.Options; | ||
| import org.apache.solr.client.solrj.io.Lang; | ||
| import org.apache.solr.client.solrj.io.SolrClientCache; | ||
| import org.apache.solr.client.solrj.io.Tuple; | ||
| import org.apache.solr.client.solrj.io.comp.StreamComparator; | ||
|
|
@@ -49,8 +48,8 @@ | |
| import org.apache.solr.client.solrj.io.stream.expr.Explanation; | ||
| import org.apache.solr.client.solrj.io.stream.expr.Expressible; | ||
| import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; | ||
| import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser; | ||
| import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; | ||
| import org.apache.solr.client.solrj.jetty.HttpJettySolrClient; | ||
| import org.apache.solr.common.SolrException; | ||
| import org.apache.solr.common.params.ModifiableSolrParams; | ||
| import org.apache.solr.handler.CatStream; | ||
|
|
@@ -62,8 +61,6 @@ public StreamTool(ToolRuntime runtime) { | |
| super(runtime); | ||
| } | ||
|
|
||
| private final SolrClientCache solrClientCache = new SolrClientCache(); | ||
|
|
||
| @Override | ||
| public String getName() { | ||
| return "stream"; | ||
|
|
@@ -166,14 +163,17 @@ public void runImpl(CommandLine cli) throws Exception { | |
| } | ||
| } | ||
|
|
||
| // a stream needs a context | ||
| StreamContext streamContext = createStreamContext(cli); | ||
| // create the stream | ||
| PushBackStream pushBackStream; | ||
| if (execution.equalsIgnoreCase("local")) { | ||
| pushBackStream = doLocalMode(cli, expr); | ||
| pushBackStream = doLocalMode(expr, streamContext.getStreamFactory()); | ||
| } else { | ||
| pushBackStream = doRemoteMode(cli, expr); | ||
| pushBackStream = doRemoteMode(expr, cli); | ||
| } | ||
|
|
||
| try { | ||
| pushBackStream.setStreamContext(streamContext); | ||
| pushBackStream.open(); | ||
|
|
||
| if (outputHeaders == null) { | ||
|
|
@@ -227,35 +227,52 @@ public void runImpl(CommandLine cli) throws Exception { | |
| } | ||
| } finally { | ||
| pushBackStream.close(); | ||
| solrClientCache.close(); | ||
| streamContext.getSolrClientCache().close(); | ||
| } | ||
|
|
||
| echoIfVerbose("StreamTool -- Done."); | ||
| } | ||
|
|
||
| private StreamContext createStreamContext(CommandLine cli) throws Exception { | ||
| var jettyClientBuilder = new HttpJettySolrClient.Builder(); | ||
| String credentials = cli.getOptionValue(CommonCLIOptions.CREDENTIALS_OPTION); | ||
| if (credentials != null) { | ||
| String[] userPass = credentials.split(":"); | ||
| jettyClientBuilder.withBasicAuthCredentials(userPass[0], userPass[1]); | ||
| } | ||
| HttpJettySolrClient client = jettyClientBuilder.build(); | ||
|
|
||
| // subclass so we can ensure our client is closed when the cache is closed | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is some magic! If this isn't there, do we see errors? |
||
| var solrClientCache = | ||
| new SolrClientCache(client) { | ||
| @Override | ||
| public synchronized void close() { | ||
| super.close(); | ||
| client.close(); | ||
| } | ||
| }; | ||
|
|
||
| var solrConnection = CLIUtils.getSolrConnection(cli); | ||
| echoIfVerbose("Connecting to Solr at " + solrConnection); | ||
|
|
||
| StreamContext streamContext = new StreamContext(); | ||
| streamContext.setSolrClientCache(solrClientCache); | ||
| streamContext.getStreamFactory().withDefaultSolrConnection(solrConnection); | ||
| return streamContext; | ||
| } | ||
|
|
||
| /** | ||
| * Runs a streaming expression in the local process of the CLI. | ||
| * | ||
| * <p>Running locally means that parallelization support or those expressions requiring access to | ||
| * internal Solr capabilities will not function. | ||
| * | ||
| * @param cli The CLI invoking the call | ||
| * @param expr The streaming expression to be parsed and in the context of the CLI process | ||
| * @param expr The streaming expression to be parsed and run in the context of the CLI process | ||
| * @param streamFactory The factory used to construct the streaming expression | ||
| * @return A connection to the streaming expression that receives Tuples as they are emitted | ||
| * locally. | ||
| */ | ||
| private PushBackStream doLocalMode(CommandLine cli, String expr) throws Exception { | ||
| var solrConnection = CLIUtils.getSolrConnection(cli); | ||
| echoIfVerbose("Connecting to Solr at " + solrConnection.toString()); | ||
| solrClientCache.setBasicAuthCredentials( | ||
| cli.getOptionValue(CommonCLIOptions.CREDENTIALS_OPTION)); | ||
| solrClientCache.getCloudSolrClient(solrConnection); | ||
|
|
||
| TupleStream stream; | ||
| PushBackStream pushBackStream; | ||
|
|
||
| StreamExpression streamExpression = StreamExpressionParser.parse(expr); | ||
| StreamFactory streamFactory = new StreamFactory(); | ||
| private PushBackStream doLocalMode(String expr, StreamFactory streamFactory) throws Exception { | ||
|
|
||
| // stdin is ONLY available in the local mode, not in the remote mode as it | ||
| // requires access to System.in | ||
|
|
@@ -265,23 +282,7 @@ private PushBackStream doLocalMode(CommandLine cli, String expr) throws Exceptio | |
| // logic about where to read data from. | ||
| streamFactory.withFunctionName("cat", LocalCatStream.class); | ||
|
|
||
| streamFactory.withDefaultSolrConnection(solrConnection); | ||
|
|
||
| Lang.register(streamFactory); | ||
|
|
||
| assert streamExpression != null; | ||
| stream = streamFactory.constructStream(streamExpression); | ||
|
|
||
| pushBackStream = new PushBackStream(stream); | ||
|
|
||
| // Now we can run the stream and return the results. | ||
| StreamContext streamContext = new StreamContext(); | ||
| streamContext.setSolrClientCache(solrClientCache); | ||
|
|
||
| // Output the headers | ||
| pushBackStream.setStreamContext(streamContext); | ||
|
|
||
| return pushBackStream; | ||
| return new PushBackStream(streamFactory.constructStream(expr)); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -291,14 +292,15 @@ private PushBackStream doLocalMode(CommandLine cli, String expr) throws Exceptio | |
| * <p>Running remotely allows you to use all the standard Streaming Expression capabilities as the | ||
| * expression is running in a Solr environment. | ||
| * | ||
| * @param cli The CLI invoking the call | ||
| * @param expr The streaming expression to be parsed and run remotely | ||
| * @param cli The CLI invoking the call | ||
| * @return A connection to the streaming expression that receives Tuples as they are emitted from | ||
| * Solr /stream. | ||
| */ | ||
| private PushBackStream doRemoteMode(CommandLine cli, String expr) throws Exception { | ||
| private PushBackStream doRemoteMode(String expr, CommandLine cli) throws Exception { | ||
|
|
||
| String solrUrl = CLIUtils.normalizeSolrUrl(cli); | ||
|
|
||
| if (!cli.hasOption(COLLECTION_OPTION)) { | ||
| throw new IllegalStateException( | ||
| "You must provide --name COLLECTION with --execution remote parameter."); | ||
|
|
@@ -310,16 +312,8 @@ private PushBackStream doRemoteMode(CommandLine cli, String expr) throws Excepti | |
| "The stdin() expression is only usable with --worker local set up."); | ||
| } | ||
|
|
||
| final SolrStream solrStream = | ||
| new SolrStream(solrUrl + "/solr/" + collection, params("qt", "/stream", "expr", expr)); | ||
|
|
||
| String credentials = cli.getOptionValue(CommonCLIOptions.CREDENTIALS_OPTION); | ||
| if (credentials != null) { | ||
| String username = credentials.split(":")[0]; | ||
| String password = credentials.split(":")[1]; | ||
| solrStream.setCredentials(username, password); | ||
| } | ||
| return new PushBackStream(solrStream); | ||
| return new PushBackStream( | ||
| new SolrStream(solrUrl + "/solr/" + collection, params("qt", "/stream", "expr", expr))); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i feel like this should be |
||
| } | ||
|
|
||
| private static ModifiableSolrParams params(String... params) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO these "normalize" methods are non-obvious since the URL normalization I'm used to seeing within Solr produce Solr URLs ending with "/solr". The javadoc here refers to what I'm used to as "legacy". Wow... am I a dinosaur or has v2 taken over? SolrJ didn't get the memo, I can tell you that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a little bit dinosaur... But not a lot dinosaur... We've been trying to simpliy users lives so they don't have to think about /solr or /v2 or /api all the time, unless is critical. So, we normalize to the root version, and then whatever tool can append what it needs...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO URL utility methods should be centralized to URLUtil. And naming reconsidered. If this method produces URLs without any path, then this is an opportunity to very simply state that in the method name rather than ambiguous "normalization", (it once wasn't ambiguous but in our transition period, is ambiguous).