From 73984ad81de4a1db025ca8a68b57e1d8b75240a0 Mon Sep 17 00:00:00 2001 From: Keshav Dandeva Date: Fri, 24 Apr 2026 14:45:40 +0000 Subject: [PATCH 1/5] feat(bigquery-jdbc): extend OpenTelemetry instrumentation for metadata and pagination --- .../bigquery/jdbc/BigQueryArrowResultSet.java | 109 +-- .../bigquery/jdbc/BigQueryBaseResultSet.java | 4 + .../jdbc/BigQueryDatabaseMetaData.java | 767 ++++++++++-------- .../jdbc/BigQueryJdbcOpenTelemetry.java | 8 + .../bigquery/jdbc/BigQueryJsonResultSet.java | 81 +- .../bigquery/jdbc/BigQueryStatement.java | 166 ++-- 6 files changed, 600 insertions(+), 535 deletions(-) diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java index 1d7d89e3f1f1..d483fad18f75 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java @@ -27,6 +27,9 @@ import com.google.cloud.bigquery.exception.BigQueryJdbcRuntimeException; import com.google.cloud.bigquery.storage.v1.ArrowRecordBatch; import com.google.cloud.bigquery.storage.v1.ArrowSchema; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import java.io.IOException; import java.math.BigDecimal; import java.sql.Date; @@ -213,66 +216,68 @@ public void close() { @Override public boolean next() throws SQLException { checkClosed(); - if (this.isNested) { - if (this.currentNestedBatch == null || this.currentNestedBatch.getNestedRecords() == null) { - throw new IllegalStateException( - "currentNestedBatch/JsonStringArrayList can not be null working with the nested record"); - } - if (this.nestedRowIndex < (this.toIndexExclusive - 1)) { - /* Check if there's a next record in the array which can be read */ - this.nestedRowIndex++; - return true; - } - this.afterLast = true; - return false; - } else { - /* Non nested */ - if (this.hasReachedEnd || this.isLast()) { + try (Scope scope = Context.current().with(Span.wrap(originalSpanContext)).makeCurrent()) { + if (this.isNested) { + if (this.currentNestedBatch == null || this.currentNestedBatch.getNestedRecords() == null) { + throw new IllegalStateException( + "currentNestedBatch/JsonStringArrayList can not be null working with the nested record"); + } + if (this.nestedRowIndex < (this.toIndexExclusive - 1)) { + /* Check if there's a next record in the array which can be read */ + this.nestedRowIndex++; + return true; + } this.afterLast = true; return false; - } - try { - if (this.currentBatchRowIndex == -1 - || this.currentBatchRowIndex == (this.vectorSchemaRoot.getRowCount() - 1)) { - /* Start of iteration or we have exhausted the current batch */ - // Advance the cursor. Potentially blocking operation. - BigQueryArrowBatchWrapper batchWrapper = this.buffer.take(); - if (batchWrapper.getException() != null) { - throw new BigQueryJdbcRuntimeException(batchWrapper.getException()); - } - if (batchWrapper.isLast()) { - /* Marks the end of the records */ - if (this.vectorSchemaRoot != null) { - // IMP: To avoid memory leak: clear vectorSchemaRoot as it still holds - // the last batch - this.vectorSchemaRoot.clear(); + } else { + /* Non nested */ + if (this.hasReachedEnd || this.isLast()) { + this.afterLast = true; + return false; + } + try { + if (this.currentBatchRowIndex == -1 + || this.currentBatchRowIndex == (this.vectorSchemaRoot.getRowCount() - 1)) { + /* Start of iteration or we have exhausted the current batch */ + // Advance the cursor. Potentially blocking operation. + BigQueryArrowBatchWrapper batchWrapper = this.buffer.take(); + if (batchWrapper.getException() != null) { + throw new BigQueryJdbcRuntimeException(batchWrapper.getException()); + } + if (batchWrapper.isLast()) { + /* Marks the end of the records */ + if (this.vectorSchemaRoot != null) { + // IMP: To avoid memory leak: clear vectorSchemaRoot as it still holds + // the last batch + this.vectorSchemaRoot.clear(); + } + this.hasReachedEnd = true; + this.rowCount++; + return false; } - this.hasReachedEnd = true; + // Valid batch, process it + ArrowRecordBatch arrowBatch = batchWrapper.getCurrentArrowBatch(); + // Populates vectorSchemaRoot + this.arrowDeserializer.deserializeArrowBatch(arrowBatch); + // Pointing to the first row in this fresh batch + this.currentBatchRowIndex = 0; this.rowCount++; - return false; + return true; } - // Valid batch, process it - ArrowRecordBatch arrowBatch = batchWrapper.getCurrentArrowBatch(); - // Populates vectorSchemaRoot - this.arrowDeserializer.deserializeArrowBatch(arrowBatch); - // Pointing to the first row in this fresh batch - this.currentBatchRowIndex = 0; - this.rowCount++; - return true; - } - // There are rows left in the current batch. - else if (this.currentBatchRowIndex < this.vectorSchemaRoot.getRowCount()) { - this.currentBatchRowIndex++; - this.rowCount++; - return true; + // There are rows left in the current batch. + else if (this.currentBatchRowIndex < this.vectorSchemaRoot.getRowCount()) { + this.currentBatchRowIndex++; + this.rowCount++; + return true; + } + } catch (InterruptedException | SQLException ex) { + throw new BigQueryJdbcException( + "Error occurred while advancing the cursor. This could happen when connection is closed while the next method is being called.", + ex); } - } catch (InterruptedException | SQLException ex) { - throw new BigQueryJdbcException( - "Error occurred while advancing the cursor. This could happen when connection is closed while the next method is being called.", - ex); } + return false; } - return false; } private Object getObjectInternal(int columnIndex) throws SQLException { diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryBaseResultSet.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryBaseResultSet.java index 4ff4acad6b2a..9b20354ad014 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryBaseResultSet.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryBaseResultSet.java @@ -27,6 +27,8 @@ import com.google.cloud.bigquery.exception.BigQueryConversionException; import com.google.cloud.bigquery.exception.BigQueryJdbcCoercionException; import com.google.cloud.bigquery.exception.BigQueryJdbcCoercionNotFoundException; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; import java.io.InputStream; import java.io.Reader; import java.io.StringReader; @@ -58,6 +60,7 @@ public abstract class BigQueryBaseResultSet extends BigQueryNoOpsResultSet protected boolean isClosed = false; protected boolean wasNull = false; protected final BigQueryTypeCoercer bigQueryTypeCoercer = BigQueryTypeCoercionUtility.INSTANCE; + protected final SpanContext originalSpanContext; protected BigQueryBaseResultSet( BigQuery bigQuery, BigQueryStatement statement, Schema schema, boolean isNested) { @@ -66,6 +69,7 @@ protected BigQueryBaseResultSet( this.schema = schema; this.schemaFieldList = schema != null ? schema.getFields() : null; this.isNested = isNested; + this.originalSpanContext = Span.current().getSpanContext(); } public QueryStatistics getQueryStatistics() { diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java index f08e29215c1a..689e57622714 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java @@ -42,6 +42,10 @@ import com.google.cloud.bigquery.TableDefinition; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.exception.BigQueryJdbcException; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; @@ -1699,179 +1703,193 @@ Comparator defineGetProcedureColumnsComparator(FieldList resultS @Override public ResultSet getTables( String catalog, String schemaPattern, String tableNamePattern, String[] types) { + Tracer tracer = BigQueryJdbcOpenTelemetry.getSafeTracer(this.connection); + Span span = tracer.spanBuilder("BigQueryDatabaseMetaData.getTables").startSpan(); + try (Scope scope = span.makeCurrent()) { + Tuple effectiveIdentifiers = + determineEffectiveCatalogAndSchema(catalog, schemaPattern); + String effectiveCatalog = effectiveIdentifiers.x(); + String effectiveSchemaPattern = effectiveIdentifiers.y(); + + if ((effectiveCatalog == null || effectiveCatalog.isEmpty()) + || (effectiveSchemaPattern != null && effectiveSchemaPattern.isEmpty()) + || (tableNamePattern != null && tableNamePattern.isEmpty())) { + LOG.warning( + "Returning empty ResultSet as one or more patterns are empty or catalog is null."); + return new BigQueryJsonResultSet(); + } - Tuple effectiveIdentifiers = - determineEffectiveCatalogAndSchema(catalog, schemaPattern); - String effectiveCatalog = effectiveIdentifiers.x(); - String effectiveSchemaPattern = effectiveIdentifiers.y(); + LOG.info( + "getTables called for catalog: %s, schemaPattern: %s, tableNamePattern: %s, types: %s", + effectiveCatalog, effectiveSchemaPattern, tableNamePattern, Arrays.toString(types)); - if ((effectiveCatalog == null || effectiveCatalog.isEmpty()) - || (effectiveSchemaPattern != null && effectiveSchemaPattern.isEmpty()) - || (tableNamePattern != null && tableNamePattern.isEmpty())) { - LOG.warning( - "Returning empty ResultSet as one or more patterns are empty or catalog is null."); - return new BigQueryJsonResultSet(); - } + final Pattern schemaRegex = compileSqlLikePattern(effectiveSchemaPattern); + final Pattern tableNameRegex = compileSqlLikePattern(tableNamePattern); + final Set requestedTypes = + (types == null || types.length == 0) ? null : new HashSet<>(Arrays.asList(types)); - LOG.info( - "getTables called for catalog: %s, schemaPattern: %s, tableNamePattern: %s, types: %s", - effectiveCatalog, effectiveSchemaPattern, tableNamePattern, Arrays.toString(types)); - - final Pattern schemaRegex = compileSqlLikePattern(effectiveSchemaPattern); - final Pattern tableNameRegex = compileSqlLikePattern(tableNamePattern); - final Set requestedTypes = - (types == null || types.length == 0) ? null : new HashSet<>(Arrays.asList(types)); + final Schema resultSchema = defineGetTablesSchema(); + final FieldList resultSchemaFields = resultSchema.getFields(); - final Schema resultSchema = defineGetTablesSchema(); - final FieldList resultSchemaFields = resultSchema.getFields(); + final BlockingQueue queue = + new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); + final List collectedResults = Collections.synchronizedList(new ArrayList<>()); + final String catalogParam = effectiveCatalog; + final String schemaParam = effectiveSchemaPattern; - final BlockingQueue queue = - new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); - final List collectedResults = Collections.synchronizedList(new ArrayList<>()); - final String catalogParam = effectiveCatalog; - final String schemaParam = effectiveSchemaPattern; + Runnable tableFetcher = + () -> { + ExecutorService apiExecutor = null; + ExecutorService tableProcessorExecutor = null; + final FieldList localResultSchemaFields = resultSchemaFields; + final List>> apiFutures = new ArrayList<>(); + final List> processingFutures = new ArrayList<>(); - Runnable tableFetcher = - () -> { - ExecutorService apiExecutor = null; - ExecutorService tableProcessorExecutor = null; - final FieldList localResultSchemaFields = resultSchemaFields; - final List>> apiFutures = new ArrayList<>(); - final List> processingFutures = new ArrayList<>(); + try { + List datasetsToScan = + findMatchingBigQueryObjects( + "Dataset", + () -> + bigquery.listDatasets( + catalogParam, DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), + (name) -> bigquery.getDataset(DatasetId.of(catalogParam, name)), + (ds) -> ds.getDatasetId().getDataset(), + schemaParam, + schemaRegex, + LOG); - try { - List datasetsToScan = - findMatchingBigQueryObjects( - "Dataset", - () -> - bigquery.listDatasets( - catalogParam, DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), - (name) -> bigquery.getDataset(DatasetId.of(catalogParam, name)), - (ds) -> ds.getDatasetId().getDataset(), - schemaParam, - schemaRegex, - LOG); + if (datasetsToScan.isEmpty()) { + LOG.info("Fetcher thread found no matching datasets. Returning empty resultset."); + return; + } - if (datasetsToScan.isEmpty()) { - LOG.info("Fetcher thread found no matching datasets. Returning empty resultset."); - return; - } + apiExecutor = Executors.newFixedThreadPool(API_EXECUTOR_POOL_SIZE); + tableProcessorExecutor = Executors.newFixedThreadPool(this.metadataFetchThreadCount); - apiExecutor = Executors.newFixedThreadPool(API_EXECUTOR_POOL_SIZE); - tableProcessorExecutor = Executors.newFixedThreadPool(this.metadataFetchThreadCount); + LOG.fine("Submitting parallel findMatchingTables tasks..."); + for (Dataset dataset : datasetsToScan) { + if (Thread.currentThread().isInterrupted()) { + LOG.warning("Table fetcher interrupted during dataset iteration."); + break; + } - LOG.fine("Submitting parallel findMatchingTables tasks..."); - for (Dataset dataset : datasetsToScan) { - if (Thread.currentThread().isInterrupted()) { - LOG.warning("Table fetcher interrupted during dataset iteration."); - break; + final DatasetId currentDatasetId = dataset.getDatasetId(); + Callable> apiCallable = + () -> + findMatchingBigQueryObjects( + "Table", + () -> + bigquery.listTables( + currentDatasetId, TableListOption.pageSize(DEFAULT_PAGE_SIZE)), + (name) -> + bigquery.getTable( + TableId.of( + currentDatasetId.getProject(), + currentDatasetId.getDataset(), + name)), + (tbl) -> tbl.getTableId().getTable(), + tableNamePattern, + tableNameRegex, + LOG); + + Callable> wrappedApiCallable = Context.current().wrap(apiCallable); + Future> apiFuture = apiExecutor.submit(wrappedApiCallable); + apiFutures.add(apiFuture); } + LOG.fine("Finished submitting " + apiFutures.size() + " findMatchingTables tasks."); + apiExecutor.shutdown(); - final DatasetId currentDatasetId = dataset.getDatasetId(); - Callable> apiCallable = - () -> - findMatchingBigQueryObjects( - "Table", + LOG.fine("Processing results from findMatchingTables tasks..."); + for (Future> apiFuture : apiFutures) { + if (Thread.currentThread().isInterrupted()) { + LOG.warning("Table fetcher interrupted while processing API futures."); + break; + } + try { + List tablesResult = apiFuture.get(); + if (tablesResult != null) { + for (Table table : tablesResult) { + if (Thread.currentThread().isInterrupted()) break; + + final Table currentTable = table; + Runnable processRunnable = () -> - bigquery.listTables( - currentDatasetId, TableListOption.pageSize(DEFAULT_PAGE_SIZE)), - (name) -> - bigquery.getTable( - TableId.of( - currentDatasetId.getProject(), - currentDatasetId.getDataset(), - name)), - (tbl) -> tbl.getTableId().getTable(), - tableNamePattern, - tableNameRegex, - LOG); - Future> apiFuture = apiExecutor.submit(apiCallable); - apiFutures.add(apiFuture); - } - LOG.fine("Finished submitting " + apiFutures.size() + " findMatchingTables tasks."); - apiExecutor.shutdown(); + processTableInfo( + currentTable, + requestedTypes, + collectedResults, + localResultSchemaFields); + Runnable wrappedProcessRunnable = Context.current().wrap(processRunnable); + Future processFuture = + tableProcessorExecutor.submit(wrappedProcessRunnable); + processingFutures.add(processFuture); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warning("Fetcher thread interrupted while waiting for API future result."); + break; + } catch (ExecutionException e) { + LOG.warning( + "Error executing findMatchingTables task: " + + e.getMessage() + + ". Cause: " + + e.getCause()); + } catch (CancellationException e) { + LOG.warning("A findMatchingTables task was cancelled."); + } + } + + LOG.fine( + "Finished submitting " + processingFutures.size() + " processTableInfo tasks."); - LOG.fine("Processing results from findMatchingTables tasks..."); - for (Future> apiFuture : apiFutures) { if (Thread.currentThread().isInterrupted()) { - LOG.warning("Table fetcher interrupted while processing API futures."); - break; + LOG.warning( + "Fetcher interrupted before waiting for processing tasks; cancelling remaining."); + processingFutures.forEach(f -> f.cancel(true)); + } else { + LOG.fine("Waiting for processTableInfo tasks to complete..."); + waitForTasksCompletion(processingFutures); + LOG.fine("All processTableInfo tasks completed."); } - try { - List
tablesResult = apiFuture.get(); - if (tablesResult != null) { - for (Table table : tablesResult) { - if (Thread.currentThread().isInterrupted()) break; - final Table currentTable = table; - Future processFuture = - tableProcessorExecutor.submit( - () -> - processTableInfo( - currentTable, - requestedTypes, - collectedResults, - localResultSchemaFields)); - processingFutures.add(processFuture); - } - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.warning("Fetcher thread interrupted while waiting for API future result."); - break; - } catch (ExecutionException e) { - LOG.warning( - "Error executing findMatchingTables task: " - + e.getMessage() - + ". Cause: " - + e.getCause()); - } catch (CancellationException e) { - LOG.warning("A findMatchingTables task was cancelled."); + if (!Thread.currentThread().isInterrupted()) { + Comparator comparator = + defineGetTablesComparator(localResultSchemaFields); + sortResults(collectedResults, comparator, "getTables", LOG); } - } - LOG.fine( - "Finished submitting " + processingFutures.size() + " processTableInfo tasks."); + if (!Thread.currentThread().isInterrupted()) { + populateQueue(collectedResults, queue, localResultSchemaFields); + } - if (Thread.currentThread().isInterrupted()) { - LOG.warning( - "Fetcher interrupted before waiting for processing tasks; cancelling remaining."); + } catch (Throwable t) { + LOG.severe("Unexpected error in table fetcher runnable: " + t.getMessage()); + apiFutures.forEach(f -> f.cancel(true)); processingFutures.forEach(f -> f.cancel(true)); - } else { - LOG.fine("Waiting for processTableInfo tasks to complete..."); - waitForTasksCompletion(processingFutures); - LOG.fine("All processTableInfo tasks completed."); + } finally { + signalEndOfData(queue, localResultSchemaFields); + shutdownExecutor(apiExecutor); + shutdownExecutor(tableProcessorExecutor); + LOG.info("Table fetcher thread finished."); } + }; - if (!Thread.currentThread().isInterrupted()) { - Comparator comparator = - defineGetTablesComparator(localResultSchemaFields); - sortResults(collectedResults, comparator, "getTables", LOG); - } - - if (!Thread.currentThread().isInterrupted()) { - populateQueue(collectedResults, queue, localResultSchemaFields); - } - - } catch (Throwable t) { - LOG.severe("Unexpected error in table fetcher runnable: " + t.getMessage()); - apiFutures.forEach(f -> f.cancel(true)); - processingFutures.forEach(f -> f.cancel(true)); - } finally { - signalEndOfData(queue, localResultSchemaFields); - shutdownExecutor(apiExecutor); - shutdownExecutor(tableProcessorExecutor); - LOG.info("Table fetcher thread finished."); - } - }; - - Thread fetcherThread = new Thread(tableFetcher, "getTables-fetcher-" + effectiveCatalog); - BigQueryJsonResultSet resultSet = - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); + Runnable wrappedTableFetcher = Context.current().wrap(tableFetcher); + Thread fetcherThread = + new Thread(wrappedTableFetcher, "getTables-fetcher-" + effectiveCatalog); + BigQueryJsonResultSet resultSet = + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); - fetcherThread.start(); - LOG.info("Started background thread for getTables"); - return resultSet; + fetcherThread.start(); + LOG.info("Started background thread for getTables"); + return resultSet; + } catch (Exception e) { + span.recordException(e); + throw e; + } finally { + span.end(); + } } Schema defineGetTablesSchema() { @@ -1994,20 +2012,29 @@ public ResultSet getSchemas() { @Override public ResultSet getCatalogs() { LOG.info("getCatalogs() called"); - - final List accessibleCatalogs = getAccessibleCatalogNames(); - final Schema catalogsSchema = defineGetCatalogsSchema(); - final FieldList schemaFields = catalogsSchema.getFields(); - final List catalogRows = - prepareGetCatalogsRows(schemaFields, accessibleCatalogs); - - final BlockingQueue queue = - new LinkedBlockingQueue<>(catalogRows.isEmpty() ? 1 : catalogRows.size() + 1); - - populateQueue(catalogRows, queue, schemaFields); - signalEndOfData(queue, schemaFields); - - return BigQueryJsonResultSet.of(catalogsSchema, catalogRows.size(), queue, null, new Thread[0]); + Tracer tracer = BigQueryJdbcOpenTelemetry.getSafeTracer(this.connection); + Span span = tracer.spanBuilder("BigQueryDatabaseMetaData.getCatalogs").startSpan(); + try (Scope scope = span.makeCurrent()) { + final List accessibleCatalogs = getAccessibleCatalogNames(); + final Schema catalogsSchema = defineGetCatalogsSchema(); + final FieldList schemaFields = catalogsSchema.getFields(); + final List catalogRows = + prepareGetCatalogsRows(schemaFields, accessibleCatalogs); + + final BlockingQueue queue = + new LinkedBlockingQueue<>(catalogRows.isEmpty() ? 1 : catalogRows.size() + 1); + + populateQueue(catalogRows, queue, schemaFields); + signalEndOfData(queue, schemaFields); + + return BigQueryJsonResultSet.of( + catalogsSchema, catalogRows.size(), queue, null, new Thread[0]); + } catch (Exception e) { + span.recordException(e); + throw e; + } finally { + span.end(); + } } Schema defineGetCatalogsSchema() { @@ -2064,140 +2091,152 @@ static List prepareGetTableTypesRows(Schema schema) { @Override public ResultSet getColumns( String catalog, String schemaPattern, String tableNamePattern, String columnNamePattern) { + Tracer tracer = BigQueryJdbcOpenTelemetry.getSafeTracer(this.connection); + Span span = tracer.spanBuilder("BigQueryDatabaseMetaData.getColumns").startSpan(); + try (Scope scope = span.makeCurrent()) { + Tuple effectiveIdentifiers = + determineEffectiveCatalogAndSchema(catalog, schemaPattern); + String effectiveCatalog = effectiveIdentifiers.x(); + String effectiveSchemaPattern = effectiveIdentifiers.y(); + + if ((effectiveCatalog == null || effectiveCatalog.isEmpty()) + || (effectiveSchemaPattern != null && effectiveSchemaPattern.isEmpty()) + || (tableNamePattern != null && tableNamePattern.isEmpty()) + || (columnNamePattern != null && columnNamePattern.isEmpty())) { + LOG.warning( + "Returning empty ResultSet as one or more patterns are empty or catalog is null."); + return new BigQueryJsonResultSet(); + } - Tuple effectiveIdentifiers = - determineEffectiveCatalogAndSchema(catalog, schemaPattern); - String effectiveCatalog = effectiveIdentifiers.x(); - String effectiveSchemaPattern = effectiveIdentifiers.y(); - - if ((effectiveCatalog == null || effectiveCatalog.isEmpty()) - || (effectiveSchemaPattern != null && effectiveSchemaPattern.isEmpty()) - || (tableNamePattern != null && tableNamePattern.isEmpty()) - || (columnNamePattern != null && columnNamePattern.isEmpty())) { - LOG.warning( - "Returning empty ResultSet as one or more patterns are empty or catalog is null."); - return new BigQueryJsonResultSet(); - } - - LOG.info( - "getColumns called for catalog: %s, schemaPattern: %s, tableNamePattern: %s," - + " columnNamePattern: %s", - effectiveCatalog, effectiveSchemaPattern, tableNamePattern, columnNamePattern); - - Pattern schemaRegex = compileSqlLikePattern(effectiveSchemaPattern); - Pattern tableNameRegex = compileSqlLikePattern(tableNamePattern); - Pattern columnNameRegex = compileSqlLikePattern(columnNamePattern); - - final Schema resultSchema = defineGetColumnsSchema(); - final FieldList resultSchemaFields = resultSchema.getFields(); - final BlockingQueue queue = - new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); - final List collectedResults = Collections.synchronizedList(new ArrayList<>()); - final String catalogParam = effectiveCatalog; - final String schemaParam = effectiveSchemaPattern; - - Runnable columnFetcher = - () -> { - ExecutorService columnExecutor = null; - final List> taskFutures = new ArrayList<>(); - final FieldList localResultSchemaFields = resultSchemaFields; - - try { - List datasetsToScan = - findMatchingBigQueryObjects( - "Dataset", - () -> - bigquery.listDatasets( - catalogParam, DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), - (name) -> bigquery.getDataset(DatasetId.of(catalogParam, name)), - (ds) -> ds.getDatasetId().getDataset(), - schemaParam, - schemaRegex, - LOG); - - if (datasetsToScan.isEmpty()) { - LOG.info("Fetcher thread found no matching datasets. Returning empty resultset."); - return; - } + LOG.info( + "getColumns called for catalog: %s, schemaPattern: %s, tableNamePattern: %s," + + " columnNamePattern: %s", + effectiveCatalog, effectiveSchemaPattern, tableNamePattern, columnNamePattern); - columnExecutor = Executors.newFixedThreadPool(this.metadataFetchThreadCount); + Pattern schemaRegex = compileSqlLikePattern(effectiveSchemaPattern); + Pattern tableNameRegex = compileSqlLikePattern(tableNamePattern); + Pattern columnNameRegex = compileSqlLikePattern(columnNamePattern); - for (Dataset dataset : datasetsToScan) { - if (Thread.currentThread().isInterrupted()) { - LOG.warning("Fetcher interrupted during dataset iteration."); - break; - } + final Schema resultSchema = defineGetColumnsSchema(); + final FieldList resultSchemaFields = resultSchema.getFields(); + final BlockingQueue queue = + new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); + final List collectedResults = Collections.synchronizedList(new ArrayList<>()); + final String catalogParam = effectiveCatalog; + final String schemaParam = effectiveSchemaPattern; - DatasetId datasetId = dataset.getDatasetId(); - LOG.info("Processing dataset: " + datasetId.getDataset()); + Runnable columnFetcher = + () -> { + ExecutorService columnExecutor = null; + final List> taskFutures = new ArrayList<>(); + final FieldList localResultSchemaFields = resultSchemaFields; - List
tablesToScan = + try { + List datasetsToScan = findMatchingBigQueryObjects( - "Table", + "Dataset", () -> - bigquery.listTables( - datasetId, TableListOption.pageSize(DEFAULT_PAGE_SIZE)), - (name) -> - bigquery.getTable( - TableId.of(datasetId.getProject(), datasetId.getDataset(), name)), - (tbl) -> tbl.getTableId().getTable(), - tableNamePattern, - tableNameRegex, + bigquery.listDatasets( + catalogParam, DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), + (name) -> bigquery.getDataset(DatasetId.of(catalogParam, name)), + (ds) -> ds.getDatasetId().getDataset(), + schemaParam, + schemaRegex, LOG); - for (Table table : tablesToScan) { + if (datasetsToScan.isEmpty()) { + LOG.info("Fetcher thread found no matching datasets. Returning empty resultset."); + return; + } + + columnExecutor = Executors.newFixedThreadPool(this.metadataFetchThreadCount); + + for (Dataset dataset : datasetsToScan) { if (Thread.currentThread().isInterrupted()) { - LOG.warning( - "Fetcher interrupted during table iteration for dataset " - + datasetId.getDataset()); + LOG.warning("Fetcher interrupted during dataset iteration."); break; } - TableId tableId = table.getTableId(); - LOG.fine("Submitting task for table: " + tableId); - final Table finalTable = table; - Future future = - columnExecutor.submit( + DatasetId datasetId = dataset.getDatasetId(); + LOG.info("Processing dataset: " + datasetId.getDataset()); + + List
tablesToScan = + findMatchingBigQueryObjects( + "Table", () -> - processTableColumns( - finalTable, - columnNameRegex, - collectedResults, - localResultSchemaFields)); - taskFutures.add(future); + bigquery.listTables( + datasetId, TableListOption.pageSize(DEFAULT_PAGE_SIZE)), + (name) -> + bigquery.getTable( + TableId.of(datasetId.getProject(), datasetId.getDataset(), name)), + (tbl) -> tbl.getTableId().getTable(), + tableNamePattern, + tableNameRegex, + LOG); + + for (Table table : tablesToScan) { + if (Thread.currentThread().isInterrupted()) { + LOG.warning( + "Fetcher interrupted during table iteration for dataset " + + datasetId.getDataset()); + break; + } + + TableId tableId = table.getTableId(); + LOG.fine("Submitting task for table: " + tableId); + final Table finalTable = table; + + Runnable columnTask = + () -> + processTableColumns( + finalTable, + columnNameRegex, + collectedResults, + localResultSchemaFields); + Runnable wrappedColumnTask = Context.current().wrap(columnTask); + Future future = columnExecutor.submit(wrappedColumnTask); + taskFutures.add(future); + } + if (Thread.currentThread().isInterrupted()) break; } - if (Thread.currentThread().isInterrupted()) break; - } - waitForTasksCompletion(taskFutures); + waitForTasksCompletion(taskFutures); - if (!Thread.currentThread().isInterrupted()) { - Comparator comparator = - defineGetColumnsComparator(localResultSchemaFields); - sortResults(collectedResults, comparator, "getColumns", LOG); - } + if (!Thread.currentThread().isInterrupted()) { + Comparator comparator = + defineGetColumnsComparator(localResultSchemaFields); + sortResults(collectedResults, comparator, "getColumns", LOG); + } - if (!Thread.currentThread().isInterrupted()) { - populateQueue(collectedResults, queue, localResultSchemaFields); - } + if (!Thread.currentThread().isInterrupted()) { + populateQueue(collectedResults, queue, localResultSchemaFields); + } - } catch (Throwable t) { - LOG.severe("Unexpected error in column fetcher runnable: " + t.getMessage()); - taskFutures.forEach(f -> f.cancel(true)); - } finally { - signalEndOfData(queue, localResultSchemaFields); - shutdownExecutor(columnExecutor); - LOG.info("Column fetcher thread finished."); - } - }; + } catch (Throwable t) { + LOG.severe("Unexpected error in column fetcher runnable: " + t.getMessage()); + taskFutures.forEach(f -> f.cancel(true)); + } finally { + signalEndOfData(queue, localResultSchemaFields); + shutdownExecutor(columnExecutor); + LOG.info("Column fetcher thread finished."); + } + }; - Thread fetcherThread = new Thread(columnFetcher, "getColumns-fetcher-" + effectiveCatalog); - BigQueryJsonResultSet resultSet = - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); + Runnable wrappedColumnFetcher = Context.current().wrap(columnFetcher); + Thread fetcherThread = + new Thread(wrappedColumnFetcher, "getColumns-fetcher-" + effectiveCatalog); + BigQueryJsonResultSet resultSet = + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); - fetcherThread.start(); - LOG.info("Started background thread for getColumns"); - return resultSet; + fetcherThread.start(); + LOG.info("Started background thread for getColumns"); + return resultSet; + } catch (Exception e) { + span.recordException(e); + throw e; + } finally { + span.end(); + } } private void processTableColumns( @@ -3617,107 +3656,117 @@ public RowIdLifetime getRowIdLifetime() { @Override public ResultSet getSchemas(String catalog, String schemaPattern) { - if ((catalog != null && catalog.isEmpty()) - || (schemaPattern != null && schemaPattern.isEmpty())) { - LOG.warning("Returning empty ResultSet as catalog or schemaPattern is an empty string."); - return new BigQueryJsonResultSet(); - } - - LOG.info("getSchemas called for catalog: %s, schemaPattern: %s", catalog, schemaPattern); - - final Pattern schemaRegex = compileSqlLikePattern(schemaPattern); - final Schema resultSchema = defineGetSchemasSchema(); - final FieldList resultSchemaFields = resultSchema.getFields(); + Tracer tracer = BigQueryJdbcOpenTelemetry.getSafeTracer(this.connection); + Span span = tracer.spanBuilder("BigQueryDatabaseMetaData.getSchemas").startSpan(); + try (Scope scope = span.makeCurrent()) { + if ((catalog != null && catalog.isEmpty()) + || (schemaPattern != null && schemaPattern.isEmpty())) { + LOG.warning("Returning empty ResultSet as catalog or schemaPattern is an empty string."); + return new BigQueryJsonResultSet(); + } - final BlockingQueue queue = - new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); - final List collectedResults = Collections.synchronizedList(new ArrayList<>()); - final String catalogParam = catalog; + LOG.info("getSchemas called for catalog: %s, schemaPattern: %s", catalog, schemaPattern); - Runnable schemaFetcher = - () -> { - final FieldList localResultSchemaFields = resultSchemaFields; - List projectsToScanList = new ArrayList<>(); + final Pattern schemaRegex = compileSqlLikePattern(schemaPattern); + final Schema resultSchema = defineGetSchemasSchema(); + final FieldList resultSchemaFields = resultSchema.getFields(); - if (catalogParam != null) { - projectsToScanList.add(catalogParam); - } else { - projectsToScanList.addAll(getAccessibleCatalogNames()); - } + final BlockingQueue queue = + new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); + final List collectedResults = Collections.synchronizedList(new ArrayList<>()); + final String catalogParam = catalog; - if (projectsToScanList.isEmpty()) { - LOG.info( - "No valid projects to scan (primary, specified, or additional). Returning empty" - + " resultset."); - return; - } + Runnable schemaFetcher = + () -> { + final FieldList localResultSchemaFields = resultSchemaFields; + List projectsToScanList = new ArrayList<>(); - try { - for (String currentProjectToScan : projectsToScanList) { - if (Thread.currentThread().isInterrupted()) { - LOG.warning( - "Schema fetcher interrupted during project iteration for project: " - + currentProjectToScan); - break; - } - LOG.info("Fetching schemas for project: " + currentProjectToScan); - List datasetsInProject = - findMatchingBigQueryObjects( - "Dataset", - () -> - bigquery.listDatasets( - currentProjectToScan, - BigQuery.DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), - (name) -> bigquery.getDataset(DatasetId.of(currentProjectToScan, name)), - (ds) -> ds.getDatasetId().getDataset(), - schemaPattern, - schemaRegex, - LOG); + if (catalogParam != null) { + projectsToScanList.add(catalogParam); + } else { + projectsToScanList.addAll(getAccessibleCatalogNames()); + } - if (datasetsInProject.isEmpty() || Thread.currentThread().isInterrupted()) { - LOG.info( - "Fetcher thread found no matching datasets in project: " - + currentProjectToScan); - continue; - } + if (projectsToScanList.isEmpty()) { + LOG.info( + "No valid projects to scan (primary, specified, or additional). Returning empty" + + " resultset."); + return; + } - LOG.fine("Processing found datasets for project: " + currentProjectToScan); - for (Dataset dataset : datasetsInProject) { + try { + for (String currentProjectToScan : projectsToScanList) { if (Thread.currentThread().isInterrupted()) { LOG.warning( - "Schema fetcher interrupted during dataset iteration for project: " + "Schema fetcher interrupted during project iteration for project: " + currentProjectToScan); break; } - processSchemaInfo(dataset, collectedResults, localResultSchemaFields); + LOG.info("Fetching schemas for project: " + currentProjectToScan); + List datasetsInProject = + findMatchingBigQueryObjects( + "Dataset", + () -> + bigquery.listDatasets( + currentProjectToScan, + BigQuery.DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), + (name) -> bigquery.getDataset(DatasetId.of(currentProjectToScan, name)), + (ds) -> ds.getDatasetId().getDataset(), + schemaPattern, + schemaRegex, + LOG); + + if (datasetsInProject.isEmpty() || Thread.currentThread().isInterrupted()) { + LOG.info( + "Fetcher thread found no matching datasets in project: " + + currentProjectToScan); + continue; + } + + LOG.fine("Processing found datasets for project: " + currentProjectToScan); + for (Dataset dataset : datasetsInProject) { + if (Thread.currentThread().isInterrupted()) { + LOG.warning( + "Schema fetcher interrupted during dataset iteration for project: " + + currentProjectToScan); + break; + } + processSchemaInfo(dataset, collectedResults, localResultSchemaFields); + } } - } - if (!Thread.currentThread().isInterrupted()) { - Comparator comparator = - defineGetSchemasComparator(localResultSchemaFields); - sortResults(collectedResults, comparator, "getSchemas", LOG); - } + if (!Thread.currentThread().isInterrupted()) { + Comparator comparator = + defineGetSchemasComparator(localResultSchemaFields); + sortResults(collectedResults, comparator, "getSchemas", LOG); + } - if (!Thread.currentThread().isInterrupted()) { - populateQueue(collectedResults, queue, localResultSchemaFields); - } + if (!Thread.currentThread().isInterrupted()) { + populateQueue(collectedResults, queue, localResultSchemaFields); + } - } catch (Throwable t) { - LOG.severe("Unexpected error in schema fetcher runnable: " + t.getMessage()); - } finally { - signalEndOfData(queue, localResultSchemaFields); - LOG.info("Schema fetcher thread finished."); - } - }; + } catch (Throwable t) { + LOG.severe("Unexpected error in schema fetcher runnable: " + t.getMessage()); + } finally { + signalEndOfData(queue, localResultSchemaFields); + LOG.info("Schema fetcher thread finished."); + } + }; - Thread fetcherThread = new Thread(schemaFetcher, "getSchemas-fetcher-" + catalog); - BigQueryJsonResultSet resultSet = - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); + Runnable wrappedFetcher = Context.current().wrap(schemaFetcher); + Thread fetcherThread = new Thread(wrappedFetcher, "getSchemas-fetcher-" + catalog); + BigQueryJsonResultSet resultSet = + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); - fetcherThread.start(); - LOG.info("Started background thread for getSchemas"); - return resultSet; + fetcherThread.start(); + LOG.info("Started background thread for getSchemas"); + return resultSet; + } catch (Exception e) { + span.recordException(e); + throw e; + } finally { + span.end(); + } } Schema defineGetSchemasSchema() { diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcOpenTelemetry.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcOpenTelemetry.java index 181e15629c5b..592569bde70c 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcOpenTelemetry.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcOpenTelemetry.java @@ -49,4 +49,12 @@ public static OpenTelemetry getOpenTelemetry( public static Tracer getTracer(OpenTelemetry openTelemetry) { return openTelemetry.getTracer(INSTRUMENTATION_SCOPE_NAME); } + + /** Gets a Tracer for the JDBC driver, fallback to noop if connection has no tracer. */ + public static Tracer getSafeTracer(BigQueryConnection connection) { + if (connection != null && connection.getTracer() != null) { + return connection.getTracer(); + } + return getOpenTelemetry(false, false, null).getTracer(INSTRUMENTATION_SCOPE_NAME); + } } diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java index da2ade028e9f..6553bee82344 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java @@ -25,6 +25,9 @@ import com.google.cloud.bigquery.FieldValue.Attribute; import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.exception.BigQueryJdbcRuntimeException; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import java.sql.ResultSet; import java.sql.SQLException; import java.util.concurrent.BlockingQueue; @@ -133,49 +136,51 @@ static BigQueryJsonResultSet getNestedResultSet( /* Advances the result set to the next row, returning false if no such row exists. Potentially blocking operation */ public boolean next() throws SQLException { checkClosed(); - if (this.isNested) { - // We are working with the nested record, the cursor would have been - // populated. - if (this.cursor == null || this.cursor.getArrayFieldValueList() == null) { - throw new IllegalStateException( - "Cursor/ArrayFieldValueList can not be null working with the nested record"); - } - // Check if there's a next record in the array which can be read - if (this.nestedRowIndex < (this.toIndexExclusive - 1)) { - this.nestedRowIndex++; - return true; - } - this.afterLast = true; - return false; - - } else { - // If end of stream is reached or we are past the last row i.e - // rowcnt == totalRows (rowcnt starts at 0) - // then we can simply return false - if (this.hasReachedEnd || this.isLast()) { + try (Scope scope = Context.current().with(Span.wrap(originalSpanContext)).makeCurrent()) { + if (this.isNested) { + // We are working with the nested record, the cursor would have been + // populated. + if (this.cursor == null || this.cursor.getArrayFieldValueList() == null) { + throw new IllegalStateException( + "Cursor/ArrayFieldValueList can not be null working with the nested record"); + } + // Check if there's a next record in the array which can be read + if (this.nestedRowIndex < (this.toIndexExclusive - 1)) { + this.nestedRowIndex++; + return true; + } this.afterLast = true; return false; - } - try { - // Advance the cursor,Potentially blocking operation - this.cursor = this.buffer.take(); - if (this.cursor.getException() != null) { - throw new BigQueryJdbcRuntimeException(this.cursor.getException()); - } - this.rowCnt++; - // Check for end of stream - if (this.cursor.isLast()) { - this.cursor = null; - this.hasReachedEnd = true; + + } else { + // If end of stream is reached or we are past the last row i.e + // rowcnt == totalRows (rowcnt starts at 0) + // then we can simply return false + if (this.hasReachedEnd || this.isLast()) { + this.afterLast = true; return false; } - // Cursor has been advanced - return true; + try { + // Advance the cursor,Potentially blocking operation + this.cursor = this.buffer.take(); + if (this.cursor.getException() != null) { + throw new BigQueryJdbcRuntimeException(this.cursor.getException()); + } + this.rowCnt++; + // Check for end of stream + if (this.cursor.isLast()) { + this.cursor = null; + this.hasReachedEnd = true; + return false; + } + // Cursor has been advanced + return true; - } catch (InterruptedException ex) { - throw new BigQueryJdbcRuntimeException( - "Error occurred while advancing the cursor. This could happen when connection is closed while we call the next method", - ex); + } catch (InterruptedException ex) { + throw new BigQueryJdbcRuntimeException( + "Error occurred while advancing the cursor. This could happen when connection is closed while we call the next method", + ex); + } } } } diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java index 4740e55aa33c..ddc4ebbcc6da 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java @@ -60,6 +60,7 @@ import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.context.Context; @@ -848,88 +849,85 @@ Thread populateArrowBufferedQueue( Runnable arrowStreamProcessor = Context.current() .wrap( - () -> { - long rowsRead = 0; - int retryCount = 0; - try { - // Use the first stream to perform reading. - String streamName = readSession.getStreams(0).getName(); - - while (true) { - try { - ReadRowsRequest readRowsRequest = - ReadRowsRequest.newBuilder() - .setReadStream(streamName) - .setOffset(rowsRead) - .build(); - - // Process each block of rows as they arrive and decode using our simple row - // reader. - com.google.api.gax.rpc.ServerStream stream = - bqReadClient.readRowsCallable().call(readRowsRequest); - for (ReadRowsResponse response : stream) { - if (Thread.currentThread().isInterrupted() - || queryTaskExecutor.isShutdown()) { - break; - } - - ArrowRecordBatch currentBatch = response.getArrowRecordBatch(); - Uninterruptibles.putUninterruptibly( - arrowBatchWrapperBlockingQueue, - BigQueryArrowBatchWrapper.of(currentBatch)); - rowsRead += response.getRowCount(); - } - break; - } catch (com.google.api.gax.rpc.ApiException e) { - if (e.getStatusCode().getCode() - == com.google.api.gax.rpc.StatusCode.Code.NOT_FOUND) { - LOG.warning("Read session expired or not found: %s", e.getMessage()); - enqueueError(arrowBatchWrapperBlockingQueue, e); - break; - } - if (retryCount >= MAX_RETRY_COUNT) { - LOG.log( - Level.SEVERE, - "\n" - + Thread.currentThread().getName() - + " Interrupted @ arrowStreamProcessor, max retries exceeded", - e); - enqueueError(arrowBatchWrapperBlockingQueue, e); - break; - } - retryCount++; - LOG.warning( - "Connection interrupted during arrow stream read, retrying. attempt: %d", - retryCount); - Thread.sleep(RETRY_DELAY_MS); - } - } - - } catch (InterruptedException e) { - LOG.log( - Level.WARNING, - "\n" - + Thread.currentThread().getName() - + " Interrupted @ arrowStreamProcessor", - e); - enqueueError(arrowBatchWrapperBlockingQueue, e); - Thread.currentThread().interrupt(); - } catch (Exception e) { - LOG.log( - Level.WARNING, - "\n" + Thread.currentThread().getName() + " Error @ arrowStreamProcessor", - e); - enqueueError(arrowBatchWrapperBlockingQueue, e); - } finally { // logic needed for graceful shutdown - enqueueEndOfStream(arrowBatchWrapperBlockingQueue); - } - }); + () -> + processArrowStream(readSession, arrowBatchWrapperBlockingQueue, bqReadClient)); Thread populateBufferWorker = JDBC_THREAD_FACTORY.newThread(arrowStreamProcessor); populateBufferWorker.start(); return populateBufferWorker; } + private void processArrowStream( + ReadSession readSession, + BlockingQueue arrowBatchWrapperBlockingQueue, + BigQueryReadClient bqReadClient) { + long rowsRead = 0; + int retryCount = 0; + try { + // Use the first stream to perform reading. + String streamName = readSession.getStreams(0).getName(); + + while (true) { + try { + ReadRowsRequest readRowsRequest = + ReadRowsRequest.newBuilder().setReadStream(streamName).setOffset(rowsRead).build(); + + // Process each block of rows as they arrive and decode using our simple row + // reader. + com.google.api.gax.rpc.ServerStream stream = + bqReadClient.readRowsCallable().call(readRowsRequest); + for (ReadRowsResponse response : stream) { + if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { + break; + } + + ArrowRecordBatch currentBatch = response.getArrowRecordBatch(); + Uninterruptibles.putUninterruptibly( + arrowBatchWrapperBlockingQueue, BigQueryArrowBatchWrapper.of(currentBatch)); + rowsRead += response.getRowCount(); + } + break; + } catch (com.google.api.gax.rpc.ApiException e) { + if (e.getStatusCode().getCode() == com.google.api.gax.rpc.StatusCode.Code.NOT_FOUND) { + LOG.warning("Read session expired or not found: %s", e.getMessage()); + enqueueError(arrowBatchWrapperBlockingQueue, e); + break; + } + if (retryCount >= MAX_RETRY_COUNT) { + LOG.log( + Level.SEVERE, + "\n" + + Thread.currentThread().getName() + + " Interrupted @ arrowStreamProcessor, max retries exceeded", + e); + enqueueError(arrowBatchWrapperBlockingQueue, e); + break; + } + retryCount++; + LOG.warning( + "Connection interrupted during arrow stream read, retrying. attempt: %d", retryCount); + Thread.sleep(RETRY_DELAY_MS); + } + } + + } catch (InterruptedException e) { + LOG.log( + Level.WARNING, + "\n" + Thread.currentThread().getName() + " Interrupted @ arrowStreamProcessor", + e); + enqueueError(arrowBatchWrapperBlockingQueue, e); + Thread.currentThread().interrupt(); + } catch (Exception e) { + LOG.log( + Level.WARNING, + "\n" + Thread.currentThread().getName() + " Error @ arrowStreamProcessor", + e); + enqueueError(arrowBatchWrapperBlockingQueue, e); + } finally { // logic needed for graceful shutdown + enqueueEndOfStream(arrowBatchWrapperBlockingQueue); + } + } + /** Executes SQL query using either fast query path or read API */ void processQueryResponse(String query, TableResult results) throws SQLException { LOG.finest( @@ -1544,7 +1542,7 @@ private interface TracedOperation { } private T withTracing(String spanName, TracedOperation operation) throws SQLException { - Tracer tracer = getSafeTracer(); + Tracer tracer = BigQueryJdbcOpenTelemetry.getSafeTracer(this.connection); Span span = tracer.spanBuilder(spanName).startSpan(); try (Scope scope = span.makeCurrent()) { return operation.run(span); @@ -1563,7 +1561,8 @@ private void fetchNextPages( BlockingQueue> rpcResponseQueue, BlockingQueue bigQueryFieldValueListWrapperBlockingQueue, TableResult result) { - Tracer tracer = getSafeTracer(); + Tracer tracer = BigQueryJdbcOpenTelemetry.getSafeTracer(this.connection); + SpanContext parentSpanContext = Span.current().getSpanContext(); String currentPageToken = firstPageToken; TableResult currentResults = result; TableId destinationTable = null; @@ -1579,6 +1578,9 @@ private void fetchNextPages( } SpanBuilder spanBuilder = tracer.spanBuilder("BigQueryStatement.pagination"); + if (parentSpanContext != null) { + spanBuilder.addLink(parentSpanContext); + } Span paginationSpan = spanBuilder.startSpan(); try (Scope scope = paginationSpan.makeCurrent()) { paginationSpan.setAttribute("db.pagination.page_token", currentPageToken); @@ -1672,12 +1674,4 @@ private void parseAndPopulateRpcData( enqueueBufferEndOfStream(bigQueryFieldValueListWrapperBlockingQueue); } } - - private Tracer getSafeTracer() { - if (connection != null && connection.getTracer() != null) { - return connection.getTracer(); - } - return BigQueryJdbcOpenTelemetry.getOpenTelemetry(false, false, null) - .getTracer(BigQueryJdbcOpenTelemetry.INSTRUMENTATION_SCOPE_NAME); - } } From 3228f45c7b72ac4399ccbc3e223af9baec99eeea Mon Sep 17 00:00:00 2001 From: Keshav Dandeva Date: Fri, 24 Apr 2026 15:26:47 +0000 Subject: [PATCH 2/5] chore: narrowed the scope of the OTel context in `next()` to only cover the blocking operation `buffer.take()` --- .../bigquery/jdbc/BigQueryArrowResultSet.java | 109 +++++++++--------- .../bigquery/jdbc/BigQueryJsonResultSet.java | 80 ++++++------- 2 files changed, 95 insertions(+), 94 deletions(-) diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java index d483fad18f75..e5cf2df9dbd5 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java @@ -216,68 +216,69 @@ public void close() { @Override public boolean next() throws SQLException { checkClosed(); - try (Scope scope = Context.current().with(Span.wrap(originalSpanContext)).makeCurrent()) { - if (this.isNested) { - if (this.currentNestedBatch == null || this.currentNestedBatch.getNestedRecords() == null) { - throw new IllegalStateException( - "currentNestedBatch/JsonStringArrayList can not be null working with the nested record"); - } - if (this.nestedRowIndex < (this.toIndexExclusive - 1)) { - /* Check if there's a next record in the array which can be read */ - this.nestedRowIndex++; - return true; - } + if (this.isNested) { + if (this.currentNestedBatch == null || this.currentNestedBatch.getNestedRecords() == null) { + throw new IllegalStateException( + "currentNestedBatch/JsonStringArrayList can not be null working with the nested record"); + } + if (this.nestedRowIndex < (this.toIndexExclusive - 1)) { + /* Check if there's a next record in the array which can be read */ + this.nestedRowIndex++; + return true; + } + this.afterLast = true; + return false; + } else { + /* Non nested */ + if (this.hasReachedEnd || this.isLast()) { this.afterLast = true; return false; - } else { - /* Non nested */ - if (this.hasReachedEnd || this.isLast()) { - this.afterLast = true; - return false; - } - try { - if (this.currentBatchRowIndex == -1 - || this.currentBatchRowIndex == (this.vectorSchemaRoot.getRowCount() - 1)) { - /* Start of iteration or we have exhausted the current batch */ - // Advance the cursor. Potentially blocking operation. - BigQueryArrowBatchWrapper batchWrapper = this.buffer.take(); - if (batchWrapper.getException() != null) { - throw new BigQueryJdbcRuntimeException(batchWrapper.getException()); - } - if (batchWrapper.isLast()) { - /* Marks the end of the records */ - if (this.vectorSchemaRoot != null) { - // IMP: To avoid memory leak: clear vectorSchemaRoot as it still holds - // the last batch - this.vectorSchemaRoot.clear(); - } - this.hasReachedEnd = true; - this.rowCount++; - return false; - } - // Valid batch, process it - ArrowRecordBatch arrowBatch = batchWrapper.getCurrentArrowBatch(); - // Populates vectorSchemaRoot - this.arrowDeserializer.deserializeArrowBatch(arrowBatch); - // Pointing to the first row in this fresh batch - this.currentBatchRowIndex = 0; - this.rowCount++; - return true; + } + try { + if (this.currentBatchRowIndex == -1 + || this.currentBatchRowIndex == (this.vectorSchemaRoot.getRowCount() - 1)) { + /* Start of iteration or we have exhausted the current batch */ + // Advance the cursor. Potentially blocking operation. + BigQueryArrowBatchWrapper batchWrapper; + try (Scope scope = Context.current().with(Span.wrap(originalSpanContext)).makeCurrent()) { + batchWrapper = this.buffer.take(); + } + if (batchWrapper.getException() != null) { + throw new BigQueryJdbcRuntimeException(batchWrapper.getException()); } - // There are rows left in the current batch. - else if (this.currentBatchRowIndex < this.vectorSchemaRoot.getRowCount()) { - this.currentBatchRowIndex++; + if (batchWrapper.isLast()) { + /* Marks the end of the records */ + if (this.vectorSchemaRoot != null) { + // IMP: To avoid memory leak: clear vectorSchemaRoot as it still holds + // the last batch + this.vectorSchemaRoot.clear(); + } + this.hasReachedEnd = true; this.rowCount++; - return true; + return false; } - } catch (InterruptedException | SQLException ex) { - throw new BigQueryJdbcException( - "Error occurred while advancing the cursor. This could happen when connection is closed while the next method is being called.", - ex); + // Valid batch, process it + ArrowRecordBatch arrowBatch = batchWrapper.getCurrentArrowBatch(); + // Populates vectorSchemaRoot + this.arrowDeserializer.deserializeArrowBatch(arrowBatch); + // Pointing to the first row in this fresh batch + this.currentBatchRowIndex = 0; + this.rowCount++; + return true; } + // There are rows left in the current batch. + else if (this.currentBatchRowIndex < this.vectorSchemaRoot.getRowCount()) { + this.currentBatchRowIndex++; + this.rowCount++; + return true; + } + } catch (InterruptedException | SQLException ex) { + throw new BigQueryJdbcException( + "Error occurred while advancing the cursor. This could happen when connection is closed while the next method is being called.", + ex); } - return false; } + return false; } private Object getObjectInternal(int columnIndex) throws SQLException { diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java index 6553bee82344..fe613120b91f 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java @@ -136,51 +136,51 @@ static BigQueryJsonResultSet getNestedResultSet( /* Advances the result set to the next row, returning false if no such row exists. Potentially blocking operation */ public boolean next() throws SQLException { checkClosed(); - try (Scope scope = Context.current().with(Span.wrap(originalSpanContext)).makeCurrent()) { - if (this.isNested) { - // We are working with the nested record, the cursor would have been - // populated. - if (this.cursor == null || this.cursor.getArrayFieldValueList() == null) { - throw new IllegalStateException( - "Cursor/ArrayFieldValueList can not be null working with the nested record"); - } - // Check if there's a next record in the array which can be read - if (this.nestedRowIndex < (this.toIndexExclusive - 1)) { - this.nestedRowIndex++; - return true; - } + if (this.isNested) { + // We are working with the nested record, the cursor would have been + // populated. + if (this.cursor == null || this.cursor.getArrayFieldValueList() == null) { + throw new IllegalStateException( + "Cursor/ArrayFieldValueList can not be null working with the nested record"); + } + // Check if there's a next record in the array which can be read + if (this.nestedRowIndex < (this.toIndexExclusive - 1)) { + this.nestedRowIndex++; + return true; + } + this.afterLast = true; + return false; + + } else { + // If end of stream is reached or we are past the last row i.e + // rowcnt == totalRows (rowcnt starts at 0) + // then we can simply return false + if (this.hasReachedEnd || this.isLast()) { this.afterLast = true; return false; - - } else { - // If end of stream is reached or we are past the last row i.e - // rowcnt == totalRows (rowcnt starts at 0) - // then we can simply return false - if (this.hasReachedEnd || this.isLast()) { - this.afterLast = true; + } + try { + // Advance the cursor,Potentially blocking operation + try (Scope scope = Context.current().with(Span.wrap(originalSpanContext)).makeCurrent()) { + this.cursor = this.buffer.take(); + } + if (this.cursor.getException() != null) { + throw new BigQueryJdbcRuntimeException(this.cursor.getException()); + } + this.rowCnt++; + // Check for end of stream + if (this.cursor.isLast()) { + this.cursor = null; + this.hasReachedEnd = true; return false; } - try { - // Advance the cursor,Potentially blocking operation - this.cursor = this.buffer.take(); - if (this.cursor.getException() != null) { - throw new BigQueryJdbcRuntimeException(this.cursor.getException()); - } - this.rowCnt++; - // Check for end of stream - if (this.cursor.isLast()) { - this.cursor = null; - this.hasReachedEnd = true; - return false; - } - // Cursor has been advanced - return true; + // Cursor has been advanced + return true; - } catch (InterruptedException ex) { - throw new BigQueryJdbcRuntimeException( - "Error occurred while advancing the cursor. This could happen when connection is closed while we call the next method", - ex); - } + } catch (InterruptedException ex) { + throw new BigQueryJdbcRuntimeException( + "Error occurred while advancing the cursor. This could happen when connection is closed while we call the next method", + ex); } } } From fd30068ea5cfa5b9fe773962f6f8fb6e7be99d1b Mon Sep 17 00:00:00 2001 From: Keshav Dandeva Date: Fri, 24 Apr 2026 15:46:44 +0000 Subject: [PATCH 3/5] chore: add span links to db metadata methods --- .../jdbc/BigQueryDatabaseMetaData.java | 542 ++++++++++-------- 1 file changed, 289 insertions(+), 253 deletions(-) diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java index 689e57622714..18181ddf414e 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java @@ -43,6 +43,7 @@ import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.exception.BigQueryJdbcException; import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; @@ -1737,141 +1738,154 @@ public ResultSet getTables( final String catalogParam = effectiveCatalog; final String schemaParam = effectiveSchemaPattern; + SpanContext parentSpanContext = span.getSpanContext(); Runnable tableFetcher = () -> { - ExecutorService apiExecutor = null; - ExecutorService tableProcessorExecutor = null; - final FieldList localResultSchemaFields = resultSchemaFields; - final List>> apiFutures = new ArrayList<>(); - final List> processingFutures = new ArrayList<>(); + Span backgroundSpan = + tracer + .spanBuilder("BigQueryDatabaseMetaData.getTables.background") + .addLink(parentSpanContext) + .startSpan(); + + try (Scope backgroundScope = backgroundSpan.makeCurrent()) { + ExecutorService apiExecutor = null; + ExecutorService tableProcessorExecutor = null; + final FieldList localResultSchemaFields = resultSchemaFields; + final List>> apiFutures = new ArrayList<>(); + final List> processingFutures = new ArrayList<>(); - try { - List datasetsToScan = - findMatchingBigQueryObjects( - "Dataset", - () -> - bigquery.listDatasets( - catalogParam, DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), - (name) -> bigquery.getDataset(DatasetId.of(catalogParam, name)), - (ds) -> ds.getDatasetId().getDataset(), - schemaParam, - schemaRegex, - LOG); - - if (datasetsToScan.isEmpty()) { - LOG.info("Fetcher thread found no matching datasets. Returning empty resultset."); - return; - } - - apiExecutor = Executors.newFixedThreadPool(API_EXECUTOR_POOL_SIZE); - tableProcessorExecutor = Executors.newFixedThreadPool(this.metadataFetchThreadCount); + try { + List datasetsToScan = + findMatchingBigQueryObjects( + "Dataset", + () -> + bigquery.listDatasets( + catalogParam, DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), + (name) -> bigquery.getDataset(DatasetId.of(catalogParam, name)), + (ds) -> ds.getDatasetId().getDataset(), + schemaParam, + schemaRegex, + LOG); - LOG.fine("Submitting parallel findMatchingTables tasks..."); - for (Dataset dataset : datasetsToScan) { - if (Thread.currentThread().isInterrupted()) { - LOG.warning("Table fetcher interrupted during dataset iteration."); - break; + if (datasetsToScan.isEmpty()) { + LOG.info("Fetcher thread found no matching datasets. Returning empty resultset."); + return; } - final DatasetId currentDatasetId = dataset.getDatasetId(); - Callable> apiCallable = - () -> - findMatchingBigQueryObjects( - "Table", - () -> - bigquery.listTables( - currentDatasetId, TableListOption.pageSize(DEFAULT_PAGE_SIZE)), - (name) -> - bigquery.getTable( - TableId.of( - currentDatasetId.getProject(), - currentDatasetId.getDataset(), - name)), - (tbl) -> tbl.getTableId().getTable(), - tableNamePattern, - tableNameRegex, - LOG); - - Callable> wrappedApiCallable = Context.current().wrap(apiCallable); - Future> apiFuture = apiExecutor.submit(wrappedApiCallable); - apiFutures.add(apiFuture); - } - LOG.fine("Finished submitting " + apiFutures.size() + " findMatchingTables tasks."); - apiExecutor.shutdown(); + apiExecutor = Executors.newFixedThreadPool(API_EXECUTOR_POOL_SIZE); + tableProcessorExecutor = + Executors.newFixedThreadPool(this.metadataFetchThreadCount); - LOG.fine("Processing results from findMatchingTables tasks..."); - for (Future> apiFuture : apiFutures) { - if (Thread.currentThread().isInterrupted()) { - LOG.warning("Table fetcher interrupted while processing API futures."); - break; + LOG.fine("Submitting parallel findMatchingTables tasks..."); + for (Dataset dataset : datasetsToScan) { + if (Thread.currentThread().isInterrupted()) { + LOG.warning("Table fetcher interrupted during dataset iteration."); + break; + } + + final DatasetId currentDatasetId = dataset.getDatasetId(); + Callable> apiCallable = + () -> + findMatchingBigQueryObjects( + "Table", + () -> + bigquery.listTables( + currentDatasetId, + TableListOption.pageSize(DEFAULT_PAGE_SIZE)), + (name) -> + bigquery.getTable( + TableId.of( + currentDatasetId.getProject(), + currentDatasetId.getDataset(), + name)), + (tbl) -> tbl.getTableId().getTable(), + tableNamePattern, + tableNameRegex, + LOG); + + Callable> wrappedApiCallable = Context.current().wrap(apiCallable); + Future> apiFuture = apiExecutor.submit(wrappedApiCallable); + apiFutures.add(apiFuture); } - try { - List
tablesResult = apiFuture.get(); - if (tablesResult != null) { - for (Table table : tablesResult) { - if (Thread.currentThread().isInterrupted()) break; - - final Table currentTable = table; - Runnable processRunnable = - () -> - processTableInfo( - currentTable, - requestedTypes, - collectedResults, - localResultSchemaFields); - Runnable wrappedProcessRunnable = Context.current().wrap(processRunnable); - Future processFuture = - tableProcessorExecutor.submit(wrappedProcessRunnable); - processingFutures.add(processFuture); + LOG.fine("Finished submitting " + apiFutures.size() + " findMatchingTables tasks."); + apiExecutor.shutdown(); + + LOG.fine("Processing results from findMatchingTables tasks..."); + for (Future> apiFuture : apiFutures) { + if (Thread.currentThread().isInterrupted()) { + LOG.warning("Table fetcher interrupted while processing API futures."); + break; + } + try { + List
tablesResult = apiFuture.get(); + if (tablesResult != null) { + for (Table table : tablesResult) { + if (Thread.currentThread().isInterrupted()) break; + + final Table currentTable = table; + Runnable processRunnable = + () -> + processTableInfo( + currentTable, + requestedTypes, + collectedResults, + localResultSchemaFields); + Runnable wrappedProcessRunnable = Context.current().wrap(processRunnable); + Future processFuture = + tableProcessorExecutor.submit(wrappedProcessRunnable); + processingFutures.add(processFuture); + } } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warning("Fetcher thread interrupted while waiting for API future result."); + break; + } catch (ExecutionException e) { + LOG.warning( + "Error executing findMatchingTables task: " + + e.getMessage() + + ". Cause: " + + e.getCause()); + } catch (CancellationException e) { + LOG.warning("A findMatchingTables task was cancelled."); } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.warning("Fetcher thread interrupted while waiting for API future result."); - break; - } catch (ExecutionException e) { - LOG.warning( - "Error executing findMatchingTables task: " - + e.getMessage() - + ". Cause: " - + e.getCause()); - } catch (CancellationException e) { - LOG.warning("A findMatchingTables task was cancelled."); } - } - LOG.fine( - "Finished submitting " + processingFutures.size() + " processTableInfo tasks."); + LOG.fine( + "Finished submitting " + processingFutures.size() + " processTableInfo tasks."); - if (Thread.currentThread().isInterrupted()) { - LOG.warning( - "Fetcher interrupted before waiting for processing tasks; cancelling remaining."); - processingFutures.forEach(f -> f.cancel(true)); - } else { - LOG.fine("Waiting for processTableInfo tasks to complete..."); - waitForTasksCompletion(processingFutures); - LOG.fine("All processTableInfo tasks completed."); - } + if (Thread.currentThread().isInterrupted()) { + LOG.warning( + "Fetcher interrupted before waiting for processing tasks; cancelling remaining."); + processingFutures.forEach(f -> f.cancel(true)); + } else { + LOG.fine("Waiting for processTableInfo tasks to complete..."); + waitForTasksCompletion(processingFutures); + LOG.fine("All processTableInfo tasks completed."); + } - if (!Thread.currentThread().isInterrupted()) { - Comparator comparator = - defineGetTablesComparator(localResultSchemaFields); - sortResults(collectedResults, comparator, "getTables", LOG); - } + if (!Thread.currentThread().isInterrupted()) { + Comparator comparator = + defineGetTablesComparator(localResultSchemaFields); + sortResults(collectedResults, comparator, "getTables", LOG); + } - if (!Thread.currentThread().isInterrupted()) { - populateQueue(collectedResults, queue, localResultSchemaFields); - } + if (!Thread.currentThread().isInterrupted()) { + populateQueue(collectedResults, queue, localResultSchemaFields); + } - } catch (Throwable t) { - LOG.severe("Unexpected error in table fetcher runnable: " + t.getMessage()); - apiFutures.forEach(f -> f.cancel(true)); - processingFutures.forEach(f -> f.cancel(true)); + } catch (Throwable t) { + LOG.severe("Unexpected error in table fetcher runnable: " + t.getMessage()); + apiFutures.forEach(f -> f.cancel(true)); + processingFutures.forEach(f -> f.cancel(true)); + } finally { + signalEndOfData(queue, localResultSchemaFields); + shutdownExecutor(apiExecutor); + shutdownExecutor(tableProcessorExecutor); + LOG.info("Table fetcher thread finished."); + } } finally { - signalEndOfData(queue, localResultSchemaFields); - shutdownExecutor(apiExecutor); - shutdownExecutor(tableProcessorExecutor); - LOG.info("Table fetcher thread finished."); + backgroundSpan.end(); } }; @@ -2125,100 +2139,111 @@ public ResultSet getColumns( final String catalogParam = effectiveCatalog; final String schemaParam = effectiveSchemaPattern; + SpanContext parentSpanContext = span.getSpanContext(); Runnable columnFetcher = () -> { - ExecutorService columnExecutor = null; - final List> taskFutures = new ArrayList<>(); - final FieldList localResultSchemaFields = resultSchemaFields; - - try { - List datasetsToScan = - findMatchingBigQueryObjects( - "Dataset", - () -> - bigquery.listDatasets( - catalogParam, DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), - (name) -> bigquery.getDataset(DatasetId.of(catalogParam, name)), - (ds) -> ds.getDatasetId().getDataset(), - schemaParam, - schemaRegex, - LOG); - - if (datasetsToScan.isEmpty()) { - LOG.info("Fetcher thread found no matching datasets. Returning empty resultset."); - return; - } - - columnExecutor = Executors.newFixedThreadPool(this.metadataFetchThreadCount); - - for (Dataset dataset : datasetsToScan) { - if (Thread.currentThread().isInterrupted()) { - LOG.warning("Fetcher interrupted during dataset iteration."); - break; - } + Span backgroundSpan = + tracer + .spanBuilder("BigQueryDatabaseMetaData.getColumns.background") + .addLink(parentSpanContext) + .startSpan(); - DatasetId datasetId = dataset.getDatasetId(); - LOG.info("Processing dataset: " + datasetId.getDataset()); + try (Scope backgroundScope = backgroundSpan.makeCurrent()) { + ExecutorService columnExecutor = null; + final List> taskFutures = new ArrayList<>(); + final FieldList localResultSchemaFields = resultSchemaFields; - List
tablesToScan = + try { + List datasetsToScan = findMatchingBigQueryObjects( - "Table", + "Dataset", () -> - bigquery.listTables( - datasetId, TableListOption.pageSize(DEFAULT_PAGE_SIZE)), - (name) -> - bigquery.getTable( - TableId.of(datasetId.getProject(), datasetId.getDataset(), name)), - (tbl) -> tbl.getTableId().getTable(), - tableNamePattern, - tableNameRegex, + bigquery.listDatasets( + catalogParam, DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), + (name) -> bigquery.getDataset(DatasetId.of(catalogParam, name)), + (ds) -> ds.getDatasetId().getDataset(), + schemaParam, + schemaRegex, LOG); - for (Table table : tablesToScan) { + if (datasetsToScan.isEmpty()) { + LOG.info("Fetcher thread found no matching datasets. Returning empty resultset."); + return; + } + + columnExecutor = Executors.newFixedThreadPool(this.metadataFetchThreadCount); + + for (Dataset dataset : datasetsToScan) { if (Thread.currentThread().isInterrupted()) { - LOG.warning( - "Fetcher interrupted during table iteration for dataset " - + datasetId.getDataset()); + LOG.warning("Fetcher interrupted during dataset iteration."); break; } - TableId tableId = table.getTableId(); - LOG.fine("Submitting task for table: " + tableId); - final Table finalTable = table; + DatasetId datasetId = dataset.getDatasetId(); + LOG.info("Processing dataset: " + datasetId.getDataset()); - Runnable columnTask = - () -> - processTableColumns( - finalTable, - columnNameRegex, - collectedResults, - localResultSchemaFields); - Runnable wrappedColumnTask = Context.current().wrap(columnTask); - Future future = columnExecutor.submit(wrappedColumnTask); - taskFutures.add(future); + List
tablesToScan = + findMatchingBigQueryObjects( + "Table", + () -> + bigquery.listTables( + datasetId, TableListOption.pageSize(DEFAULT_PAGE_SIZE)), + (name) -> + bigquery.getTable( + TableId.of(datasetId.getProject(), datasetId.getDataset(), name)), + (tbl) -> tbl.getTableId().getTable(), + tableNamePattern, + tableNameRegex, + LOG); + + for (Table table : tablesToScan) { + if (Thread.currentThread().isInterrupted()) { + LOG.warning( + "Fetcher interrupted during table iteration for dataset " + + datasetId.getDataset()); + break; + } + + TableId tableId = table.getTableId(); + LOG.fine("Submitting task for table: " + tableId); + final Table finalTable = table; + + Runnable columnTask = + () -> + processTableColumns( + finalTable, + columnNameRegex, + collectedResults, + localResultSchemaFields); + Runnable wrappedColumnTask = Context.current().wrap(columnTask); + Future future = columnExecutor.submit(wrappedColumnTask); + taskFutures.add(future); + } + if (Thread.currentThread().isInterrupted()) break; } - if (Thread.currentThread().isInterrupted()) break; - } - waitForTasksCompletion(taskFutures); + waitForTasksCompletion(taskFutures); - if (!Thread.currentThread().isInterrupted()) { - Comparator comparator = - defineGetColumnsComparator(localResultSchemaFields); - sortResults(collectedResults, comparator, "getColumns", LOG); - } + if (!Thread.currentThread().isInterrupted()) { + Comparator comparator = + defineGetColumnsComparator(localResultSchemaFields); + sortResults(collectedResults, comparator, "getColumns", LOG); + } - if (!Thread.currentThread().isInterrupted()) { - populateQueue(collectedResults, queue, localResultSchemaFields); - } + if (!Thread.currentThread().isInterrupted()) { + populateQueue(collectedResults, queue, localResultSchemaFields); + } - } catch (Throwable t) { - LOG.severe("Unexpected error in column fetcher runnable: " + t.getMessage()); - taskFutures.forEach(f -> f.cancel(true)); + } catch (Throwable t) { + LOG.severe("Unexpected error in column fetcher runnable: " + t.getMessage()); + taskFutures.forEach(f -> f.cancel(true)); + } finally { + signalEndOfData(queue, localResultSchemaFields); + shutdownExecutor(columnExecutor); + LOG.info("Column fetcher thread finished."); + } } finally { - signalEndOfData(queue, localResultSchemaFields); - shutdownExecutor(columnExecutor); - LOG.info("Column fetcher thread finished."); + backgroundSpan.end(); } }; @@ -3676,80 +3701,91 @@ public ResultSet getSchemas(String catalog, String schemaPattern) { final List collectedResults = Collections.synchronizedList(new ArrayList<>()); final String catalogParam = catalog; + SpanContext parentSpanContext = span.getSpanContext(); Runnable schemaFetcher = () -> { - final FieldList localResultSchemaFields = resultSchemaFields; - List projectsToScanList = new ArrayList<>(); - - if (catalogParam != null) { - projectsToScanList.add(catalogParam); - } else { - projectsToScanList.addAll(getAccessibleCatalogNames()); - } - - if (projectsToScanList.isEmpty()) { - LOG.info( - "No valid projects to scan (primary, specified, or additional). Returning empty" - + " resultset."); - return; - } - - try { - for (String currentProjectToScan : projectsToScanList) { - if (Thread.currentThread().isInterrupted()) { - LOG.warning( - "Schema fetcher interrupted during project iteration for project: " - + currentProjectToScan); - break; - } - LOG.info("Fetching schemas for project: " + currentProjectToScan); - List datasetsInProject = - findMatchingBigQueryObjects( - "Dataset", - () -> - bigquery.listDatasets( - currentProjectToScan, - BigQuery.DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), - (name) -> bigquery.getDataset(DatasetId.of(currentProjectToScan, name)), - (ds) -> ds.getDatasetId().getDataset(), - schemaPattern, - schemaRegex, - LOG); + Span backgroundSpan = + tracer + .spanBuilder("BigQueryDatabaseMetaData.getSchemas.background") + .addLink(parentSpanContext) + .startSpan(); + + try (Scope backgroundScope = backgroundSpan.makeCurrent()) { + final FieldList localResultSchemaFields = resultSchemaFields; + List projectsToScanList = new ArrayList<>(); + + if (catalogParam != null) { + projectsToScanList.add(catalogParam); + } else { + projectsToScanList.addAll(getAccessibleCatalogNames()); + } - if (datasetsInProject.isEmpty() || Thread.currentThread().isInterrupted()) { - LOG.info( - "Fetcher thread found no matching datasets in project: " - + currentProjectToScan); - continue; - } + if (projectsToScanList.isEmpty()) { + LOG.info( + "No valid projects to scan (primary, specified, or additional). Returning empty" + + " resultset."); + return; + } - LOG.fine("Processing found datasets for project: " + currentProjectToScan); - for (Dataset dataset : datasetsInProject) { + try { + for (String currentProjectToScan : projectsToScanList) { if (Thread.currentThread().isInterrupted()) { LOG.warning( - "Schema fetcher interrupted during dataset iteration for project: " + "Schema fetcher interrupted during project iteration for project: " + currentProjectToScan); break; } - processSchemaInfo(dataset, collectedResults, localResultSchemaFields); + LOG.info("Fetching schemas for project: " + currentProjectToScan); + List datasetsInProject = + findMatchingBigQueryObjects( + "Dataset", + () -> + bigquery.listDatasets( + currentProjectToScan, + BigQuery.DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), + (name) -> bigquery.getDataset(DatasetId.of(currentProjectToScan, name)), + (ds) -> ds.getDatasetId().getDataset(), + schemaPattern, + schemaRegex, + LOG); + + if (datasetsInProject.isEmpty() || Thread.currentThread().isInterrupted()) { + LOG.info( + "Fetcher thread found no matching datasets in project: " + + currentProjectToScan); + continue; + } + + LOG.fine("Processing found datasets for project: " + currentProjectToScan); + for (Dataset dataset : datasetsInProject) { + if (Thread.currentThread().isInterrupted()) { + LOG.warning( + "Schema fetcher interrupted during dataset iteration for project: " + + currentProjectToScan); + break; + } + processSchemaInfo(dataset, collectedResults, localResultSchemaFields); + } } - } - if (!Thread.currentThread().isInterrupted()) { - Comparator comparator = - defineGetSchemasComparator(localResultSchemaFields); - sortResults(collectedResults, comparator, "getSchemas", LOG); - } + if (!Thread.currentThread().isInterrupted()) { + Comparator comparator = + defineGetSchemasComparator(localResultSchemaFields); + sortResults(collectedResults, comparator, "getSchemas", LOG); + } - if (!Thread.currentThread().isInterrupted()) { - populateQueue(collectedResults, queue, localResultSchemaFields); - } + if (!Thread.currentThread().isInterrupted()) { + populateQueue(collectedResults, queue, localResultSchemaFields); + } - } catch (Throwable t) { - LOG.severe("Unexpected error in schema fetcher runnable: " + t.getMessage()); + } catch (Throwable t) { + LOG.severe("Unexpected error in schema fetcher runnable: " + t.getMessage()); + } finally { + signalEndOfData(queue, localResultSchemaFields); + LOG.info("Schema fetcher thread finished."); + } } finally { - signalEndOfData(queue, localResultSchemaFields); - LOG.info("Schema fetcher thread finished."); + backgroundSpan.end(); } }; From 409a3ab8e1b30fcb71fdd28e3c2f2618a4ff8acb Mon Sep 17 00:00:00 2001 From: Keshav Dandeva Date: Fri, 24 Apr 2026 16:06:15 +0000 Subject: [PATCH 4/5] chore: minor improvements --- .../cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java | 8 ++++++++ .../com/google/cloud/bigquery/jdbc/BigQueryStatement.java | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java index 18181ddf414e..753bf4a995c4 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java @@ -44,6 +44,7 @@ import com.google.cloud.bigquery.exception.BigQueryJdbcException; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; @@ -1744,6 +1745,7 @@ public ResultSet getTables( Span backgroundSpan = tracer .spanBuilder("BigQueryDatabaseMetaData.getTables.background") + .setNoParent() .addLink(parentSpanContext) .startSpan(); @@ -1900,6 +1902,7 @@ public ResultSet getTables( return resultSet; } catch (Exception e) { span.recordException(e); + span.setStatus(StatusCode.ERROR, e.getMessage()); throw e; } finally { span.end(); @@ -2045,6 +2048,7 @@ public ResultSet getCatalogs() { catalogsSchema, catalogRows.size(), queue, null, new Thread[0]); } catch (Exception e) { span.recordException(e); + span.setStatus(StatusCode.ERROR, e.getMessage()); throw e; } finally { span.end(); @@ -2145,6 +2149,7 @@ public ResultSet getColumns( Span backgroundSpan = tracer .spanBuilder("BigQueryDatabaseMetaData.getColumns.background") + .setNoParent() .addLink(parentSpanContext) .startSpan(); @@ -2258,6 +2263,7 @@ public ResultSet getColumns( return resultSet; } catch (Exception e) { span.recordException(e); + span.setStatus(StatusCode.ERROR, e.getMessage()); throw e; } finally { span.end(); @@ -3707,6 +3713,7 @@ public ResultSet getSchemas(String catalog, String schemaPattern) { Span backgroundSpan = tracer .spanBuilder("BigQueryDatabaseMetaData.getSchemas.background") + .setNoParent() .addLink(parentSpanContext) .startSpan(); @@ -3799,6 +3806,7 @@ public ResultSet getSchemas(String catalog, String schemaPattern) { return resultSet; } catch (Exception e) { span.recordException(e); + span.setStatus(StatusCode.ERROR, e.getMessage()); throw e; } finally { span.end(); diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java index ddc4ebbcc6da..a846540b722b 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java @@ -1578,7 +1578,7 @@ private void fetchNextPages( } SpanBuilder spanBuilder = tracer.spanBuilder("BigQueryStatement.pagination"); - if (parentSpanContext != null) { + if (parentSpanContext.isValid()) { spanBuilder.addLink(parentSpanContext); } Span paginationSpan = spanBuilder.startSpan(); From 4e9d6ae8271487b2e1fa6bd87c1a4d0a63a0a6bf Mon Sep 17 00:00:00 2001 From: Keshav Dandeva Date: Fri, 24 Apr 2026 16:25:39 +0000 Subject: [PATCH 5/5] refactor: move core login to *impl methods and use `withTrace` generic function --- .../jdbc/BigQueryDatabaseMetaData.java | 897 +++++++++--------- .../jdbc/BigQueryDatabaseMetaDataTest.java | 2 +- 2 files changed, 452 insertions(+), 447 deletions(-) diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java index 753bf4a995c4..87352cf8cbed 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java @@ -1704,209 +1704,206 @@ Comparator defineGetProcedureColumnsComparator(FieldList resultS @Override public ResultSet getTables( - String catalog, String schemaPattern, String tableNamePattern, String[] types) { - Tracer tracer = BigQueryJdbcOpenTelemetry.getSafeTracer(this.connection); - Span span = tracer.spanBuilder("BigQueryDatabaseMetaData.getTables").startSpan(); - try (Scope scope = span.makeCurrent()) { - Tuple effectiveIdentifiers = - determineEffectiveCatalogAndSchema(catalog, schemaPattern); - String effectiveCatalog = effectiveIdentifiers.x(); - String effectiveSchemaPattern = effectiveIdentifiers.y(); - - if ((effectiveCatalog == null || effectiveCatalog.isEmpty()) - || (effectiveSchemaPattern != null && effectiveSchemaPattern.isEmpty()) - || (tableNamePattern != null && tableNamePattern.isEmpty())) { - LOG.warning( - "Returning empty ResultSet as one or more patterns are empty or catalog is null."); - return new BigQueryJsonResultSet(); - } - - LOG.info( - "getTables called for catalog: %s, schemaPattern: %s, tableNamePattern: %s, types: %s", - effectiveCatalog, effectiveSchemaPattern, tableNamePattern, Arrays.toString(types)); + String catalog, String schemaPattern, String tableNamePattern, String[] types) + throws SQLException { + return withTracing( + "BigQueryDatabaseMetaData.getTables", + () -> getTablesImpl(catalog, schemaPattern, tableNamePattern, types)); + } - final Pattern schemaRegex = compileSqlLikePattern(effectiveSchemaPattern); - final Pattern tableNameRegex = compileSqlLikePattern(tableNamePattern); - final Set requestedTypes = - (types == null || types.length == 0) ? null : new HashSet<>(Arrays.asList(types)); + private ResultSet getTablesImpl( + String catalog, String schemaPattern, String tableNamePattern, String[] types) + throws SQLException { + Tuple effectiveIdentifiers = + determineEffectiveCatalogAndSchema(catalog, schemaPattern); + String effectiveCatalog = effectiveIdentifiers.x(); + String effectiveSchemaPattern = effectiveIdentifiers.y(); + + if ((effectiveCatalog == null || effectiveCatalog.isEmpty()) + || (effectiveSchemaPattern != null && effectiveSchemaPattern.isEmpty()) + || (tableNamePattern != null && tableNamePattern.isEmpty())) { + LOG.warning( + "Returning empty ResultSet as one or more patterns are empty or catalog is null."); + return new BigQueryJsonResultSet(); + } - final Schema resultSchema = defineGetTablesSchema(); - final FieldList resultSchemaFields = resultSchema.getFields(); + LOG.info( + "getTables called for catalog: %s, schemaPattern: %s, tableNamePattern: %s, types: %s", + effectiveCatalog, effectiveSchemaPattern, tableNamePattern, Arrays.toString(types)); - final BlockingQueue queue = - new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); - final List collectedResults = Collections.synchronizedList(new ArrayList<>()); - final String catalogParam = effectiveCatalog; - final String schemaParam = effectiveSchemaPattern; + final Pattern schemaRegex = compileSqlLikePattern(effectiveSchemaPattern); + final Pattern tableNameRegex = compileSqlLikePattern(tableNamePattern); + final Set requestedTypes = + (types == null || types.length == 0) ? null : new HashSet<>(Arrays.asList(types)); - SpanContext parentSpanContext = span.getSpanContext(); - Runnable tableFetcher = - () -> { - Span backgroundSpan = - tracer - .spanBuilder("BigQueryDatabaseMetaData.getTables.background") - .setNoParent() - .addLink(parentSpanContext) - .startSpan(); - - try (Scope backgroundScope = backgroundSpan.makeCurrent()) { - ExecutorService apiExecutor = null; - ExecutorService tableProcessorExecutor = null; - final FieldList localResultSchemaFields = resultSchemaFields; - final List>> apiFutures = new ArrayList<>(); - final List> processingFutures = new ArrayList<>(); + final Schema resultSchema = defineGetTablesSchema(); + final FieldList resultSchemaFields = resultSchema.getFields(); - try { - List datasetsToScan = - findMatchingBigQueryObjects( - "Dataset", - () -> - bigquery.listDatasets( - catalogParam, DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), - (name) -> bigquery.getDataset(DatasetId.of(catalogParam, name)), - (ds) -> ds.getDatasetId().getDataset(), - schemaParam, - schemaRegex, - LOG); + final BlockingQueue queue = + new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); + final List collectedResults = Collections.synchronizedList(new ArrayList<>()); + final String catalogParam = effectiveCatalog; + final String schemaParam = effectiveSchemaPattern; - if (datasetsToScan.isEmpty()) { - LOG.info("Fetcher thread found no matching datasets. Returning empty resultset."); - return; - } + Tracer tracer = BigQueryJdbcOpenTelemetry.getSafeTracer(this.connection); + SpanContext parentSpanContext = Span.current().getSpanContext(); + Runnable tableFetcher = + () -> { + Span backgroundSpan = + tracer + .spanBuilder("BigQueryDatabaseMetaData.getTables.background") + .setNoParent() + .addLink(parentSpanContext) + .startSpan(); + + try (Scope backgroundScope = backgroundSpan.makeCurrent()) { + ExecutorService apiExecutor = null; + ExecutorService tableProcessorExecutor = null; + final FieldList localResultSchemaFields = resultSchemaFields; + final List>> apiFutures = new ArrayList<>(); + final List> processingFutures = new ArrayList<>(); - apiExecutor = Executors.newFixedThreadPool(API_EXECUTOR_POOL_SIZE); - tableProcessorExecutor = - Executors.newFixedThreadPool(this.metadataFetchThreadCount); + try { + List datasetsToScan = + findMatchingBigQueryObjects( + "Dataset", + () -> + bigquery.listDatasets( + catalogParam, DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), + (name) -> bigquery.getDataset(DatasetId.of(catalogParam, name)), + (ds) -> ds.getDatasetId().getDataset(), + schemaParam, + schemaRegex, + LOG); + + if (datasetsToScan.isEmpty()) { + LOG.info("Fetcher thread found no matching datasets. Returning empty resultset."); + return; + } - LOG.fine("Submitting parallel findMatchingTables tasks..."); - for (Dataset dataset : datasetsToScan) { - if (Thread.currentThread().isInterrupted()) { - LOG.warning("Table fetcher interrupted during dataset iteration."); - break; - } + apiExecutor = Executors.newFixedThreadPool(API_EXECUTOR_POOL_SIZE); + tableProcessorExecutor = Executors.newFixedThreadPool(this.metadataFetchThreadCount); - final DatasetId currentDatasetId = dataset.getDatasetId(); - Callable> apiCallable = - () -> - findMatchingBigQueryObjects( - "Table", - () -> - bigquery.listTables( - currentDatasetId, - TableListOption.pageSize(DEFAULT_PAGE_SIZE)), - (name) -> - bigquery.getTable( - TableId.of( - currentDatasetId.getProject(), - currentDatasetId.getDataset(), - name)), - (tbl) -> tbl.getTableId().getTable(), - tableNamePattern, - tableNameRegex, - LOG); - - Callable> wrappedApiCallable = Context.current().wrap(apiCallable); - Future> apiFuture = apiExecutor.submit(wrappedApiCallable); - apiFutures.add(apiFuture); + LOG.fine("Submitting parallel findMatchingTables tasks..."); + for (Dataset dataset : datasetsToScan) { + if (Thread.currentThread().isInterrupted()) { + LOG.warning("Table fetcher interrupted during dataset iteration."); + break; } - LOG.fine("Finished submitting " + apiFutures.size() + " findMatchingTables tasks."); - apiExecutor.shutdown(); - LOG.fine("Processing results from findMatchingTables tasks..."); - for (Future> apiFuture : apiFutures) { - if (Thread.currentThread().isInterrupted()) { - LOG.warning("Table fetcher interrupted while processing API futures."); - break; - } - try { - List
tablesResult = apiFuture.get(); - if (tablesResult != null) { - for (Table table : tablesResult) { - if (Thread.currentThread().isInterrupted()) break; - - final Table currentTable = table; - Runnable processRunnable = + final DatasetId currentDatasetId = dataset.getDatasetId(); + Callable> apiCallable = + () -> + findMatchingBigQueryObjects( + "Table", () -> - processTableInfo( - currentTable, - requestedTypes, - collectedResults, - localResultSchemaFields); - Runnable wrappedProcessRunnable = Context.current().wrap(processRunnable); - Future processFuture = - tableProcessorExecutor.submit(wrappedProcessRunnable); - processingFutures.add(processFuture); - } - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.warning("Fetcher thread interrupted while waiting for API future result."); - break; - } catch (ExecutionException e) { - LOG.warning( - "Error executing findMatchingTables task: " - + e.getMessage() - + ". Cause: " - + e.getCause()); - } catch (CancellationException e) { - LOG.warning("A findMatchingTables task was cancelled."); - } - } - - LOG.fine( - "Finished submitting " + processingFutures.size() + " processTableInfo tasks."); + bigquery.listTables( + currentDatasetId, TableListOption.pageSize(DEFAULT_PAGE_SIZE)), + (name) -> + bigquery.getTable( + TableId.of( + currentDatasetId.getProject(), + currentDatasetId.getDataset(), + name)), + (tbl) -> tbl.getTableId().getTable(), + tableNamePattern, + tableNameRegex, + LOG); + + Callable> wrappedApiCallable = Context.current().wrap(apiCallable); + Future> apiFuture = apiExecutor.submit(wrappedApiCallable); + apiFutures.add(apiFuture); + } + LOG.fine("Finished submitting " + apiFutures.size() + " findMatchingTables tasks."); + apiExecutor.shutdown(); + LOG.fine("Processing results from findMatchingTables tasks..."); + for (Future> apiFuture : apiFutures) { if (Thread.currentThread().isInterrupted()) { - LOG.warning( - "Fetcher interrupted before waiting for processing tasks; cancelling remaining."); - processingFutures.forEach(f -> f.cancel(true)); - } else { - LOG.fine("Waiting for processTableInfo tasks to complete..."); - waitForTasksCompletion(processingFutures); - LOG.fine("All processTableInfo tasks completed."); + LOG.warning("Table fetcher interrupted while processing API futures."); + break; } - - if (!Thread.currentThread().isInterrupted()) { - Comparator comparator = - defineGetTablesComparator(localResultSchemaFields); - sortResults(collectedResults, comparator, "getTables", LOG); + try { + List
tablesResult = apiFuture.get(); + if (tablesResult != null) { + for (Table table : tablesResult) { + if (Thread.currentThread().isInterrupted()) break; + + final Table currentTable = table; + Runnable processRunnable = + () -> + processTableInfo( + currentTable, + requestedTypes, + collectedResults, + localResultSchemaFields); + Runnable wrappedProcessRunnable = Context.current().wrap(processRunnable); + Future processFuture = + tableProcessorExecutor.submit(wrappedProcessRunnable); + processingFutures.add(processFuture); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warning("Fetcher thread interrupted while waiting for API future result."); + break; + } catch (ExecutionException e) { + LOG.warning( + "Error executing findMatchingTables task: " + + e.getMessage() + + ". Cause: " + + e.getCause()); + } catch (CancellationException e) { + LOG.warning("A findMatchingTables task was cancelled."); } + } - if (!Thread.currentThread().isInterrupted()) { - populateQueue(collectedResults, queue, localResultSchemaFields); - } + LOG.fine( + "Finished submitting " + processingFutures.size() + " processTableInfo tasks."); - } catch (Throwable t) { - LOG.severe("Unexpected error in table fetcher runnable: " + t.getMessage()); - apiFutures.forEach(f -> f.cancel(true)); + if (Thread.currentThread().isInterrupted()) { + LOG.warning( + "Fetcher interrupted before waiting for processing tasks; cancelling remaining."); processingFutures.forEach(f -> f.cancel(true)); - } finally { - signalEndOfData(queue, localResultSchemaFields); - shutdownExecutor(apiExecutor); - shutdownExecutor(tableProcessorExecutor); - LOG.info("Table fetcher thread finished."); + } else { + LOG.fine("Waiting for processTableInfo tasks to complete..."); + waitForTasksCompletion(processingFutures); + LOG.fine("All processTableInfo tasks completed."); + } + + if (!Thread.currentThread().isInterrupted()) { + Comparator comparator = + defineGetTablesComparator(localResultSchemaFields); + sortResults(collectedResults, comparator, "getTables", LOG); + } + + if (!Thread.currentThread().isInterrupted()) { + populateQueue(collectedResults, queue, localResultSchemaFields); } + + } catch (Throwable t) { + LOG.severe("Unexpected error in table fetcher runnable: " + t.getMessage()); + apiFutures.forEach(f -> f.cancel(true)); + processingFutures.forEach(f -> f.cancel(true)); } finally { - backgroundSpan.end(); + signalEndOfData(queue, localResultSchemaFields); + shutdownExecutor(apiExecutor); + shutdownExecutor(tableProcessorExecutor); + LOG.info("Table fetcher thread finished."); } - }; + } finally { + backgroundSpan.end(); + } + }; - Runnable wrappedTableFetcher = Context.current().wrap(tableFetcher); - Thread fetcherThread = - new Thread(wrappedTableFetcher, "getTables-fetcher-" + effectiveCatalog); - BigQueryJsonResultSet resultSet = - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); + Runnable wrappedTableFetcher = Context.current().wrap(tableFetcher); + Thread fetcherThread = new Thread(wrappedTableFetcher, "getTables-fetcher-" + effectiveCatalog); + BigQueryJsonResultSet resultSet = + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); - fetcherThread.start(); - LOG.info("Started background thread for getTables"); - return resultSet; - } catch (Exception e) { - span.recordException(e); - span.setStatus(StatusCode.ERROR, e.getMessage()); - throw e; - } finally { - span.end(); - } + fetcherThread.start(); + LOG.info("Started background thread for getTables"); + return resultSet; } Schema defineGetTablesSchema() { @@ -2020,39 +2017,32 @@ Comparator defineGetTablesComparator(FieldList resultSchemaField } @Override - public ResultSet getSchemas() { + public ResultSet getSchemas() throws SQLException { LOG.info("getSchemas() called"); return getSchemas(null, null); } @Override - public ResultSet getCatalogs() { + public ResultSet getCatalogs() throws SQLException { + return withTracing("BigQueryDatabaseMetaData.getCatalogs", () -> getCatalogsImpl()); + } + + private ResultSet getCatalogsImpl() throws SQLException { LOG.info("getCatalogs() called"); - Tracer tracer = BigQueryJdbcOpenTelemetry.getSafeTracer(this.connection); - Span span = tracer.spanBuilder("BigQueryDatabaseMetaData.getCatalogs").startSpan(); - try (Scope scope = span.makeCurrent()) { - final List accessibleCatalogs = getAccessibleCatalogNames(); - final Schema catalogsSchema = defineGetCatalogsSchema(); - final FieldList schemaFields = catalogsSchema.getFields(); - final List catalogRows = - prepareGetCatalogsRows(schemaFields, accessibleCatalogs); + final List accessibleCatalogs = getAccessibleCatalogNames(); + final Schema catalogsSchema = defineGetCatalogsSchema(); + final FieldList schemaFields = catalogsSchema.getFields(); + final List catalogRows = + prepareGetCatalogsRows(schemaFields, accessibleCatalogs); - final BlockingQueue queue = - new LinkedBlockingQueue<>(catalogRows.isEmpty() ? 1 : catalogRows.size() + 1); + final BlockingQueue queue = + new LinkedBlockingQueue<>(catalogRows.isEmpty() ? 1 : catalogRows.size() + 1); - populateQueue(catalogRows, queue, schemaFields); - signalEndOfData(queue, schemaFields); + populateQueue(catalogRows, queue, schemaFields); + signalEndOfData(queue, schemaFields); - return BigQueryJsonResultSet.of( - catalogsSchema, catalogRows.size(), queue, null, new Thread[0]); - } catch (Exception e) { - span.recordException(e); - span.setStatus(StatusCode.ERROR, e.getMessage()); - throw e; - } finally { - span.end(); - } + return BigQueryJsonResultSet.of(catalogsSchema, catalogRows.size(), queue, null, new Thread[0]); } Schema defineGetCatalogsSchema() { @@ -2108,166 +2098,166 @@ static List prepareGetTableTypesRows(Schema schema) { @Override public ResultSet getColumns( - String catalog, String schemaPattern, String tableNamePattern, String columnNamePattern) { - Tracer tracer = BigQueryJdbcOpenTelemetry.getSafeTracer(this.connection); - Span span = tracer.spanBuilder("BigQueryDatabaseMetaData.getColumns").startSpan(); - try (Scope scope = span.makeCurrent()) { - Tuple effectiveIdentifiers = - determineEffectiveCatalogAndSchema(catalog, schemaPattern); - String effectiveCatalog = effectiveIdentifiers.x(); - String effectiveSchemaPattern = effectiveIdentifiers.y(); - - if ((effectiveCatalog == null || effectiveCatalog.isEmpty()) - || (effectiveSchemaPattern != null && effectiveSchemaPattern.isEmpty()) - || (tableNamePattern != null && tableNamePattern.isEmpty()) - || (columnNamePattern != null && columnNamePattern.isEmpty())) { - LOG.warning( - "Returning empty ResultSet as one or more patterns are empty or catalog is null."); - return new BigQueryJsonResultSet(); - } + String catalog, String schemaPattern, String tableNamePattern, String columnNamePattern) + throws SQLException { + return withTracing( + "BigQueryDatabaseMetaData.getColumns", + () -> getColumnsImpl(catalog, schemaPattern, tableNamePattern, columnNamePattern)); + } - LOG.info( - "getColumns called for catalog: %s, schemaPattern: %s, tableNamePattern: %s," - + " columnNamePattern: %s", - effectiveCatalog, effectiveSchemaPattern, tableNamePattern, columnNamePattern); - - Pattern schemaRegex = compileSqlLikePattern(effectiveSchemaPattern); - Pattern tableNameRegex = compileSqlLikePattern(tableNamePattern); - Pattern columnNameRegex = compileSqlLikePattern(columnNamePattern); - - final Schema resultSchema = defineGetColumnsSchema(); - final FieldList resultSchemaFields = resultSchema.getFields(); - final BlockingQueue queue = - new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); - final List collectedResults = Collections.synchronizedList(new ArrayList<>()); - final String catalogParam = effectiveCatalog; - final String schemaParam = effectiveSchemaPattern; - - SpanContext parentSpanContext = span.getSpanContext(); - Runnable columnFetcher = - () -> { - Span backgroundSpan = - tracer - .spanBuilder("BigQueryDatabaseMetaData.getColumns.background") - .setNoParent() - .addLink(parentSpanContext) - .startSpan(); - - try (Scope backgroundScope = backgroundSpan.makeCurrent()) { - ExecutorService columnExecutor = null; - final List> taskFutures = new ArrayList<>(); - final FieldList localResultSchemaFields = resultSchemaFields; + private ResultSet getColumnsImpl( + String catalog, String schemaPattern, String tableNamePattern, String columnNamePattern) + throws SQLException { + Tuple effectiveIdentifiers = + determineEffectiveCatalogAndSchema(catalog, schemaPattern); + String effectiveCatalog = effectiveIdentifiers.x(); + String effectiveSchemaPattern = effectiveIdentifiers.y(); + + if ((effectiveCatalog == null || effectiveCatalog.isEmpty()) + || (effectiveSchemaPattern != null && effectiveSchemaPattern.isEmpty()) + || (tableNamePattern != null && tableNamePattern.isEmpty()) + || (columnNamePattern != null && columnNamePattern.isEmpty())) { + LOG.warning( + "Returning empty ResultSet as one or more patterns are empty or catalog is null."); + return new BigQueryJsonResultSet(); + } - try { - List datasetsToScan = - findMatchingBigQueryObjects( - "Dataset", - () -> - bigquery.listDatasets( - catalogParam, DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), - (name) -> bigquery.getDataset(DatasetId.of(catalogParam, name)), - (ds) -> ds.getDatasetId().getDataset(), - schemaParam, - schemaRegex, - LOG); + LOG.info( + "getColumns called for catalog: %s, schemaPattern: %s, tableNamePattern: %s," + + " columnNamePattern: %s", + effectiveCatalog, effectiveSchemaPattern, tableNamePattern, columnNamePattern); - if (datasetsToScan.isEmpty()) { - LOG.info("Fetcher thread found no matching datasets. Returning empty resultset."); - return; - } + Pattern schemaRegex = compileSqlLikePattern(effectiveSchemaPattern); + Pattern tableNameRegex = compileSqlLikePattern(tableNamePattern); + Pattern columnNameRegex = compileSqlLikePattern(columnNamePattern); - columnExecutor = Executors.newFixedThreadPool(this.metadataFetchThreadCount); + final Schema resultSchema = defineGetColumnsSchema(); + final FieldList resultSchemaFields = resultSchema.getFields(); + final BlockingQueue queue = + new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); + final List collectedResults = Collections.synchronizedList(new ArrayList<>()); + final String catalogParam = effectiveCatalog; + final String schemaParam = effectiveSchemaPattern; - for (Dataset dataset : datasetsToScan) { - if (Thread.currentThread().isInterrupted()) { - LOG.warning("Fetcher interrupted during dataset iteration."); - break; - } + Tracer tracer = BigQueryJdbcOpenTelemetry.getSafeTracer(this.connection); + SpanContext parentSpanContext = Span.current().getSpanContext(); + Runnable columnFetcher = + () -> { + Span backgroundSpan = + tracer + .spanBuilder("BigQueryDatabaseMetaData.getColumns.background") + .setNoParent() + .addLink(parentSpanContext) + .startSpan(); + + try (Scope scope = backgroundSpan.makeCurrent()) { + ExecutorService columnExecutor = null; + final List> taskFutures = new ArrayList<>(); + final FieldList localResultSchemaFields = resultSchemaFields; - DatasetId datasetId = dataset.getDatasetId(); - LOG.info("Processing dataset: " + datasetId.getDataset()); + try { + List datasetsToScan = + findMatchingBigQueryObjects( + "Dataset", + () -> + bigquery.listDatasets( + catalogParam, DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), + (name) -> bigquery.getDataset(DatasetId.of(catalogParam, name)), + (ds) -> ds.getDatasetId().getDataset(), + schemaParam, + schemaRegex, + LOG); + + if (datasetsToScan.isEmpty()) { + LOG.info("Fetcher thread found no matching datasets. Returning empty resultset."); + return; + } - List
tablesToScan = - findMatchingBigQueryObjects( - "Table", - () -> - bigquery.listTables( - datasetId, TableListOption.pageSize(DEFAULT_PAGE_SIZE)), - (name) -> - bigquery.getTable( - TableId.of(datasetId.getProject(), datasetId.getDataset(), name)), - (tbl) -> tbl.getTableId().getTable(), - tableNamePattern, - tableNameRegex, - LOG); + columnExecutor = Executors.newFixedThreadPool(this.metadataFetchThreadCount); - for (Table table : tablesToScan) { - if (Thread.currentThread().isInterrupted()) { - LOG.warning( - "Fetcher interrupted during table iteration for dataset " - + datasetId.getDataset()); - break; - } + for (Dataset dataset : datasetsToScan) { + if (Thread.currentThread().isInterrupted()) { + LOG.warning("Fetcher interrupted during dataset iteration."); + break; + } - TableId tableId = table.getTableId(); - LOG.fine("Submitting task for table: " + tableId); - final Table finalTable = table; + DatasetId datasetId = dataset.getDatasetId(); + LOG.info("Processing dataset: " + datasetId.getDataset()); - Runnable columnTask = + List
tablesToScan = + findMatchingBigQueryObjects( + "Table", () -> - processTableColumns( - finalTable, - columnNameRegex, - collectedResults, - localResultSchemaFields); - Runnable wrappedColumnTask = Context.current().wrap(columnTask); - Future future = columnExecutor.submit(wrappedColumnTask); - taskFutures.add(future); + bigquery.listTables( + datasetId, TableListOption.pageSize(DEFAULT_PAGE_SIZE)), + (name) -> + bigquery.getTable( + TableId.of(datasetId.getProject(), datasetId.getDataset(), name)), + (tbl) -> tbl.getTableId().getTable(), + tableNamePattern, + tableNameRegex, + LOG); + + for (Table table : tablesToScan) { + if (Thread.currentThread().isInterrupted()) { + LOG.warning( + "Fetcher interrupted during table iteration for dataset " + + datasetId.getDataset()); + break; } - if (Thread.currentThread().isInterrupted()) break; - } - waitForTasksCompletion(taskFutures); + TableId tableId = table.getTableId(); + LOG.fine("Submitting task for table: " + tableId); + final Table finalTable = table; - if (!Thread.currentThread().isInterrupted()) { - Comparator comparator = - defineGetColumnsComparator(localResultSchemaFields); - sortResults(collectedResults, comparator, "getColumns", LOG); + Runnable columnTask = + () -> + processTableColumns( + finalTable, + columnNameRegex, + collectedResults, + localResultSchemaFields); + Runnable wrappedColumnTask = Context.current().wrap(columnTask); + Future future = columnExecutor.submit(wrappedColumnTask); + taskFutures.add(future); } + if (Thread.currentThread().isInterrupted()) break; + } - if (!Thread.currentThread().isInterrupted()) { - populateQueue(collectedResults, queue, localResultSchemaFields); - } + waitForTasksCompletion(taskFutures); + + if (!Thread.currentThread().isInterrupted()) { + Comparator comparator = + defineGetColumnsComparator(localResultSchemaFields); + sortResults(collectedResults, comparator, "getColumns", LOG); + } - } catch (Throwable t) { - LOG.severe("Unexpected error in column fetcher runnable: " + t.getMessage()); - taskFutures.forEach(f -> f.cancel(true)); - } finally { - signalEndOfData(queue, localResultSchemaFields); - shutdownExecutor(columnExecutor); - LOG.info("Column fetcher thread finished."); + if (!Thread.currentThread().isInterrupted()) { + populateQueue(collectedResults, queue, localResultSchemaFields); } + + } catch (Throwable t) { + LOG.severe("Unexpected error in column fetcher runnable: " + t.getMessage()); + taskFutures.forEach(f -> f.cancel(true)); } finally { - backgroundSpan.end(); + signalEndOfData(queue, localResultSchemaFields); + shutdownExecutor(columnExecutor); + LOG.info("Column fetcher thread finished."); } - }; + } finally { + backgroundSpan.end(); + } + }; - Runnable wrappedColumnFetcher = Context.current().wrap(columnFetcher); - Thread fetcherThread = - new Thread(wrappedColumnFetcher, "getColumns-fetcher-" + effectiveCatalog); - BigQueryJsonResultSet resultSet = - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); + Runnable wrappedColumnFetcher = Context.current().wrap(columnFetcher); + Thread fetcherThread = + new Thread(wrappedColumnFetcher, "getColumns-fetcher-" + effectiveCatalog); + BigQueryJsonResultSet resultSet = + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); - fetcherThread.start(); - LOG.info("Started background thread for getColumns"); - return resultSet; - } catch (Exception e) { - span.recordException(e); - span.setStatus(StatusCode.ERROR, e.getMessage()); - throw e; - } finally { - span.end(); - } + fetcherThread.start(); + LOG.info("Started background thread for getColumns"); + return resultSet; } private void processTableColumns( @@ -3686,131 +3676,127 @@ public RowIdLifetime getRowIdLifetime() { } @Override - public ResultSet getSchemas(String catalog, String schemaPattern) { - Tracer tracer = BigQueryJdbcOpenTelemetry.getSafeTracer(this.connection); - Span span = tracer.spanBuilder("BigQueryDatabaseMetaData.getSchemas").startSpan(); - try (Scope scope = span.makeCurrent()) { - if ((catalog != null && catalog.isEmpty()) - || (schemaPattern != null && schemaPattern.isEmpty())) { - LOG.warning("Returning empty ResultSet as catalog or schemaPattern is an empty string."); - return new BigQueryJsonResultSet(); - } + public ResultSet getSchemas(String catalog, String schemaPattern) throws SQLException { + return withTracing( + "BigQueryDatabaseMetaData.getSchemas", () -> getSchemasImpl(catalog, schemaPattern)); + } - LOG.info("getSchemas called for catalog: %s, schemaPattern: %s", catalog, schemaPattern); + private ResultSet getSchemasImpl(String catalog, String schemaPattern) throws SQLException { + if ((catalog != null && catalog.isEmpty()) + || (schemaPattern != null && schemaPattern.isEmpty())) { + LOG.warning("Returning empty ResultSet as catalog or schemaPattern is an empty string."); + return new BigQueryJsonResultSet(); + } - final Pattern schemaRegex = compileSqlLikePattern(schemaPattern); - final Schema resultSchema = defineGetSchemasSchema(); - final FieldList resultSchemaFields = resultSchema.getFields(); + LOG.info("getSchemas called for catalog: %s, schemaPattern: %s", catalog, schemaPattern); - final BlockingQueue queue = - new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); - final List collectedResults = Collections.synchronizedList(new ArrayList<>()); - final String catalogParam = catalog; + final Pattern schemaRegex = compileSqlLikePattern(schemaPattern); + final Schema resultSchema = defineGetSchemasSchema(); + final FieldList resultSchemaFields = resultSchema.getFields(); - SpanContext parentSpanContext = span.getSpanContext(); - Runnable schemaFetcher = - () -> { - Span backgroundSpan = - tracer - .spanBuilder("BigQueryDatabaseMetaData.getSchemas.background") - .setNoParent() - .addLink(parentSpanContext) - .startSpan(); - - try (Scope backgroundScope = backgroundSpan.makeCurrent()) { - final FieldList localResultSchemaFields = resultSchemaFields; - List projectsToScanList = new ArrayList<>(); - - if (catalogParam != null) { - projectsToScanList.add(catalogParam); - } else { - projectsToScanList.addAll(getAccessibleCatalogNames()); - } + final BlockingQueue queue = + new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); + final List collectedResults = Collections.synchronizedList(new ArrayList<>()); + final String catalogParam = catalog; - if (projectsToScanList.isEmpty()) { - LOG.info( - "No valid projects to scan (primary, specified, or additional). Returning empty" - + " resultset."); - return; - } + Tracer tracer = BigQueryJdbcOpenTelemetry.getSafeTracer(this.connection); + SpanContext parentSpanContext = Span.current().getSpanContext(); + Runnable schemaFetcher = + () -> { + Span backgroundSpan = + tracer + .spanBuilder("BigQueryDatabaseMetaData.getSchemas.background") + .setNoParent() + .addLink(parentSpanContext) + .startSpan(); + + try (Scope backgroundScope = backgroundSpan.makeCurrent()) { + final FieldList localResultSchemaFields = resultSchemaFields; + List projectsToScanList = new ArrayList<>(); + + if (catalogParam != null) { + projectsToScanList.add(catalogParam); + } else { + projectsToScanList.addAll(getAccessibleCatalogNames()); + } - try { - for (String currentProjectToScan : projectsToScanList) { + if (projectsToScanList.isEmpty()) { + LOG.info( + "No valid projects to scan (primary, specified, or additional). Returning empty" + + " resultset."); + return; + } + + try { + for (String currentProjectToScan : projectsToScanList) { + if (Thread.currentThread().isInterrupted()) { + LOG.warning( + "Schema fetcher interrupted during project iteration for project: " + + currentProjectToScan); + break; + } + LOG.info("Fetching schemas for project: " + currentProjectToScan); + List datasetsInProject = + findMatchingBigQueryObjects( + "Dataset", + () -> + bigquery.listDatasets( + currentProjectToScan, + BigQuery.DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), + (name) -> bigquery.getDataset(DatasetId.of(currentProjectToScan, name)), + (ds) -> ds.getDatasetId().getDataset(), + schemaPattern, + schemaRegex, + LOG); + + if (datasetsInProject.isEmpty() || Thread.currentThread().isInterrupted()) { + LOG.info( + "Fetcher thread found no matching datasets in project: " + + currentProjectToScan); + continue; + } + + LOG.fine("Processing found datasets for project: " + currentProjectToScan); + for (Dataset dataset : datasetsInProject) { if (Thread.currentThread().isInterrupted()) { LOG.warning( - "Schema fetcher interrupted during project iteration for project: " + "Schema fetcher interrupted during dataset iteration for project: " + currentProjectToScan); break; } - LOG.info("Fetching schemas for project: " + currentProjectToScan); - List datasetsInProject = - findMatchingBigQueryObjects( - "Dataset", - () -> - bigquery.listDatasets( - currentProjectToScan, - BigQuery.DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), - (name) -> bigquery.getDataset(DatasetId.of(currentProjectToScan, name)), - (ds) -> ds.getDatasetId().getDataset(), - schemaPattern, - schemaRegex, - LOG); - - if (datasetsInProject.isEmpty() || Thread.currentThread().isInterrupted()) { - LOG.info( - "Fetcher thread found no matching datasets in project: " - + currentProjectToScan); - continue; - } - - LOG.fine("Processing found datasets for project: " + currentProjectToScan); - for (Dataset dataset : datasetsInProject) { - if (Thread.currentThread().isInterrupted()) { - LOG.warning( - "Schema fetcher interrupted during dataset iteration for project: " - + currentProjectToScan); - break; - } - processSchemaInfo(dataset, collectedResults, localResultSchemaFields); - } - } - - if (!Thread.currentThread().isInterrupted()) { - Comparator comparator = - defineGetSchemasComparator(localResultSchemaFields); - sortResults(collectedResults, comparator, "getSchemas", LOG); + processSchemaInfo(dataset, collectedResults, localResultSchemaFields); } + } - if (!Thread.currentThread().isInterrupted()) { - populateQueue(collectedResults, queue, localResultSchemaFields); - } + if (!Thread.currentThread().isInterrupted()) { + Comparator comparator = + defineGetSchemasComparator(localResultSchemaFields); + sortResults(collectedResults, comparator, "getSchemas", LOG); + } - } catch (Throwable t) { - LOG.severe("Unexpected error in schema fetcher runnable: " + t.getMessage()); - } finally { - signalEndOfData(queue, localResultSchemaFields); - LOG.info("Schema fetcher thread finished."); + if (!Thread.currentThread().isInterrupted()) { + populateQueue(collectedResults, queue, localResultSchemaFields); } + + } catch (Throwable t) { + LOG.severe("Unexpected error in schema fetcher runnable: " + t.getMessage()); } finally { - backgroundSpan.end(); + signalEndOfData(queue, localResultSchemaFields); + LOG.info("Schema fetcher thread finished."); } - }; + } finally { + backgroundSpan.end(); + } + }; - Runnable wrappedFetcher = Context.current().wrap(schemaFetcher); - Thread fetcherThread = new Thread(wrappedFetcher, "getSchemas-fetcher-" + catalog); - BigQueryJsonResultSet resultSet = - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); + Runnable wrappedFetcher = Context.current().wrap(schemaFetcher); + Thread fetcherThread = new Thread(wrappedFetcher, "getSchemas-fetcher-" + catalog); + BigQueryJsonResultSet resultSet = + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); - fetcherThread.start(); - LOG.info("Started background thread for getSchemas"); - return resultSet; - } catch (Exception e) { - span.recordException(e); - span.setStatus(StatusCode.ERROR, e.getMessage()); - throw e; - } finally { - span.end(); - } + fetcherThread.start(); + LOG.info("Started background thread for getSchemas"); + return resultSet; } Schema defineGetSchemasSchema() { @@ -5384,4 +5370,23 @@ private void loadDriverVersionProperties() { throw new IllegalStateException(errorMessage, e); } } + + private interface TracedMetadataOperation { + T run() throws SQLException; + } + + private T withTracing(String spanName, TracedMetadataOperation operation) + throws SQLException { + Tracer tracer = BigQueryJdbcOpenTelemetry.getSafeTracer(this.connection); + Span span = tracer.spanBuilder(spanName).startSpan(); + try (Scope scope = span.makeCurrent()) { + return operation.run(); + } catch (Exception ex) { + span.recordException(ex); + span.setStatus(StatusCode.ERROR, ex.getMessage()); + throw ex; + } finally { + span.end(); + } + } } diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java index 4d108ee54f8b..81fe6d4d13cb 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java @@ -2917,7 +2917,7 @@ public void testPrepareGetCatalogsRows() { } @Test - public void testGetSchemas_NoArgs_DelegatesCorrectly() { + public void testGetSchemas_NoArgs_DelegatesCorrectly() throws Exception { BigQueryDatabaseMetaData spiedDbMetadata = spy(dbMetadata); ResultSet mockResultSet = mock(ResultSet.class); doReturn(mockResultSet).when(spiedDbMetadata).getSchemas(null, null);