diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java index 7279639ce3de7..6555bd57d19b7 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java @@ -232,13 +232,14 @@ private void createMaterializedTableInContinuousMode( Collections.emptyMap(), Optional.empty()); } catch (Exception e) { - // drop materialized table while submit flink streaming job occur exception. Thus, weak + // drop materialized table if submitting the Flink streaming job encounters an + // exception. Thus, weak // atomicity is guaranteed operationExecutor.callExecutableOperation( handle, new DropMaterializedTableOperation(materializedTableIdentifier, true)); throw new SqlExecutionException( String.format( - "Submit continuous refresh job for materialized table %s occur exception.", + "Failed to submit continuous refresh job for materialized table %s.", materializedTableIdentifier), e); } @@ -288,7 +289,8 @@ private void createMaterializedTableInFullMode( refreshHandler.asSummaryString(), serializedRefreshHandler); } catch (Exception e) { - // drop materialized table while create refresh workflow occur exception. Thus, weak + // drop materialized table if creating the refresh workflow encounters an exception. + // Thus, weak // atomicity is guaranteed operationExecutor.callExecutableOperation( handle, new DropMaterializedTableOperation(materializedTableIdentifier, true)); @@ -651,7 +653,7 @@ public ResultFetcher refreshMaterializedTable( try { LOG.info( - "Begin to refreshing the materialized table {}, statement: {}", + "Starting refresh of the materialized table {}, statement: {}", materializedTableIdentifier, insertStatement); JobExecutionResult result = @@ -682,7 +684,7 @@ public ResultFetcher refreshMaterializedTable( } catch (Exception e) { throw new SqlExecutionException( String.format( - "Refreshing the materialized table %s occur exception.", + "Failed to refresh the materialized table %s.", materializedTableIdentifier), e); } @@ -1128,8 +1130,7 @@ private ContinuousRefreshHandler deserializeContinuousHandler(byte[] serializedR return ContinuousRefreshHandlerSerializer.INSTANCE.deserialize( serializedRefreshHandler, userCodeClassLoader); } catch (IOException | ClassNotFoundException e) { - throw new SqlExecutionException( - "Deserialize ContinuousRefreshHandler occur exception.", e); + throw new SqlExecutionException("Failed to deserialize ContinuousRefreshHandler.", e); } } @@ -1137,8 +1138,7 @@ private byte[] serializeContinuousHandler(ContinuousRefreshHandler refreshHandle try { return ContinuousRefreshHandlerSerializer.INSTANCE.serialize(refreshHandler); } catch (IOException e) { - throw new SqlExecutionException( - "Serialize ContinuousRefreshHandler occur exception.", e); + throw new SqlExecutionException("Failed to serialize ContinuousRefreshHandler.", e); } } diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java index bf484e57c7dd8..e65d1fd8575e7 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java @@ -492,7 +492,7 @@ void testCreateMaterializedTableFailedInInContinuousMode() { .cause() .hasMessageContaining( String.format( - "Submit continuous refresh job for materialized table %s occur exception.", + "Failed to submit continuous refresh job for materialized table %s.", userShopsIdentifier.asSerializableString())); // verify the materialized table is not created diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/WorkflowScheduler.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/WorkflowScheduler.java index ce630a3241d5d..01bb19ebcbac7 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/WorkflowScheduler.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/WorkflowScheduler.java @@ -36,7 +36,7 @@ public interface WorkflowScheduler { * Open this workflow scheduler instance. Used for any required preparation in initialization * phase. * - * @throws WorkflowException if initializing workflow scheduler occur exception + * @throws WorkflowException if initializing the workflow scheduler fails */ void open() throws WorkflowException; diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalog.java b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalog.java index 5386228630697..787178367f25e 100644 --- a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalog.java +++ b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalog.java @@ -129,7 +129,7 @@ public void open() throws CatalogException { } } catch (IOException e) { throw new CatalogException( - String.format("Checking catalog path %s exists occur exception.", catalogPath), + String.format("Error checking whether catalog path %s exists.", catalogPath), e); } } @@ -146,7 +146,7 @@ public List listDatabases() throws CatalogException { .map(fileStatus -> fileStatus.getPath().getName()) .collect(Collectors.toList()); } catch (IOException e) { - throw new CatalogException("Listing database occur exception.", e); + throw new CatalogException("Error listing databases.", e); } } @@ -189,8 +189,7 @@ public void createDatabase(String name, CatalogDatabase database, boolean ignore try { fs.mkdirs(dbPath); } catch (IOException e) { - throw new CatalogException( - String.format("Creating database %s occur exception.", name), e); + throw new CatalogException(String.format("Error creating database %s.", name), e); } } @@ -220,7 +219,7 @@ public void dropDatabase(String databaseName, boolean ignoreIfNotExists, boolean fs.delete(dbPath, true); } catch (IOException e) { throw new CatalogException( - String.format("Dropping database %s occur exception.", databaseName), e); + String.format("Error dropping database %s.", databaseName), e); } } @@ -246,7 +245,7 @@ public List listTables(String databaseName) .collect(Collectors.toList()); } catch (IOException e) { throw new CatalogException( - String.format("Listing table in database %s occur exception.", dbPath), e); + String.format("Error listing tables in database %s.", dbPath), e); } } @@ -276,8 +275,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) tableInfo.getCatalogTableInfo(), tableDataPath.toString()); } catch (IOException e) { - throw new CatalogException( - String.format("Getting table %s occur exception.", tablePath), e); + throw new CatalogException(String.format("Error getting table %s.", tablePath), e); } } @@ -298,7 +296,7 @@ public boolean tableExists(ObjectPath tablePath) throws CatalogException { return fs.exists(path) && fs.exists(tableSchemaFilePath); } catch (IOException e) { throw new CatalogException( - String.format("Checking table %s exists occur exception.", tablePath), e); + String.format("Error checking whether table %s exists.", tablePath), e); } } @@ -317,8 +315,7 @@ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) try { fs.delete(path, true); } catch (IOException e) { - throw new CatalogException( - String.format("Dropping table %s occur exception.", tablePath), e); + throw new CatalogException(String.format("Error dropping table %s.", tablePath), e); } } @@ -374,8 +371,7 @@ public void createTable( } } catch (IOException e) { - throw new CatalogException( - String.format("Create table %s occur exception.", tablePath), e); + throw new CatalogException(String.format("Error creating table %s.", tablePath), e); } } @@ -404,7 +400,7 @@ public void alterTable( if (!fs.exists(tableSchemaPath)) { throw new CatalogException( String.format( - "Table %s schema file %s doesn't exists.", + "Table %s schema file %s doesn't exist.", tablePath, tableSchemaPath)); } // write new table schema @@ -416,8 +412,7 @@ public void alterTable( } } catch (IOException e) { - throw new CatalogException( - String.format("Altering table %s occur exception.", tablePath), e); + throw new CatalogException(String.format("Error altering table %s.", tablePath), e); } }