Skip to content

No support for multiple fluentd workers  #68

@DanielShor

Description

@DanielShor

Hi,

I need to use multiple fluentd workers, and when trying to do that looks like mdsd plugin doesn't support multiple workers.

fluentd conf:

    <system>
      workers 4
    </system>
    
    <source>
      @type forward
      port 24224
      bind 0.0.0.0
    </source>
    <filter kubernetes.**>
      @type prometheus
      <metric>
        name fluentd_input_status_num_records_total
        type counter
        desc The total number of incoming records
        <labels>
          tag ${tag}
          hostname ${hostname}
        </labels>
      </metric>
    </filter>
    
    <filter kubernetes.**>
      @type record_modifier
      <record>
          # NOTE: This Ruby code written in ${} has to be written in one line!   
        _temp_ ${ if record.has_key?("log"); begin; parsed = JSON.parse(record["log"]); parsed.each do |key, value|; record[key] = value; end; rescue; record["message"] = record["log"]; end; end; nil }
        _ine_ ${ if record.has_key?("LOG"); begin; record["LOG"].each do |key, value|; record[key] = value; end; rescue; record["message"] = record["LOG"]; end; end; nil }
        _kube_ ${ if record.has_key?("kubernetes"); begin; kube = record["kubernetes"]; kube.each do |key, value|; record[key] = value; end; rescue; end; end; nil}
        _build_id_ ${ begin; record["build_id"] = record["container_image"].split(':')[1].split('-')[0] if record.has_key?("container_image"); rescue; end; nil} 
        _service_ ${ if record.has_key?("labels"); record["service_name"] = record["labels"]["app_kubernetes_io/name"] if record["labels"].has_key?("app_kubernetes_io/name") end; nil;}
        _node_ ${ record["node"] = record["host"] if record.has_key?('host'); nil}
      </record>
      remove_keys _temp_, _kube_, _service_, _build_id_, _node_, host, log, namespace_id, pod_id, container_image, container_image_id, namespace_labels, master_url, docker, kubernetes, _ine_
    </filter>
    
    <match kubernetes.**>
      @type rewrite_tag_filter
      <rule>
        key     SOURCE
        pattern ^(OmsTelemetryLog)$
        tag     custom.$1
      </rule>
      <rule>
        key     SOURCE
        pattern ^(OmsTelemetryLog)$
        tag     custom.${tag}
        invert  true
      </rule>
    </match>
    
    <filter custom.**>
      @type record_modifier
      remove_keys  DATA, TAG, SOURCE, stream, kubernetes, source1, LOG
    </filter>
    
    <match custom.**>
      @type copy
      <store>
        @type mdsd
        @log_level info
        # Full path to mdsd dynamic json socket file
        djsonsocket /var/run/mdsd/default_djson.socket
        # Max time in milliseconds to wait for mdsd acknowledge response. If 0, no wait.
        acktimeoutms 5000
        # An array of regex patterns for mdsd source name unification purpose.
        # The passed will be matched against each regex, and if there's a match, the matched substring will be used as the resulting mdsd source name.
        mdsd_tag_regex_patterns ["kubernetes", "OmsTelemetryLog"]
        # Convert hash value to valid json.
        convert_hash_to_json true
        <buffer>
          @type file
          path /var/log/td-agent/buffer/kubernetes_out_mdsd.buffer
          chunk_limit_size 8m
          queued_chunks_limit_size 2048
          flush_interval 15s
          flush_thread_count 1
          flush_at_shutdown true
          retry_max_times 10
          retry_timeout 10m
        </buffer>
      </store>
      <store>
        @type prometheus
        <metric>
          name fluentd_output_status_num_records_total
          type counter
          desc The total number of outgoing records
          <labels>
            tag ${tag}
            hostname ${hostname}
          </labels>
        </metric>
      </store>
    </match>

