Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@
*.o
*.a
mkmf.log
/vendor/
71 changes: 50 additions & 21 deletions lib/fluent/plugin/in_prometheus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,25 @@ def multi_workers_ready?
def start
super

# Normalize bind address: strip brackets if present (for consistency)
# Brackets are only for URI formatting, not for socket binding
@bind = @bind[1..-2] if @bind.start_with?('[') && @bind.end_with?(']')

scheme = @secure ? 'https' : 'http'
log.debug "listening prometheus http server on #{scheme}:://#{@bind}:#{@port}/#{@metrics_path} for worker#{fluentd_worker_id}"
# Format bind address properly for URLs (add brackets for IPv6)
bind_display = @bind.include?(':') ? "[#{@bind}]" : @bind
log.debug "listening prometheus http server on #{scheme}://#{bind_display}:#{@port}/#{@metrics_path} for worker#{fluentd_worker_id}"

proto = @secure ? :tls : :tcp

if @ssl && @ssl['enable'] && @ssl['extra_conf']
# IPv6 + TLS combination is not currently supported
if @bind.include?(':') && @secure
raise Fluent::ConfigError, 'IPv6 with <transport tls> is not currently supported. Use bind 0.0.0.0 with TLS, or bind ::1 without TLS.'
end

# Use webrick for IPv6 or SSL extra_conf
# The http_server helper has issues with IPv6 URI construction
if (@ssl && @ssl['enable'] && @ssl['extra_conf']) || @bind.include?(':')
start_webrick
return
end
Expand Down Expand Up @@ -110,6 +123,8 @@ def start
ssl_config
end

# Use raw bind address for socket binding (no brackets)
# Brackets are only for URL/URI formatting, not for bind()
http_server_create_http_server(:in_prometheus_server, addr: @bind, port: @port, logger: log, proto: proto, tls_opts: tls_opt) do |server|
server.get(@metrics_path) { |_req| all_metrics }
server.get(@aggregated_metrics_path) { |_req| all_workers_metrics }
Expand All @@ -127,6 +142,7 @@ def shutdown
private

# For compatiblity because http helper can't support extra_conf option
# Also used for IPv6 addresses since http helper has IPv6 URI issues
def start_webrick
require 'webrick/https'
require 'webrick'
Expand All @@ -138,28 +154,32 @@ def start_webrick
Logger: WEBrick::Log.new(STDERR, WEBrick::Log::FATAL),
AccessLog: [],
}
if (@ssl['certificate_path'] && @ssl['private_key_path'].nil?) || (@ssl['certificate_path'].nil? && @ssl['private_key_path'])
raise RuntimeError.new("certificate_path and private_key_path most both be defined")
end

# Configure SSL if enabled
if @ssl && @ssl['enable']
if (@ssl['certificate_path'] && @ssl['private_key_path'].nil?) || (@ssl['certificate_path'].nil? && @ssl['private_key_path'])
raise RuntimeError.new("certificate_path and private_key_path most both be defined")
end

ssl_config = {
SSLEnable: true,
SSLCertName: [['CN', 'nobody'], ['DC', 'example']]
}
ssl_config = {
SSLEnable: true,
SSLCertName: [['CN', 'nobody'], ['DC', 'example']]
}

if @ssl['certificate_path']
cert = OpenSSL::X509::Certificate.new(File.read(@ssl['certificate_path']))
ssl_config[:SSLCertificate] = cert
end
if @ssl['certificate_path']
cert = OpenSSL::X509::Certificate.new(File.read(@ssl['certificate_path']))
ssl_config[:SSLCertificate] = cert
end

if @ssl['private_key_path']
key = OpenSSL::PKey.read(@ssl['private_key_path'])
ssl_config[:SSLPrivateKey] = key
end
if @ssl['private_key_path']
key = OpenSSL::PKey.read(@ssl['private_key_path'])
ssl_config[:SSLPrivateKey] = key
end

ssl_config[:SSLCACertificateFile] = @ssl['ca_path'] if @ssl['ca_path']
ssl_config = ssl_config.merge(@ssl['extra_conf']) if @ssl['extra_conf']
config = ssl_config.merge(config)
ssl_config[:SSLCACertificateFile] = @ssl['ca_path'] if @ssl['ca_path']
ssl_config = ssl_config.merge(@ssl['extra_conf']) if @ssl['extra_conf']
config = ssl_config.merge(config)
end

