From 7273b103c9c2cebab32a4f13a511e9b0bdbf7bb9 Mon Sep 17 00:00:00 2001 From: John Nunemaker Date: Sun, 8 Mar 2026 15:55:05 -0400 Subject: [PATCH 1/4] Add poll-based memoization for zero-impact feature flag reads Adds `memoize: :poll` option that keeps an in-memory adapter in sync with the source adapter via a background polling thread. All reads hit memory (zero DB/Redis impact), writes go to source first then update memory. Other processes' changes are picked up on the poll interval. - New `Flipper::Adapters::Poll` adapter (reads from memory, writes to source + memory, background thread syncs) - New `Flipper::Middleware::Sync` (syncs once per request) - Rename old Cloud-specific Poll to `CloudPoll` - Safe `memoize=`/`memoizing?` on DSL (no-op when not Memoizable) - `FLIPPER_MEMOIZE=poll` env var support - Memoizer middleware gracefully skips when adapter isn't Memoizable Usage: Rails: `config.flipper.memoize = :poll` Ruby: `Flipper.new(adapter, memoize: :poll)` Co-Authored-By: Claude Opus 4.6 --- lib/flipper.rb | 2 + lib/flipper/adapters/cloud_poll.rb | 54 ++++++++++ lib/flipper/adapters/poll.rb | 132 ++++++++++++++++++----- lib/flipper/adapters/poll/poller.rb | 2 +- lib/flipper/cloud/configuration.rb | 4 +- lib/flipper/dsl.rb | 29 ++++- lib/flipper/engine.rb | 13 ++- lib/flipper/middleware/memoizer.rb | 6 ++ lib/flipper/middleware/sync.rb | 20 ++++ spec/flipper/adapters/cloud_poll_spec.rb | 41 +++++++ spec/flipper/adapters/poll_spec.rb | 116 +++++++++++++++----- spec/flipper/cloud_spec.rb | 2 +- spec/flipper/dsl_spec.rb | 19 ++++ spec/flipper/engine_spec.rb | 12 +++ spec/flipper/middleware/memoizer_spec.rb | 21 ++++ spec/flipper/middleware/sync_spec.rb | 52 +++++++++ 16 files changed, 459 insertions(+), 66 deletions(-) create mode 100644 lib/flipper/adapters/cloud_poll.rb create mode 100644 lib/flipper/middleware/sync.rb create mode 100644 spec/flipper/adapters/cloud_poll_spec.rb create mode 100644 spec/flipper/middleware/sync_spec.rb diff --git a/lib/flipper.rb b/lib/flipper.rb index 4f05e285e..2ce839d41 100644 --- a/lib/flipper.rb +++ b/lib/flipper.rb @@ -195,6 +195,7 @@ def groups_registry=(registry) require 'flipper/adapters/instrumented' require 'flipper/adapters/memoizable' require 'flipper/adapters/memory' +require 'flipper/adapters/poll' require 'flipper/adapters/strict' require 'flipper/adapter_builder' require 'flipper/configuration' @@ -207,6 +208,7 @@ def groups_registry=(registry) require 'flipper/identifier' require 'flipper/middleware/memoizer' require 'flipper/middleware/setup_env' +require 'flipper/middleware/sync' require 'flipper/poller' require 'flipper/registry' require 'flipper/expression' diff --git a/lib/flipper/adapters/cloud_poll.rb b/lib/flipper/adapters/cloud_poll.rb new file mode 100644 index 000000000..896520c14 --- /dev/null +++ b/lib/flipper/adapters/cloud_poll.rb @@ -0,0 +1,54 @@ +require 'flipper/adapters/sync/synchronizer' +require 'flipper/poller' + +module Flipper + module Adapters + # Internal: Cloud-specific adapter that keeps a local adapter in sync with + # a remote HTTP adapter via a background poller thread. Used by + # Flipper::Cloud::Configuration. + # + # For general-purpose polling, use Flipper::Adapters::Poll instead. + class CloudPoll + extend Forwardable + include ::Flipper::Adapter + + attr_reader :adapter, :poller + + def_delegators :synced_adapter, :features, :get, :get_multi, :get_all, :add, :remove, :clear, :enable, :disable + + def initialize(poller, adapter) + @adapter = adapter + @poller = poller + @last_synced_at = 0 + + # If the adapter is empty, we need to sync before starting the poller. + # Yes, this will block the main thread, but that's better than thinking + # nothing is enabled. + if adapter.features.empty? + begin + @poller.sync + rescue + # TODO: Warn here that it's possible that no data has been synced + # and flags are being evaluated without flag data being present + # until a sync completes. We rescue to avoid flipper being down + # causing your processes to crash. + end + end + + @poller.start + end + + private + + def synced_adapter + @poller.start + poller_last_synced_at = @poller.last_synced_at.value + if poller_last_synced_at > @last_synced_at + Flipper::Adapters::Sync::Synchronizer.new(@adapter, @poller.adapter).call + @last_synced_at = poller_last_synced_at + end + @adapter + end + end + end +end diff --git a/lib/flipper/adapters/poll.rb b/lib/flipper/adapters/poll.rb index 20cd9e920..22a5c8985 100644 --- a/lib/flipper/adapters/poll.rb +++ b/lib/flipper/adapters/poll.rb @@ -1,51 +1,127 @@ -require 'flipper/adapters/sync/synchronizer' require 'flipper/poller' module Flipper module Adapters + # An adapter that keeps a local memory adapter in sync with a source adapter + # via a background poller thread. + # + # Reads go to the local memory adapter (fast, zero-impact). + # Writes go to the source adapter first, then update the local memory adapter. + # A background thread periodically polls the source adapter and updates the + # local memory adapter so other processes' writes are picked up. class Poll - extend Forwardable include ::Flipper::Adapter - # Deprecated - Poller = ::Flipper::Poller + # Public: The Poller instance used to sync in the background. + attr_reader :poller - attr_reader :adapter, :poller + # Public: The local memory adapter that serves reads. + attr_reader :local - def_delegators :synced_adapter, :features, :get, :get_multi, :get_all, :add, :remove, :clear, :enable, :disable + # Public: The source adapter that receives writes and is polled. + attr_reader :remote - def initialize(poller, adapter) - @adapter = adapter - @poller = poller + # Public: Build a new Poll adapter. + # + # source - The source adapter to poll and write to (e.g., ActiveRecord, Redis). + # options - The Hash of options: + # :key - The key to identify the poller instance (default: object_id). + # :interval - Poll interval in seconds (default: 10). + # :instrumenter - Instrumenter for events (default: Noop). + # :start_automatically - Start the poller thread automatically (default: true). + # :shutdown_automatically - Register at_exit handler (default: true). + def initialize(source, options = {}) + key = options.fetch(:key, object_id.to_s) + @poller = Flipper::Poller.get(key, { + remote_adapter: source, + interval: options.fetch(:interval, 10), + instrumenter: options.fetch(:instrumenter, Instrumenters::Noop), + start_automatically: options.fetch(:start_automatically, true), + shutdown_automatically: options.fetch(:shutdown_automatically, true), + }) + @local = Adapters::Memory.new + @remote = source @last_synced_at = 0 + end + + def adapter_stack + "poll(local: #{@local.adapter_stack}, remote: #{@remote.adapter_stack})" + end - # If the adapter is empty, we need to sync before starting the poller. - # Yes, this will block the main thread, but that's better than thinking - # nothing is enabled. - if adapter.features.empty? + # Public: Synchronize the local memory adapter with the poller's latest + # snapshot if the poller has synced since we last checked. + # + # If given a block, syncs once at the start and suppresses further syncs + # for the duration of the block (useful for per-request sync). + def sync + poller_last_synced_at = @poller.last_synced_at.value + if poller_last_synced_at > @last_synced_at + @local.import(@poller.adapter) + @last_synced_at = poller_last_synced_at + end + + if block_given? begin - @poller.sync - rescue - # TODO: Warn here that it's possible that no data has been synced - # and flags are being evaluated without flag data being present - # until a sync completes. We rescue to avoid flipper being down - # causing your processes to crash. + @syncing = false + yield + ensure + @syncing = true end end + end - @poller.start + # Reads - always from local memory + + def features + maybe_sync + @local.features end - private + def get(feature) + maybe_sync + @local.get(feature) + end - def synced_adapter - @poller.start - poller_last_synced_at = @poller.last_synced_at.value - if poller_last_synced_at > @last_synced_at - Flipper::Adapters::Sync::Synchronizer.new(@adapter, @poller.adapter).call - @last_synced_at = poller_last_synced_at + def get_multi(features) + maybe_sync + @local.get_multi(features) + end + + def get_all(**kwargs) + maybe_sync + @local.get_all(**kwargs) + end + + # Writes - go to source first, then update local memory + + def add(feature) + @remote.add(feature).tap { @local.add(feature) } + end + + def remove(feature) + @remote.remove(feature).tap { @local.remove(feature) } + end + + def clear(feature) + @remote.clear(feature).tap { @local.clear(feature) } + end + + def enable(feature, gate, thing) + @remote.enable(feature, gate, thing).tap do + @local.enable(feature, gate, thing) + end + end + + def disable(feature, gate, thing) + @remote.disable(feature, gate, thing).tap do + @local.disable(feature, gate, thing) end - @adapter + end + + private + + def maybe_sync + sync if @syncing != false end end end diff --git a/lib/flipper/adapters/poll/poller.rb b/lib/flipper/adapters/poll/poller.rb index 156859cde..87658ae23 100644 --- a/lib/flipper/adapters/poll/poller.rb +++ b/lib/flipper/adapters/poll/poller.rb @@ -1,2 +1,2 @@ warn "DEPRECATION WARNING: Flipper::Adapters::Poll::Poller is deprecated. Use Flipper::Poller instead." -require 'flipper/adapters/poll' +require 'flipper/adapters/cloud_poll' diff --git a/lib/flipper/cloud/configuration.rb b/lib/flipper/cloud/configuration.rb index 00f013f8c..4596d9202 100644 --- a/lib/flipper/cloud/configuration.rb +++ b/lib/flipper/cloud/configuration.rb @@ -1,7 +1,7 @@ require "logger" require "socket" require "flipper/adapters/http" -require "flipper/adapters/poll" +require "flipper/adapters/cloud_poll" require "flipper/poller" require "flipper/adapters/dual_write" require "flipper/adapters/sync/synchronizer" @@ -154,7 +154,7 @@ def poller end def poll_adapter - Flipper::Adapters::Poll.new(poller, local_adapter) + Flipper::Adapters::CloudPoll.new(poller, local_adapter) end def http_adapter diff --git a/lib/flipper/dsl.rb b/lib/flipper/dsl.rb index 858e5fa63..6e03e709d 100644 --- a/lib/flipper/dsl.rb +++ b/lib/flipper/dsl.rb @@ -10,7 +10,19 @@ class DSL # Private: What is being used to instrument all the things. attr_reader :instrumenter - def_delegators :@adapter, :memoize=, :memoizing?, :import, :export, :adapter_stack + def_delegators :@adapter, :import, :export, :adapter_stack + + # Public: Set memoization on/off. No-op if adapter doesn't support it + # (e.g., when using memoize: :poll). + def memoize=(value) + @adapter.memoize = value if @adapter.respond_to?(:memoize=) + end + + # Public: Are we currently memoizing? Returns false if adapter doesn't + # support memoization (e.g., when using memoize: :poll). + def memoizing? + @adapter.respond_to?(:memoizing?) ? @adapter.memoizing? : false + end # Public: Returns a new instance of the DSL. # @@ -21,8 +33,19 @@ class DSL def initialize(adapter, options = {}) @instrumenter = options.fetch(:instrumenter, Instrumenters::Noop) memoize = options.fetch(:memoize, true) - adapter = Adapters::Memoizable.new(adapter) if memoize - @adapter = adapter + + @adapter = if memoize == :poll + Adapters::Poll.new(adapter, { + key: "flipper_poll_#{object_id}", + interval: options.fetch(:poll_interval, 10), + instrumenter: @instrumenter, + }) + elsif memoize + Adapters::Memoizable.new(adapter) + else + adapter + end + @memoized_features = {} end diff --git a/lib/flipper/engine.rb b/lib/flipper/engine.rb index f1a1091c3..604f3487f 100644 --- a/lib/flipper/engine.rb +++ b/lib/flipper/engine.rb @@ -19,7 +19,7 @@ def self.default_strict_value config.before_configuration do config.flipper = ActiveSupport::OrderedOptions.new.update( env_key: ENV.fetch('FLIPPER_ENV_KEY', 'flipper'), - memoize: ENV.fetch('FLIPPER_MEMOIZE', 'true').casecmp('true').zero?, + memoize: ENV.fetch('FLIPPER_MEMOIZE', 'true').then { |v| v == 'poll' ? :poll : v.casecmp('true').zero? }, preload: ENV.fetch('FLIPPER_PRELOAD', 'true').casecmp('true').zero?, instrumenter: ENV.fetch('FLIPPER_INSTRUMENTER', 'ActiveSupport::Notifications').constantize, log: ENV.fetch('FLIPPER_LOG', 'true').casecmp('true').zero?, @@ -52,7 +52,10 @@ def self.default_strict_value instrumenter: app.config.flipper.instrumenter ) else - Flipper.new(config.adapter, instrumenter: app.config.flipper.instrumenter) + Flipper.new(config.adapter, { + instrumenter: app.config.flipper.instrumenter, + memoize: app.config.flipper.memoize, + }) end end end @@ -78,7 +81,11 @@ def self.default_strict_value initializer "flipper.memoizer", after: :load_config_initializers do |app| flipper = app.config.flipper - if flipper.memoize + if flipper.memoize == :poll + app.middleware.use Flipper::Middleware::Sync, { + env_key: flipper.env_key, + } + elsif flipper.memoize app.middleware.use Flipper::Middleware::Memoizer, { env_key: flipper.env_key, preload: flipper.preload, diff --git a/lib/flipper/middleware/memoizer.rb b/lib/flipper/middleware/memoizer.rb index be3f561a5..a8bd2dbdf 100644 --- a/lib/flipper/middleware/memoizer.rb +++ b/lib/flipper/middleware/memoizer.rb @@ -63,6 +63,12 @@ def memoize?(request) def memoized_call(request) flipper = request.env.fetch(@env_key) { Flipper } + # If the adapter is not Memoizable (e.g., using memoize: :poll), + # skip memoization and just call the app. + unless flipper.adapter.is_a?(Flipper::Adapters::Memoizable) + return @app.call(request.env) + end + # Already memoizing. This instance does not need to do anything. if flipper.memoizing? warn "Flipper::Middleware::Memoizer appears to be running twice. Read how to resolve this at https://github.com/flippercloud/flipper/pull/523" diff --git a/lib/flipper/middleware/sync.rb b/lib/flipper/middleware/sync.rb new file mode 100644 index 000000000..1ce40d9cf --- /dev/null +++ b/lib/flipper/middleware/sync.rb @@ -0,0 +1,20 @@ +module Flipper + module Middleware + class Sync + def initialize(app, options = {}) + @app = app + @env_key = options.fetch(:env_key, 'flipper') + end + + def call(env) + flipper = env.fetch(@env_key) { Flipper } + + if flipper.adapter.respond_to?(:sync) + flipper.adapter.sync { @app.call(env) } + else + @app.call(env) + end + end + end + end +end diff --git a/spec/flipper/adapters/cloud_poll_spec.rb b/spec/flipper/adapters/cloud_poll_spec.rb new file mode 100644 index 000000000..1fda9b733 --- /dev/null +++ b/spec/flipper/adapters/cloud_poll_spec.rb @@ -0,0 +1,41 @@ +require 'flipper/adapters/cloud_poll' + +RSpec.describe Flipper::Adapters::CloudPoll do + let(:remote_adapter) { + adapter = Flipper::Adapters::Memory.new(threadsafe: true) + flipper = Flipper.new(adapter) + flipper.enable(:search) + flipper.enable(:analytics) + adapter + } + let(:local_adapter) { Flipper::Adapters::Memory.new(threadsafe: true) } + let(:poller) { + Flipper::Poller.get("for_cloud_poll_spec", { + start_automatically: false, + remote_adapter: remote_adapter, + }) + } + + it "syncs in main thread if local adapter is empty" do + instance = described_class.new(poller, local_adapter) + instance.features # call something to force sync + expect(local_adapter.features).to eq(remote_adapter.features) + end + + it "does not sync in main thread if local adapter is not empty" do + # make local not empty by importing remote + flipper = Flipper.new(local_adapter) + flipper.import(remote_adapter) + + # make a fake poller to verify calls + poller = double("Poller", last_synced_at: Concurrent::AtomicFixnum.new(0)) + expect(poller).to receive(:start).twice + expect(poller).not_to receive(:sync) + + # create new instance and call something to force sync + instance = described_class.new(poller, local_adapter) + instance.features # call something to force sync + + expect(local_adapter.features).to eq(remote_adapter.features) + end +end diff --git a/spec/flipper/adapters/poll_spec.rb b/spec/flipper/adapters/poll_spec.rb index 2fe08fe75..89a3a57e7 100644 --- a/spec/flipper/adapters/poll_spec.rb +++ b/spec/flipper/adapters/poll_spec.rb @@ -1,41 +1,101 @@ require 'flipper/adapters/poll' +require 'flipper/adapters/operation_logger' RSpec.describe Flipper::Adapters::Poll do - let(:remote_adapter) { - adapter = Flipper::Adapters::Memory.new(threadsafe: true) - flipper = Flipper.new(adapter) - flipper.enable(:search) - flipper.enable(:analytics) - adapter - } - let(:local_adapter) { Flipper::Adapters::Memory.new(threadsafe: true) } - let(:poller) { - Flipper::Poller.get("for_spec", { + let(:source_adapter) { Flipper::Adapters::Memory.new } + let(:source) { Flipper.new(source_adapter, memoize: false) } + + subject do + described_class.new(source_adapter, { + key: "poll_spec_#{SecureRandom.hex(4)}", start_automatically: false, - remote_adapter: remote_adapter, + shutdown_automatically: false, }) - } + end + + let(:flipper) { Flipper.new(subject, memoize: false) } + + it_should_behave_like 'a flipper adapter' + + describe '#sync' do + it 'syncs features from source when poller has synced' do + source.enable(:search) + subject.poller.sync + + expect(flipper[:search].boolean_value).to be(true) + end + + it 'does not sync when poller has not synced since last check' do + subject.poller.sync + subject.sync + + source.enable(:search) + + expect(flipper[:search].boolean_value).to be(false) + end + + it 'suppresses further syncs during block' do + source.enable(:search) + subject.poller.sync + + subject.sync do + expect(flipper[:search].boolean_value).to be(true) - it "syncs in main thread if local adapter is empty" do - instance = described_class.new(poller, local_adapter) - instance.features # call something to force sync - expect(local_adapter.features).to eq(remote_adapter.features) + source.enable(:stats) + subject.poller.sync + + expect(flipper[:stats].boolean_value).to be(false) + end + + expect(flipper[:stats].boolean_value).to be(true) + end end - it "does not sync in main thread if local adapter is not empty" do - # make local not empty by importing remote - flipper = Flipper.new(local_adapter) - flipper.import(remote_adapter) + describe 'writes' do + it 'writes to both source and local memory' do + flipper.enable(:search) + + expect(source[:search].boolean_value).to be(true) + expect(flipper[:search].boolean_value).to be(true) + end + + it 'add writes to both source and local memory' do + flipper.add(:search) + + expect(source_adapter.features).to include('search') + expect(subject.local.features).to include('search') + end - # make a fake poller to verify calls - poller = double("Poller", last_synced_at: Concurrent::AtomicFixnum.new(0)) - expect(poller).to receive(:start).twice - expect(poller).not_to receive(:sync) + it 'remove writes to both source and local memory' do + flipper.enable(:search) + flipper.remove(:search) - # create new instance and call something to force sync - instance = described_class.new(poller, local_adapter) - instance.features # call something to force sync + expect(source_adapter.features).not_to include('search') + expect(subject.local.features).not_to include('search') + end + + it 'clear writes to both source and local memory' do + flipper.enable(:search) + subject.clear(flipper[:search]) + + expect(source[:search].boolean_value).to be(false) + expect(flipper[:search].boolean_value).to be(false) + end + + it 'disable writes to both source and local memory' do + flipper.enable(:search) + flipper.disable(:search) + + expect(source[:search].boolean_value).to be(false) + expect(flipper[:search].boolean_value).to be(false) + end + end - expect(local_adapter.features).to eq(remote_adapter.features) + describe '#adapter_stack' do + it 'includes poll and memory adapter names' do + stack = subject.adapter_stack + expect(stack).to include('poll') + expect(stack).to include('memory') + end end end diff --git a/spec/flipper/cloud_spec.rb b/spec/flipper/cloud_spec.rb index ef436105e..5fa10b3d5 100644 --- a/spec/flipper/cloud_spec.rb +++ b/spec/flipper/cloud_spec.rb @@ -28,7 +28,7 @@ dual_write_adapter = memoized_adapter.adapter expect(dual_write_adapter).to be_instance_of(Flipper::Adapters::DualWrite) poll_adapter = dual_write_adapter.local - expect(poll_adapter).to be_instance_of(Flipper::Adapters::Poll) + expect(poll_adapter).to be_instance_of(Flipper::Adapters::CloudPoll) http_adapter = dual_write_adapter.remote client = http_adapter.client diff --git a/spec/flipper/dsl_spec.rb b/spec/flipper/dsl_spec.rb index c2f18f1d8..90115cd57 100644 --- a/spec/flipper/dsl_spec.rb +++ b/spec/flipper/dsl_spec.rb @@ -14,6 +14,25 @@ end end + context 'when using :poll memoize strategy' do + it 'wraps the given adapter with Flipper::Adapters::Poll' do + dsl = described_class.new(adapter, memoize: :poll) + expect(dsl.adapter).to be_a(Flipper::Adapters::Poll) + expect(dsl.adapter.remote).to be(adapter) + expect(dsl.adapter.local).to be_a(Flipper::Adapters::Memory) + end + + it 'returns false for memoizing?' do + dsl = described_class.new(adapter, memoize: :poll) + expect(dsl.memoizing?).to be(false) + end + + it 'does not raise when setting memoize=' do + dsl = described_class.new(adapter, memoize: :poll) + expect { dsl.memoize = true }.not_to raise_error + end + end + context 'when disabling memoization' do it 'uses the given adapter directly' do dsl = described_class.new(adapter, memoize: false) diff --git a/spec/flipper/engine_spec.rb b/spec/flipper/engine_spec.rb index bbc93b884..63efcbb1a 100644 --- a/spec/flipper/engine_spec.rb +++ b/spec/flipper/engine_spec.rb @@ -156,6 +156,18 @@ expect(subject.middleware).not_to include(Flipper::Middleware::Memoizer) end + it 'uses Sync middleware if config.memoize = :poll' do + initializer { config.memoize = :poll } + expect(subject.middleware).to include(Flipper::Middleware::Sync) + expect(subject.middleware).not_to include(Flipper::Middleware::Memoizer) + end + + it 'sets memoize to :poll from ENV' do + ENV['FLIPPER_MEMOIZE'] = 'poll' + subject + expect(config.memoize).to eq(:poll) + end + it 'passes config to memoizer' do initializer do config.update( diff --git a/spec/flipper/middleware/memoizer_spec.rb b/spec/flipper/middleware/memoizer_spec.rb index 85d3dfade..5628737a4 100644 --- a/spec/flipper/middleware/memoizer_spec.rb +++ b/spec/flipper/middleware/memoizer_spec.rb @@ -3,6 +3,7 @@ require 'flipper/adapters/active_support_cache_store' require 'flipper/adapters/operation_logger' require 'flipper/adapters/actor_limit' +require 'flipper/adapters/poll' require 'flipper/adapters/sync' RSpec.describe Flipper::Middleware::Memoizer do @@ -484,6 +485,26 @@ def get(uri, params = {}, env = {}, &block) end end + context 'when adapter is Poll (memoize: :poll)' do + let(:flipper) { Flipper.new(adapter, memoize: :poll) } + + it 'skips memoization and delegates to app' do + app = lambda { |env| [200, {}, ['OK']] } + middleware = described_class.new(app) + + expect(app).to receive(:call).and_call_original + status, _, _ = middleware.call('flipper' => flipper) + expect(status).to eq(200) + end + + it 'does not raise NoMethodError for missing memoize=' do + app = lambda { |env| [200, {}, ['OK']] } + middleware = described_class.new(app) + + expect { middleware.call('flipper' => flipper) }.not_to raise_error + end + end + context 'with preload:true and Sync adapter wrapped with ActorLimit' do it 'preloads even when remote has more actors than local limit' do local = Flipper::Adapters::Memory.new diff --git a/spec/flipper/middleware/sync_spec.rb b/spec/flipper/middleware/sync_spec.rb new file mode 100644 index 000000000..cfd9af609 --- /dev/null +++ b/spec/flipper/middleware/sync_spec.rb @@ -0,0 +1,52 @@ +require 'rack/test' +require 'flipper/adapters/operation_logger' + +RSpec.describe Flipper::Middleware::Sync do + include Rack::Test::Methods + + let(:memory_adapter) { Flipper::Adapters::Memory.new } + let(:adapter) { Flipper::Adapters::OperationLogger.new(memory_adapter) } + let(:app) { lambda { |_env| [200, {}, ['OK']] } } + + subject { described_class.new(app) } + + context 'when adapter responds to sync' do + let(:flipper) { Flipper.new(adapter, memoize: :poll) } + let(:env) { { 'flipper' => flipper } } + + it 'delegates to the app' do + expect(app).to receive(:call).and_call_original + subject.call(env) + end + + it 'calls sync on the adapter' do + expect(flipper.adapter).to receive(:sync).and_yield + subject.call(env) + end + end + + context 'when adapter does not respond to sync' do + let(:flipper) { Flipper.new(adapter, memoize: false) } + let(:env) { { 'flipper' => flipper } } + + it 'delegates to the app without syncing' do + expect(app).to receive(:call).and_call_original + subject.call(env) + end + end + + context 'defaults to Flipper' do + let(:flipper) { Flipper.new(adapter, memoize: :poll) } + + before do + Flipper.configure do |config| + config.default { flipper } + end + end + + it 'uses the default Flipper instance' do + expect(flipper.adapter).to receive(:sync).and_yield + subject.call({}) + end + end +end From 29a512240ec3df01523f2828833c17728e069e23 Mon Sep 17 00:00:00 2001 From: John Nunemaker Date: Sun, 8 Mar 2026 16:00:07 -0400 Subject: [PATCH 2/4] Fix thread safety, initial sync, and adapter wrapping in Poll - Use Concurrent::AtomicFixnum for @last_synced_at (thread-safe) - Use Thread.current for sync suppression instead of instance variable (prevents one thread's suppression from leaking to others in Puma) - Add blocking initial sync on startup so features aren't empty before the first poll completes - Walk adapter stack in Sync middleware to find Poll through Strict/ActorLimit wrappers - Use respond_to?(:memoize=) in Memoizer instead of is_a? check for consistency with DSL approach Co-Authored-By: Claude Opus 4.6 --- lib/flipper/adapters/poll.rb | 30 ++++++++++++++++++++++-------- lib/flipper/middleware/memoizer.rb | 4 ++-- lib/flipper/middleware/sync.rb | 15 +++++++++++++-- 3 files changed, 37 insertions(+), 12 deletions(-) diff --git a/lib/flipper/adapters/poll.rb b/lib/flipper/adapters/poll.rb index 22a5c8985..08a1ca9a6 100644 --- a/lib/flipper/adapters/poll.rb +++ b/lib/flipper/adapters/poll.rb @@ -1,3 +1,4 @@ +require 'concurrent/atomic/atomic_fixnum' require 'flipper/poller' module Flipper @@ -12,6 +13,8 @@ module Adapters class Poll include ::Flipper::Adapter + SYNC_KEY = :flipper_poll_sync_suppressed + # Public: The Poller instance used to sync in the background. attr_reader :poller @@ -32,16 +35,26 @@ class Poll # :shutdown_automatically - Register at_exit handler (default: true). def initialize(source, options = {}) key = options.fetch(:key, object_id.to_s) - @poller = Flipper::Poller.get(key, { + poller_options = { remote_adapter: source, interval: options.fetch(:interval, 10), instrumenter: options.fetch(:instrumenter, Instrumenters::Noop), start_automatically: options.fetch(:start_automatically, true), shutdown_automatically: options.fetch(:shutdown_automatically, true), - }) + } + @poller = Flipper::Poller.get(key, poller_options) @local = Adapters::Memory.new @remote = source - @last_synced_at = 0 + @last_synced_at = Concurrent::AtomicFixnum.new(0) + + # Block the main thread for the initial sync so we don't serve + # empty/default values before the first poll completes. + begin + @poller.sync + sync + rescue + # Rescue to avoid source adapter being down causing processes to crash. + end end def adapter_stack @@ -55,17 +68,18 @@ def adapter_stack # for the duration of the block (useful for per-request sync). def sync poller_last_synced_at = @poller.last_synced_at.value - if poller_last_synced_at > @last_synced_at + last = @last_synced_at.value + if poller_last_synced_at > last @local.import(@poller.adapter) - @last_synced_at = poller_last_synced_at + @last_synced_at.update { poller_last_synced_at } end if block_given? begin - @syncing = false + Thread.current[SYNC_KEY] = true yield ensure - @syncing = true + Thread.current[SYNC_KEY] = false end end end @@ -121,7 +135,7 @@ def disable(feature, gate, thing) private def maybe_sync - sync if @syncing != false + sync unless Thread.current[SYNC_KEY] end end end diff --git a/lib/flipper/middleware/memoizer.rb b/lib/flipper/middleware/memoizer.rb index a8bd2dbdf..271baa585 100644 --- a/lib/flipper/middleware/memoizer.rb +++ b/lib/flipper/middleware/memoizer.rb @@ -63,9 +63,9 @@ def memoize?(request) def memoized_call(request) flipper = request.env.fetch(@env_key) { Flipper } - # If the adapter is not Memoizable (e.g., using memoize: :poll), + # If the adapter doesn't support memoization (e.g., using memoize: :poll), # skip memoization and just call the app. - unless flipper.adapter.is_a?(Flipper::Adapters::Memoizable) + unless flipper.adapter.respond_to?(:memoize=) return @app.call(request.env) end diff --git a/lib/flipper/middleware/sync.rb b/lib/flipper/middleware/sync.rb index 1ce40d9cf..06c0eb0c6 100644 --- a/lib/flipper/middleware/sync.rb +++ b/lib/flipper/middleware/sync.rb @@ -8,13 +8,24 @@ def initialize(app, options = {}) def call(env) flipper = env.fetch(@env_key) { Flipper } + poll_adapter = find_poll_adapter(flipper.adapter) - if flipper.adapter.respond_to?(:sync) - flipper.adapter.sync { @app.call(env) } + if poll_adapter + poll_adapter.sync { @app.call(env) } else @app.call(env) end end + + private + + # Walk the adapter stack to find a Poll adapter, which may be wrapped + # by Strict, ActorLimit, or other Wrapper adapters. + def find_poll_adapter(adapter) + return adapter if adapter.respond_to?(:sync) + return find_poll_adapter(adapter.adapter) if adapter.respond_to?(:adapter) + nil + end end end end From ff6cec05c8274eb013ab35cc5da4c5658e476f5d Mon Sep 17 00:00:00 2001 From: John Nunemaker Date: Sun, 8 Mar 2026 16:19:22 -0400 Subject: [PATCH 3/4] Fix thread safety and robustness in Poll adapter and Sync middleware - Add mutex to Poll#sync to prevent redundant concurrent imports - Use rescue StandardError instead of bare rescue in Poll and CloudPoll - Cache poll adapter lookup by identity in Sync middleware to avoid object_id reuse issues after GC - Document write-through eventual consistency trade-off Co-Authored-By: Claude Opus 4.6 --- lib/flipper/adapters/cloud_poll.rb | 2 +- lib/flipper/adapters/poll.rb | 23 ++++++++++++++++------- lib/flipper/middleware/sync.rb | 13 ++++++++++++- 3 files changed, 29 insertions(+), 9 deletions(-) diff --git a/lib/flipper/adapters/cloud_poll.rb b/lib/flipper/adapters/cloud_poll.rb index 896520c14..36100ac3d 100644 --- a/lib/flipper/adapters/cloud_poll.rb +++ b/lib/flipper/adapters/cloud_poll.rb @@ -27,7 +27,7 @@ def initialize(poller, adapter) if adapter.features.empty? begin @poller.sync - rescue + rescue StandardError # TODO: Warn here that it's possible that no data has been synced # and flags are being evaluated without flag data being present # until a sync completes. We rescue to avoid flipper being down diff --git a/lib/flipper/adapters/poll.rb b/lib/flipper/adapters/poll.rb index 08a1ca9a6..274727df3 100644 --- a/lib/flipper/adapters/poll.rb +++ b/lib/flipper/adapters/poll.rb @@ -18,6 +18,9 @@ class Poll # Public: The Poller instance used to sync in the background. attr_reader :poller + # Public: The mutex used to synchronize sync operations. + attr_reader :sync_mutex + # Public: The local memory adapter that serves reads. attr_reader :local @@ -46,13 +49,14 @@ def initialize(source, options = {}) @local = Adapters::Memory.new @remote = source @last_synced_at = Concurrent::AtomicFixnum.new(0) + @sync_mutex = Mutex.new # Block the main thread for the initial sync so we don't serve # empty/default values before the first poll completes. begin @poller.sync sync - rescue + rescue StandardError # Rescue to avoid source adapter being down causing processes to crash. end end @@ -67,11 +71,13 @@ def adapter_stack # If given a block, syncs once at the start and suppresses further syncs # for the duration of the block (useful for per-request sync). def sync - poller_last_synced_at = @poller.last_synced_at.value - last = @last_synced_at.value - if poller_last_synced_at > last - @local.import(@poller.adapter) - @last_synced_at.update { poller_last_synced_at } + @sync_mutex.synchronize do + poller_last_synced_at = @poller.last_synced_at.value + last = @last_synced_at.value + if poller_last_synced_at > last + @local.import(@poller.adapter) + @last_synced_at.update { poller_last_synced_at } + end end if block_given? @@ -106,7 +112,10 @@ def get_all(**kwargs) @local.get_all(**kwargs) end - # Writes - go to source first, then update local memory + # Writes - go to source first, then update local memory. + # Note: There is a small window where another thread's sync could overwrite + # the local adapter with a stale poller snapshot that doesn't include the + # write yet. The write will be picked up on the next poll cycle. def add(feature) @remote.add(feature).tap { @local.add(feature) } diff --git a/lib/flipper/middleware/sync.rb b/lib/flipper/middleware/sync.rb index 06c0eb0c6..b8af0dc2a 100644 --- a/lib/flipper/middleware/sync.rb +++ b/lib/flipper/middleware/sync.rb @@ -8,7 +8,7 @@ def initialize(app, options = {}) def call(env) flipper = env.fetch(@env_key) { Flipper } - poll_adapter = find_poll_adapter(flipper.adapter) + poll_adapter = poll_adapter_for(flipper) if poll_adapter poll_adapter.sync { @app.call(env) } @@ -19,6 +19,17 @@ def call(env) private + # Cache the poll adapter lookup since the adapter stack doesn't change + # after initialization. Uses the flipper instance itself as key to avoid + # object_id reuse issues after GC. + def poll_adapter_for(flipper) + @poll_adapters ||= {}.compare_by_identity + unless @poll_adapters.key?(flipper) + @poll_adapters[flipper] = find_poll_adapter(flipper.adapter) + end + @poll_adapters[flipper] + end + # Walk the adapter stack to find a Poll adapter, which may be wrapped # by Strict, ActorLimit, or other Wrapper adapters. def find_poll_adapter(adapter) From cdacd645c55e05a9f731d42894bbdc11fc55e1b1 Mon Sep 17 00:00:00 2001 From: John Nunemaker Date: Sun, 8 Mar 2026 16:47:05 -0400 Subject: [PATCH 4/4] Improve thread safety: use AtomicReference, threadsafe Memory, and preload warning - Use Concurrent::AtomicReference instead of AtomicFixnum for last_synced_at so sub-second monotonic timestamps are not truncated to integers - Use threadsafe Memory adapter for Poll's local store - Make CloudPoll's last_synced_at atomic to match Poll's thread safety - Log info when preload is set with memoize: :poll (preload is unnecessary) Co-Authored-By: Claude Opus 4.6 --- lib/flipper/adapters/cloud_poll.rb | 6 +++--- lib/flipper/adapters/poll.rb | 6 +++--- lib/flipper/engine.rb | 3 +++ lib/flipper/poller.rb | 4 ++-- spec/flipper/adapters/cloud_poll_spec.rb | 2 +- 5 files changed, 12 insertions(+), 9 deletions(-) diff --git a/lib/flipper/adapters/cloud_poll.rb b/lib/flipper/adapters/cloud_poll.rb index 36100ac3d..04eef302b 100644 --- a/lib/flipper/adapters/cloud_poll.rb +++ b/lib/flipper/adapters/cloud_poll.rb @@ -19,7 +19,7 @@ class CloudPoll def initialize(poller, adapter) @adapter = adapter @poller = poller - @last_synced_at = 0 + @last_synced_at = Concurrent::AtomicReference.new(0.0) # If the adapter is empty, we need to sync before starting the poller. # Yes, this will block the main thread, but that's better than thinking @@ -43,9 +43,9 @@ def initialize(poller, adapter) def synced_adapter @poller.start poller_last_synced_at = @poller.last_synced_at.value - if poller_last_synced_at > @last_synced_at + if poller_last_synced_at > @last_synced_at.value Flipper::Adapters::Sync::Synchronizer.new(@adapter, @poller.adapter).call - @last_synced_at = poller_last_synced_at + @last_synced_at.set(poller_last_synced_at) end @adapter end diff --git a/lib/flipper/adapters/poll.rb b/lib/flipper/adapters/poll.rb index 274727df3..5a8fd4d08 100644 --- a/lib/flipper/adapters/poll.rb +++ b/lib/flipper/adapters/poll.rb @@ -1,4 +1,4 @@ -require 'concurrent/atomic/atomic_fixnum' +require 'concurrent/atomic/atomic_reference' require 'flipper/poller' module Flipper @@ -46,9 +46,9 @@ def initialize(source, options = {}) shutdown_automatically: options.fetch(:shutdown_automatically, true), } @poller = Flipper::Poller.get(key, poller_options) - @local = Adapters::Memory.new + @local = Adapters::Memory.new(nil, threadsafe: true) @remote = source - @last_synced_at = Concurrent::AtomicFixnum.new(0) + @last_synced_at = Concurrent::AtomicReference.new(0.0) @sync_mutex = Mutex.new # Block the main thread for the initial sync so we don't serve diff --git a/lib/flipper/engine.rb b/lib/flipper/engine.rb index 604f3487f..d8aa65d14 100644 --- a/lib/flipper/engine.rb +++ b/lib/flipper/engine.rb @@ -82,6 +82,9 @@ def self.default_strict_value flipper = app.config.flipper if flipper.memoize == :poll + if flipper.preload + Rails.logger.info "Flipper: preload is unnecessary with memoize: :poll (all features are already in memory)" + end app.middleware.use Flipper::Middleware::Sync, { env_key: flipper.env_key, } diff --git a/lib/flipper/poller.rb b/lib/flipper/poller.rb index b1ef35c02..965bd2e4d 100644 --- a/lib/flipper/poller.rb +++ b/lib/flipper/poller.rb @@ -1,7 +1,7 @@ require 'logger' require 'concurrent/utility/monotonic_time' require 'concurrent/map' -require 'concurrent/atomic/atomic_fixnum' +require 'concurrent/atomic/atomic_reference' require 'concurrent/atomic/atomic_boolean' module Flipper @@ -32,7 +32,7 @@ def initialize(options = {}) @mutex = Mutex.new @instrumenter = options.fetch(:instrumenter, Instrumenters::Noop) @remote_adapter = options.fetch(:remote_adapter) - @last_synced_at = Concurrent::AtomicFixnum.new(0) + @last_synced_at = Concurrent::AtomicReference.new(0.0) @adapter = Adapters::Memory.new(nil, threadsafe: true) @shutdown_requested = Concurrent::AtomicBoolean.new(false) diff --git a/spec/flipper/adapters/cloud_poll_spec.rb b/spec/flipper/adapters/cloud_poll_spec.rb index 1fda9b733..45192a9cb 100644 --- a/spec/flipper/adapters/cloud_poll_spec.rb +++ b/spec/flipper/adapters/cloud_poll_spec.rb @@ -28,7 +28,7 @@ flipper.import(remote_adapter) # make a fake poller to verify calls - poller = double("Poller", last_synced_at: Concurrent::AtomicFixnum.new(0)) + poller = double("Poller", last_synced_at: Concurrent::AtomicReference.new(0.0)) expect(poller).to receive(:start).twice expect(poller).not_to receive(:sync)