-
Notifications
You must be signed in to change notification settings - Fork 196
Add view and job table enhancements for improved observability #406
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
maxschridde1494
wants to merge
1
commit into
que-rb:master
Choose a base branch
from
talysto:master
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| DROP VIEW IF EXISTS public.que_jobs_ext; | ||
|
|
||
| ALTER TABLE que_jobs | ||
| DROP COLUMN first_run_at; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| -- Add 'first_run_at' column to 'que_jobs'. The 'first_run_at' column will store the timestamp with time zone (timestamptz) | ||
| -- of the initial scheduled execution time for que jobs. The column default is set to now, but realistically this will always | ||
| -- be defaulted to the initial run_at value. This enhancement helps re-trace the execution of the job when reviewing failures. | ||
|
|
||
| ALTER TABLE que_jobs | ||
| ADD COLUMN first_run_at timestamptz NOT NULL DEFAULT now(); | ||
|
|
||
|
|
||
| -- This view extends the functionality of the que job management system by providing an enriched view of que jobs. | ||
| -- It combines data from the 'que_jobs' table and the 'pg_locks' table to present a comprehensive overview of que jobs, | ||
| -- including their status, associated information, and locking details. The view is designed to facilitate the monitoring | ||
| -- and management of que jobs, allowing you to track job statuses, locking details, and job-related information. | ||
|
|
||
| -- Columns: | ||
| -- - lock_id: Unique identifier for the lock associated with the job. | ||
| -- - que_locker_pid: Process ID (PID) of the que job locker. | ||
| -- - sub_class: The job class extracted from the job arguments. | ||
| -- - updated_at: The most recent timestamp among 'run_at,' 'expired_at,' and 'finished_at.' | ||
| -- - status: The status of the job, which can be 'running,' 'completed,' 'failed,' 'errored,' 'queued,' or 'scheduled.' | ||
|
|
||
| CREATE OR REPLACE VIEW public.que_jobs_ext | ||
| AS | ||
| SELECT | ||
| locks.id AS lock_id, | ||
| locks.pid as que_locker_pid, | ||
| (que_jobs.args -> 0) ->> 'job_class'::text AS sub_class, | ||
| greatest(run_at, expired_at, finished_at) as updated_at, | ||
|
|
||
| case | ||
| when locks.id is not null then 'running' | ||
| when finished_at is not null then 'completed' | ||
| when expired_at is not null then 'failed' | ||
| when error_count > 0 then 'errored' | ||
| when run_at < now() then 'queued' | ||
| else 'scheduled' | ||
| end as status, | ||
|
|
||
| -- que_jobs.*: | ||
| que_jobs.id, | ||
| que_jobs.priority, | ||
| que_jobs.run_at, | ||
| que_jobs.first_run_at, | ||
| que_jobs.job_class, | ||
| que_jobs.error_count, | ||
| que_jobs.last_error_message, | ||
| que_jobs.queue, | ||
| que_jobs.last_error_backtrace, | ||
| que_jobs.finished_at, | ||
| que_jobs.expired_at, | ||
| que_jobs.args, | ||
| que_jobs.data, | ||
| que_jobs.kwargs, | ||
| que_jobs.job_schema_version | ||
|
|
||
| FROM que_jobs | ||
| LEFT JOIN ( | ||
| SELECT | ||
| (classid::bigint << 32) + objid::bigint AS id | ||
| , pid | ||
| FROM pg_locks | ||
| WHERE pg_locks.locktype = 'advisory'::text) locks USING (id); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,199 @@ | ||
| # frozen_string_literal: true | ||
|
|
||
| require 'spec_helper' | ||
|
|
||
| describe "Que Jobs Ext View", skip: true do | ||
|
|
||
| class TestJob < Que::Job | ||
| include Que::JobMethods | ||
|
|
||
| def default_resolve_action | ||
| # prevents default deletion of complete jobs for testing purposes | ||
| finish | ||
| end | ||
| end | ||
|
|
||
| class TestFailedJob < TestJob | ||
| def run | ||
| raise Que::Error, 'Test Error' | ||
| end | ||
| end | ||
|
|
||
| describe 'job.enqueue' do | ||
| it "should mirror enqueued job" do | ||
| assert_equal 0, jobs_dataset.count | ||
| assert_equal 0, jobs_ext_dataset.count | ||
|
|
||
| TestJob.enqueue( | ||
| 1, | ||
| 'two', | ||
| string: "string", | ||
| integer: 5, | ||
| array: [1, "two", {three: 3}], | ||
| hash: {one: 1, two: 'two', three: [3]}, | ||
| job_options: { | ||
| priority: 4, | ||
| queue: 'special_queue_name', | ||
| run_at: Time.now | ||
| } | ||
| ) | ||
|
|
||
| assert_equal 1, jobs_dataset.count | ||
| assert_equal 1, jobs_ext_dataset.count | ||
|
|
||
| job = jobs_dataset.first | ||
| ext_job = jobs_ext_dataset.first | ||
| assert_equal ext_job[:queue], job[:queue] | ||
| assert_equal ext_job[:priority], job[:priority] | ||
| assert_equal ext_job[:run_at], job[:run_at] | ||
| assert_equal ext_job[:first_run_at], job[:first_run_at] | ||
| assert_equal ext_job[:job_class], job[:job_class] | ||
| assert_equal ext_job[:args], job[:args] | ||
| assert_equal ext_job[:job_schema_version], job[:job_schema_version] | ||
|
|
||
| jobs_dataset.delete | ||
|
|
||
| assert_equal 0, jobs_dataset.count | ||
| assert_equal 0, jobs_ext_dataset.count | ||
| end | ||
|
|
||
| it "should include additional lock data" do | ||
| locker_settings.clear | ||
| locker_settings[:listen] = false | ||
| locker_settings[:poll_interval] = 0.02 | ||
| locker | ||
|
|
||
| TestJob.enqueue | ||
|
|
||
| sleep_until { locked_ids.count.positive? && locked_ids.first == jobs_ext_dataset.first[:lock_id] } | ||
|
|
||
| locker.stop! | ||
|
|
||
| jobs_dataset.delete | ||
| end | ||
|
|
||
| it "should add additional updated_at" do | ||
| TestJob.enqueue | ||
|
|
||
| ext_job = jobs_ext_dataset.first | ||
|
|
||
| assert_equal ext_job[:run_at], ext_job[:updated_at] | ||
|
|
||
| locker | ||
|
|
||
| sleep_until_equal(1) { finished_jobs_dataset.count } | ||
|
|
||
| locker.stop! | ||
|
|
||
| ext_job = jobs_ext_dataset.first | ||
|
|
||
| assert_equal ext_job[:finished_at], ext_job[:updated_at] | ||
|
|
||
| jobs_dataset.delete | ||
| end | ||
|
|
||
| describe "should include additional status" do | ||
|
|
||
| let(:notified_errors) { [] } | ||
|
|
||
| it "should set status to scheduled when run_at is in the future" do | ||
| TestJob.enqueue(job_options: { run_at: Time.now + 1 }) | ||
|
|
||
| assert_equal jobs_ext_dataset.first[:status], 'scheduled' | ||
|
|
||
| jobs_dataset.delete | ||
| end | ||
|
|
||
| it "should set status to queued when run_at is in the past and the job is not currently running, completed, failed or errored" do | ||
| TestJob.enqueue(job_options: { run_at: Time.now - 1 }) | ||
|
|
||
| assert_equal jobs_ext_dataset.first[:status], 'queued' | ||
|
|
||
| jobs_dataset.delete | ||
| end | ||
|
|
||
| it "should set status to running when the job has a lock associated with it" do | ||
| locker_settings.clear | ||
| locker_settings[:listen] = false | ||
| locker_settings[:poll_interval] = 0.02 | ||
| locker | ||
|
|
||
| TestJob.enqueue | ||
|
|
||
| sleep_until { locked_ids.count.positive? && locked_ids.first == jobs_ext_dataset.first[:lock_id] && jobs_ext_dataset.first[:status] == 'running' } | ||
|
|
||
| locker.stop! | ||
|
|
||
| jobs_dataset.delete | ||
| end | ||
|
|
||
| it "should set status to complete when finished_at is present" do | ||
| TestJob.enqueue | ||
|
|
||
| locker | ||
|
|
||
| sleep_until_equal(1) { DB[:que_lockers].count } | ||
|
|
||
| sleep_until { finished_jobs_dataset.count.positive? } | ||
|
|
||
| locker.stop! | ||
|
|
||
| assert_equal jobs_ext_dataset.first[:status], 'completed' | ||
|
|
||
| jobs_dataset.delete | ||
| end | ||
|
|
||
| it "should set status to errored when error_count is positive and expired_at is not present" do | ||
| Que.error_notifier = proc { |e| notified_errors << e } | ||
|
|
||
| TestFailedJob.class_eval do | ||
| self.maximum_retry_count = 100 # prevent from entering failed state on first error | ||
| end | ||
|
|
||
| locker | ||
|
|
||
| sleep_until_equal(1) { DB[:que_lockers].count } | ||
|
|
||
| TestFailedJob.enqueue | ||
|
|
||
| sleep_until { errored_jobs_dataset.where(expired_at: nil).count.positive? } | ||
|
|
||
| locker.stop! | ||
|
|
||
| ext_job = jobs_ext_dataset.first | ||
|
|
||
| assert_equal ext_job[:status], 'errored' | ||
| assert_equal notified_errors.count, 1 | ||
| assert_equal notified_errors.first.message, 'Test Error' | ||
|
|
||
|
|
||
| jobs_dataset.delete | ||
| end | ||
|
|
||
| it "should set status to failed when expired_at is present" do | ||
| TestFailedJob.class_eval do | ||
| self.maximum_retry_count = 0 | ||
| end | ||
|
|
||
| Que.error_notifier = proc { |e| notified_errors << e } | ||
|
|
||
| locker | ||
|
|
||
| sleep_until_equal(1) { DB[:que_lockers].count } | ||
|
|
||
| TestFailedJob.enqueue | ||
|
|
||
| sleep_until { expired_jobs_dataset.count.positive? } | ||
|
|
||
| locker.stop! | ||
|
|
||
| assert_equal jobs_ext_dataset.first[:status], 'failed' | ||
| assert_equal notified_errors.count, 1 | ||
| assert_equal notified_errors.first.message, 'Test Error' | ||
|
|
||
|
|
||
| jobs_dataset.delete | ||
| end | ||
| end | ||
| end | ||
| end | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.