@log.on_debug do
@log.debug("WEBrick conf: #{config}")
Expand Down Expand Up @@ -207,7 +227,16 @@ def all_workers_metrics
end

def send_request_to_each_worker
bind = (@bind == '0.0.0.0') ? '127.0.0.1' : @bind
# Convert bind address to localhost for inter-worker communication
# 0.0.0.0 and :: are not connectable, use localhost instead
bind = case @bind
when '0.0.0.0'
'127.0.0.1'
when '::'
'::1' # IPv6 localhost
else
@bind
end
[*(@base_port...(@base_port + @num_workers))].each do |worker_port|
do_request(host: bind, port: worker_port, secure: @secure) do |http|
yield(http.get(@metrics_path))
Expand Down
11 changes: 9 additions & 2 deletions lib/fluent/plugin/in_prometheus/async_wrapper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,20 @@ module Fluent::Plugin
class PrometheusInput
module AsyncWrapper
def do_request(host:, port:, secure:)
# Format host for URI - bracket IPv6 addresses if not already bracketed
uri_host = if host.include?(':') && !host.start_with?('[')
"[#{host}]"
else
host
end

endpoint =
if secure
context = OpenSSL::SSL::SSLContext.new
context.verify_mode = OpenSSL::SSL::VERIFY_NONE
Async::HTTP::Endpoint.parse("https://#{host}:#{port}", ssl_context: context)
Async::HTTP::Endpoint.parse("https://#{uri_host}:#{port}", ssl_context: context)
else
Async::HTTP::Endpoint.parse("http://#{host}:#{port}")
Async::HTTP::Endpoint.parse("http://#{uri_host}:#{port}")
end

Async::HTTP::Client.open(endpoint) do |client|
Expand Down
53 changes: 53 additions & 0 deletions spec/fluent/plugin/in_prometheus_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,22 @@
end
end

context 'IPv6 with TLS' do
let(:config) do
%[
@type prometheus
bind ::1
<transport tls>
insecure true
</transport>
]
end

it 'raises ConfigError for unsupported combination' do
expect { driver.run(timeout: 1) }.to raise_error(Fluent::ConfigError, /IPv6 with <transport tls> is not currently supported/)
end
end

context 'old parameters are given' do
context 'when extra_conf is used' do
let(:config) do
Expand Down Expand Up @@ -278,4 +294,41 @@
end
end
end

describe '#run with IPv6' do
shared_examples 'IPv6 server binding' do |bind_addr, connect_addr, description|
let(:config) do
# Quote the bind address if it contains brackets
bind_value = bind_addr.include?('[') ? "\"#{bind_addr}\"" : bind_addr
%[
@type prometheus
bind #{bind_value}
]
end

it description do
skip 'IPv6 not available on this system' unless ipv6_enabled?

driver.run(timeout: 3) do
Net::HTTP.start(connect_addr, port) do |http|
req = Net::HTTP::Get.new('/metrics')
res = http.request(req)
expect(res.code).to eq('200')
end
end
end
end

context 'IPv6 loopback address ::1' do
include_examples 'IPv6 server binding', '::1', '::1', 'binds and serves on IPv6 loopback address'
end

context 'IPv6 any address ::' do
include_examples 'IPv6 server binding', '::', '::1', 'binds on :: and connects via ::1'
end

context 'pre-bracketed IPv6 address [::1]' do
include_examples 'IPv6 server binding', '[::1]', '::1', 'handles pre-bracketed address correctly'
end
end
end
18 changes: 18 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,21 @@

Fluent::Test.setup
include Fluent::Test::Helpers

def ipv6_enabled?
require 'socket'

begin
# Try to actually bind to an IPv6 address to verify it works
sock = Socket.new(Socket::AF_INET6, Socket::SOCK_STREAM, 0)
sock.bind(Socket.sockaddr_in(0, '::1'))
sock.close

# Also test that we can resolve IPv6 addresses
# This is needed because some systems can bind but can't connect
Socket.getaddrinfo('::1', nil, Socket::AF_INET6)
true
rescue Errno::EADDRNOTAVAIL, Errno::EAFNOSUPPORT, SocketError
false
end
end