When starting fluentd I get this (before it's crashed):

2020-07-10 07:45:18 +0000 [info]: starting fluentd-1.9.2 pid=7 ruby="2.4.9"
2020-07-10 07:45:18 +0000 [info]: spawn command to main:  cmdline=["/opt/td-agent/embedded/bin/ruby", "-Eascii-8bit:ascii-8bit", "/opt/td-agent/embedded/bin/fluentd", "-c", "/etc/fluentd/fluentd.conf", "--log", "/dev/stdout", "--under-supervisor"]
2020-07-10 07:45:18 +0000 [info]: adding filter pattern="kubernetes.**" type="prometheus"
2020-07-10 07:45:19 +0000 [info]: adding filter pattern="kubernetes.**" type="record_modifier"
2020-07-10 07:45:19 +0000 [info]: #1 adding rewrite_tag_filter rule: SOURCE [#<Fluent::PluginHelper::RecordAccessor::Accessor:0x00000000027aa9e8 @keys="SOURCE">, /^(OmsTelemetryLog)$/, "", "custom.$1"]
2020-07-10 07:45:19 +0000 [info]: #1 adding rewrite_tag_filter rule: SOURCE [#<Fluent::PluginHelper::RecordAccessor::Accessor:0x00000000027b5e60 @keys="SOURCE">, /^(OmsTelemetryLog)$/, "!", "custom.${tag}"]
2020-07-10 07:45:19 +0000 [info]: adding match pattern="kubernetes.**" type="rewrite_tag_filter"
2020-07-10 07:45:19 +0000 [info]: #0 adding rewrite_tag_filter rule: SOURCE [#<Fluent::PluginHelper::RecordAccessor::Accessor:0x0000000001f98908 @keys="SOURCE">, /^(OmsTelemetryLog)$/, "", "custom.$1"]
2020-07-10 07:45:19 +0000 [info]: #0 adding rewrite_tag_filter rule: SOURCE [#<Fluent::PluginHelper::RecordAccessor::Accessor:0x0000000001f9e650 @keys="SOURCE">, /^(OmsTelemetryLog)$/, "!", "custom.${tag}"]
2020-07-10 07:45:19 +0000 [info]: adding filter pattern="custom.**" type="record_modifier"
2020-07-10 07:45:19 +0000 [error]: #1 config error file="/etc/fluentd/fluentd.conf" error_class=Fluent::ConfigError error="Plugin 'mdsd' does not support multi workers configuration (Fluent::OutputMdsd)"
2020-07-10 07:45:19 +0000 [error]: fluent/log.rb:362:error: config error file="/etc/fluentd/fluentd.conf" error_class=Fluent::ConfigError error="Plugin 'mdsd' does not support multi workers configuration (Fluent::OutputMdsd)"
2020-07-10 07:45:19 +0000 [info]: adding match pattern="custom.**" type="copy"
2020-07-10 07:45:19 +0000 [info]: Worker 1 finished unexpectedly with status 2
2020-07-10 07:45:19 +0000 [info]: Received graceful stop
2020-07-10 07:45:19 +0000 [info]: #3 adding rewrite_tag_filter rule: SOURCE [#<Fluent::PluginHelper::RecordAccessor::Accessor:0x0000000000fcd528 @keys="SOURCE">, /^(OmsTelemetryLog)$/, "", "custom.$1"]
2020-07-10 07:45:19 +0000 [info]: #3 adding rewrite_tag_filter rule: SOURCE [#<Fluent::PluginHelper::RecordAccessor::Accessor:0x0000000000fce5e0 @keys="SOURCE">, /^(OmsTelemetryLog)$/, "!", "custom.${tag}"]
2020-07-10 07:45:19 +0000 [error]: #0 config error file="/etc/fluentd/fluentd.conf" error_class=Fluent::ConfigError error="Plugin 'mdsd' does not support multi workers configuration (Fluent::OutputMdsd)"
2020-07-10 07:45:19 +0000 [error]: fluent/log.rb:362:error: config error file="/etc/fluentd/fluentd.conf" error_class=Fluent::ConfigError error="Plugin 'mdsd' does not support multi workers configuration (Fluent::OutputMdsd)"
2020-07-10 07:45:19 +0000 [info]: Worker 0 finished with status 2
2020-07-10 07:45:19 +0000 [info]: Received graceful stop
2020-07-10 07:45:19 +0000 [info]: Received graceful stop
2020-07-10 07:45:19 +0000 [info]: #2 adding rewrite_tag_filter rule: SOURCE [#<Fluent::PluginHelper::RecordAccessor::Accessor:0x00000000011548b0 @keys="SOURCE">, /^(OmsTelemetryLog)$/, "", "custom.$1"]
2020-07-10 07:45:19 +0000 [info]: #2 adding rewrite_tag_filter rule: SOURCE [#<Fluent::PluginHelper::RecordAccessor::Accessor:0x0000000001163108 @keys="SOURCE">, /^(OmsTelemetryLog)$/, "!", "custom.${tag}"]
2020-07-10 07:45:19 +0000 [error]: #3 config error file="/etc/fluentd/fluentd.conf" error_class=Fluent::ConfigError error="Plugin 'mdsd' does not support multi workers configuration (Fluent::OutputMdsd)"
2020-07-10 07:45:19 +0000 [error]: fluent/log.rb:362:error: config error file="/etc/fluentd/fluentd.conf" error_class=Fluent::ConfigError error="Plugin 'mdsd' does not support multi workers configuration (Fluent::OutputMdsd)"
2020-07-10 07:45:19 +0000 [info]: Received graceful stop
2020-07-10 07:45:19 +0000 [info]: Received graceful stop
2020-07-10 07:45:19 +0000 [info]: Worker 3 finished with status 2
2020-07-10 07:45:19 +0000 [info]: Received graceful stop
2020-07-10 07:45:19 +0000 [error]: #2 config error file="/etc/fluentd/fluentd.conf" error_class=Fluent::ConfigError error="Plugin 'mdsd' does not support multi workers configuration (Fluent::OutputMdsd)"
2020-07-10 07:45:19 +0000 [error]: fluent/log.rb:362:error: config error file="/etc/fluentd/fluentd.conf" error_class=Fluent::ConfigError error="Plugin 'mdsd' does not support multi workers configuration (Fluent::OutputMdsd)"
2020-07-10 07:45:19 +0000 [info]: Received graceful stop
2020-07-10 07:45:19 +0000 [info]: Received graceful stop
2020-07-10 07:45:19 +0000 [info]: Worker 2 finished with status 2
2020-07-10 07:45:19 +0000 [info]: Received graceful stop
2020-07-10 07:45:19 +0000 [info]: Received graceful stop

Is it possible to run with multiple workers using mdsd plugin? Have I missed something with my configuration? If not possible, why? Can we add this capability (would love to help with some directions :) )

Thanks in advance

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions