Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/flipper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ def groups_registry=(registry)
require 'flipper/adapters/instrumented'
require 'flipper/adapters/memoizable'
require 'flipper/adapters/memory'
require 'flipper/adapters/poll'
require 'flipper/adapters/strict'
require 'flipper/adapter_builder'
require 'flipper/configuration'
Expand All @@ -207,6 +208,7 @@ def groups_registry=(registry)
require 'flipper/identifier'
require 'flipper/middleware/memoizer'
require 'flipper/middleware/setup_env'
require 'flipper/middleware/sync'
require 'flipper/poller'
require 'flipper/registry'
require 'flipper/expression'
Expand Down
54 changes: 54 additions & 0 deletions lib/flipper/adapters/cloud_poll.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
require 'flipper/adapters/sync/synchronizer'
require 'flipper/poller'

module Flipper
module Adapters
# Internal: Cloud-specific adapter that keeps a local adapter in sync with
# a remote HTTP adapter via a background poller thread. Used by
# Flipper::Cloud::Configuration.
#
# For general-purpose polling, use Flipper::Adapters::Poll instead.
class CloudPoll
extend Forwardable
include ::Flipper::Adapter

attr_reader :adapter, :poller

def_delegators :synced_adapter, :features, :get, :get_multi, :get_all, :add, :remove, :clear, :enable, :disable

def initialize(poller, adapter)
@adapter = adapter
@poller = poller
@last_synced_at = Concurrent::AtomicReference.new(0.0)

# If the adapter is empty, we need to sync before starting the poller.
# Yes, this will block the main thread, but that's better than thinking
# nothing is enabled.
if adapter.features.empty?
begin
@poller.sync
rescue StandardError
# TODO: Warn here that it's possible that no data has been synced
# and flags are being evaluated without flag data being present
# until a sync completes. We rescue to avoid flipper being down
# causing your processes to crash.
end
end

@poller.start
end

private

def synced_adapter
@poller.start
poller_last_synced_at = @poller.last_synced_at.value
if poller_last_synced_at > @last_synced_at.value
Flipper::Adapters::Sync::Synchronizer.new(@adapter, @poller.adapter).call
@last_synced_at.set(poller_last_synced_at)
end
@adapter
end
end
end
end
157 changes: 128 additions & 29 deletions lib/flipper/adapters/poll.rb
Original file line number Diff line number Diff line change
@@ -1,51 +1,150 @@
require 'flipper/adapters/sync/synchronizer'
require 'concurrent/atomic/atomic_reference'
require 'flipper/poller'

module Flipper
module Adapters
# An adapter that keeps a local memory adapter in sync with a source adapter
# via a background poller thread.
#
# Reads go to the local memory adapter (fast, zero-impact).
# Writes go to the source adapter first, then update the local memory adapter.
# A background thread periodically polls the source adapter and updates the
# local memory adapter so other processes' writes are picked up.
class Poll
extend Forwardable
include ::Flipper::Adapter

# Deprecated
Poller = ::Flipper::Poller
SYNC_KEY = :flipper_poll_sync_suppressed

attr_reader :adapter, :poller
# Public: The Poller instance used to sync in the background.
attr_reader :poller

def_delegators :synced_adapter, :features, :get, :get_multi, :get_all, :add, :remove, :clear, :enable, :disable
# Public: The mutex used to synchronize sync operations.
attr_reader :sync_mutex

def initialize(poller, adapter)
@adapter = adapter
@poller = poller
@last_synced_at = 0
# Public: The local memory adapter that serves reads.
attr_reader :local

# If the adapter is empty, we need to sync before starting the poller.
# Yes, this will block the main thread, but that's better than thinking
# nothing is enabled.
if adapter.features.empty?
# Public: The source adapter that receives writes and is polled.
attr_reader :remote

# Public: Build a new Poll adapter.
#
# source - The source adapter to poll and write to (e.g., ActiveRecord, Redis).
# options - The Hash of options:
# :key - The key to identify the poller instance (default: object_id).
# :interval - Poll interval in seconds (default: 10).
# :instrumenter - Instrumenter for events (default: Noop).
# :start_automatically - Start the poller thread automatically (default: true).
# :shutdown_automatically - Register at_exit handler (default: true).
def initialize(source, options = {})
key = options.fetch(:key, object_id.to_s)
poller_options = {
remote_adapter: source,
interval: options.fetch(:interval, 10),
instrumenter: options.fetch(:instrumenter, Instrumenters::Noop),
start_automatically: options.fetch(:start_automatically, true),
shutdown_automatically: options.fetch(:shutdown_automatically, true),
}
@poller = Flipper::Poller.get(key, poller_options)
@local = Adapters::Memory.new(nil, threadsafe: true)
@remote = source
@last_synced_at = Concurrent::AtomicReference.new(0.0)
@sync_mutex = Mutex.new

# Block the main thread for the initial sync so we don't serve
# empty/default values before the first poll completes.
begin
@poller.sync
sync
rescue StandardError
# Rescue to avoid source adapter being down causing processes to crash.
end
end

def adapter_stack
"poll(local: #{@local.adapter_stack}, remote: #{@remote.adapter_stack})"
end

# Public: Synchronize the local memory adapter with the poller's latest
# snapshot if the poller has synced since we last checked.
#
# If given a block, syncs once at the start and suppresses further syncs
# for the duration of the block (useful for per-request sync).
def sync
@sync_mutex.synchronize do
poller_last_synced_at = @poller.last_synced_at.value
last = @last_synced_at.value
if poller_last_synced_at > last
@local.import(@poller.adapter)
@last_synced_at.update { poller_last_synced_at }
end
end

if block_given?
begin
@poller.sync
rescue
# TODO: Warn here that it's possible that no data has been synced
# and flags are being evaluated without flag data being present
# until a sync completes. We rescue to avoid flipper being down
# causing your processes to crash.
Thread.current[SYNC_KEY] = true
yield
ensure
Thread.current[SYNC_KEY] = false
end
end
end

# Reads - always from local memory

@poller.start
def features
maybe_sync
@local.features
end

private
def get(feature)
maybe_sync
@local.get(feature)
end

def get_multi(features)
maybe_sync
@local.get_multi(features)
end

def synced_adapter
@poller.start
poller_last_synced_at = @poller.last_synced_at.value
if poller_last_synced_at > @last_synced_at
Flipper::Adapters::Sync::Synchronizer.new(@adapter, @poller.adapter).call
@last_synced_at = poller_last_synced_at
def get_all(**kwargs)
maybe_sync
@local.get_all(**kwargs)
end

# Writes - go to source first, then update local memory.
# Note: There is a small window where another thread's sync could overwrite
# the local adapter with a stale poller snapshot that doesn't include the
# write yet. The write will be picked up on the next poll cycle.

def add(feature)
@remote.add(feature).tap { @local.add(feature) }
end

def remove(feature)
@remote.remove(feature).tap { @local.remove(feature) }
end

def clear(feature)
@remote.clear(feature).tap { @local.clear(feature) }
end

def enable(feature, gate, thing)
@remote.enable(feature, gate, thing).tap do
@local.enable(feature, gate, thing)
end
end

def disable(feature, gate, thing)
@remote.disable(feature, gate, thing).tap do
@local.disable(feature, gate, thing)
end
@adapter
end

private

def maybe_sync
sync unless Thread.current[SYNC_KEY]
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/flipper/adapters/poll/poller.rb
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
warn "DEPRECATION WARNING: Flipper::Adapters::Poll::Poller is deprecated. Use Flipper::Poller instead."
require 'flipper/adapters/poll'
require 'flipper/adapters/cloud_poll'
4 changes: 2 additions & 2 deletions lib/flipper/cloud/configuration.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
require "logger"
require "socket"
require "flipper/adapters/http"
require "flipper/adapters/poll"
require "flipper/adapters/cloud_poll"
require "flipper/poller"
require "flipper/adapters/dual_write"
require "flipper/adapters/sync/synchronizer"
Expand Down Expand Up @@ -154,7 +154,7 @@ def poller
end

def poll_adapter
Flipper::Adapters::Poll.new(poller, local_adapter)
Flipper::Adapters::CloudPoll.new(poller, local_adapter)
end

def http_adapter
Expand Down
29 changes: 26 additions & 3 deletions lib/flipper/dsl.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,19 @@ class DSL
# Private: What is being used to instrument all the things.
attr_reader :instrumenter

def_delegators :@adapter, :memoize=, :memoizing?, :import, :export, :adapter_stack
def_delegators :@adapter, :import, :export, :adapter_stack

# Public: Set memoization on/off. No-op if adapter doesn't support it
# (e.g., when using memoize: :poll).
def memoize=(value)
@adapter.memoize = value if @adapter.respond_to?(:memoize=)
end

# Public: Are we currently memoizing? Returns false if adapter doesn't
# support memoization (e.g., when using memoize: :poll).
def memoizing?
@adapter.respond_to?(:memoizing?) ? @adapter.memoizing? : false
end

# Public: Returns a new instance of the DSL.
#
Expand All @@ -21,8 +33,19 @@ class DSL
def initialize(adapter, options = {})
@instrumenter = options.fetch(:instrumenter, Instrumenters::Noop)
memoize = options.fetch(:memoize, true)
adapter = Adapters::Memoizable.new(adapter) if memoize
@adapter = adapter

@adapter = if memoize == :poll
Adapters::Poll.new(adapter, {
key: "flipper_poll_#{object_id}",
interval: options.fetch(:poll_interval, 10),
instrumenter: @instrumenter,
})
elsif memoize
Adapters::Memoizable.new(adapter)
else
adapter
end

@memoized_features = {}
end

Expand Down
16 changes: 13 additions & 3 deletions lib/flipper/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def self.default_strict_value
config.before_configuration do
config.flipper = ActiveSupport::OrderedOptions.new.update(
env_key: ENV.fetch('FLIPPER_ENV_KEY', 'flipper'),
memoize: ENV.fetch('FLIPPER_MEMOIZE', 'true').casecmp('true').zero?,
memoize: ENV.fetch('FLIPPER_MEMOIZE', 'true').then { |v| v == 'poll' ? :poll : v.casecmp('true').zero? },
preload: ENV.fetch('FLIPPER_PRELOAD', 'true').casecmp('true').zero?,
instrumenter: ENV.fetch('FLIPPER_INSTRUMENTER', 'ActiveSupport::Notifications').constantize,
log: ENV.fetch('FLIPPER_LOG', 'true').casecmp('true').zero?,
Expand Down Expand Up @@ -52,7 +52,10 @@ def self.default_strict_value
instrumenter: app.config.flipper.instrumenter
)
else
Flipper.new(config.adapter, instrumenter: app.config.flipper.instrumenter)
Flipper.new(config.adapter, {
instrumenter: app.config.flipper.instrumenter,
memoize: app.config.flipper.memoize,
})
end
end
end
Expand All @@ -78,7 +81,14 @@ def self.default_strict_value
initializer "flipper.memoizer", after: :load_config_initializers do |app|
flipper = app.config.flipper

if flipper.memoize
if flipper.memoize == :poll
if flipper.preload
Rails.logger.info "Flipper: preload is unnecessary with memoize: :poll (all features are already in memory)"
end
app.middleware.use Flipper::Middleware::Sync, {
env_key: flipper.env_key,
}
elsif flipper.memoize
app.middleware.use Flipper::Middleware::Memoizer, {
env_key: flipper.env_key,
preload: flipper.preload,
Expand Down
6 changes: 6 additions & 0 deletions lib/flipper/middleware/memoizer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ def memoize?(request)
def memoized_call(request)
flipper = request.env.fetch(@env_key) { Flipper }

# If the adapter doesn't support memoization (e.g., using memoize: :poll),
# skip memoization and just call the app.
unless flipper.adapter.respond_to?(:memoize=)
return @app.call(request.env)
end

# Already memoizing. This instance does not need to do anything.
if flipper.memoizing?
warn "Flipper::Middleware::Memoizer appears to be running twice. Read how to resolve this at https://github.com/flippercloud/flipper/pull/523"
Expand Down
Loading