diff --git a/.github/import_generation.txt b/.github/import_generation.txt index e522732c77e..a2720097dcc 100644 --- a/.github/import_generation.txt +++ b/.github/import_generation.txt @@ -1 +1 @@ -38 +39 diff --git a/.github/last_commit.txt b/.github/last_commit.txt index 7b3760d4b24..de305a4bb62 100644 --- a/.github/last_commit.txt +++ b/.github/last_commit.txt @@ -1 +1 @@ -688edef85e4e4e55828630ef3943a91ca9306799 +bc2335edd99f161c2f65b36772768babcd9e687c diff --git a/CHANGELOG.md b/CHANGELOG.md index e460091e8c1..02797cbad38 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,14 @@ +## v3.17.0 + +* Added support of describe for scheme objects with type 'secret' via new TSecretClient + +* Added support for METRICS_LEVEL for the CreateTable/AlterTable requests. + +* Added support for the new alter table compact action in TableClient and compaction operation in OperationClient in SDK + ## v3.16.0 -* Added support for the new inverted index type: JSON, intended to speed-up queries on Json or JsonDocument columns. +* Added support for the new inverted index type: JSON, intended to speed up queries on Json or JsonDocument columns. * Fixed partition session id conflict @@ -9,7 +17,7 @@ ## v3.15.0 * EXPERIMENTAL! Added `IProducer` interface to the SDK. This interface is used to write messages to a topic. -Each message can be associated with a partitioning key, which is used to determine the partition to which the message will be written. + Each message can be associated with a partitioning key, which is used to determine the partition to which the message will be written. * Added gRPC load balancing policy option for `TDriver`. Default policy: `round_robin`. diff --git a/include/ydb-cpp-sdk/client/export/export.h b/include/ydb-cpp-sdk/client/export/export.h index a2ada6ed70f..3e7a6859808 100644 --- a/include/ydb-cpp-sdk/client/export/export.h +++ b/include/ydb-cpp-sdk/client/export/export.h @@ -147,6 +147,7 @@ struct TExportToFsSettings : public TOperationRequestSettingsGetTopicPath()) + , TopicOriginPath(!originPath.empty() ? originPath : PartitionSession->GetTopicPath()) { Y_ABORT_UNLESS(ReadSourceDatabase); } diff --git a/include/ydb-cpp-sdk/client/import/import.h b/include/ydb-cpp-sdk/client/import/import.h index 742627595b6..e3ab71d8d38 100644 --- a/include/ydb-cpp-sdk/client/import/import.h +++ b/include/ydb-cpp-sdk/client/import/import.h @@ -161,6 +161,11 @@ struct TImportFromFsSettings : public TOperationRequestSettings + +namespace NYdb::inline V3 { +namespace NSecret { + +class TDescribeSecretResult; + +using TAsyncDescribeSecretResult = NThreading::TFuture; + +class TSecretClient { + class TImpl; + +public: + TSecretClient(const TDriver& driver, const TCommonClientSettings& settings = TCommonClientSettings()); + + TAsyncDescribeSecretResult DescribeSecret(const std::string& path, + const NScheme::TDescribePathSettings& settings = NScheme::TDescribePathSettings()); + +private: + std::shared_ptr Impl_; +}; + +class TDescribeSecretResult : public TStatus { +public: + TDescribeSecretResult(TStatus&& status, NScheme::TSchemeEntry&& entry, uint64_t version); + const NScheme::TSchemeEntry& GetEntry() const; + const std::string& GetName() const; + uint64_t GetVersion() const; + + void Out(IOutputStream& out) const; + + void SerializeTo(::Ydb::Scheme::Entry* proto) const; + +private: + NScheme::TSchemeEntry Entry_; + uint64_t Version_ = 0; +}; + +} // namespace NSecret +} // namespace NYdb diff --git a/include/ydb-cpp-sdk/client/table/fwd.h b/include/ydb-cpp-sdk/client/table/fwd.h index 04c947ef6be..f10a5f29048 100644 --- a/include/ydb-cpp-sdk/client/table/fwd.h +++ b/include/ydb-cpp-sdk/client/table/fwd.h @@ -64,6 +64,7 @@ struct TPartitioningPolicy; struct TReplicationPolicy; class TBuildIndexOperation; +class TCompactionOperation; class TTtlDeleteAction; class TTtlEvictToExternalStorageAction; diff --git a/include/ydb-cpp-sdk/client/table/table.h b/include/ydb-cpp-sdk/client/table/table.h index 26aca1647a8..697d8d708fe 100644 --- a/include/ydb-cpp-sdk/client/table/table.h +++ b/include/ydb-cpp-sdk/client/table/table.h @@ -34,6 +34,7 @@ class KMeansTreeSettings; class FulltextIndexSettings; class PartitioningSettings; class ReadReplicasSettings; +class MetricsSettings; class DateTypeColumnModeSettings; class TtlSettings; class TtlTier; @@ -41,6 +42,7 @@ class TableIndex; class TableIndexDescription; class ValueSinceUnixEpochModeSettings; class EvictionToExternalStorageSettings; +class CompactItem; } // namespace Table } // namespace Ydb @@ -229,6 +231,70 @@ class TReadReplicasSettings { uint64_t ReadReplicasCount_; }; +/** + * The parameters for the detailed metrics for the given table. + */ +class TMetricsSettings { +public: + /** + * The level at which metrics are aggregated and reported. + */ + enum class EMetricsLevel { + /** + * The metrics level is not specified. + */ + Unspecified = 0, + + /** + * All metrics are disabled. + */ + Disabled = 1, + + /** + * Metrics are aggregated and reported for the entire database. + */ + Database = 2, + + /** + * Metrics are aggregated and reported for individual tables. + */ + Table = 3, + + /** + * Metrics are aggregated and reported for individual partitions. + */ + Partition = 4, + }; + + explicit TMetricsSettings(EMetricsLevel metricsLevel); + + /** + * Return the configured metrics level. + * + * @return The metrics level + */ + EMetricsLevel GetMetricsLevel() const; + + /** + * Read the metrics configuration from the corresponding protobuf message. + * + * @param[in] proto The message to read from + * + * @return The corresponding metrics configuration + */ + static std::optional FromProto(const Ydb::Table::MetricsSettings& proto); + + /** + * Read the metrics configuration to the corresponding protobuf message. + * + * @param[in,out] proto The message to write to + */ + void SerializeTo(Ydb::Table::MetricsSettings& proto) const; + +private: + EMetricsLevel MetricsLevel_; +}; + struct TGlobalIndexSettings { static constexpr const int VectorKMeansTreeLevelTablePosition = 0; static constexpr const int VectorKMeansTreePostingTablePosition = 1; @@ -243,6 +309,7 @@ struct TGlobalIndexSettings { TPartitioningSettings PartitioningSettings; TUniformOrExplicitPartitions Partitions; std::optional ReadReplicasSettings; + std::optional MetricsSettings; static TGlobalIndexSettings FromProto(const Ydb::Table::GlobalIndexSettings& proto); @@ -376,6 +443,65 @@ class TIndexDescription { const std::variant& GetIndexSettings() const; uint64_t GetSizeBytes() const; + static TIndexDescription CreateGlobalIndex( + const std::string& name, + const std::vector& indexColumns, + const std::vector& dataColumns = {}, + const TGlobalIndexSettings& indexTableSettings = {} + ); + + static TIndexDescription CreateGlobalAsyncIndex( + const std::string& name, + const std::vector& indexColumns, + const std::vector& dataColumns = {}, + const TGlobalIndexSettings& indexTableSettings = {} + ); + + static TIndexDescription CreateGlobalUniqueIndex( + const std::string& name, + const std::vector& indexColumns, + const std::vector& dataColumns = {}, + const TGlobalIndexSettings& indexTableSettings = {} + ); + + static TIndexDescription CreateVectorIndex( + const std::string& name, + const std::string& vectorColumn, + const TKMeansTreeSettings& specializedIndexSettings, + const std::vector& dataColumns = {}, + const TGlobalIndexSettings& levelTableSettings = {}, + const TGlobalIndexSettings& postingTableSettings = {} + ); + + static TIndexDescription CreatePrefixedVectorIndex( + const std::string& name, + const std::vector& indexColumns, + const TKMeansTreeSettings& specializedIndexSettings, + const std::vector& dataColumns = {}, + const TGlobalIndexSettings& levelTableSettings = {}, + const TGlobalIndexSettings& postingTableSettings = {}, + const TGlobalIndexSettings& prefixTableSettings = {} + ); + + static TIndexDescription CreateFulltextPlainIndex( + const std::string& name, + const std::vector& indexColumns, + const TFulltextIndexSettings& specializedIndexSettings, + const std::vector& dataColumns = {}, + const TGlobalIndexSettings& indexTableSettings = {} + ); + + static TIndexDescription CreateFulltextRelevanceIndex( + const std::string& name, + const std::vector& indexColumns, + const TFulltextIndexSettings& specializedIndexSettings, + const std::vector& dataColumns = {}, + const TGlobalIndexSettings& postingTableSettings = {}, + const TGlobalIndexSettings& dictTableSettings = {}, + const TGlobalIndexSettings& docsTableSettings = {}, + const TGlobalIndexSettings& statsTableSettings = {} + ); + void SerializeTo(Ydb::Table::TableIndex& proto) const; std::string ToString() const; void Out(IOutputStream& o) const; @@ -423,6 +549,37 @@ class TBuildIndexOperation : public TOperation { TMetadata Metadata_; }; +class TCompact { +public: + TCompact(bool cascade, uint32_t maxShardsInFlight); + TCompact(); + + void SerializeTo(Ydb::Table::CompactItem& proto) const; +private: + bool Cascade_; + uint32_t MaxShardsInFlight_; +}; + +class TCompactionOperation : public TOperation { +public: + using TOperation::TOperation; + TCompactionOperation(TStatus&& status, Ydb::Operations::Operation&& operation); + + struct TMetadata { + ECompactState State; + float Progress; + std::string Path; + bool Cascade; + uint32_t MaxInFlight; + uint32_t Total; + uint32_t Done; + }; + + const TMetadata& Metadata() const; +private: + TMetadata Metadata_; +}; + //////////////////////////////////////////////////////////////////////////////// //! Represents changefeed description @@ -646,9 +803,7 @@ class TTtlSettings { class TAlterTtlSettings { using EUnit = TValueSinceUnixEpochModeSettings::EUnit; - TAlterTtlSettings() - : Action_(true) - {} + TAlterTtlSettings() = default; template explicit TAlterTtlSettings(Args&&... args) @@ -675,11 +830,103 @@ class TAlterTtlSettings { private: std::variant< - bool, // EAction::Drop + std::monostate, // EAction::Drop TTtlSettings // EAction::Set > Action_; }; +/** + * The holder for the detailed metrics configuration for the ALTER TABLE request. + */ +class TAlterMetricsSettings { +private: + /** + * The constructor, which configures the ALTER TABLE request to remove + * the current metrics configuration. + */ + TAlterMetricsSettings() = default; + + /** + * The constructor, which configures the ALTER TABLE request to use + * the given metrics configuration. + * + * @tparam Args The types of arguments for the TMetricsSettings constructor + * + * @param[in] args The arguments for the TMetricsSettings constructor + */ + template + explicit TAlterMetricsSettings(Args&&... args) + : Action_(TMetricsSettings(std::forward(args)...)) + {} + +public: + /** + * The type of the action for the metrics configuration in the ALTER TABLE request. + */ + enum class EAction { + /** + * Remove the current metrics configuration. + */ + Drop = 0, + + /** + * Set the metrics configuration to the given values. + */ + Set = 1, + }; + + /** + * Configure the ALTER TABLE request to remove the current metrics configuration. + * + * @return The metrics configuration holder for the EAction::Drop action + */ + static TAlterMetricsSettings Drop() { + return TAlterMetricsSettings(); + } + + /** + * Configure the ALTER TABLE request to update the current metrics configuration. + * + * @tparam Args The types of arguments for the TMetricsSettings constructor + * + * @param[in] args The arguments for the TMetricsSettings constructor + * + * @return The metrics configuration holder for the EAction::Set action + */ + template + static TAlterMetricsSettings Set(Args&&... args) { + return TAlterMetricsSettings(std::forward(args)...); + } + + /** + * Return the action for the metrics configuration (drop or set). + * + * @return The action for the metrics configuration + */ + EAction GetAction() const { + return static_cast(Action_.index()); + } + + /** + * Return the current metrics configuration. + * + * @return The current metrics configuration + */ + const TMetricsSettings& GetMetricsSettings() const { + return std::get(Action_); + } + +private: + /** + * The holder for the current metrics configuration. + * + * @note If the first variant is set, the metrics configuration will be removed. + * If the second variant is set, the metrics configuration will be set + * to the given values. + */ + std::variant Action_; +}; + //! Represents table storage settings class TStorageSettings { public: @@ -786,6 +1033,13 @@ class TTableDescription { // Returns read replicas settings of the table std::optional GetReadReplicasSettings() const; + /** + * Return the metrics configuration for the given table. + * + * @return The metrics configuration + */ + std::optional GetMetricsSettings() const; + // Fills CreateTableRequest proto from this description void SerializeTo(Ydb::Table::CreateTableRequest& request) const; @@ -837,6 +1091,14 @@ class TTableDescription { void SetPartitioningSettings(const TPartitioningSettings& settings); void SetKeyBloomFilter(bool enabled); void SetReadReplicasSettings(TReadReplicasSettings::EMode mode, uint64_t readReplicasCount); + + /** + * Set the metrics configuration for the given table. + * + * @param[in] metricsLevel The metrics level + */ + void SetMetricsSettings(TMetricsSettings::EMetricsLevel metricsLevel); + void SetStoreType(EStoreType type); const Ydb::Table::DescribeTableResult& GetProto() const; @@ -1095,6 +1357,15 @@ class TTableBuilder { TTableBuilder& SetReadReplicasSettings(TReadReplicasSettings::EMode mode, uint64_t readReplicasCount); + /** + * Set the metrics configuration for the given table. + * + * @param[in] metricsLevel The metrics level + * + * @return The instance of the builder itself + */ + TTableBuilder& SetMetricsSettings(TMetricsSettings::EMetricsLevel metricsLevel); + TTableStorageSettingsBuilder BeginStorageSettings() { return TTableStorageSettingsBuilder(*this); } @@ -1634,6 +1905,66 @@ class TAlterTtlSettingsBuilder { std::shared_ptr Impl_; }; +/** + * The builder for the metrics configuration for the ALTER TABLE request. + */ +class TAlterMetricsSettingsBuilder { +public: + /** + * Start the process of building the metrics configuration for the ALTER TABLE request. + * + * @param[in] parent The instance of the ALTER TABLE request builder + */ + TAlterMetricsSettingsBuilder(TAlterTableSettings& parent); + + /** + * Configure the ALTER TABLE request to remove the metrics configuration. + * + * @return The instance of this builder + */ + TAlterMetricsSettingsBuilder& Drop(); + + /** + * Configure the ALTER TABLE request to use the given metrics configuration. + * + * @param[in] settings The metrics configuration to use + * + * @return The instance of this builder + */ + TAlterMetricsSettingsBuilder& Set(TMetricsSettings&& settings); + + /** + * Configure the ALTER TABLE request to use the given metrics configuration. + * + * @param[in] settings The metrics configuration to use + * + * @return The instance of this builder + */ + TAlterMetricsSettingsBuilder& Set(const TMetricsSettings& settings); + + /** + * Configure the ALTER TABLE request to use the given metrics level. + * + * @param[in] metricsLevel The metrics level + * + * @return The instance of this builder + */ + TAlterMetricsSettingsBuilder& Set(TMetricsSettings::EMetricsLevel metricsLevel); + + /** + * Complete the building process and finalize the metrics configuration. + * + * @return The instance of the ALTER TABLE request builder + */ + TAlterTableSettings& EndAlterMetricsSettings(); + +private: + TAlterTableSettings& Parent_; + + class TImpl; + std::shared_ptr Impl_; +}; + class TAlterAttributesBuilder { public: TAlterAttributesBuilder(TAlterTableSettings& parent) @@ -1739,6 +2070,8 @@ struct TAlterTableSettings : public TOperationRequestSettings& GetAlterMetricsSettings() const; + + /** + * Update the metrics configuration for the ALTER TABLE request. + * + * @param[in] settings The metrics configuration to use + * + * @return The instance of this builder + */ + TSelf& SetAlterMetricsSettings(const std::optional& settings); + TAlterAttributesBuilder BeginAlterAttributes() { return TAlterAttributesBuilder(*this); } diff --git a/include/ydb-cpp-sdk/client/table/table_enum.h b/include/ydb-cpp-sdk/client/table/table_enum.h index e9ed2d01123..9f156054cc8 100644 --- a/include/ydb-cpp-sdk/client/table/table_enum.h +++ b/include/ydb-cpp-sdk/client/table/table_enum.h @@ -42,6 +42,13 @@ enum class EIndexType { Unknown = std::numeric_limits::max() }; +enum class ECompactState { + Unspecified = 0, + InProgress = 1, + Done = 2, + Cancelled = 3, +}; + enum class EChangefeedMode { KeysOnly /* "KEYS_ONLY" */, Updates /* "UPDATES" */, diff --git a/include/ydb-cpp-sdk/client/topic/control_plane.h b/include/ydb-cpp-sdk/client/topic/control_plane.h index a74f230f045..e4e42f28064 100644 --- a/include/ydb-cpp-sdk/client/topic/control_plane.h +++ b/include/ydb-cpp-sdk/client/topic/control_plane.h @@ -94,6 +94,8 @@ class TConsumer { const std::map& GetAttributes() const; bool GetKeepMessagesOrder() const; TDuration GetDefaultProcessingTimeout() const; + TDuration GetReceiveMessageWaitTime() const; + TDuration GetReceiveMessageDelay() const; const TDeadLetterPolicy& GetDeadLetterPolicy() const; private: @@ -107,6 +109,8 @@ class TConsumer { bool KeepMessagesOrder_; TDuration DefaultProcessingTimeout_; TDeadLetterPolicy DeadLetterPolicy_; + TDuration ReceiveMessageWaitTime_; + TDuration ReceiveMessageDelay_; }; class TTopicStats { @@ -352,6 +356,8 @@ class TTopicDescription { uint64_t GetPartitionWriteBurstBytes() const; + bool GetContentBasedDeduplication() const; + const std::map& GetAttributes() const; const std::vector& GetConsumers() const; @@ -386,6 +392,7 @@ class TTopicDescription { std::vector Permissions_; std::vector EffectivePermissions_; std::optional MetricsLevel_; + bool ContentBasedDeduplication_; }; class TConsumerDescription { @@ -655,6 +662,8 @@ struct TConsumerSettings { FLUENT_SETTING_OPTIONAL(bool, KeepMessagesOrder); FLUENT_SETTING_OPTIONAL(TDuration, DefaultProcessingTimeout); + FLUENT_SETTING_OPTIONAL(TDuration, ReceiveMessageWaitTime); + FLUENT_SETTING_OPTIONAL(TDuration, ReceiveMessageDelay); FLUENT_SETTING(TDeadLetterPolicySettings, DeadLetterPolicy) FLUENT_SETTING(TAttributes, Attributes); @@ -729,6 +738,8 @@ struct TAlterConsumerSettings { FLUENT_SETTING(TAlterAttributes, AlterAttributes); FLUENT_SETTING_OPTIONAL(TDuration, DefaultProcessingTimeout); + FLUENT_SETTING_OPTIONAL(TDuration, ReceiveMessageWaitTime); + FLUENT_SETTING_OPTIONAL(TDuration, ReceiveMessageDelay); FLUENT_SETTING(TAlterDeadLetterPolicySettings, DeadLetterPolicy); TAlterConsumerAttributesBuilder BeginAlterAttributes() { @@ -778,6 +789,7 @@ struct TCreateTopicSettings : public TOperationRequestSettings; FLUENT_SETTING_OPTIONAL(TDuration, SetRetentionPeriod); + FLUENT_SETTING_OPTIONAL(bool, SetContentBasedDeduplication); FLUENT_SETTING_OPTIONAL_VECTOR(ECodec, SetSupportedCodecs); diff --git a/include/ydb-cpp-sdk/client/types/operation/operation.h b/include/ydb-cpp-sdk/client/types/operation/operation.h index 2909ffbaca9..9d156e63139 100644 --- a/include/ydb-cpp-sdk/client/types/operation/operation.h +++ b/include/ydb-cpp-sdk/client/types/operation/operation.h @@ -6,7 +6,6 @@ #include -#include #include #include diff --git a/src/api/grpc/ydb_secret_v1.proto b/src/api/grpc/ydb_secret_v1.proto new file mode 100644 index 00000000000..29f73681743 --- /dev/null +++ b/src/api/grpc/ydb_secret_v1.proto @@ -0,0 +1,11 @@ +syntax = "proto3"; + +package Ydb.Secret.V1; +option java_package = "com.yandex.ydb.secret.v1"; + +import "src/api/protos/ydb_secret.proto"; + +service SecretService { + // Describe secret command. + rpc DescribeSecret(Secret.DescribeSecretRequest) returns (Secret.DescribeSecretResponse); +} diff --git a/src/api/protos/ydb_auth.proto b/src/api/protos/ydb_auth.proto index d8dd4e795c6..0494b3c4854 100644 --- a/src/api/protos/ydb_auth.proto +++ b/src/api/protos/ydb_auth.proto @@ -4,12 +4,13 @@ option cc_enable_arenas = true; package Ydb.Auth; option java_package = "com.yandex.ydb.auth"; +import "src/api/protos/annotations/sensitive.proto"; import "src/api/protos/ydb_operation.proto"; message LoginRequest { Ydb.Operations.OperationParams operation_params = 1; string user = 2; - string password = 3; + string password = 3 [(Ydb.sensitive) = true]; } message LoginResponse { @@ -18,5 +19,5 @@ message LoginResponse { } message LoginResult { - string token = 1; + string token = 1 [(Ydb.sensitive) = true]; } diff --git a/src/api/protos/ydb_export.proto b/src/api/protos/ydb_export.proto index 1d6b327b7b9..262e95a36c6 100644 --- a/src/api/protos/ydb_export.proto +++ b/src/api/protos/ydb_export.proto @@ -39,7 +39,7 @@ message ExportToYtSettings { string host = 1 [(required) = true]; uint32 port = 2; - string token = 3 [(required) = true]; + string token = 3 [(required) = true, (Ydb.sensitive) = true]; repeated Item items = 4 [(size).ge = 1]; string description = 5 [(length).le = 128]; uint32 number_of_retries = 6; @@ -110,8 +110,8 @@ message ExportToS3Settings { string endpoint = 1 [(required) = true]; Scheme scheme = 2; // HTTPS if not specified string bucket = 3 [(required) = true]; - string access_key = 4 [(required) = true]; - string secret_key = 5 [(required) = true]; + string access_key = 4 [(required) = true, (Ydb.sensitive) = true]; + string secret_key = 5 [(required) = true, (Ydb.sensitive) = true]; repeated Item items = 6; string description = 7 [(length).le = 128]; uint32 number_of_retries = 8; diff --git a/src/api/protos/ydb_import.proto b/src/api/protos/ydb_import.proto index 9068c29b1d4..e4407799c82 100644 --- a/src/api/protos/ydb_import.proto +++ b/src/api/protos/ydb_import.proto @@ -1,6 +1,7 @@ syntax = "proto3"; option cc_enable_arenas = true; +import "src/api/protos/annotations/sensitive.proto"; import "src/api/protos/annotations/validation.proto"; import "src/api/protos/ydb_export.proto"; import "src/api/protos/ydb_operation.proto"; @@ -76,8 +77,8 @@ message ImportFromS3Settings { string endpoint = 1 [(required) = true]; Scheme scheme = 2; // HTTPS if not specified string bucket = 3 [(required) = true]; - string access_key = 4 [(required) = true]; - string secret_key = 5 [(required) = true]; + string access_key = 4 [(required) = true, (Ydb.sensitive) = true]; + string secret_key = 5 [(required) = true, (Ydb.sensitive) = true]; repeated Item items = 6; // Empty collection means import of all export objects string description = 7 [(length).le = 128]; uint32 number_of_retries = 8; @@ -145,21 +146,25 @@ message ImportFromS3Response { /// File system (FS) message ImportFromFsSettings { message Item { - /* YDB tables in FS are stored in a directory structure (see ydb_export.proto). - The directory contains: - * '/data_PartNumber', where 'PartNumber' represents the index of the part, starting at zero; - * '/scheme.pb' - object with information about scheme, indexes, etc; - * '/permissions.pb' - object with information about ACL and owner; - * '/metadata.json' - object with metadata about the backup. - The FS path can be either provided explicitly (relative to base_path) - Or, if the export contains the database objects list, you may specify the database object name, - and the FS prefix will be looked up in the database objects list by the import procedure - */ - string source_path = 1; + oneof Source { + /* YDB tables in FS are stored in a directory structure (see ydb_export.proto). + The directory contains: + * '/data_PartNumber', where 'PartNumber' represents the index of the part, starting at zero; + * '/scheme.pb' - object with information about scheme, indexes, etc; + * '/permissions.pb' - object with information about ACL and owner; + * '/metadata.json' - object with metadata about the backup. + */ + + // The FS path can be provided explicitly (relative to base_path) + string source_path = 1; + + // Or, if the export contains the database objects list, you may specify the database object name, and the FS path will be looked up in the database objects list by the import procedure + string source_path_db = 3; + } // Database path to a database object to import the item to // Resolved relative to the default destination_path - // May be omitted if the item's source_path is specified, in this case will be taken equal to it + // May be omitted if the item's source_path_db is specified, in this case will be taken equal to it string destination_path = 2; } @@ -234,8 +239,8 @@ message ListObjectsInS3ExportSettings { string endpoint = 1 [(required) = true]; ImportFromS3Settings.Scheme scheme = 2; // HTTPS if not specified string bucket = 3 [(required) = true]; - string access_key = 4 [(required) = true]; - string secret_key = 5 [(required) = true]; + string access_key = 4 [(required) = true, (Ydb.sensitive) = true]; + string secret_key = 5 [(required) = true, (Ydb.sensitive) = true]; repeated Item items = 6; uint32 number_of_retries = 7; diff --git a/src/api/protos/ydb_query.proto b/src/api/protos/ydb_query.proto index c795eeaafa4..db248fb13f1 100644 --- a/src/api/protos/ydb_query.proto +++ b/src/api/protos/ydb_query.proto @@ -45,18 +45,25 @@ message AttachSessionRequest { string session_id = 1 [(Ydb.length).le = 1024]; } +// Sent by server when this session is being gracefully terminated. +// Server will attempt to complete in-flight requests within the soft deadline; +// requests still running at the hard deadline will be cancelled. +// Client should not reuse the session after receiving this hint. message SessionShutdownHint { } +// Sent by server when the node is being gracefully shut down. +// Server will attempt to complete in-flight requests within the soft deadline; +// requests still running at the hard deadline will be cancelled. +// Client should not create new sessions on this node. message NodeShutdownHint { } message SessionState { StatusIds.StatusCode status = 1; repeated Ydb.Issue.IssueMessage issues = 2; - - // The reason the session is ending, for SDK-side handling - oneof session_hint { + + oneof session_hint { SessionShutdownHint session_shutdown = 3; NodeShutdownHint node_shutdown = 4; } diff --git a/src/api/protos/ydb_scheme.proto b/src/api/protos/ydb_scheme.proto index c3a93c33565..e3d6029e00c 100644 --- a/src/api/protos/ydb_scheme.proto +++ b/src/api/protos/ydb_scheme.proto @@ -69,6 +69,7 @@ message Entry { BACKUP_COLLECTION = 22; TRANSFER = 23; SYS_VIEW = 24; + SECRET = 25; STREAMING_QUERY = 26; } diff --git a/src/api/protos/ydb_secret.proto b/src/api/protos/ydb_secret.proto new file mode 100644 index 00000000000..f28897a05b4 --- /dev/null +++ b/src/api/protos/ydb_secret.proto @@ -0,0 +1,32 @@ +syntax = "proto3"; +option cc_enable_arenas = true; + +package Ydb.Secret; +option java_package = "com.yandex.ydb.secret"; + +import "src/api/protos/annotations/validation.proto"; +import "src/api/protos/ydb_operation.proto"; +import "src/api/protos/ydb_scheme.proto"; + +// Describe secret request sent from client to server. +message DescribeSecretRequest { + Ydb.Operations.OperationParams operation_params = 1; + + // Secret path + string path = 2; +} + +// Describe secret response sent from server to client. +// If the secret does not exist then response status will be "SCHEME_ERROR". +message DescribeSecretResponse { + Ydb.Operations.Operation operation = 1; +} + +// Describe secret result message that will be inside DescribeSecretResponse.operation. +message DescribeSecretResult { + // Description of scheme object. + Ydb.Scheme.Entry self = 1; + + // Internal version of the secret object + int64 version = 2 [(Ydb.value) = ">= 0"]; +} diff --git a/src/api/protos/ydb_table.proto b/src/api/protos/ydb_table.proto index 05102c9e7ba..d9dd173266a 100644 --- a/src/api/protos/ydb_table.proto +++ b/src/api/protos/ydb_table.proto @@ -59,6 +59,11 @@ message GlobalIndexSettings { // Partitioning settings for the table that implements the index. PartitioningSettings partitioning_settings = 3; ReadReplicasSettings read_replicas_settings = 4; + + /** + * The metrics configuration for the given index. + */ + optional MetricsSettings metrics_settings = 5; } message VectorIndexSettings { @@ -326,6 +331,7 @@ message LocalBloomNgramFilterIndex { uint32 filter_size_bytes = 3; uint32 records_count = 4; optional bool case_sensitive = 5; + optional double false_positive_probability = 6; } message GlobalJsonIndex { @@ -680,6 +686,19 @@ message ColumnCompression { optional int32 compression_level = 2; } +message ColumnEncoding { + message Off { + } + + message Dictionary { + } + + oneof encoding_settings { + Off off = 1; + Dictionary dictionary = 2; + } +} + message ColumnMeta { // Name of column string name = 1; @@ -696,6 +715,7 @@ message ColumnMeta { google.protobuf.NullValue empty_default = 7; } optional ColumnCompression compression = 8; + repeated ColumnEncoding encoding = 9; } message EvictionToExternalStorageSettings { @@ -880,6 +900,48 @@ message ReadReplicasSettings { reserved 3; // cluster_replicas_settings (part of oneof settings) } +/** + * The parameters for the detailed metrics for the given table. + */ +message MetricsSettings { + /** + * The level at which metrics are aggregated and reported. + */ + enum MetricsLevel { + /** + * The metrics level is not specified. + */ + METRICS_LEVEL_UNSPECIFIED = 0; + + /** + * All metrics are disabled. + */ + METRICS_LEVEL_DISABLED = 1; + + /** + * Metrics are aggregated and reported for the entire database. + */ + METRICS_LEVEL_DATABASE = 2; + + /** + * Metrics are aggregated and reported for individual tables. + */ + METRICS_LEVEL_TABLE = 3; + + /** + * Metrics are aggregated and reported for individual partitions. + */ + METRICS_LEVEL_PARTITION = 4; + } + + /** + * The level at which metrics are aggregated and reported. + * + * @note If this value is not set, METRICS_LEVEL_DATABASE is assumed. + */ + MetricsLevel metrics_level = 1; +} + enum StoreType { STORE_TYPE_UNSPECIFIED = 0; STORE_TYPE_ROW = 1; @@ -929,6 +991,11 @@ message CreateTableRequest { bool temporary = 19; // Is table column or row oriented StoreType store_type = 20; + + /** + * The metrics configuration for the given table. + */ + optional MetricsSettings metrics_settings = 21; } message CreateTableResponse { @@ -1033,6 +1100,14 @@ message AlterTableRequest { reserved 22, 23; // Start compaction for table CompactItem compact = 24; + + /** + * Set, update or remove the metrics configuration for the given table. + */ + oneof metrics_settings_action { + MetricsSettings set_metrics_settings = 25; + google.protobuf.Empty drop_metrics_settings = 26; + } } message AlterTableResponse { @@ -1159,6 +1234,11 @@ message DescribeTableResult { bool temporary = 17; // Is table column or row oriented StoreType store_type = 18; + + /** + * The metrics configuration for the given table. + */ + optional MetricsSettings metrics_settings = 19; } message Query { diff --git a/src/api/protos/ydb_topic.proto b/src/api/protos/ydb_topic.proto index f3fdf7c7a8d..05c8faf088f 100644 --- a/src/api/protos/ydb_topic.proto +++ b/src/api/protos/ydb_topic.proto @@ -840,12 +840,17 @@ message AlterStreamingConsumerType { message SharedConsumerType { // If this is a shared consumer, then the value true means that the order of messages within the same message group will be preserved. // For streaming consumers, the order of messages is always preserved. - optional bool keep_messages_order = 10; + bool keep_messages_order = 10; // The duration of blocking messages for its processing. If the message is not committed during this time, // it will be returned to the queue and sent for re-processing. - optional google.protobuf.Duration default_processing_timeout = 11; - // The dedad letter policy for this consumer. - optional DeadLetterPolicy dead_letter_policy = 12; + google.protobuf.Duration default_processing_timeout = 11; + // The dead letter policy for this consumer. + DeadLetterPolicy dead_letter_policy = 12; + // Time to wait for message if queue empty and if this parameter is not set in receive request + google.protobuf.Duration receive_message_wait_time = 13; + // Default delay before messages become visible to ReceiveMessage after write (Amazon SQS DelaySeconds). + // Only used for shared (SQS-compatible) consumers. If unset or zero, there is no delivery delay. + google.protobuf.Duration receive_message_delay = 14; } message AlterSharedConsumerType { @@ -855,6 +860,13 @@ message AlterSharedConsumerType { // Change dead letter policy. AlterDeadLetterPolicy alter_dead_letter_policy = 11; + // Time to wait for message if queue empty and if this parameter is not set in receive request + google.protobuf.Duration set_receive_message_wait_time = 12; + + // Default delay before messages become visible after write (Amazon SQS DelaySeconds). Shared consumer only. + // While this duration has not elapsed since write, ReceiveMessage must not return the message. + // Optional: if unset in AlterConsumer.alter_shared_consumer_type, the previous value is unchanged. + google.protobuf.Duration set_receive_message_delay = 13; } // Consumer description. @@ -1084,6 +1096,9 @@ message CreateTopicRequest { // Metrics level. If the level is unset, use database setting. optional uint32 metrics_level = 13; + + // Enable content-based deduplication for the topic. + bool content_based_deduplication = 14; } // Create topic response sent from server to client. @@ -1220,6 +1235,9 @@ message DescribeTopicResult { // Metrics level. optional uint32 metrics_level = 16; + + // Is content-based deduplication enabled for the topic. + bool content_based_deduplication = 17; } // Describe partition request sent from client to server. @@ -1397,6 +1415,9 @@ message AlterTopicRequest { uint32 set_metrics_level = 15; google.protobuf.Empty reset_metrics_level = 16; } + + // Enable content-based deduplication for the topic. + optional bool set_content_based_deduplication = 17; } // Update topic response sent from server to client. diff --git a/src/client/CMakeLists.txt b/src/client/CMakeLists.txt index e7f448e8675..0ef5f3fcb42 100644 --- a/src/client/CMakeLists.txt +++ b/src/client/CMakeLists.txt @@ -22,6 +22,7 @@ add_subdirectory(rate_limiter) add_subdirectory(resources) add_subdirectory(result) add_subdirectory(scheme) +add_subdirectory(secret) add_subdirectory(ss_tasks) add_subdirectory(table) add_subdirectory(topic) diff --git a/src/client/export/export.cpp b/src/client/export/export.cpp index 3e637b8f0ea..6ac28469894 100644 --- a/src/client/export/export.cpp +++ b/src/client/export/export.cpp @@ -126,6 +126,7 @@ TExportToFsResponse::TExportToFsResponse(TStatus&& status, Ydb::Operations::Oper Metadata_.Settings.Description(metadata.settings().description()); Metadata_.Settings.NumberOfRetries(metadata.settings().number_of_retries()); + Metadata_.Settings.IncludeIndexData(metadata.settings().include_index_data()); if (!metadata.settings().compression().empty()) { Metadata_.Settings.Compression(metadata.settings().compression()); @@ -297,6 +298,8 @@ TFuture TExportClient::ExportToFs(const TExportToFsSettings request.mutable_settings()->set_source_path(settings.SourcePath_.value()); } + request.mutable_settings()->set_include_index_data(settings.IncludeIndexData_); + if (settings.EncryptionAlgorithm_.empty() != settings.SymmetricKey_.empty()) { throw TContractViolation("Encryption algorithm and symmetric key must be set together"); } diff --git a/src/client/federated_topic/impl/federated_read_session.h b/src/client/federated_topic/impl/federated_read_session.h index c93b9722bda..661a16f1aa7 100644 --- a/src/client/federated_topic/impl/federated_read_session.h +++ b/src/client/federated_topic/impl/federated_read_session.h @@ -46,7 +46,7 @@ class TEventFederator { using T = std::decay_t; if constexpr (std::is_same_v) { - return Federate(std::move(event), std::move(fps)); + return Federate(std::forward(event), std::move(fps)); } else if constexpr (std::is_same_v) { psPtr = std::visit([](auto&& arg) -> NTopic::TPartitionSession::TPtr { using T = std::decay_t; @@ -58,7 +58,7 @@ class TEventFederator { }, event); if (!psPtr) { // TSessionClosedEvent - return Federate(std::move(event), std::move(fps)); + return Federate(std::forward(event), std::move(fps)); } } else { psPtr = event.GetPartitionSession(); @@ -71,12 +71,12 @@ class TEventFederator { } fps = FederatedPartitionSessions[psPtr.Get()]; - if constexpr (std::is_same_v) { + if constexpr (std::is_same_v) { FederatedPartitionSessions.erase(psPtr.Get()); } } - return Federate(std::move(event), std::move(fps)); + return Federate(std::forward(event), std::move(fps)); } template @@ -95,11 +95,11 @@ class TEventFederator { using T = std::decay_t; std::optional ev; if constexpr (std::is_same_v) { - ev = TReadSessionEvent::TDataReceivedEvent(std::move(arg), std::move(fps)); + ev = TReadSessionEvent::TDataReceivedEvent(std::forward(arg), std::move(fps)); } else if constexpr (std::is_same_v) { - ev = std::move(arg); + ev = std::forward(arg); } else { - ev = TReadSessionEvent::TFederated(std::move(arg), std::move(fps)); + ev = TReadSessionEvent::TFederated(std::forward(arg), std::move(fps)); } return *ev; }, diff --git a/src/client/federated_topic/impl/federated_topic.cpp b/src/client/federated_topic/impl/federated_topic.cpp index 33f39ec23a1..3c9813ca540 100644 --- a/src/client/federated_topic/impl/federated_topic.cpp +++ b/src/client/federated_topic/impl/federated_topic.cpp @@ -8,12 +8,12 @@ namespace NYdb::inline V3::NFederatedTopic { using TReadOriginalSettings = TFederatedReadSessionSettings::TReadOriginalSettings; TReadOriginalSettings& TReadOriginalSettings::AddDatabase(const std::string& database) { - Databases.insert(std::move(database)); + Databases.insert(database); return *this; } TReadOriginalSettings& TReadOriginalSettings::AddDatabases(const std::vector& databases) { - std::move(std::begin(databases), std::end(databases), std::inserter(Databases, Databases.end())); + Databases.insert(databases.begin(), databases.end()); return *this; } @@ -33,7 +33,7 @@ TFederatedReadSessionSettings& TFederatedReadSessionSettings::ReadMirrored(const ythrow TContractViolation("Reading from local database not supported, use specific database"); } DatabasesToReadFrom.clear(); - DatabasesToReadFrom.insert(std::move(database)); + DatabasesToReadFrom.insert(database); ReadMirroredEnabled = true; return *this; } diff --git a/src/client/federated_topic/impl/federated_write_session.cpp b/src/client/federated_topic/impl/federated_write_session.cpp index a6063ea208e..d7cc815dd24 100644 --- a/src/client/federated_topic/impl/federated_write_session.cpp +++ b/src/client/federated_topic/impl/federated_write_session.cpp @@ -135,7 +135,7 @@ std::shared_ptr TFederatedWriteSessionImpl::OpenSubsessio return; } - Y_ABORT_UNLESS(!self->PendingToken.has_value()); + Y_ABORT_UNLESS(!self->PendingToken.has_value(), "Continuation token is required"); self->PendingToken = std::move(ev.ContinuationToken); self->MaybeWriteImpl(); } @@ -160,7 +160,7 @@ std::shared_ptr TFederatedWriteSessionImpl::OpenSubsessio } } - self->ClientEventsQueue->PushEvent(std::move(ev)); + self->ClientEventsQueue->PushEvent(ev); self->IssueTokenIfAllowed(); } }) diff --git a/src/client/iam/iam.cpp b/src/client/iam/iam.cpp index 5be65d306a0..9a152b85561 100644 --- a/src/client/iam/iam.cpp +++ b/src/client/iam/iam.cpp @@ -8,6 +8,8 @@ #include #include +#include + using namespace yandex::cloud::iam::v1; namespace NYdb::inline V3 { @@ -24,10 +26,21 @@ class TIAMCredentialsProvider : public ICredentialsProvider { } std::string GetAuthInfo() const override { - if (TInstant::Now() >= NextTicketUpdate_) { + std::string ticket; + TInstant nextTicketUpdate; + { + std::lock_guard lock(Lock_); + ticket = Ticket_; + nextTicketUpdate = NextTicketUpdate_; + } + if (TInstant::Now() >= nextTicketUpdate) { GetTicket(); + { + std::lock_guard lock(Lock_); + ticket = Ticket_; + } } - return Ticket_; + return ticket; } bool IsValid() const override { @@ -37,6 +50,7 @@ class TIAMCredentialsProvider : public ICredentialsProvider { private: TSimpleHttpClient HttpClient_; std::string Request_; + mutable std::mutex Lock_; mutable std::string Ticket_; mutable TInstant NextTicketUpdate_; TDuration RefreshPeriod_; @@ -52,21 +66,41 @@ class TIAMCredentialsProvider : public ICredentialsProvider { auto respMap = resp.GetMap(); + std::string ticket; if (auto it = respMap.find("access_token"); it == respMap.end()) ythrow yexception() << "Result doesn't contain access_token"; - else if (std::string ticket = it->second.GetStringSafe(); ticket.empty()) + else if (ticket = it->second.GetStringSafe(); ticket.empty()) ythrow yexception() << "Got empty ticket"; - else - Ticket_ = std::move(ticket); - if (auto it = respMap.find("expires_in"); it == respMap.end()) - ythrow yexception() << "Result doesn't contain expires_in"; - else { - const TDuration expiresIn = TDuration::Seconds(it->second.GetUInteger()) / 2; - - const auto interval = std::max(std::min(expiresIn, RefreshPeriod_), TDuration::MilliSeconds(100)); + const auto now = TInstant::Now(); + TInstant nextUpdate; + TDuration expiresIn; + if (auto it = respMap.find("expires_in"); it != respMap.end()) { + auto seconds = it->second.GetUInteger(); + if (seconds > 0) { + expiresIn = TDuration::Seconds(seconds); + } + } else if (auto it = respMap.find("expiry"); it != respMap.end()) { + try { + TInstant expiry; + if (TInstant::TryParseIso8601(it->second.GetStringSafe(), expiry) && expiry > now) { + expiresIn = expiry - now; + } + } catch (...) { + } + } + if (expiresIn > TDuration::Zero()) { + const auto halfLife = expiresIn / 2; + const auto interval = std::max(std::min(halfLife, RefreshPeriod_), TDuration::MilliSeconds(100)); + nextUpdate = now + interval; + } else { + nextUpdate = now + std::min(RefreshPeriod_, TDuration::Minutes(30)); + } - NextTicketUpdate_ = TInstant::Now() + interval; + { + std::lock_guard lock(Lock_); + Ticket_ = std::move(ticket); + NextTicketUpdate_ = nextUpdate; } } catch (...) { } diff --git a/src/client/impl/internal/CMakeLists.txt b/src/client/impl/internal/CMakeLists.txt index 6fd29a5c4a2..de7f2d5a669 100644 --- a/src/client/impl/internal/CMakeLists.txt +++ b/src/client/impl/internal/CMakeLists.txt @@ -5,6 +5,7 @@ add_subdirectory(logger) add_subdirectory(make_request) add_subdirectory(plain_status) add_subdirectory(rpc_request_settings) +add_subdirectory(scheme_helpers) add_subdirectory(retry) add_subdirectory(thread_pool) add_subdirectory(value_helpers) diff --git a/src/client/impl/internal/grpc_connections/grpc_connections.cpp b/src/client/impl/internal/grpc_connections/grpc_connections.cpp index 5ca5d78998f..cd74acc19af 100644 --- a/src/client/impl/internal/grpc_connections/grpc_connections.cpp +++ b/src/client/impl/internal/grpc_connections/grpc_connections.cpp @@ -371,21 +371,22 @@ TAsyncListEndpointsResult TGRpcConnectionsImpl::GetEndpoints(TDbDriverStatePtr d if (strong && result.DiscoveryStatus.IsTransportError()) { strong->StatCollector.IncDiscoveryFailDueTransportError(); } - return NThreading::MakeFuture(MutateDiscovery(std::move(result), *strong)); + return NThreading::MakeFuture(MutateDiscovery(std::move(result), strong.get())); }); } -TListEndpointsResult TGRpcConnectionsImpl::MutateDiscovery(TListEndpointsResult result, const TDbDriverState& dbDriverState) { +TListEndpointsResult TGRpcConnectionsImpl::MutateDiscovery(TListEndpointsResult result, const TDbDriverState* dbDriverState) { std::lock_guard lock(ExtensionsLock_); - if (!DiscoveryMutatorCb) + if (!DiscoveryMutatorCb || !dbDriverState) { return result; + } auto endpoint = result.DiscoveryStatus.Endpoint; auto ydbStatus = NYdb::TStatus(std::move(result.DiscoveryStatus)); auto aux = IDiscoveryMutatorApi::TAuxInfo { - .Database = dbDriverState.Database, - .DiscoveryEndpoint = dbDriverState.DiscoveryEndpoint + .Database = dbDriverState->Database, + .DiscoveryEndpoint = dbDriverState->DiscoveryEndpoint }; ydbStatus = DiscoveryMutatorCb(&result.Result, std::move(ydbStatus), aux); diff --git a/src/client/impl/internal/grpc_connections/grpc_connections.h b/src/client/impl/internal/grpc_connections/grpc_connections.h index 630dc051332..6e73fcba559 100644 --- a/src/client/impl/internal/grpc_connections/grpc_connections.h +++ b/src/client/impl/internal/grpc_connections/grpc_connections.h @@ -565,7 +565,7 @@ class TGRpcConnectionsImpl } TAsyncListEndpointsResult GetEndpoints(TDbDriverStatePtr dbState) override; - TListEndpointsResult MutateDiscovery(TListEndpointsResult result, const TDbDriverState& dbDriverState); + TListEndpointsResult MutateDiscovery(TListEndpointsResult result, const TDbDriverState* dbDriverState); #ifndef YDB_GRPC_BYPASS_CHANNEL_POOL void DeleteChannels(const std::vector& endpoints) override { diff --git a/src/client/impl/internal/scheme_helpers/CMakeLists.txt b/src/client/impl/internal/scheme_helpers/CMakeLists.txt new file mode 100644 index 00000000000..c8863f865fd --- /dev/null +++ b/src/client/impl/internal/scheme_helpers/CMakeLists.txt @@ -0,0 +1,12 @@ +_ydb_sdk_add_library(impl-internal-scheme_helpers) + +target_link_libraries(impl-internal-scheme_helpers PUBLIC + api-protos + client-ydb_driver +) + +target_sources(impl-internal-scheme_helpers PRIVATE + helpers.cpp +) + +_ydb_sdk_install_targets(TARGETS impl-internal-scheme_helpers) diff --git a/src/client/impl/internal/scheme_helpers/helpers.cpp b/src/client/impl/internal/scheme_helpers/helpers.cpp new file mode 100644 index 00000000000..885fd928626 --- /dev/null +++ b/src/client/impl/internal/scheme_helpers/helpers.cpp @@ -0,0 +1,81 @@ +#include "helpers.h" + +#include + +#include + +namespace NYdb::inline V3 { +namespace NScheme { + +namespace { + +::Ydb::Scheme::Entry::Type SchemeEntryTypeToProto(ESchemeEntryType type) { + switch (type) { + case ESchemeEntryType::Directory: + return ::Ydb::Scheme::Entry::DIRECTORY; + case ESchemeEntryType::Table: + return ::Ydb::Scheme::Entry::TABLE; + case ESchemeEntryType::ColumnTable: + return ::Ydb::Scheme::Entry::COLUMN_TABLE; + case ESchemeEntryType::PqGroup: + return ::Ydb::Scheme::Entry::PERS_QUEUE_GROUP; + case ESchemeEntryType::SubDomain: + return ::Ydb::Scheme::Entry::DATABASE; + case ESchemeEntryType::RtmrVolume: + return ::Ydb::Scheme::Entry::RTMR_VOLUME; + case ESchemeEntryType::BlockStoreVolume: + return ::Ydb::Scheme::Entry::BLOCK_STORE_VOLUME; + case ESchemeEntryType::CoordinationNode: + return ::Ydb::Scheme::Entry::COORDINATION_NODE; + case ESchemeEntryType::Sequence: + return ::Ydb::Scheme::Entry::SEQUENCE; + case ESchemeEntryType::Replication: + return ::Ydb::Scheme::Entry::REPLICATION; + case ESchemeEntryType::Topic: + return ::Ydb::Scheme::Entry::TOPIC; + case ESchemeEntryType::ColumnStore: + return ::Ydb::Scheme::Entry::COLUMN_STORE; + case ESchemeEntryType::ExternalTable: + return ::Ydb::Scheme::Entry::EXTERNAL_TABLE; + case ESchemeEntryType::ExternalDataSource: + return ::Ydb::Scheme::Entry::EXTERNAL_DATA_SOURCE; + case ESchemeEntryType::View: + return ::Ydb::Scheme::Entry::VIEW; + case ESchemeEntryType::ResourcePool: + return ::Ydb::Scheme::Entry::RESOURCE_POOL; + case ESchemeEntryType::BackupCollection: + return ::Ydb::Scheme::Entry::BACKUP_COLLECTION; + case ESchemeEntryType::SysView: + return ::Ydb::Scheme::Entry::SYS_VIEW; + case ESchemeEntryType::Transfer: + return ::Ydb::Scheme::Entry::TRANSFER; + case ESchemeEntryType::StreamingQuery: + return ::Ydb::Scheme::Entry::STREAMING_QUERY; + case ESchemeEntryType::Secret: + return ::Ydb::Scheme::Entry::SECRET; + case ESchemeEntryType::Unknown: + default: + return ::Ydb::Scheme::Entry::TYPE_UNSPECIFIED; + } +} + +} // namespace + +void SchemeEntryToProto(const TSchemeEntry& entry, ::Ydb::Scheme::Entry* proto) { + proto->set_name(TStringType{entry.Name}); + proto->set_owner(TStringType{entry.Owner}); + proto->set_type(SchemeEntryTypeToProto(entry.Type)); + proto->set_size_bytes(entry.SizeBytes); + auto& timestamp = *proto->mutable_created_at(); + timestamp.set_plan_step(entry.CreatedAt.PlanStep); + timestamp.set_tx_id(entry.CreatedAt.TxId); + for (const auto& permission : entry.Permissions) { + permission.SerializeTo(*proto->add_permissions()); + } + for (const auto& permission : entry.EffectivePermissions) { + permission.SerializeTo(*proto->add_effective_permissions()); + } +} + +} // namespace NScheme +} // namespace NYdb diff --git a/src/client/impl/internal/scheme_helpers/helpers.h b/src/client/impl/internal/scheme_helpers/helpers.h index 5a4c8528b19..30b3e5e4cbf 100644 --- a/src/client/impl/internal/scheme_helpers/helpers.h +++ b/src/client/impl/internal/scheme_helpers/helpers.h @@ -15,4 +15,10 @@ inline void PermissionToSchemeEntry(const TFrom& from, std::vectorAdd(); - protoItem.set_source_path(item.Src); + if (!item.Src.empty()) { + protoItem.set_source_path(item.Src); + } + if (!item.SrcPath.empty()) { + protoItem.set_source_path_db(item.SrcPath); + } protoItem.set_destination_path(item.Dst); } @@ -354,6 +364,10 @@ TAsyncImportFromFsResponse TImportClient::ImportFromFs(const TImportFromFsSettin settingsProto.set_skip_checksum_validation(settings.SkipChecksumValidation_.value()); } + if (settings.DestinationPath_) { + settingsProto.set_destination_path(TStringType{settings.DestinationPath_.value()}); + } + settingsProto.set_index_population_mode(TProtoAccessor::GetProto(settings.IndexPopulationMode_)); for (const std::string& excludeRegexp : settings.ExcludeRegexp_) { diff --git a/src/client/operation/operation.cpp b/src/client/operation/operation.cpp index 96dd7d7b2a2..aaeb5791fe6 100644 --- a/src/client/operation/operation.cpp +++ b/src/client/operation/operation.cpp @@ -85,4 +85,10 @@ NThreading::TFuture> TOperati return List("scriptexec", pageSize, pageToken); } +template NThreading::TFuture TOperationClient::Get(const TOperation::TOperationId& id); +template <> +NThreading::TFuture> TOperationClient::List(std::uint64_t pageSize, const std::string& pageToken) { + return List("compaction", pageSize, pageToken); +} + } // namespace NYdb::NOperation diff --git a/src/client/persqueue_public/impl/read_session.cpp b/src/client/persqueue_public/impl/read_session.cpp index 2acceb9cd84..bbb8f06e996 100644 --- a/src/client/persqueue_public/impl/read_session.cpp +++ b/src/client/persqueue_public/impl/read_session.cpp @@ -741,7 +741,7 @@ class TGracefulReleasingSimpleDataHandlers : public TThrRefBase { PartitionStreamToUncommittedOffsets.erase(partitionStreamId); event.Confirm(); } else { - UnconfirmedDestroys.emplace(partitionStreamId, std::move(event)); + UnconfirmedDestroys.emplace(partitionStreamId, event); } } diff --git a/src/client/scheme/CMakeLists.txt b/src/client/scheme/CMakeLists.txt index cb84b44bb23..4d22368097e 100644 --- a/src/client/scheme/CMakeLists.txt +++ b/src/client/scheme/CMakeLists.txt @@ -4,6 +4,7 @@ target_link_libraries(client-ydb_scheme PUBLIC yutil enum_serialization_runtime impl-internal-make_request + impl-internal-scheme_helpers client-ydb_common_client-impl client-ydb_driver ) diff --git a/src/client/scheme/scheme.cpp b/src/client/scheme/scheme.cpp index 3377486f962..32d30251571 100644 --- a/src/client/scheme/scheme.cpp +++ b/src/client/scheme/scheme.cpp @@ -117,6 +117,8 @@ static ESchemeEntryType ConvertProtoEntryType(::Ydb::Scheme::Entry::Type entry) return ESchemeEntryType::Transfer; case ::Ydb::Scheme::Entry::STREAMING_QUERY: return ESchemeEntryType::StreamingQuery; + case ::Ydb::Scheme::Entry::SECRET: + return ESchemeEntryType::Secret; default: return ESchemeEntryType::Unknown; } diff --git a/src/client/secret/CMakeLists.txt b/src/client/secret/CMakeLists.txt new file mode 100644 index 00000000000..6a62cb3df61 --- /dev/null +++ b/src/client/secret/CMakeLists.txt @@ -0,0 +1,16 @@ +_ydb_sdk_add_library(client-secret) + +target_link_libraries(client-secret PUBLIC + api-grpc + impl-internal-make_request + client-ydb_common_client-impl + client-ydb_driver + client-ydb_scheme +) + +target_sources(client-secret PRIVATE + secret.cpp + out.cpp +) + +_ydb_sdk_make_client_component(Secret client-secret) diff --git a/src/client/secret/out.cpp b/src/client/secret/out.cpp new file mode 100644 index 00000000000..03b86b51996 --- /dev/null +++ b/src/client/secret/out.cpp @@ -0,0 +1,5 @@ +#include + +Y_DECLARE_OUT_SPEC(, NYdb::NSecret::TDescribeSecretResult, o, x) { + return x.Out(o); +} diff --git a/src/client/secret/secret.cpp b/src/client/secret/secret.cpp new file mode 100644 index 00000000000..0fe2f7a6a68 --- /dev/null +++ b/src/client/secret/secret.cpp @@ -0,0 +1,92 @@ +#include + +#define INCLUDE_YDB_INTERNAL_H +#include +#include +#undef INCLUDE_YDB_INTERNAL_H + +#include +#include +#include + +namespace NYdb::inline V3 { +namespace NSecret { + +class TSecretClient::TImpl : public TClientImplCommon { +public: + TImpl(std::shared_ptr&& connections, const TCommonClientSettings& settings) + : TClientImplCommon(std::move(connections), settings) {} + + TAsyncDescribeSecretResult DescribeSecret(const std::string& path, const NScheme::TDescribePathSettings& settings) { + auto request = MakeOperationRequest<::Ydb::Secret::DescribeSecretRequest>(settings); + request.set_path(TStringType{path}); + + auto promise = NThreading::NewPromise(); + auto extractor = [promise](google::protobuf::Any* any, TPlainStatus status) mutable { + ::Ydb::Secret::DescribeSecretResult result; + if (any) { + any->UnpackTo(&result); + } + NScheme::TSchemeEntry entry(result.self()); + promise.SetValue(TDescribeSecretResult( + TStatus(std::move(status)), + std::move(entry), + static_cast(result.version()))); + }; + + Connections_->RunDeferred<::Ydb::Secret::V1::SecretService, ::Ydb::Secret::DescribeSecretRequest, ::Ydb::Secret::DescribeSecretResponse>( + std::move(request), + extractor, + &::Ydb::Secret::V1::SecretService::Stub::AsyncDescribeSecret, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings)); + + return promise.GetFuture(); + } +}; + +TDescribeSecretResult::TDescribeSecretResult(TStatus&& status, NScheme::TSchemeEntry&& entry, uint64_t version) + : TStatus(std::move(status)) + , Entry_(std::move(entry)) + , Version_(version) +{} + +const NScheme::TSchemeEntry& TDescribeSecretResult::GetEntry() const { + CheckStatusOk("TDescribeSecretResult::GetEntry"); + return Entry_; +} + +const std::string& TDescribeSecretResult::GetName() const { + CheckStatusOk("TDescribeSecretResult::GetName"); + return Entry_.Name; +} + +uint64_t TDescribeSecretResult::GetVersion() const { + CheckStatusOk("TDescribeSecretResult::GetVersion"); + return Version_; +} + +void TDescribeSecretResult::Out(IOutputStream& out) const { + if (IsSuccess()) { + Entry_.Out(out); + } else { + TStatus::Out(out); + } +} + +void TDescribeSecretResult::SerializeTo(::Ydb::Scheme::Entry* proto) const { + CheckStatusOk("TDescribeSecretResult::SerializeTo"); + NScheme::SchemeEntryToProto(Entry_, proto); +} + +TSecretClient::TSecretClient(const TDriver& driver, const TCommonClientSettings& settings) + : Impl_(new TImpl(CreateInternalInterface(driver), settings)) +{} + +TAsyncDescribeSecretResult TSecretClient::DescribeSecret(const std::string& path, const NScheme::TDescribePathSettings& settings) { + return Impl_->DescribeSecret(path, settings); +} + +} // namespace NSecret +} // namespace NYdb diff --git a/src/client/table/table.cpp b/src/client/table/table.cpp index b24afacee6a..b0e2c62387b 100644 --- a/src/client/table/table.cpp +++ b/src/client/table/table.cpp @@ -191,6 +191,38 @@ const TBuildIndexOperation::TMetadata& TBuildIndexOperation::Metadata() const { return Metadata_; } +TCompact::TCompact(bool cascade, uint32_t maxShardsInFlight) + : Cascade_(cascade) + , MaxShardsInFlight_(maxShardsInFlight) +{} + +TCompact::TCompact() + : TCompact(false, 1) +{} + +void TCompact::SerializeTo(Ydb::Table::CompactItem& proto) const { + proto.set_cascade(Cascade_); + proto.set_max_shards_in_flight(MaxShardsInFlight_); +} + +TCompactionOperation::TCompactionOperation(TStatus &&status, Ydb::Operations::Operation &&operation) + : TOperation(std::move(status), std::move(operation)) +{ + Ydb::Table::CompactMetadata metadata; + GetProto().metadata().UnpackTo(&metadata); + Metadata_.State = static_cast(metadata.state()); + Metadata_.Progress = metadata.progress(); + Metadata_.Path = metadata.path(); + Metadata_.Cascade = metadata.cascade(); + Metadata_.MaxInFlight = metadata.max_shards_in_flight(); + Metadata_.Total = metadata.shards_total(); + Metadata_.Done = metadata.shards_done(); +} + +const TCompactionOperation::TMetadata& TCompactionOperation::Metadata() const { + return Metadata_; +} + //////////////////////////////////////////////////////////////////////////////// class TPartitioningSettings::TImpl { @@ -379,6 +411,10 @@ class TTableDescription::TImpl { // read replicas settings ReadReplicasSettings_ = TReadReplicasSettings::FromProto(proto.read_replicas_settings()); + + if (proto.has_metrics_settings()) { + MetricsSettings_ = TMetricsSettings::FromProto(proto.metrics_settings()); + } } public: @@ -546,6 +582,15 @@ class TTableDescription::TImpl { ReadReplicasSettings_ = TReadReplicasSettings(mode, readReplicasCount); } + /** + * Set the metrics configuration for the given table. + * + * @param[in] metricsLevel The metrics level + */ + void SetMetricsSettings(TMetricsSettings::EMetricsLevel metricsLevel) { + MetricsSettings_ = TMetricsSettings(metricsLevel); + } + void SetStoreType(EStoreType type) { StoreType_ = type; } @@ -642,6 +687,15 @@ class TTableDescription::TImpl { return ReadReplicasSettings_; } + /** + * Return the metrics configuration for the given table. + * + * @return The metrics configuration + */ + const std::optional& GetMetricsSettings() const { + return MetricsSettings_; + } + private: Ydb::Table::DescribeTableResult Proto_; TStorageSettings StorageSettings_; @@ -664,6 +718,7 @@ class TTableDescription::TImpl { TPartitioningSettings PartitioningSettings_; std::optional KeyBloomFilter_; std::optional ReadReplicasSettings_; + std::optional MetricsSettings_; bool HasStorageSettings_ = false; bool HasPartitioningSettings_ = false; EStoreType StoreType_ = EStoreType::Row; @@ -868,6 +923,10 @@ void TTableDescription::SetReadReplicasSettings(TReadReplicasSettings::EMode mod Impl_->SetReadReplicasSettings(mode, readReplicasCount); } +void TTableDescription::SetMetricsSettings(TMetricsSettings::EMetricsLevel metricsLevel) { + Impl_->SetMetricsSettings(metricsLevel); +} + void TTableDescription::SetStoreType(EStoreType type) { Impl_->SetStoreType(type); } @@ -920,6 +979,10 @@ std::optional TTableDescription::GetReadReplicasSettings( return Impl_->GetReadReplicasSettings(); } +std::optional TTableDescription::GetMetricsSettings() const { + return Impl_->GetMetricsSettings(); +} + const Ydb::Table::DescribeTableResult& TTableDescription::GetProto() const { return Impl_->GetProto(); } @@ -1002,6 +1065,10 @@ void TTableDescription::SerializeTo(Ydb::Table::CreateTableRequest& request) con if (const auto& settings = Impl_->GetReadReplicasSettings()) { settings->SerializeTo(*request.mutable_read_replicas_settings()); } + + if (const auto& settings = Impl_->GetMetricsSettings()) { + settings->SerializeTo(*request.mutable_metrics_settings()); + } } //////////////////////////////////////////////////////////////////////////////// @@ -1345,7 +1412,7 @@ TTableBuilder& TTableBuilder::SetStorageSettings(const TStorageSettings& setting } TTableBuilder& TTableBuilder::AddColumnFamily(const TColumnFamilyDescription& desc) { - TableDescription_.AddColumnFamily(std::move(desc)); + TableDescription_.AddColumnFamily(desc); return *this; } @@ -1394,6 +1461,11 @@ TTableBuilder& TTableBuilder::SetReadReplicasSettings(TReadReplicasSettings::EMo return *this; } +TTableBuilder& TTableBuilder::SetMetricsSettings(TMetricsSettings::EMetricsLevel metricsLevel) { + TableDescription_.SetMetricsSettings(metricsLevel); + return *this; +} + TTableDescription TTableBuilder::Build() { return TableDescription_; } @@ -1810,6 +1882,21 @@ static Ydb::Table::AlterTableRequest MakeAlterTableProtoRequest( replSettings.SerializeTo(*request.mutable_set_read_replicas_settings()); } + if (const auto& metricsSettings = settings.GetAlterMetricsSettings()) { + switch (metricsSettings->GetAction()) { + case TAlterMetricsSettings::EAction::Set: + metricsSettings->GetMetricsSettings().SerializeTo(*request.mutable_set_metrics_settings()); + break; + case TAlterMetricsSettings::EAction::Drop: + request.mutable_drop_metrics_settings(); + break; + } + } + + if (settings.Compact_) { + settings.Compact_->SerializeTo(*request.mutable_compact()); + } + return request; } @@ -2424,6 +2511,91 @@ uint64_t TIndexDescription::GetSizeBytes() const { return SizeBytes_; } +TIndexDescription TIndexDescription::CreateGlobalIndex( + const std::string& name, + const std::vector& indexColumns, + const std::vector& dataColumns, + const TGlobalIndexSettings& indexTableSettings +) { + return TIndexDescription(name, EIndexType::GlobalSync, indexColumns, dataColumns, {indexTableSettings}); +} + +TIndexDescription TIndexDescription::CreateGlobalAsyncIndex( + const std::string& name, + const std::vector& indexColumns, + const std::vector& dataColumns, + const TGlobalIndexSettings& indexTableSettings +) { + return TIndexDescription(name, EIndexType::GlobalAsync, indexColumns, dataColumns, {indexTableSettings}); +} + +TIndexDescription TIndexDescription::CreateGlobalUniqueIndex( + const std::string& name, + const std::vector& indexColumns, + const std::vector& dataColumns, + const TGlobalIndexSettings& indexTableSettings +) { + return TIndexDescription(name, EIndexType::GlobalUnique, indexColumns, dataColumns, {indexTableSettings}); +} + +TIndexDescription TIndexDescription::CreateVectorIndex( + const std::string& name, + const std::string& vectorColumn, + const TKMeansTreeSettings& specializedIndexSettings, + const std::vector& dataColumns, + const TGlobalIndexSettings& levelTableSettings, + const TGlobalIndexSettings& postingTableSettings +) { + return TIndexDescription( + name, EIndexType::GlobalVectorKMeansTree, {vectorColumn}, dataColumns, + {levelTableSettings, postingTableSettings}, specializedIndexSettings + ); +} + +TIndexDescription TIndexDescription::CreatePrefixedVectorIndex( + const std::string& name, + const std::vector& indexColumns, + const TKMeansTreeSettings& specializedIndexSettings, + const std::vector& dataColumns, + const TGlobalIndexSettings& levelTableSettings, + const TGlobalIndexSettings& postingTableSettings, + const TGlobalIndexSettings& prefixTableSettings +) { + return TIndexDescription( + name, EIndexType::GlobalVectorKMeansTree, indexColumns, dataColumns, + {levelTableSettings, postingTableSettings, prefixTableSettings}, specializedIndexSettings + ); +} + +TIndexDescription TIndexDescription::CreateFulltextPlainIndex( + const std::string& name, + const std::vector& indexColumns, + const TFulltextIndexSettings& specializedIndexSettings, + const std::vector& dataColumns, + const TGlobalIndexSettings& indexTableSettings +) { + return TIndexDescription( + name, EIndexType::GlobalFulltextPlain, indexColumns, dataColumns, + {indexTableSettings}, specializedIndexSettings + ); +} + +TIndexDescription TIndexDescription::CreateFulltextRelevanceIndex( + const std::string& name, + const std::vector& indexColumns, + const TFulltextIndexSettings& specializedIndexSettings, + const std::vector& dataColumns, + const TGlobalIndexSettings& postingTableSettings, + const TGlobalIndexSettings& dictTableSettings, + const TGlobalIndexSettings& docsTableSettings, + const TGlobalIndexSettings& statsTableSettings +) { + return TIndexDescription( + name, EIndexType::GlobalFulltextRelevance, indexColumns, dataColumns, + {dictTableSettings, docsTableSettings, statsTableSettings, postingTableSettings}, specializedIndexSettings + ); +} + std::optional TReadReplicasSettings::FromProto(const Ydb::Table::ReadReplicasSettings& proto) { switch (proto.settings_case()) { case Ydb::Table::ReadReplicasSettings::kPerAzReadReplicasCount: @@ -2452,6 +2624,60 @@ void TReadReplicasSettings::SerializeTo(Ydb::Table::ReadReplicasSettings& proto) } } +std::optional TMetricsSettings::FromProto( + const Ydb::Table::MetricsSettings& proto +) { + switch (proto.metrics_level()) { + case Ydb::Table::MetricsSettings::METRICS_LEVEL_UNSPECIFIED: + return TMetricsSettings(TMetricsSettings::EMetricsLevel::Unspecified); + + case Ydb::Table::MetricsSettings::METRICS_LEVEL_DISABLED: + return TMetricsSettings(TMetricsSettings::EMetricsLevel::Disabled); + + case Ydb::Table::MetricsSettings::METRICS_LEVEL_DATABASE: + return TMetricsSettings(TMetricsSettings::EMetricsLevel::Database); + + case Ydb::Table::MetricsSettings::METRICS_LEVEL_TABLE: + return TMetricsSettings(TMetricsSettings::EMetricsLevel::Table); + + case Ydb::Table::MetricsSettings::METRICS_LEVEL_PARTITION: + return TMetricsSettings(TMetricsSettings::EMetricsLevel::Partition); + + default: + return TMetricsSettings(TMetricsSettings::EMetricsLevel::Unspecified); + } +} + +/** + * Read the metrics configuration to the corresponding protobuf message. + * + * @param[in,out] proto The message to write to + */ +void TMetricsSettings::SerializeTo(Ydb::Table::MetricsSettings& proto) const { + switch (GetMetricsLevel()) { + case EMetricsLevel::Unspecified: + proto.set_metrics_level(Ydb::Table::MetricsSettings::METRICS_LEVEL_UNSPECIFIED); + break; + + case EMetricsLevel::Disabled: + proto.set_metrics_level(Ydb::Table::MetricsSettings::METRICS_LEVEL_DISABLED); + break; + + case EMetricsLevel::Database: + proto.set_metrics_level(Ydb::Table::MetricsSettings::METRICS_LEVEL_DATABASE); + break; + + case EMetricsLevel::Table: + proto.set_metrics_level(Ydb::Table::MetricsSettings::METRICS_LEVEL_TABLE); + break; + + case EMetricsLevel::Partition: + proto.set_metrics_level(Ydb::Table::MetricsSettings::METRICS_LEVEL_PARTITION); + break; + } +} + + TGlobalIndexSettings TGlobalIndexSettings::FromProto(const Ydb::Table::GlobalIndexSettings& proto) { auto partitionsFromProto = [](const Ydb::Table::GlobalIndexSettings& proto) -> TUniformOrExplicitPartitions { switch (proto.partitions_case()) { @@ -2464,10 +2690,17 @@ TGlobalIndexSettings TGlobalIndexSettings::FromProto(const Ydb::Table::GlobalInd } }; + std::optional metricsSettings; + + if (proto.has_metrics_settings()) { + metricsSettings = TMetricsSettings::FromProto(proto.metrics_settings()); + } + return { .PartitioningSettings = TPartitioningSettings(proto.partitioning_settings()), .Partitions = partitionsFromProto(proto), - .ReadReplicasSettings = TReadReplicasSettings::FromProto(proto.read_replicas_settings()) + .ReadReplicasSettings = TReadReplicasSettings::FromProto(proto.read_replicas_settings()), + .MetricsSettings = metricsSettings, }; } @@ -2487,6 +2720,10 @@ void TGlobalIndexSettings::SerializeTo(Ydb::Table::GlobalIndexSettings& settings if (ReadReplicasSettings) { ReadReplicasSettings->SerializeTo(*settings.mutable_read_replicas_settings()); } + + if (MetricsSettings) { + MetricsSettings->SerializeTo(*settings.mutable_metrics_settings()); + } } TVectorIndexSettings TVectorIndexSettings::FromProto(const Ydb::Table::VectorIndexSettings& proto) { @@ -3633,6 +3870,78 @@ TAlterTableSettings& TAlterTtlSettingsBuilder::EndAlterTtlSettings() { return Parent_.AlterTtlSettings(Impl_->GetAlterTtlSettings()); } +/** + * The implementation of the builder for the metrics configuration + * for the ALTER TABLE request. + */ +class TAlterMetricsSettingsBuilder::TImpl { +public: + /** + * Configure the ALTER TABLE request to remove the metrics configuration. + */ + void Drop() { + AlterMetricsSettings_ = TAlterMetricsSettings::Drop(); + } + + /** + * Configure the ALTER TABLE request to use the given metrics configuration. + * + * @param[in] settings The metrics configuration to use + */ + void Set(TMetricsSettings&& settings) { + AlterMetricsSettings_ = TAlterMetricsSettings::Set(std::move(settings)); + } + + /** + * Configure the ALTER TABLE request to use the given metrics configuration. + * + * @param[in] settings The metrics configuration to use + */ + void Set(const TMetricsSettings& settings) { + AlterMetricsSettings_ = TAlterMetricsSettings::Set(settings); + } + + /** + * Return the current metrics configuration. + * + * @return The current metrics configuration + */ + const std::optional& GetAlterMetricsSettings() const { + return AlterMetricsSettings_; + } + +private: + std::optional AlterMetricsSettings_; +}; + +TAlterMetricsSettingsBuilder::TAlterMetricsSettingsBuilder(TAlterTableSettings& parent) + : Parent_(parent) + , Impl_(std::make_shared()) +{ } + +TAlterMetricsSettingsBuilder& TAlterMetricsSettingsBuilder::Drop() { + Impl_->Drop(); + return *this; +} + +TAlterMetricsSettingsBuilder& TAlterMetricsSettingsBuilder::Set(TMetricsSettings&& settings) { + Impl_->Set(std::move(settings)); + return *this; +} + +TAlterMetricsSettingsBuilder& TAlterMetricsSettingsBuilder::Set(const TMetricsSettings& settings) { + Impl_->Set(settings); + return *this; +} + +TAlterMetricsSettingsBuilder& TAlterMetricsSettingsBuilder::Set(TMetricsSettings::EMetricsLevel metricsLevel) { + return Set(TMetricsSettings(metricsLevel)); +} + +TAlterTableSettings& TAlterMetricsSettingsBuilder::EndAlterMetricsSettings() { + return Parent_.SetAlterMetricsSettings(Impl_->GetAlterMetricsSettings()); +} + class TAlterTableSettings::TImpl { public: TImpl() { } @@ -3645,8 +3954,27 @@ class TAlterTableSettings::TImpl { return AlterTtlSettings_; } + /** + * Update the metrics configuration for the ALTER TABLE request. + * + * @param[in] settings The metrics configuration to use + */ + void SetAlterMetricsSettings(const std::optional& settings) { + AlterMetricsSettings_ = settings; + } + + /** + * Return the current metrics configuration for the ALTER TABLE request. + * + * @return The current metrics configuration + */ + const std::optional& GetAlterMetricsSettings() const { + return AlterMetricsSettings_; + } + private: std::optional AlterTtlSettings_; + std::optional AlterMetricsSettings_; }; TAlterTableSettings::TAlterTableSettings() @@ -3662,6 +3990,17 @@ const std::optional& TAlterTableSettings::GetAlterTtlSettings return Impl_->GetAlterTtlSettings(); } +const std::optional& TAlterTableSettings::GetAlterMetricsSettings() const { + return Impl_->GetAlterMetricsSettings(); +} + +TAlterTableSettings& TAlterTableSettings::SetAlterMetricsSettings( + const std::optional& settings +) { + Impl_->SetAlterMetricsSettings(settings); + return *this; +} + //////////////////////////////////////////////////////////////////////////////// TReadReplicasSettings::TReadReplicasSettings(EMode mode, uint64_t readReplicasCount) @@ -3677,6 +4016,14 @@ uint64_t TReadReplicasSettings::GetReadReplicasCount() const { return ReadReplicasCount_; } +TMetricsSettings::TMetricsSettings(EMetricsLevel metricsLevel) + : MetricsLevel_(metricsLevel) { +} + +TMetricsSettings::EMetricsLevel TMetricsSettings::GetMetricsLevel() const { + return MetricsLevel_; +} + //////////////////////////////////////////////////////////////////////////////// TBulkUpsertResult::TBulkUpsertResult(TStatus&& status) diff --git a/src/client/topic/impl/event_handlers.cpp b/src/client/topic/impl/event_handlers.cpp index e91024f6775..8cc523c842c 100644 --- a/src/client/topic/impl/event_handlers.cpp +++ b/src/client/topic/impl/event_handlers.cpp @@ -75,7 +75,7 @@ class TGracefulReleasingSimpleDataHandlers : public TThrRefBase { PartitionStreamToUncommittedOffsets.erase(key); event.Confirm(); } else { - UnconfirmedDestroys.emplace(key, std::move(event)); + UnconfirmedDestroys.emplace(key, event); } } diff --git a/src/client/topic/impl/producer.cpp b/src/client/topic/impl/producer.cpp index 18b8e642213..7ea98738874 100644 --- a/src/client/topic/impl/producer.cpp +++ b/src/client/topic/impl/producer.cpp @@ -10,6 +10,12 @@ namespace NYdb::inline V3::NTopic { +namespace { + +static constexpr auto PARTITION_KEY_META_KEY = "__partition_key"; + +} // namespace + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // TProducer @@ -45,7 +51,7 @@ bool TProducer::TPartitionInfo::IsSplitted() const { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // TProducer::TMessageInfo -TProducer::TMessageInfo::TMessageInfo(const std::string& key, TWriteMessage&& message, std::uint32_t partition) +TProducer::TMessageInfo::TMessageInfo(const std::string& key, const std::string& choosePartitionKey, TWriteMessage&& message, std::uint32_t partition) : Key(key) , Data(message.Data) , Codec(message.Codec) @@ -58,6 +64,10 @@ TProducer::TMessageInfo::TMessageInfo(const std::string& key, TWriteMessage&& me for (const auto& [key, value] : message.MessageMeta_) { MessageMeta.Fields.emplace_back(key, value); } + + if (!choosePartitionKey.empty()) { + MessageMeta.Fields.emplace_back(PARTITION_KEY_META_KEY, choosePartitionKey); + } } TWriteMessage TProducer::TMessageInfo::BuildMessage() const { @@ -913,7 +923,7 @@ void TProducer::TMessagesWorker::RechoosePartitionIfNeeded(MessageIter message) } // this case means that partition was split, so we need to rechoose the partition for the message - auto newPartition = Producer->PartitionChooser->ChoosePartition(message->Key); + auto [newPartition, _] = Producer->PartitionChooser->ChoosePartition(message->Key); message->Partition = newPartition; } @@ -1060,8 +1070,8 @@ void TProducer::TMessagesWorker::DoWork() { iterateMessagesIndex( MessagesToResendIndex, - [](MessageIter) { - return false; + [this](MessageIter head) { + return Producer->Partitions[head->Partition].Locked_; } ); @@ -1144,9 +1154,13 @@ bool TProducer::TMessagesWorker::IsMemoryUsageOK() const { return MemoryUsage <= Producer->Settings.MaxMemoryUsage_ / 2; } -void TProducer::TMessagesWorker::AddMessage(const std::string& key, TWriteMessage&& message, std::uint32_t partition) { +void TProducer::TMessagesWorker::AddMessage( + const std::string& key, + const std::string& choosePartitionKey, + TWriteMessage&& message, + std::uint32_t partition) { MemoryUsage += message.Data.size(); - PushInFlightMessage(partition, TMessageInfo(key, std::move(message), partition)); + PushInFlightMessage(partition, TMessageInfo(key, choosePartitionKey, std::move(message), partition)); } std::optional TProducer::TMessagesWorker::GetContinuationToken(std::uint32_t partition) { @@ -1249,10 +1263,10 @@ void TProducer::TMessagesWorker::ScheduleResendMessages(std::uint32_t partition, auto currentSeqNo = resendIt != list.end() ? (*resendIt)->SeqNo.value_or(0) : 0; for (auto iter = resendIt; iter != list.end(); ++iter) { if (iter != resendIt && currentSeqNo != 0) { - Y_ABORT_UNLESS((*iter)->SeqNo.value_or(0) > currentSeqNo, "SeqNo is not increasing for partition %d", partition); + Y_ABORT_UNLESS((*iter)->SeqNo.value_or(0) > currentSeqNo, "SeqNo is not increasing for partition %d, currentSeqNo: %llu, iterSeqNo: %llu", partition, currentSeqNo, (*iter)->SeqNo.value_or(0)); } - auto newPartition = Producer->PartitionChooser->ChoosePartition((*iter)->Key); + auto [newPartition, _] = Producer->PartitionChooser->ChoosePartition((*iter)->Key); (*iter)->Partition = newPartition; messagesFromOldPartition.emplace_back(newPartition, *iter); @@ -1260,17 +1274,20 @@ void TProducer::TMessagesWorker::ScheduleResendMessages(std::uint32_t partition, } list.erase(resendIt, list.end()); - for (const auto& [newPartition, msgIt] : messagesFromOldPartition) { + for (auto it = messagesFromOldPartition.rbegin(); it != messagesFromOldPartition.rend(); ++it) { + auto [newPartition, msgIt] = *it; auto [inFlightMessagesIndexChainIt, _] = InFlightMessagesIndex.try_emplace(newPartition); - inFlightMessagesIndexChainIt->second.push_back(msgIt); + inFlightMessagesIndexChainIt->second.push_front(msgIt); if (msgIt->Sent) { auto [messagesToResendChainIt, __] = MessagesToResendIndex.try_emplace(newPartition); - messagesToResendChainIt->second.push_back(msgIt); + messagesToResendChainIt->second.push_front(msgIt); } } InFlightMessagesIndex.erase(partition); + PendingMessagesIndex.erase(partition); + MessagesToResendIndex.erase(partition); Producer->SessionsWorker->AddIdleSession(partition); } @@ -1278,7 +1295,7 @@ void TProducer::TMessagesWorker::RebuildPendingMessagesIndex(std::uint32_t parti auto [oldPendingMessagesIndexChainIt, __] = PendingMessagesIndex.try_emplace(partition); std::unordered_map> pendingMessagesForNewPartitions; for (auto it = oldPendingMessagesIndexChainIt->second.begin(); it != oldPendingMessagesIndexChainIt->second.end(); ++it) { - auto newPartition = Producer->PartitionChooser->ChoosePartition((*it)->Key); + auto [newPartition, _] = Producer->PartitionChooser->ChoosePartition((*it)->Key); auto [pendingMessagesForNewPartitionsIt, __] = pendingMessagesForNewPartitions.try_emplace(newPartition); pendingMessagesForNewPartitionsIt->second.push_back(*it); } @@ -1940,6 +1957,7 @@ TWriteResult TProducer::WriteInternal(TContinuationToken&&, TWriteMessage&& mess std::uint32_t chosenPartition; std::string key; + std::string choosePartitionKey; if (message.GetPartition().has_value()) { if (!Partitions[message.GetPartition().value()].Children_.empty()) { return TWriteResult{ @@ -1951,13 +1969,17 @@ TWriteResult TProducer::WriteInternal(TContinuationToken&&, TWriteMessage&& mess chosenPartition = message.GetPartition().value(); } else if (!message.GetKey().has_value()) { key = Settings.ProducerIdPrefix_; - chosenPartition = PartitionChooser->ChoosePartition(Settings.ProducerIdPrefix_); + const auto partitionChoice = PartitionChooser->ChoosePartition(Settings.ProducerIdPrefix_); + chosenPartition = partitionChoice.first; + choosePartitionKey = partitionChoice.second; } else { - chosenPartition = PartitionChooser->ChoosePartition(*message.GetKey()); + const auto partitionChoice = PartitionChooser->ChoosePartition(*message.GetKey()); + chosenPartition = partitionChoice.first; + choosePartitionKey = partitionChoice.second; key = *message.GetKey(); } - MessagesWorker->AddMessage(key, std::move(message), chosenPartition); + MessagesWorker->AddMessage(key, choosePartitionKey, std::move(message), chosenPartition); eventsPromise = EventsWorker->HandleNewMessage(); RunUserEventLoop(); } @@ -2059,25 +2081,25 @@ TProducer::TBoundPartitionChooser::TBoundPartitionChooser(TProducer* producer) : Producer(producer) {} -std::uint32_t TProducer::TBoundPartitionChooser::ChoosePartition(const std::string_view key) { +std::pair TProducer::TBoundPartitionChooser::ChoosePartition(const std::string_view key) { auto hashedKey = Producer->PartitioningKeyHasher(key); auto lowerBound = Producer->PartitionsIndex.lower_bound(hashedKey); if (lowerBound != Producer->PartitionsIndex.end() && lowerBound->first == hashedKey) { - return lowerBound->second; + return { lowerBound->second, hashedKey }; } Y_ABORT_IF(lowerBound == Producer->PartitionsIndex.begin(), "Lower bound is the first element"); - return std::prev(lowerBound)->second; + return { std::prev(lowerBound)->second, hashedKey }; } TProducer::THashPartitionChooser::THashPartitionChooser(std::vector&& partitions) : Partitions(std::move(partitions)) {} -std::uint32_t TProducer::THashPartitionChooser::ChoosePartition(const std::string_view key) { +std::pair TProducer::THashPartitionChooser::ChoosePartition(const std::string_view key) { auto hash = MurmurHash(key.data(), key.size()); - return Partitions[hash % Partitions.size()]; + return {Partitions[hash % Partitions.size()], ""}; } } // namespace NYdb::inline V3::NTopic diff --git a/src/client/topic/impl/producer.h b/src/client/topic/impl/producer.h index dca301c72a0..3d339b50875 100644 --- a/src/client/topic/impl/producer.h +++ b/src/client/topic/impl/producer.h @@ -42,7 +42,7 @@ class TProducer : public IProducer, }; struct TMessageInfo { - TMessageInfo(const std::string& key, TWriteMessage&& message, std::uint32_t partition); + TMessageInfo(const std::string& key, const std::string& choosePartitionKey, TWriteMessage&& message, std::uint32_t partition); std::string Key; std::string Data; @@ -155,7 +155,7 @@ class TProducer : public IProducer, void DoWork(); - void AddMessage(const std::string& key, TWriteMessage&& message, std::uint32_t partition); + void AddMessage(const std::string& key, const std::string& choosePartitionKey, TWriteMessage&& message, std::uint32_t partition); void ScheduleResendMessages(std::uint32_t partition, std::uint64_t afterSeqNo); void RebuildPendingMessagesIndex(std::uint32_t partition); void HandleAck(); @@ -299,20 +299,22 @@ class TProducer : public IProducer, // Partition chooser struct IPartitionChooser { - virtual std::uint32_t ChoosePartition(const std::string_view key) = 0; + // this method returns the partition ID and the choose partition key for the given key + // the choose partition key is used to identify the partition + virtual std::pair ChoosePartition(const std::string_view key) = 0; virtual ~IPartitionChooser() = default; }; struct TBoundPartitionChooser : IPartitionChooser { TBoundPartitionChooser(TProducer* producer); - std::uint32_t ChoosePartition(const std::string_view key) override; + std::pair ChoosePartition(const std::string_view key) override; private: TProducer* Producer; }; struct THashPartitionChooser : IPartitionChooser { THashPartitionChooser(std::vector&& partitions); - std::uint32_t ChoosePartition(const std::string_view key) override; + std::pair ChoosePartition(const std::string_view key) override; private: std::vector Partitions; }; diff --git a/src/client/topic/impl/read_session_impl.h b/src/client/topic/impl/read_session_impl.h index 99e96bf241f..5fd4eb3e590 100644 --- a/src/client/topic/impl/read_session_impl.h +++ b/src/client/topic/impl/read_session_impl.h @@ -948,7 +948,7 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue& deferred, TCallbackContextPtr& cbContext); bool HasDataEventCallback() const; - void ApplyCallbackToEventImpl(TADataReceivedEvent& event, + void ApplyCallbackToEventImpl(TADataReceivedEvent&& event, TUserRetrievedEventsInfoAccumulator&& eventsInfo, TDeferredActions& deferred); @@ -1181,6 +1181,7 @@ class TSingleClusterReadSessionImpl : public TEnableSelfContext; friend class TDirectReadSessionControlCallbacks; + friend class TDataDecompressionInfo; TSingleClusterReadSessionImpl( const TAReadSessionSettings& settings, diff --git a/src/client/topic/impl/read_session_impl.ipp b/src/client/topic/impl/read_session_impl.ipp index 770dee20e1e..2b119bd99d7 100644 --- a/src/client/topic/impl/read_session_impl.ipp +++ b/src/client/topic/impl/read_session_impl.ipp @@ -191,7 +191,7 @@ void TRawPartitionStreamEventQueue::SignalReadyEvents(TInt std::move(compressedMessages), stream); - queue.ApplyCallbackToEventImpl(data, std::move(accumulator), deferred); + queue.ApplyCallbackToEventImpl(std::move(data), std::move(accumulator), deferred); } else { moveToReadyQueue(std::move(front)); } @@ -2760,7 +2760,7 @@ bool TReadSessionEventsQueue::HasDataEventCallback() const } template -void TReadSessionEventsQueue::ApplyCallbackToEventImpl(TADataReceivedEvent& data, +void TReadSessionEventsQueue::ApplyCallbackToEventImpl(TADataReceivedEvent&& data, TUserRetrievedEventsInfoAccumulator&& eventsInfo, TDeferredActions& deferred) { @@ -2856,11 +2856,14 @@ void TDataDecompressionInfo::Cleanup() { ui64 sourceSize = 0; ui64 messagesCount = 0; - while (!Tasks.empty()) { - const auto& task = Tasks.front(); - sourceSize += task.AddedDataSize(); - messagesCount += task.AddedMessagesCount(); - Tasks.pop_front(); + { + std::lock_guard lock(session->Lock); + while (!Tasks.empty()) { + const auto& task = Tasks.front(); + sourceSize += task.AddedDataSize(); + messagesCount += task.AddedMessagesCount(); + Tasks.pop_front(); + } } OnTaskCanceled(sourceSize, messagesCount); diff --git a/src/client/topic/impl/topic.cpp b/src/client/topic/impl/topic.cpp index 429f61a5d49..dafbccdf9e0 100644 --- a/src/client/topic/impl/topic.cpp +++ b/src/client/topic/impl/topic.cpp @@ -55,6 +55,7 @@ TTopicDescription::TTopicDescription(Ydb::Topic::DescribeTopicResult&& result) , MeteringMode_(TProtoAccessor::FromProto(Proto_.metering_mode())) , TopicStats_(Proto_.topic_stats()) , MetricsLevel_(Proto_.has_metrics_level() ? std::optional(static_cast(Proto_.metrics_level())) : std::optional()) + , ContentBasedDeduplication_(Proto_.content_based_deduplication()) { Owner_ = Proto_.self().owner(); CreationTimestamp_ = NScheme::TVirtualTimestamp(Proto_.self().created_at()); @@ -108,6 +109,8 @@ TConsumer::TConsumer(const Ydb::Topic::Consumer& consumer) ConsumerType_ = EConsumerType::Shared; KeepMessagesOrder_ = consumer.shared_consumer_type().keep_messages_order(); DefaultProcessingTimeout_ = TDuration::Seconds(consumer.shared_consumer_type().default_processing_timeout().seconds()); + ReceiveMessageWaitTime_ = ConvertPositiveDuration(consumer.shared_consumer_type().receive_message_wait_time()); + ReceiveMessageDelay_ = ConvertPositiveDuration(consumer.shared_consumer_type().receive_message_delay()); } else { ConsumerType_ = EConsumerType::Streaming; } @@ -121,6 +124,14 @@ EConsumerType TConsumer::GetConsumerType() const { return ConsumerType_; } +TDuration TConsumer::GetReceiveMessageWaitTime() const { + return ReceiveMessageWaitTime_; +} + +TDuration TConsumer::GetReceiveMessageDelay() const { + return ReceiveMessageDelay_; +} + bool TConsumer::GetImportant() const { return Important_; } @@ -160,6 +171,10 @@ uint32_t TTopicDescription::GetTotalPartitionsCount() const { return Partitions_.size(); } +bool TTopicDescription::GetContentBasedDeduplication() const { + return ContentBasedDeduplication_; +} + const std::vector& TTopicDescription::GetPartitions() const { return Partitions_; } @@ -757,6 +772,14 @@ void TConsumerSettings::SerializeTo(Ydb::Topic::Consumer& proto) cons if (DefaultProcessingTimeout_) { shared->mutable_default_processing_timeout()->set_seconds(DefaultProcessingTimeout_.value().Seconds()); } + if (ReceiveMessageDelay_) { + shared->mutable_receive_message_delay()->set_seconds(ReceiveMessageDelay_->Seconds()); + shared->mutable_receive_message_delay()->set_nanos(ReceiveMessageDelay_->NanoSecondsOfSecond()); + } + if (ReceiveMessageWaitTime_) { + shared->mutable_receive_message_wait_time()->set_seconds(ReceiveMessageWaitTime_->Seconds()); + shared->mutable_receive_message_wait_time()->set_nanos(ReceiveMessageWaitTime_->NanoSecondsOfSecond()); + } if (DeadLetterPolicy_.Enabled_) { shared->mutable_dead_letter_policy()->set_enabled(DeadLetterPolicy_.Enabled_.value()); } @@ -826,6 +849,16 @@ void TAlterConsumerSettings::SerializeTo(Ydb::Topic::AlterConsumer& proto) const ->set_set_max_processing_attempts(DeadLetterPolicy_.Condition_.MaxProcessingAttempts_.value()); } + if (ReceiveMessageDelay_) { + proto.mutable_alter_shared_consumer_type()->mutable_set_receive_message_delay()->set_seconds(ReceiveMessageDelay_->Seconds()); + proto.mutable_alter_shared_consumer_type()->mutable_set_receive_message_delay()->set_nanos(ReceiveMessageDelay_->NanoSecondsOfSecond()); + } + + if (ReceiveMessageWaitTime_) { + proto.mutable_alter_shared_consumer_type()->mutable_set_receive_message_wait_time()->set_seconds(ReceiveMessageWaitTime_->Seconds()); + proto.mutable_alter_shared_consumer_type()->mutable_set_receive_message_wait_time()->set_nanos(ReceiveMessageWaitTime_->NanoSecondsOfSecond()); + } + if (DeadLetterPolicy_.Action_) { switch (DeadLetterPolicy_.Action_.value()) { case EDeadLetterAction::Move: @@ -878,6 +911,7 @@ void TCreateTopicSettings::SerializeTo(Ydb::Topic::CreateTopicRequest& request) if (MetricsLevel_) { request.set_metrics_level(*MetricsLevel_); } + request.set_content_based_deduplication(ContentBasedDeduplication_); } } // namespace NYdb::NTopic diff --git a/src/client/topic/impl/topic_impl.h b/src/client/topic/impl/topic_impl.h index 7a3a926892d..cc4c5ecc6da 100644 --- a/src/client/topic/impl/topic_impl.h +++ b/src/client/topic/impl/topic_impl.h @@ -89,6 +89,9 @@ class TTopicClient::TImpl : public TClientImplCommon { if (settings.SetRetentionPeriod_) { request.mutable_set_retention_period()->set_seconds(settings.SetRetentionPeriod_->Seconds()); } + if (settings.SetContentBasedDeduplication_) { + request.set_set_content_based_deduplication(*settings.SetContentBasedDeduplication_); + } if (settings.SetSupportedCodecs_) { for (const auto& codec : *settings.SetSupportedCodecs_) { request.mutable_set_supported_codecs()->add_codecs((static_cast(codec))); diff --git a/src/client/topic/impl/write_session_impl.cpp b/src/client/topic/impl/write_session_impl.cpp index 7c2b0bca1fb..b8ebde1964a 100644 --- a/src/client/topic/impl/write_session_impl.cpp +++ b/src/client/topic/impl/write_session_impl.cpp @@ -658,7 +658,7 @@ void TWriteSessionImpl::WriteInternal(TContinuationToken&&, TWriteMessage&& mess ); FlushWriteIfRequiredImpl(); - readyToAccept = OnMemoryUsageChangedImpl(bufferSize).NowOk; + readyToAccept = OnMemoryUsageChangedImpl(static_cast(bufferSize)).NowOk; } if (readyToAccept) { EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()}); @@ -1183,7 +1183,7 @@ bool TWriteSessionImpl::CleanupOnAcknowledgedImpl(uint64_t id) { uint64_t size = 0; uint64_t compressedSize = 0; if(!SentPackedMessage.empty() && SentPackedMessage.front().Offset == id) { - auto memoryUsage = OnMemoryUsageChangedImpl(-SentPackedMessage.front().Data.size()); + auto memoryUsage = OnMemoryUsageChangedImpl(-static_cast(SentPackedMessage.front().Data.size())); result = memoryUsage.NowOk && !memoryUsage.WasOk; const auto& front = SentPackedMessage.front(); if (front.Compressed) { @@ -1228,7 +1228,14 @@ TMemoryUsageChange TWriteSessionImpl::OnMemoryUsageChangedImpl(i64 diff) { //if (diff < 0) { // Y_ABORT_UNLESS(MemoryUsage >= static_cast(std::abs(diff))); //} - MemoryUsage += diff; + if (diff >= 0) { + MemoryUsage += static_cast(diff); + } else { + const size_t dec = static_cast(-diff); + Y_ABORT_UNLESS(MemoryUsage >= dec); + MemoryUsage -= dec; + } + bool nowOk = MemoryUsage <= Settings.MaxMemoryUsage_; if (wasOk != nowOk) { if (wasOk) { @@ -1311,7 +1318,7 @@ TMemoryUsageChange TWriteSessionImpl::OnCompressedImpl(TBlock&& block) { UpdateTimedCountersImpl(); Y_ABORT_UNLESS(block.Valid); - auto memoryUsage = OnMemoryUsageChangedImpl(static_cast(block.Data.size()) - block.OriginalMemoryUsage); + auto memoryUsage = OnMemoryUsageChangedImpl(static_cast(block.Data.size()) - static_cast(block.OriginalMemoryUsage)); (*Counters->BytesInflightUncompressed) -= block.OriginalSize; (*Counters->BytesInflightCompressed) += block.Data.size(); @@ -1320,7 +1327,7 @@ TMemoryUsageChange TWriteSessionImpl::OnCompressedImpl(TBlock&& block) { if (!SendImplScheduled.exchange(true)) { CompressionExecutor->Post([cbContext = SelfContext]() { if (auto self = cbContext->LockShared()) { - self->SendImplScheduled = false; + self->SendImplScheduled.store(false); with_lock (self->Lock) { self->SendImpl(); } diff --git a/src/client/topic/ut/topic_tx_skip_conflict_ut.cpp b/src/client/topic/ut/topic_tx_skip_conflict_ut.cpp new file mode 100644 index 00000000000..ac0f44123cd --- /dev/null +++ b/src/client/topic/ut/topic_tx_skip_conflict_ut.cpp @@ -0,0 +1,317 @@ +#include + +#include + +#include + +#include + +#include +#include +#include + +namespace NYdb::inline V3::NTopic::NTests::NTxUsage { + +namespace { + +enum class ETrackProducerIdInTxMeta { + Absent, + True, + False, +}; + +class TFixtureTopicTxMatrixBase : public TFixtureTable { +protected: + size_t CountTableRowsWithKey(const std::string& tablePath, const std::string& key) { + auto session = CreateSession(); + auto tx = session->BeginTx(); + const auto query = Sprintf( + R"(DECLARE $k AS Utf8; SELECT COUNT(*) AS cnt FROM `%s` WHERE key = $k;)", + tablePath.c_str()); + auto params = TParamsBuilder().AddParam("$k").Utf8(key).Build().Build(); + auto result = session->Execute(query, tx.get(), true, params); + NYdb::TResultSetParser parser(result.at(0)); + UNIT_ASSERT(parser.TryNextRow()); + return static_cast(parser.ColumnParser(0).GetUint64()); + } + + void AssertTableKeyValue(const std::string& tablePath, const std::string& key, const std::string& expectedValue) { + auto session = CreateSession(); + auto tx = session->BeginTx(); + const auto query = Sprintf( + R"(DECLARE $k AS Utf8; SELECT value FROM `%s` WHERE key = $k;)", + tablePath.c_str()); + auto params = TParamsBuilder().AddParam("$k").Utf8(key).Build().Build(); + auto result = session->Execute(query, tx.get(), true, params); + NYdb::TResultSetParser parser(result.at(0)); + UNIT_ASSERT_C(parser.TryNextRow(), key.c_str()); + UNIT_ASSERT_VALUES_EQUAL(parser.ColumnParser(0).GetUtf8(), expectedValue); + } + + /// Two table sessions, same producer id, two writes per tx, both write sessions closed before any commit. + /// First commit is always SUCCESS; \p expectedTx2CommitStatus is asserted for the second (SeqNo / conflict policy). + void RunSeqNoConflictTwoWriteSessionsSameProducer(EStatus expectedTx2CommitStatus) { + CreateTopic("topic_A", TEST_CONSUMER, 1); + + const std::string producer = TEST_MESSAGE_GROUP_ID; + + auto makeSettings = [&]() { + NTopic::TWriteSessionSettings options; + options.Path(GetTopicUtPath("topic_A")); + options.ProducerId(producer); + options.MessageGroupId(producer); + options.Codec(ECodec::RAW); + AugmentWriteSessionSettings(options); + return options; + }; + + NTopic::TTopicClient client(GetDriver()); + + auto session1 = CreateSession(); + auto tx1 = session1->BeginTx(); + { + auto ws1 = client.CreateSimpleBlockingWriteSession(makeSettings()); + UNIT_ASSERT_C(ws1->Write(NTopic::TWriteMessage("m1"), tx1.get()), "write session 1, message 1"); + UNIT_ASSERT_C(ws1->Write(NTopic::TWriteMessage("m2"), tx1.get()), "write session 1, message 2"); + UNIT_ASSERT_C(ws1->Close(), "close write session 1"); + } + + auto session2 = CreateSession(); + auto tx2 = session2->BeginTx(); + { + auto ws2 = client.CreateSimpleBlockingWriteSession(makeSettings()); + UNIT_ASSERT_C(ws2->Write(NTopic::TWriteMessage("m3"), tx2.get()), "write session 2, message 1"); + UNIT_ASSERT_C(ws2->Write(NTopic::TWriteMessage("m4"), tx2.get()), "write session 2, message 2"); + UNIT_ASSERT_C(ws2->Close(), "close write session 2"); + } + + session1->CommitTx(*tx1, EStatus::SUCCESS); + session2->CommitTx(*tx2, expectedTx2CommitStatus); + + auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); + switch (expectedTx2CommitStatus) { + case EStatus::ABORTED: + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2u); + UNIT_ASSERT_VALUES_EQUAL(messages[0], "m1"); + UNIT_ASSERT_VALUES_EQUAL(messages[1], "m2"); + break; + case EStatus::SUCCESS: + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 4u); + UNIT_ASSERT_VALUES_EQUAL(messages[0], "m1"); + UNIT_ASSERT_VALUES_EQUAL(messages[1], "m2"); + UNIT_ASSERT_VALUES_EQUAL(messages[2], "m3"); + UNIT_ASSERT_VALUES_EQUAL(messages[3], "m4"); + break; + default: + UNIT_FAIL("unexpected expectedTx2CommitStatus for topic read assertions"); + break; + } + } + + /// Same as \ref RunSeqNoConflictTwoWriteSessionsSameProducer, but each transaction also upserts disjoint rows + /// into a row table so KQP uses distributed commit (topic tablet + data shard), not immediate topic-only commit. + void RunSeqNoConflictTwoWriteSessionsSameProducerDistributed(EStatus expectedTx2CommitStatus) { + static constexpr const char* kTable = "table_A"; + const std::vector tx1Rows = { + {"dist_seq_tx1_a", "v1"}, + {"dist_seq_tx1_b", "v2"}, + }; + const std::vector tx2Rows = { + {"dist_seq_tx2_a", "v3"}, + {"dist_seq_tx2_b", "v4"}, + }; + + CreateTopic("topic_A", TEST_CONSUMER, 1); + CreateTable("/Root/table_A"); + + const std::string producer = TEST_MESSAGE_GROUP_ID; + + auto makeSettings = [&]() { + NTopic::TWriteSessionSettings options; + options.Path(GetTopicUtPath("topic_A")); + options.ProducerId(producer); + options.MessageGroupId(producer); + options.Codec(ECodec::RAW); + AugmentWriteSessionSettings(options); + return options; + }; + + NTopic::TTopicClient client(GetDriver()); + + auto session1 = CreateSession(); + auto tx1 = session1->BeginTx(); + { + auto ws1 = client.CreateSimpleBlockingWriteSession(makeSettings()); + UNIT_ASSERT_C(ws1->Write(NTopic::TWriteMessage("m1"), tx1.get()), "write session 1, message 1"); + UNIT_ASSERT_C(ws1->Write(NTopic::TWriteMessage("m2"), tx1.get()), "write session 1, message 2"); + UNIT_ASSERT_C(ws1->Close(), "close write session 1"); + } + UpsertToTable(kTable, tx1Rows, *session1, tx1.get()); + + auto session2 = CreateSession(); + auto tx2 = session2->BeginTx(); + { + auto ws2 = client.CreateSimpleBlockingWriteSession(makeSettings()); + UNIT_ASSERT_C(ws2->Write(NTopic::TWriteMessage("m3"), tx2.get()), "write session 2, message 1"); + UNIT_ASSERT_C(ws2->Write(NTopic::TWriteMessage("m4"), tx2.get()), "write session 2, message 2"); + UNIT_ASSERT_C(ws2->Close(), "close write session 2"); + } + UpsertToTable(kTable, tx2Rows, *session2, tx2.get()); + + session1->CommitTx(*tx1, EStatus::SUCCESS); + session2->CommitTx(*tx2, expectedTx2CommitStatus); + + auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); + + switch (expectedTx2CommitStatus) { + case EStatus::ABORTED: + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2u); + UNIT_ASSERT_VALUES_EQUAL(messages[0], "m1"); + UNIT_ASSERT_VALUES_EQUAL(messages[1], "m2"); + UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount(kTable), 2u); + for (const auto& row : tx1Rows) { + UNIT_ASSERT_VALUES_EQUAL(CountTableRowsWithKey(kTable, row.Key), 1u); + AssertTableKeyValue(kTable, row.Key, row.Value); + } + for (const auto& row : tx2Rows) { + UNIT_ASSERT_VALUES_EQUAL(CountTableRowsWithKey(kTable, row.Key), 0u); + } + break; + case EStatus::SUCCESS: + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 4u); + UNIT_ASSERT_VALUES_EQUAL(messages[0], "m1"); + UNIT_ASSERT_VALUES_EQUAL(messages[1], "m2"); + UNIT_ASSERT_VALUES_EQUAL(messages[2], "m3"); + UNIT_ASSERT_VALUES_EQUAL(messages[3], "m4"); + UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount(kTable), 4u); + for (const auto& row : tx1Rows) { + UNIT_ASSERT_VALUES_EQUAL(CountTableRowsWithKey(kTable, row.Key), 1u); + AssertTableKeyValue(kTable, row.Key, row.Value); + } + for (const auto& row : tx2Rows) { + UNIT_ASSERT_VALUES_EQUAL(CountTableRowsWithKey(kTable, row.Key), 1u); + AssertTableKeyValue(kTable, row.Key, row.Value); + } + break; + default: + UNIT_FAIL("unexpected expectedTx2CommitStatus for distributed topic/table assertions"); + break; + } + } +}; + +template +class TFixtureTopicTxMatrix : public TFixtureTopicTxMatrixBase { +protected: + void AugmentServerSettings(NKikimr::Tests::TServerSettings& settings) override { + settings.SetEnableSkipConflictCheckForTopicsInTransaction(EnableSkipConflictCheckForTopicsInTransaction); + } + + void AugmentWriteSessionSettings(NTopic::TWriteSessionSettings& options) override { + if constexpr (TrackProducerIdInTxMeta == ETrackProducerIdInTxMeta::True) { + options.AppendSessionMeta(std::string{NKikimr::NPQ::WRITE_SESSION_ATTRIBUTE_TRACK_PRODUCER_ID_IN_TX}, "true"); + } else if constexpr (TrackProducerIdInTxMeta == ETrackProducerIdInTxMeta::False) { + options.AppendSessionMeta(std::string{NKikimr::NPQ::WRITE_SESSION_ATTRIBUTE_TRACK_PRODUCER_ID_IN_TX}, "false"); + } + } +}; + +using TFixture_SkipConflictOff_MetaAbsent = TFixtureTopicTxMatrix; +using TFixture_SkipConflictOff_MetaTrue = TFixtureTopicTxMatrix; +using TFixture_SkipConflictOff_MetaFalse = TFixtureTopicTxMatrix; +using TFixture_SkipConflictOn_MetaAbsent = TFixtureTopicTxMatrix; +using TFixture_SkipConflictOn_MetaTrue = TFixtureTopicTxMatrix; +using TFixture_SkipConflictOn_MetaFalse = TFixtureTopicTxMatrix; + +} // namespace + +Y_UNIT_TEST_SUITE(TopicTxSkipConflictAndProducerMeta) { + +Y_UNIT_TEST_F(SeqNoConflict_TwoWriteSessions_SkipConflictOff_MetaAbsent, TFixture_SkipConflictOff_MetaAbsent) { + RunSeqNoConflictTwoWriteSessionsSameProducer(EStatus::ABORTED); +} + +Y_UNIT_TEST_F(SeqNoConflict_TwoWriteSessions_SkipConflictOff_MetaTrue, TFixture_SkipConflictOff_MetaTrue) { + RunSeqNoConflictTwoWriteSessionsSameProducer(EStatus::ABORTED); +} + +Y_UNIT_TEST_F(SeqNoConflict_TwoWriteSessions_SkipConflictOff_MetaFalse, TFixture_SkipConflictOff_MetaFalse) { + RunSeqNoConflictTwoWriteSessionsSameProducer(EStatus::ABORTED); +} + +Y_UNIT_TEST_F(SeqNoConflict_TwoWriteSessions_SkipConflictOn_MetaAbsent, TFixture_SkipConflictOn_MetaAbsent) { + RunSeqNoConflictTwoWriteSessionsSameProducer(EStatus::ABORTED); +} + +Y_UNIT_TEST_F(SeqNoConflict_TwoWriteSessions_SkipConflictOn_MetaTrue, TFixture_SkipConflictOn_MetaTrue) { + RunSeqNoConflictTwoWriteSessionsSameProducer(EStatus::ABORTED); +} + +Y_UNIT_TEST_F(SeqNoConflict_TwoWriteSessions_SkipConflictOn_MetaFalse, TFixture_SkipConflictOn_MetaFalse) { + RunSeqNoConflictTwoWriteSessionsSameProducer(EStatus::SUCCESS); +} + +Y_UNIT_TEST_F(SeqNoConflict_Distributed_TwoWriteSessions_SkipConflictOff_MetaAbsent, TFixture_SkipConflictOff_MetaAbsent) { + RunSeqNoConflictTwoWriteSessionsSameProducerDistributed(EStatus::ABORTED); +} + +Y_UNIT_TEST_F(SeqNoConflict_Distributed_TwoWriteSessions_SkipConflictOff_MetaTrue, TFixture_SkipConflictOff_MetaTrue) { + RunSeqNoConflictTwoWriteSessionsSameProducerDistributed(EStatus::ABORTED); +} + +Y_UNIT_TEST_F(SeqNoConflict_Distributed_TwoWriteSessions_SkipConflictOff_MetaFalse, TFixture_SkipConflictOff_MetaFalse) { + RunSeqNoConflictTwoWriteSessionsSameProducerDistributed(EStatus::ABORTED); +} + +Y_UNIT_TEST_F(SeqNoConflict_Distributed_TwoWriteSessions_SkipConflictOn_MetaAbsent, TFixture_SkipConflictOn_MetaAbsent) { + RunSeqNoConflictTwoWriteSessionsSameProducerDistributed(EStatus::ABORTED); +} + +Y_UNIT_TEST_F(SeqNoConflict_Distributed_TwoWriteSessions_SkipConflictOn_MetaTrue, TFixture_SkipConflictOn_MetaTrue) { + RunSeqNoConflictTwoWriteSessionsSameProducerDistributed(EStatus::ABORTED); +} + +Y_UNIT_TEST_F(SeqNoConflict_Distributed_TwoWriteSessions_SkipConflictOn_MetaFalse, TFixture_SkipConflictOn_MetaFalse) { + RunSeqNoConflictTwoWriteSessionsSameProducerDistributed(EStatus::SUCCESS); +} + +Y_UNIT_TEST_F(InvalidWriteSessionAttributeTrackProducerIdInTx_RejectsInit, TFixtureTable) { + CreateTopic("topic_A", TEST_CONSUMER, 1); + + NTopic::TTopicClient client(GetDriver()); + NTopic::TWriteSessionSettings options; + options.Path(GetTopicUtPath("topic_A")); + options.ProducerId(TEST_MESSAGE_GROUP_ID); + options.MessageGroupId(TEST_MESSAGE_GROUP_ID); + options.Codec(ECodec::RAW); + options.AppendSessionMeta(std::string{NKikimr::NPQ::WRITE_SESSION_ATTRIBUTE_TRACK_PRODUCER_ID_IN_TX}, "not-a-bool"); + + auto ws = client.CreateWriteSession(options); + + std::optional closed; + const TInstant deadline = TInstant::Now() + TDuration::Seconds(30); + while (!closed.has_value()) { + UNIT_ASSERT_C( + TInstant::Now() < deadline, + "timed out waiting for write session close after invalid WRITE_SESSION_ATTRIBUTE_TRACK_PRODUCER_ID_IN_TX"); + + if (auto ev = ws->GetEvent(false)) { + if (auto* c = std::get_if(&*ev)) { + closed.emplace(*c); + } + } else { + Sleep(TDuration::MilliSeconds(50)); + } + } + UNIT_ASSERT_C(closed.has_value(), "session must close after invalid WRITE_SESSION_ATTRIBUTE_TRACK_PRODUCER_ID_IN_TX"); + UNIT_ASSERT_VALUES_EQUAL(closed->GetStatus(), EStatus::BAD_REQUEST); + const TString issues = closed->GetIssues().ToOneLineString(); + UNIT_ASSERT_STRING_CONTAINS(issues, NKikimr::NPQ::WRITE_SESSION_ATTRIBUTE_TRACK_PRODUCER_ID_IN_TX); + UNIT_ASSERT_STRING_CONTAINS(issues, "not-a-bool"); + + ws->Close(TDuration::Seconds(5)); +} + +} // Y_UNIT_TEST_SUITE(TopicTxSkipConflictAndProducerMeta) + +} // namespace NYdb::inline V3::NTopic::NTests::NTxUsage diff --git a/src/client/topic/ut/ut_utils/txusage_fixture.cpp b/src/client/topic/ut/ut_utils/txusage_fixture.cpp index 3a601c0dcbb..24d290e8c95 100644 --- a/src/client/topic/ut/ut_utils/txusage_fixture.cpp +++ b/src/client/topic/ut/ut_utils/txusage_fixture.cpp @@ -38,6 +38,7 @@ void TFixture::SetUp(NUnitTest::TTestContext&) NKikimr::Tests::TServerSettings settings = TTopicSdkTestSetup::MakeServerSettings(); settings.SetEnableHtapTx(GetEnableHtapTx()); settings.SetAllowOlapDataQuery(GetAllowOlapDataQuery()); + AugmentServerSettings(settings); Setup = std::make_unique(TEST_CASE_NAME, settings); @@ -54,6 +55,14 @@ void TFixture::SetUp(NUnitTest::TTestContext&) QueryClient = std::make_unique(*Driver, querySettings); } +void TFixture::AugmentServerSettings(NKikimr::Tests::TServerSettings&) +{ +} + +void TFixture::AugmentWriteSessionSettings(NTopic::TWriteSessionSettings&) +{ +} + void TFixture::NotifySchemeShard(const TFeatureFlags& flags) { Y_UNUSED(flags); @@ -391,6 +400,7 @@ auto TFixture::CreateTopicWriteSession(const std::string& topicPath, options.MessageGroupId(messageGroupId); options.PartitionId(partitionId); options.Codec(ECodec::RAW); + AugmentWriteSessionSettings(options); return client.CreateWriteSession(options); } diff --git a/src/client/topic/ut/ut_utils/txusage_fixture.h b/src/client/topic/ut/ut_utils/txusage_fixture.h index 634492186b7..52e00d77daf 100644 --- a/src/client/topic/ut/ut_utils/txusage_fixture.h +++ b/src/client/topic/ut/ut_utils/txusage_fixture.h @@ -189,6 +189,11 @@ class TFixture : public NUnitTest::TBaseFixture { void WriteMessagesInTx(std::size_t big, size_t small); const TDriver& GetDriver() const; + /// Topic path for SDK: `TTopicSdkTestSetup::GetTopicPath(name)` (typically `TopicPrefix_ + name`; prefix is empty by default), + /// i.e. relative to the driver database, not a cluster-absolute path. For use outside TFixture members without `Setup` access. + std::string GetTopicUtPath(const std::string& name) const { + return Setup->GetTopicPath(name); + } NTable::TTableClient& GetTableClient(); void CheckTabletKeys(const std::string& topicName); @@ -271,6 +276,11 @@ class TFixture : public NUnitTest::TBaseFixture { virtual bool GetEnableHtapTx() const; virtual bool GetAllowOlapDataQuery() const; + /// Called from SetUp after TTopicSdkTestSetup::MakeServerSettings() and HTAP-related setters. + virtual void AugmentServerSettings(NKikimr::Tests::TServerSettings& settings); + /// Called from CreateTopicWriteSession before CreateWriteSession (Topic API write_session_meta). + virtual void AugmentWriteSessionSettings(NTopic::TWriteSessionSettings& settings); + size_t GetPQCacheRenameKeysCount(); enum class EClientType { diff --git a/src/version.h b/src/version.h index a1b6dea7b18..adde4905ae1 100644 --- a/src/version.h +++ b/src/version.h @@ -2,7 +2,7 @@ namespace NYdb { -inline const char* YDB_SDK_VERSION = "3.16.0"; +inline const char* YDB_SDK_VERSION = "3.17.0"; inline const char* YDB_CERTIFICATE_FILE_KEY = "ydb_root_ca_v3.pem"; } // namespace NYdb diff --git a/tests/unit/client/CMakeLists.txt b/tests/unit/client/CMakeLists.txt index 03b0a17c386..2c6356aa6ba 100644 --- a/tests/unit/client/CMakeLists.txt +++ b/tests/unit/client/CMakeLists.txt @@ -9,11 +9,10 @@ add_ydb_test(NAME client-connection_string_ut GTEST unit ) -add_ydb_test(NAME client-ydb_coordination_ut +add_ydb_test(NAME client-coordination_ut SOURCES coordination/coordination_ut.cpp LINK_LIBRARIES - yutil YDB-CPP-SDK::Coordination api-grpc LABELS @@ -24,14 +23,13 @@ add_ydb_test(NAME client-extensions-discovery_mutator_ut SOURCES discovery_mutator/discovery_mutator_ut.cpp LINK_LIBRARIES - yutil YDB-CPP-SDK::DiscoveryMutator YDB-CPP-SDK::Table LABELS unit ) -add_ydb_test(NAME client-ydb_driver_ut +add_ydb_test(NAME client-driver_ut SOURCES driver/driver_ut.cpp LINK_LIBRARIES @@ -42,7 +40,7 @@ add_ydb_test(NAME client-ydb_driver_ut unit ) -add_ydb_test(NAME client-impl-ydb_endpoints_ut +add_ydb_test(NAME client-endpoints_ut INCLUDE_DIRS ${YDB_SDK_SOURCE_DIR}/src/client/impl/ydb_endpoints SOURCES @@ -54,12 +52,24 @@ add_ydb_test(NAME client-impl-ydb_endpoints_ut unit ) +add_ydb_test(NAME client-iam_ut GTEST + SOURCES + iam/iam_ut.cpp + LINK_LIBRARIES + api-grpc + cpp-testing-common + http-server + json + YDB-CPP-SDK::Iam + LABELS + unit +) + add_ydb_test(NAME client-oauth2_ut SOURCES oauth2_token_exchange/credentials_ut.cpp oauth2_token_exchange/jwt_token_source_ut.cpp LINK_LIBRARIES - yutil http-server json string_utils-base64 @@ -69,32 +79,40 @@ add_ydb_test(NAME client-oauth2_ut unit ) -add_ydb_test(NAME client-ydb_params_ut GTEST +add_ydb_test(NAME client-params_ut GTEST SOURCES params/params_ut.cpp LINK_LIBRARIES - yutil YDB-CPP-SDK::Params LABELS unit ) -add_ydb_test(NAME client-ydb_result_ut +add_ydb_test(NAME client-table_ut GTEST + SOURCES + table/table_ut.cpp + LINK_LIBRARIES + api-grpc + cpp-testing-common + YDB-CPP-SDK::Table + LABELS + unit +) + +add_ydb_test(NAME client-result_ut SOURCES result/result_ut.cpp LINK_LIBRARIES - yutil YDB-CPP-SDK::Result YDB-CPP-SDK::Params LABELS unit ) -add_ydb_test(NAME client-ydb_value_ut GTEST +add_ydb_test(NAME client-value_ut GTEST SOURCES value/value_ut.cpp LINK_LIBRARIES - yutil YDB-CPP-SDK::Value YDB-CPP-SDK::Params LABELS diff --git a/tests/unit/client/iam/iam_ut.cpp b/tests/unit/client/iam/iam_ut.cpp new file mode 100644 index 00000000000..630d87618fd --- /dev/null +++ b/tests/unit/client/iam/iam_ut.cpp @@ -0,0 +1,237 @@ +#include + +#include +#include +#include + +#include + +#include + +#include +#include +#include +#include +#include + +using namespace NYdb; + +class TMetadataServer : public THttpServer::ICallBack { +public: + class TRequest : public TRequestReplier { + public: + explicit TRequest(TMetadataServer* server) + : Server(server) + {} + + bool DoReply(const TReplyParams& params) override { + { + std::lock_guard lock(Server->Lock); + ++Server->RequestCount; + } + + THttpResponse resp(Server->StatusCode); + resp.SetContent(TString{Server->Response}); + resp.OutTo(params.Output); + return true; + } + + TMetadataServer* Server = nullptr; + }; + + TMetadataServer() + : PortHolder(NTesting::GetFreePort()) + , Port(static_cast(PortHolder)) + , HttpOptions(Port) + , HttpServer(this, HttpOptions) + { + HttpServer.Start(); + } + + ~TMetadataServer() { + HttpServer.Stop(); + } + + TClientRequest* CreateClient() override { + return new TRequest(this); + } + + void SetResponse(HttpCodes code, const std::string& response) { + StatusCode = code; + Response = response; + } + + int GetRequestCount() const { + std::lock_guard lock(Lock); + return RequestCount; + } + + void ResetRequestCount() { + std::lock_guard lock(Lock); + RequestCount = 0; + } + + NTesting::TPortHolder PortHolder; + uint16_t Port; + THttpServer::TOptions HttpOptions; + THttpServer HttpServer; + HttpCodes StatusCode = HTTP_OK; + std::string Response; + mutable std::mutex Lock; + int RequestCount = 0; +}; + +static std::string MakeTokenResponse(const std::string& token, int expiresIn) { + TStringStream ss; + NJson::TJsonWriter w(&ss, false); + w.OpenMap(); + w.WriteKey("access_token"); + w.Write(token); + w.WriteKey("token_type"); + w.Write("Bearer"); + w.WriteKey("expires_in"); + w.Write(expiresIn); + w.CloseMap(); + w.Flush(); + return ss.Str(); +} + +static std::string MakeTokenResponseWithExpiry(const std::string& token, const TInstant& expiry) { + TStringStream ss; + NJson::TJsonWriter w(&ss, false); + w.OpenMap(); + w.WriteKey("access_token"); + w.Write(token); + w.WriteKey("token_type"); + w.Write("Bearer"); + w.WriteKey("expiry"); + w.Write(expiry.FormatGmTime("%Y-%m-%dT%H:%M:%S.000000000Z")); + w.CloseMap(); + w.Flush(); + return ss.Str(); +} + +static std::string MakeTokenResponseNoExpiry(const std::string& token) { + TStringStream ss; + NJson::TJsonWriter w(&ss, false); + w.OpenMap(); + w.WriteKey("access_token"); + w.Write(token); + w.WriteKey("token_type"); + w.Write("Bearer"); + w.CloseMap(); + w.Flush(); + return ss.Str(); +} + +TEST(IamCredentialsProvider, BasicTokenFetch) { + TMetadataServer server; + server.SetResponse(HTTP_OK, MakeTokenResponse("test-token-123", 3600)); + + TIamHost params; + params.Host = "localhost"; + params.Port = server.Port; + params.RefreshPeriod = TDuration::Hours(1); + + auto factory = CreateIamCredentialsProviderFactory(params); + auto provider = factory->CreateProvider(); + + EXPECT_EQ(provider->GetAuthInfo(), "test-token-123"); +} + +TEST(IamCredentialsProvider, ExpiryFieldSupport) { + TMetadataServer server; + auto expiry = TInstant::Now() + TDuration::Hours(12); + server.SetResponse(HTTP_OK, MakeTokenResponseWithExpiry("expiry-token", expiry)); + + TIamHost params; + params.Host = "localhost"; + params.Port = server.Port; + params.RefreshPeriod = TDuration::Hours(1); + + auto factory = CreateIamCredentialsProviderFactory(params); + auto provider = factory->CreateProvider(); + + EXPECT_EQ(provider->GetAuthInfo(), "expiry-token"); + + // Second call should not trigger refresh (token is still valid) + int countBefore = server.GetRequestCount(); + provider->GetAuthInfo(); + EXPECT_EQ(server.GetRequestCount(), countBefore); +} + +TEST(IamCredentialsProvider, NoExpiryFieldFallback) { + TMetadataServer server; + server.SetResponse(HTTP_OK, MakeTokenResponseNoExpiry("no-expiry-token")); + + TIamHost params; + params.Host = "localhost"; + params.Port = server.Port; + params.RefreshPeriod = TDuration::Hours(1); + + auto factory = CreateIamCredentialsProviderFactory(params); + auto provider = factory->CreateProvider(); + + // Token should be saved even without expires_in/expiry + EXPECT_EQ(provider->GetAuthInfo(), "no-expiry-token"); + + // Should not immediately refresh (fallback interval should be > 0) + int countBefore = server.GetRequestCount(); + provider->GetAuthInfo(); + EXPECT_EQ(server.GetRequestCount(), countBefore); +} + +TEST(IamCredentialsProvider, ServerError) { + TMetadataServer server; + server.SetResponse(HTTP_INTERNAL_SERVER_ERROR, ""); + + TIamHost params; + params.Host = "localhost"; + params.Port = server.Port; + + auto factory = CreateIamCredentialsProviderFactory(params); + auto provider = factory->CreateProvider(); + + // Constructor GetTicket() fails, token should be empty + EXPECT_EQ(provider->GetAuthInfo(), ""); +} + +TEST(IamCredentialsProvider, ConcurrentAccess) { + TMetadataServer server; + server.SetResponse(HTTP_OK, MakeTokenResponse("concurrent-token", 3600)); + + TIamHost params; + params.Host = "localhost"; + params.Port = server.Port; + params.RefreshPeriod = TDuration::MilliSeconds(1); // Force frequent refreshes + + auto factory = CreateIamCredentialsProviderFactory(params); + auto provider = factory->CreateProvider(); + + constexpr int NUM_THREADS = 8; + constexpr int ITERATIONS = 100; + + std::vector> threads; + std::atomic errors{0}; + + for (int i = 0; i < NUM_THREADS; ++i) { + threads.push_back(std::make_unique([&]() { + for (int j = 0; j < ITERATIONS; ++j) { + try { + auto token = provider->GetAuthInfo(); + if (token != "concurrent-token") { + errors.fetch_add(1); + } + } catch (...) { + errors.fetch_add(1); + } + } + })); + } + + for (auto& t : threads) { + t->join(); + } + + EXPECT_EQ(errors.load(), 0); +} diff --git a/tests/unit/client/table/table_ut.cpp b/tests/unit/client/table/table_ut.cpp new file mode 100644 index 00000000000..1f2bd3198b1 --- /dev/null +++ b/tests/unit/client/table/table_ut.cpp @@ -0,0 +1,459 @@ +#include +#include + +#include + +#include + +#include + +#include +#include +#include + +#include + +using namespace NYdb; + +namespace { + /** + * The mock for the table service in the YDB public API. + */ + class TMockTableService : public Ydb::Table::V1::TableService::Service { + public: + virtual grpc::Status CreateSession( + grpc::ServerContext* /* context */, + const Ydb::Table::CreateSessionRequest* request, + Ydb::Table::CreateSessionResponse* response + ) override { + std::cerr << "CreateSession():" << std::endl + << request->DebugString() + << std::endl; + + // Complete the request successfully with a fake session ID + // + // NOTE: This method needs to be mocked to allow the test code to create + // a new API session. The test code must call CreateSession() + // before calling any other methods, like CreateTable() or AlterTable(). + // And CreateSession() must see a successful response from the server + // in order to create a valid session. + Ydb::Table::CreateSessionResult result; + result.set_session_id("fake-session-id"); + + auto op = response->mutable_operation(); + op->set_ready(true); + op->set_status(Ydb::StatusIds::SUCCESS); + op->mutable_result()->PackFrom(result); + + return grpc::Status::OK; + } + + virtual grpc::Status CreateTable( + grpc::ServerContext* /* context */, + const Ydb::Table::CreateTableRequest* request, + Ydb::Table::CreateTableResponse* response + ) override { + std::cerr << "CreateTable():" << std::endl + << request->DebugString() + << std::endl; + + // + + auto op = response->mutable_operation(); + + op->set_ready(true); + op->set_status(Ydb::StatusIds::SUCCESS); + + // Save the CreateTable request to allow the test to verify it + LastCreateTableRequest = Ydb::Table::CreateTableRequest(*request); + return grpc::Status::OK; + } + + virtual grpc::Status AlterTable( + grpc::ServerContext* /* context */, + const Ydb::Table::AlterTableRequest* request, + Ydb::Table::AlterTableResponse* response + ) override { + std::cerr << "AlterTable():" << std::endl + << request->DebugString() + << std::endl; + + // + + auto op = response->mutable_operation(); + + op->set_ready(true); + op->set_status(Ydb::StatusIds::SUCCESS); + + // Save the AlterTable request to allow the test to verify it + LastAlterTableRequest = Ydb::Table::AlterTableRequest(*request); + return grpc::Status::OK; + } + + std::optional LastCreateTableRequest; + std::optional LastAlterTableRequest; + }; + + /** + * Start the local GRPC server for the given API service. + * + * @tparam TService The type of the API service + * + * @param[in] address The address/port to listen to + * @param[in] service The API service to start + * + * @return The corresponding GRPC server + */ + template + std::unique_ptr StartGrpcServer(const std::string& address, TService& service) { + return grpc::ServerBuilder() + .AddListeningPort(TString{address}, grpc::InsecureServerCredentials()) + .RegisterService(&service) + .BuildAndStart(); + } + + /** + * Configure and start a local GRPC server with the mocked table API service. + * + * @param[in] tableService The table service to start + * @param[out] grpcServer Receives the corresponding GRPC server + * @param[out] driver Receives the connection pool to the server + * @param[out] tableClient Receives the API client for the table API service + * @param[out] tableSession Receives the client session for the table API service + */ + void StartServerWithTableService( + TMockTableService& tableService, + std::unique_ptr& grpcServer, + std::unique_ptr& driver, + std::unique_ptr& tableClient, + std::unique_ptr& tableSession + ) { + // Start the local GRPC service for the given table API service + NTesting::InitPortManagerFromEnv(); + const auto tablePortHolder = NTesting::GetFreePort(); + const ui16 tablePort = static_cast(tablePortHolder); + + grpcServer = StartGrpcServer( + TStringBuilder() << "127.0.0.1:" << tablePort, + tableService + ); + + // Start the connection pool and create the API client for the table API service + driver = std::make_unique( + TDriverConfig() + .SetEndpoint(TStringBuilder() << "localhost:" << tablePort) + .SetDiscoveryMode(EDiscoveryMode::Off) + .SetDatabase("/Root/My/DB") + ); + + // Create a new session + tableClient = std::make_unique(*driver); + + auto sessionFuture = tableClient->CreateSession(); + ASSERT_TRUE(sessionFuture.Wait(TDuration::Seconds(10))); + + auto sessionResult = sessionFuture.ExtractValueSync(); + ASSERT_TRUE(sessionResult.IsSuccess()); + + tableSession = std::make_unique(sessionResult.GetSession()); + } + +} // namespace + +/** + * Verify that the SDK creates the CREATE TABLE request correctly, + * when no metrics configuration is provided. + */ +TEST(TableTest, CreateTableNoMetricsSettings) { + // Start the mocked table API service + TMockTableService tableService; + std::unique_ptr grpcServer; + std::unique_ptr driver; + std::unique_ptr tableClient; + std::unique_ptr tableSession; + + StartServerWithTableService( + tableService, + grpcServer, + driver, + tableClient, + tableSession + ); + + // Call the CreateTable() API without any metrics configuration + auto requestFuture = tableSession->CreateTable( + "/Root/My/DB/test_table", + NTable::TTableBuilder() + .Build() + ); + + ASSERT_TRUE(requestFuture.Wait(TDuration::Seconds(10))); + + auto result = requestFuture.ExtractValueSync(); + ASSERT_TRUE(result.IsSuccess()); + + // Make sure the metrics configuration was not set in the CreateTable request + ASSERT_TRUE(tableService.LastCreateTableRequest.has_value()); + ASSERT_TRUE(!tableService.LastCreateTableRequest->has_metrics_settings()); +} + +/** + * Verify that the SDK creates the CREATE TABLE request correctly, + * when the metrics configuration is provided. + */ +TEST(TableTest, CreateTableWithMetricsSettings) { + // Start the mocked table API service + TMockTableService tableService; + std::unique_ptr grpcServer; + std::unique_ptr driver; + std::unique_ptr tableClient; + std::unique_ptr tableSession; + + StartServerWithTableService( + tableService, + grpcServer, + driver, + tableClient, + tableSession + ); + + // Call the CreateTable() API with the metrics configuration configured + // to every allowed metrics level + const auto verifyMetricsLevelFunc = [&]( + const TString& metricsLevelName, + NTable::TMetricsSettings::EMetricsLevel metricsLevel, + Ydb::Table::MetricsSettings::MetricsLevel protoMetricsLevel + ) { + SCOPED_TRACE(testing::Message() << "Metrics level: " << metricsLevelName); + + auto requestFuture = tableSession->CreateTable( + "/Root/My/DB/test_table", + NTable::TTableBuilder() + .SetMetricsSettings(metricsLevel) + .Build() + ); + + ASSERT_TRUE(requestFuture.Wait(TDuration::Seconds(10))); + + auto result = requestFuture.ExtractValueSync(); + ASSERT_TRUE(result.IsSuccess()); + + // Make sure the metrics configuration is set in the CreateTable request + ASSERT_TRUE(tableService.LastCreateTableRequest.has_value()); + ASSERT_TRUE(tableService.LastCreateTableRequest->has_metrics_settings()); + + ASSERT_EQ( + tableService.LastCreateTableRequest->metrics_settings().metrics_level(), + protoMetricsLevel + ); + }; + + verifyMetricsLevelFunc( + "UNSPECIFIED", + NTable::TMetricsSettings::EMetricsLevel::Unspecified, + Ydb::Table::MetricsSettings::METRICS_LEVEL_UNSPECIFIED + ); + + verifyMetricsLevelFunc( + "DISABLED", + NTable::TMetricsSettings::EMetricsLevel::Disabled, + Ydb::Table::MetricsSettings::METRICS_LEVEL_DISABLED + ); + + verifyMetricsLevelFunc( + "DATABASE", + NTable::TMetricsSettings::EMetricsLevel::Database, + Ydb::Table::MetricsSettings::METRICS_LEVEL_DATABASE + ); + + verifyMetricsLevelFunc( + "TABLE", + NTable::TMetricsSettings::EMetricsLevel::Table, + Ydb::Table::MetricsSettings::METRICS_LEVEL_TABLE + ); + + verifyMetricsLevelFunc( + "PARTITION", + NTable::TMetricsSettings::EMetricsLevel::Partition, + Ydb::Table::MetricsSettings::METRICS_LEVEL_PARTITION + ); +} + +/** + * Verify that the SDK creates the ALTER TABLE request correctly, + * when no metrics configuration is provided. + */ +TEST(TableTest, AlterTableNoMetricsSettings) { + // Start the mocked table API service + TMockTableService tableService; + std::unique_ptr grpcServer; + std::unique_ptr driver; + std::unique_ptr tableClient; + std::unique_ptr tableSession; + + StartServerWithTableService( + tableService, + grpcServer, + driver, + tableClient, + tableSession + ); + + // Call the AlterTable() API without any metrics configuration + auto requestFuture = tableSession->AlterTable( + "/Root/My/DB/test_table", + NTable::TAlterTableSettings() + ); + + ASSERT_TRUE(requestFuture.Wait(TDuration::Seconds(10))); + + auto result = requestFuture.ExtractValueSync(); + ASSERT_TRUE(result.IsSuccess()); + + // Make sure the metrics configuration was not set in the AlterTable request + ASSERT_TRUE(tableService.LastAlterTableRequest.has_value()); + + ASSERT_EQ( + tableService.LastAlterTableRequest->metrics_settings_action_case(), + Ydb::Table::AlterTableRequest::METRICS_SETTINGS_ACTION_NOT_SET + ); + + ASSERT_TRUE(!tableService.LastAlterTableRequest->has_set_metrics_settings()); + ASSERT_TRUE(!tableService.LastAlterTableRequest->has_drop_metrics_settings()); +} + +/** + * Verify that the SDK creates the ALTER TABLE request correctly, + * when the metrics configuration is explicitly dropped. + */ +TEST(TableTest, AlterTableDroppedMetricsSettings) { + // Start the mocked table API service + TMockTableService tableService; + std::unique_ptr grpcServer; + std::unique_ptr driver; + std::unique_ptr tableClient; + std::unique_ptr tableSession; + + StartServerWithTableService( + tableService, + grpcServer, + driver, + tableClient, + tableSession + ); + + // Call the AlterTable() API with the metrics configuration dropped + auto requestFuture = tableSession->AlterTable( + "/Root/My/DB/test_table", + NTable::TAlterTableSettings() + .BeginAlterMetricsSettings() + .Drop() + .EndAlterMetricsSettings() + ); + + ASSERT_TRUE(requestFuture.Wait(TDuration::Seconds(10))); + + auto result = requestFuture.ExtractValueSync(); + ASSERT_TRUE(result.IsSuccess()); + + // Make sure the metrics configuration was not set in the AlterTable request + ASSERT_TRUE(tableService.LastAlterTableRequest.has_value()); + + ASSERT_EQ( + tableService.LastAlterTableRequest->metrics_settings_action_case(), + Ydb::Table::AlterTableRequest::kDropMetricsSettings + ); + + ASSERT_TRUE(!tableService.LastAlterTableRequest->has_set_metrics_settings()); + ASSERT_TRUE(tableService.LastAlterTableRequest->has_drop_metrics_settings()); +} + +/** + * Verify that the SDK creates the ALTER TABLE request correctly, + * when the metrics configuration is explicitly set. + */ +TEST(TableTest, AlterTableSetMetricsSettings) { + // Start the mocked table API service + TMockTableService tableService; + std::unique_ptr grpcServer; + std::unique_ptr driver; + std::unique_ptr tableClient; + std::unique_ptr tableSession; + + StartServerWithTableService( + tableService, + grpcServer, + driver, + tableClient, + tableSession + ); + + // Call the AlterTable() API with the metrics configuration set explicitly + // to every allowed metrics level + const auto verifyMetricsLevelFunc = [&]( + const TString& metricsLevelName, + NTable::TMetricsSettings::EMetricsLevel metricsLevel, + Ydb::Table::MetricsSettings::MetricsLevel protoMetricsLevel + ) { + SCOPED_TRACE(testing::Message() << "Metrics level: " << metricsLevelName); + + auto requestFuture = tableSession->AlterTable( + "/Root/My/DB/test_table", + NTable::TAlterTableSettings() + .BeginAlterMetricsSettings() + .Set(metricsLevel) + .EndAlterMetricsSettings() + ); + + ASSERT_TRUE(requestFuture.Wait(TDuration::Seconds(10))); + + auto result = requestFuture.ExtractValueSync(); + ASSERT_TRUE(result.IsSuccess()); + + // Make sure the metrics configuration was set in the AlterTable request + ASSERT_TRUE(tableService.LastAlterTableRequest.has_value()); + + ASSERT_EQ( + tableService.LastAlterTableRequest->metrics_settings_action_case(), + Ydb::Table::AlterTableRequest::kSetMetricsSettings + ); + + ASSERT_EQ( + tableService.LastAlterTableRequest->set_metrics_settings().metrics_level(), + protoMetricsLevel + ); + + ASSERT_TRUE(tableService.LastAlterTableRequest->has_set_metrics_settings()); + ASSERT_TRUE(!tableService.LastAlterTableRequest->has_drop_metrics_settings()); + }; + + verifyMetricsLevelFunc( + "UNSPECIFIED", + NTable::TMetricsSettings::EMetricsLevel::Unspecified, + Ydb::Table::MetricsSettings::METRICS_LEVEL_UNSPECIFIED + ); + + verifyMetricsLevelFunc( + "DISABLED", + NTable::TMetricsSettings::EMetricsLevel::Disabled, + Ydb::Table::MetricsSettings::METRICS_LEVEL_DISABLED + ); + + verifyMetricsLevelFunc( + "DATABASE", + NTable::TMetricsSettings::EMetricsLevel::Database, + Ydb::Table::MetricsSettings::METRICS_LEVEL_DATABASE + ); + + verifyMetricsLevelFunc( + "TABLE", + NTable::TMetricsSettings::EMetricsLevel::Table, + Ydb::Table::MetricsSettings::METRICS_LEVEL_TABLE + ); + + verifyMetricsLevelFunc( + "PARTITION", + NTable::TMetricsSettings::EMetricsLevel::Partition, + Ydb::Table::MetricsSettings::METRICS_LEVEL_PARTITION + ); +}