Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions include/paimon/catalog/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ class PAIMON_EXPORT Catalog {
/// @return A shared pointer to the file system instance.
virtual std::shared_ptr<FileSystem> GetFileSystem() const = 0;

/// Returns the catalog-level options that were passed during catalog creation.
///
/// @return A const reference to the map of catalog options (key-value pairs).
virtual const std::map<std::string, std::string>& GetOptions() const = 0;

/// Loads the latest schema of a specified table.
///
/// @note System tables will not be supported.
Expand Down
1 change: 1 addition & 0 deletions src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ set(PAIMON_CORE_SRCS
core/table/source/data_evolution_batch_scan.cpp
core/table/system/audit_log_system_table.cpp
core/table/system/binlog_system_table.cpp
core/table/system/global_system_tables.cpp
core/table/system/in_memory_system_table.cpp
core/table/system/metadata_system_tables.cpp
core/table/system/system_table.cpp
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/core/catalog/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Result<std::unique_ptr<Catalog>> Catalog::Create(const std::string& root_path,
const std::map<std::string, std::string>& options,
const std::shared_ptr<FileSystem>& file_system) {
PAIMON_ASSIGN_OR_RAISE(CoreOptions core_options, CoreOptions::FromMap(options, file_system));
return std::make_unique<FileSystemCatalog>(core_options.GetFileSystem(), root_path);
return std::make_unique<FileSystemCatalog>(core_options.GetFileSystem(), root_path, options);
}

} // namespace paimon
39 changes: 34 additions & 5 deletions src/paimon/core/catalog/file_system_catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "paimon/common/utils/string_utils.h"
#include "paimon/core/core_options.h"
#include "paimon/core/snapshot.h"
#include "paimon/core/table/system/global_system_tables.h"
#include "paimon/core/table/system/system_table.h"
#include "paimon/core/table/system/system_table_schema.h"
#include "paimon/core/utils/branch_manager.h"
Expand All @@ -47,8 +48,12 @@ struct ArrowSchema;

namespace paimon {
FileSystemCatalog::FileSystemCatalog(const std::shared_ptr<FileSystem>& fs,
const std::string& warehouse)
: fs_(fs), warehouse_(warehouse), logger_(Logger::GetLogger("FileSystemCatalog")) {}
const std::string& warehouse,
const std::map<std::string, std::string>& catalog_options)
: fs_(fs),
warehouse_(warehouse),
catalog_options_(catalog_options),
logger_(Logger::GetLogger("FileSystemCatalog")) {}

Status FileSystemCatalog::CreateDatabase(const std::string& db_name,
const std::map<std::string, std::string>& options,
Expand Down Expand Up @@ -88,13 +93,16 @@ Status FileSystemCatalog::CreateDatabaseImpl(const std::string& db_name,

Result<bool> FileSystemCatalog::DatabaseExists(const std::string& db_name) const {
if (IsSystemDatabase(db_name)) {
return Status::NotImplemented(
"do not support checking DatabaseExists for system database.");
return true;
}
return fs_->Exists(NewDatabasePath(warehouse_, db_name));
}

Result<bool> FileSystemCatalog::TableExists(const Identifier& identifier) const {
// Handle sys database global tables
if (IsSystemDatabase(identifier.GetDatabaseName())) {
return GlobalSystemTableLoader::IsSupported(identifier.GetTableName());
}
PAIMON_ASSIGN_OR_RAISE(bool is_system_table, identifier.IsSystemTable());
if (is_system_table) {
PAIMON_ASSIGN_OR_RAISE(std::optional<std::string> system_table_name,
Expand Down Expand Up @@ -184,6 +192,10 @@ std::shared_ptr<FileSystem> FileSystemCatalog::GetFileSystem() const {
return fs_;
}

const std::map<std::string, std::string>& FileSystemCatalog::GetOptions() const {
return catalog_options_;
}

bool FileSystemCatalog::IsSystemDatabase(const std::string& db_name) {
return db_name == SYSTEM_DATABASE_NAME;
}
Expand Down Expand Up @@ -228,7 +240,7 @@ Result<std::vector<std::string>> FileSystemCatalog::ListDatabases() const {

Result<std::vector<std::string>> FileSystemCatalog::ListTables(const std::string& db_name) const {
if (IsSystemDatabase(db_name)) {
return Status::NotImplemented("do not support listing tables for system database.");
return GlobalSystemTableLoader::GetSupportedTableNames();
}
std::string database_path = NewDatabasePath(warehouse_, db_name);
std::vector<std::unique_ptr<BasicFileStatus>> file_status_list;
Expand Down Expand Up @@ -261,6 +273,23 @@ Result<bool> FileSystemCatalog::TableExistsInFileSystem(const std::string& table

Result<std::shared_ptr<Schema>> FileSystemCatalog::LoadTableSchema(
const Identifier& identifier) const {
// Handle sys database global tables
if (IsSystemDatabase(identifier.GetDatabaseName())) {
if (!GlobalSystemTableLoader::IsSupported(identifier.GetTableName())) {
return Status::NotExist(fmt::format("{} not exist", identifier.ToString()));
}
GlobalSystemTableContext context;
context.catalog = const_cast<FileSystemCatalog*>(this);
context.fs = fs_;
context.warehouse = warehouse_;
context.catalog_options = catalog_options_;
PAIMON_ASSIGN_OR_RAISE(
std::shared_ptr<SystemTable> system_table,
GlobalSystemTableLoader::Load(identifier.GetTableName(), context));
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Schema> arrow_schema,
system_table->ArrowSchema());
return std::make_shared<SystemTableSchema>(std::move(arrow_schema));
}
PAIMON_ASSIGN_OR_RAISE(bool is_system_table, identifier.IsSystemTable());
if (is_system_table) {
PAIMON_ASSIGN_OR_RAISE(std::optional<std::string> system_table_name,
Expand Down
5 changes: 4 additions & 1 deletion src/paimon/core/catalog/file_system_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ class Logger;

class FileSystemCatalog : public Catalog {
public:
FileSystemCatalog(const std::shared_ptr<FileSystem>& fs, const std::string& warehouse);
FileSystemCatalog(const std::shared_ptr<FileSystem>& fs, const std::string& warehouse,
const std::map<std::string, std::string>& catalog_options = {});

Status CreateDatabase(const std::string& db_name,
const std::map<std::string, std::string>& options,
Expand All @@ -61,6 +62,7 @@ class FileSystemCatalog : public Catalog {
Result<std::shared_ptr<Schema>> LoadTableSchema(const Identifier& identifier) const override;
std::string GetRootPath() const override;
std::shared_ptr<FileSystem> GetFileSystem() const override;
const std::map<std::string, std::string>& GetOptions() const override;
Result<std::shared_ptr<Table>> GetTable(const Identifier& identifier) const override;
Result<std::vector<SnapshotInfo>> ListSnapshots(const Identifier& identifier,
const std::string& branch) const override;
Expand Down Expand Up @@ -92,6 +94,7 @@ class FileSystemCatalog : public Catalog {

std::shared_ptr<FileSystem> fs_;
std::string warehouse_;
std::map<std::string, std::string> catalog_options_;

std::shared_ptr<Logger> logger_;
};
Expand Down
14 changes: 12 additions & 2 deletions src/paimon/core/catalog/file_system_catalog_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#include "paimon/core/catalog/file_system_catalog.h"

#include <algorithm>

#include "arrow/api.h"
#include "arrow/c/abi.h"
#include "arrow/c/bridge.h"
Expand Down Expand Up @@ -557,8 +559,16 @@ TEST(FileSystemCatalogTest, TestInvalidList) {
auto dir = UniqueTestDirectory::Create();
ASSERT_TRUE(dir);
FileSystemCatalog catalog(core_options.GetFileSystem(), dir->Str());
ASSERT_NOK_WITH_MSG(catalog.ListTables("sys"),
"do not support listing tables for system database.");
ASSERT_OK_AND_ASSIGN(auto sys_tables, catalog.ListTables("sys"));
ASSERT_FALSE(sys_tables.empty());
// Verify expected global system table names are present
ASSERT_TRUE(std::find(sys_tables.begin(), sys_tables.end(), "catalog_options") !=
sys_tables.end());
ASSERT_TRUE(std::find(sys_tables.begin(), sys_tables.end(), "all_table_options") !=
sys_tables.end());
ASSERT_TRUE(std::find(sys_tables.begin(), sys_tables.end(), "tables") != sys_tables.end());
ASSERT_TRUE(std::find(sys_tables.begin(), sys_tables.end(), "partitions") !=
sys_tables.end());
}

TEST(FileSystemCatalogTest, TestValidateTableSchema) {
Expand Down
Loading
Loading