diff --git a/.gitignore b/.gitignore index 8ae0b46..d96124b 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,4 @@ *.o *.a mkmf.log +/vendor/ diff --git a/lib/fluent/plugin/in_prometheus.rb b/lib/fluent/plugin/in_prometheus.rb index ab201de..862e298 100644 --- a/lib/fluent/plugin/in_prometheus.rb +++ b/lib/fluent/plugin/in_prometheus.rb @@ -67,12 +67,25 @@ def multi_workers_ready? def start super + # Normalize bind address: strip brackets if present (for consistency) + # Brackets are only for URI formatting, not for socket binding + @bind = @bind[1..-2] if @bind.start_with?('[') && @bind.end_with?(']') + scheme = @secure ? 'https' : 'http' - log.debug "listening prometheus http server on #{scheme}:://#{@bind}:#{@port}/#{@metrics_path} for worker#{fluentd_worker_id}" + # Format bind address properly for URLs (add brackets for IPv6) + bind_display = @bind.include?(':') ? "[#{@bind}]" : @bind + log.debug "listening prometheus http server on #{scheme}://#{bind_display}:#{@port}/#{@metrics_path} for worker#{fluentd_worker_id}" proto = @secure ? :tls : :tcp - if @ssl && @ssl['enable'] && @ssl['extra_conf'] + # IPv6 + TLS combination is not currently supported + if @bind.include?(':') && @secure + raise Fluent::ConfigError, 'IPv6 with is not currently supported. Use bind 0.0.0.0 with TLS, or bind ::1 without TLS.' + end + + # Use webrick for IPv6 or SSL extra_conf + # The http_server helper has issues with IPv6 URI construction + if (@ssl && @ssl['enable'] && @ssl['extra_conf']) || @bind.include?(':') start_webrick return end @@ -110,6 +123,8 @@ def start ssl_config end + # Use raw bind address for socket binding (no brackets) + # Brackets are only for URL/URI formatting, not for bind() http_server_create_http_server(:in_prometheus_server, addr: @bind, port: @port, logger: log, proto: proto, tls_opts: tls_opt) do |server| server.get(@metrics_path) { |_req| all_metrics } server.get(@aggregated_metrics_path) { |_req| all_workers_metrics } @@ -127,6 +142,7 @@ def shutdown private # For compatiblity because http helper can't support extra_conf option + # Also used for IPv6 addresses since http helper has IPv6 URI issues def start_webrick require 'webrick/https' require 'webrick' @@ -138,28 +154,32 @@ def start_webrick Logger: WEBrick::Log.new(STDERR, WEBrick::Log::FATAL), AccessLog: [], } - if (@ssl['certificate_path'] && @ssl['private_key_path'].nil?) || (@ssl['certificate_path'].nil? && @ssl['private_key_path']) - raise RuntimeError.new("certificate_path and private_key_path most both be defined") - end + + # Configure SSL if enabled + if @ssl && @ssl['enable'] + if (@ssl['certificate_path'] && @ssl['private_key_path'].nil?) || (@ssl['certificate_path'].nil? && @ssl['private_key_path']) + raise RuntimeError.new("certificate_path and private_key_path most both be defined") + end - ssl_config = { - SSLEnable: true, - SSLCertName: [['CN', 'nobody'], ['DC', 'example']] - } + ssl_config = { + SSLEnable: true, + SSLCertName: [['CN', 'nobody'], ['DC', 'example']] + } - if @ssl['certificate_path'] - cert = OpenSSL::X509::Certificate.new(File.read(@ssl['certificate_path'])) - ssl_config[:SSLCertificate] = cert - end + if @ssl['certificate_path'] + cert = OpenSSL::X509::Certificate.new(File.read(@ssl['certificate_path'])) + ssl_config[:SSLCertificate] = cert + end - if @ssl['private_key_path'] - key = OpenSSL::PKey.read(@ssl['private_key_path']) - ssl_config[:SSLPrivateKey] = key - end + if @ssl['private_key_path'] + key = OpenSSL::PKey.read(@ssl['private_key_path']) + ssl_config[:SSLPrivateKey] = key + end - ssl_config[:SSLCACertificateFile] = @ssl['ca_path'] if @ssl['ca_path'] - ssl_config = ssl_config.merge(@ssl['extra_conf']) if @ssl['extra_conf'] - config = ssl_config.merge(config) + ssl_config[:SSLCACertificateFile] = @ssl['ca_path'] if @ssl['ca_path'] + ssl_config = ssl_config.merge(@ssl['extra_conf']) if @ssl['extra_conf'] + config = ssl_config.merge(config) + end @log.on_debug do @log.debug("WEBrick conf: #{config}") @@ -207,7 +227,16 @@ def all_workers_metrics end def send_request_to_each_worker - bind = (@bind == '0.0.0.0') ? '127.0.0.1' : @bind + # Convert bind address to localhost for inter-worker communication + # 0.0.0.0 and :: are not connectable, use localhost instead + bind = case @bind + when '0.0.0.0' + '127.0.0.1' + when '::' + '::1' # IPv6 localhost + else + @bind + end [*(@base_port...(@base_port + @num_workers))].each do |worker_port| do_request(host: bind, port: worker_port, secure: @secure) do |http| yield(http.get(@metrics_path)) diff --git a/lib/fluent/plugin/in_prometheus/async_wrapper.rb b/lib/fluent/plugin/in_prometheus/async_wrapper.rb index 00860c6..9434b5c 100644 --- a/lib/fluent/plugin/in_prometheus/async_wrapper.rb +++ b/lib/fluent/plugin/in_prometheus/async_wrapper.rb @@ -4,13 +4,20 @@ module Fluent::Plugin class PrometheusInput module AsyncWrapper def do_request(host:, port:, secure:) + # Format host for URI - bracket IPv6 addresses if not already bracketed + uri_host = if host.include?(':') && !host.start_with?('[') + "[#{host}]" + else + host + end + endpoint = if secure context = OpenSSL::SSL::SSLContext.new context.verify_mode = OpenSSL::SSL::VERIFY_NONE - Async::HTTP::Endpoint.parse("https://#{host}:#{port}", ssl_context: context) + Async::HTTP::Endpoint.parse("https://#{uri_host}:#{port}", ssl_context: context) else - Async::HTTP::Endpoint.parse("http://#{host}:#{port}") + Async::HTTP::Endpoint.parse("http://#{uri_host}:#{port}") end Async::HTTP::Client.open(endpoint) do |client| diff --git a/spec/fluent/plugin/in_prometheus_spec.rb b/spec/fluent/plugin/in_prometheus_spec.rb index 08caae0..8bff55f 100644 --- a/spec/fluent/plugin/in_prometheus_spec.rb +++ b/spec/fluent/plugin/in_prometheus_spec.rb @@ -89,6 +89,22 @@ end end + context 'IPv6 with TLS' do + let(:config) do + %[ + @type prometheus + bind ::1 + + insecure true + + ] + end + + it 'raises ConfigError for unsupported combination' do + expect { driver.run(timeout: 1) }.to raise_error(Fluent::ConfigError, /IPv6 with is not currently supported/) + end + end + context 'old parameters are given' do context 'when extra_conf is used' do let(:config) do @@ -278,4 +294,41 @@ end end end + + describe '#run with IPv6' do + shared_examples 'IPv6 server binding' do |bind_addr, connect_addr, description| + let(:config) do + # Quote the bind address if it contains brackets + bind_value = bind_addr.include?('[') ? "\"#{bind_addr}\"" : bind_addr + %[ + @type prometheus + bind #{bind_value} + ] + end + + it description do + skip 'IPv6 not available on this system' unless ipv6_enabled? + + driver.run(timeout: 3) do + Net::HTTP.start(connect_addr, port) do |http| + req = Net::HTTP::Get.new('/metrics') + res = http.request(req) + expect(res.code).to eq('200') + end + end + end + end + + context 'IPv6 loopback address ::1' do + include_examples 'IPv6 server binding', '::1', '::1', 'binds and serves on IPv6 loopback address' + end + + context 'IPv6 any address ::' do + include_examples 'IPv6 server binding', '::', '::1', 'binds on :: and connects via ::1' + end + + context 'pre-bracketed IPv6 address [::1]' do + include_examples 'IPv6 server binding', '[::1]', '::1', 'handles pre-bracketed address correctly' + end + end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 61b4dce..d2a12f7 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -8,3 +8,21 @@ Fluent::Test.setup include Fluent::Test::Helpers + +def ipv6_enabled? + require 'socket' + + begin + # Try to actually bind to an IPv6 address to verify it works + sock = Socket.new(Socket::AF_INET6, Socket::SOCK_STREAM, 0) + sock.bind(Socket.sockaddr_in(0, '::1')) + sock.close + + # Also test that we can resolve IPv6 addresses + # This is needed because some systems can bind but can't connect + Socket.getaddrinfo('::1', nil, Socket::AF_INET6) + true + rescue Errno::EADDRNOTAVAIL, Errno::EAFNOSUPPORT, SocketError + false + end +end