diff --git a/.env.example b/.env.example index 0192c6534..9a7990836 100644 --- a/.env.example +++ b/.env.example @@ -60,3 +60,11 @@ S3_ACCESS_KEY_ID=your_s3_access_key_id_here S3_SECRET_ACCESS_KEY=your_s3_secret_access_key_here S3_BUCKET=your_s3_bucket_name_here S3_ENDPOINT=https://.r2.cloudflarestorage.com + +# ClickHouse database +CLICKHOUSE_HOST=clickhouse +CLICKHOUSE_PORT=8123 +CLICKHOUSE_SSL=false +CLICKHOUSE_DATABASE=hackatime_development +CLICKHOUSE_USERNAME=default +CLICKHOUSE_PASSWORD= diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7147caf12..1c9e69b8f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -113,6 +113,13 @@ jobs: POSTGRES_USER: postgres POSTGRES_PASSWORD: postgres POSTGRES_DB: app_test + clickhouse: + image: clickhouse/clickhouse-server:latest + ports: + - 8123:8123 + env: + CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: 1 + options: --health-cmd "wget --no-verbose --tries=1 --spider http://localhost:8123/ping || exit 1" --health-interval 10s --health-timeout 5s --health-retries 5 steps: - name: Checkout code @@ -130,6 +137,7 @@ jobs: TEST_DATABASE_URL: postgres://postgres:postgres@localhost:5432/app_test PGHOST: localhost PGUSER: postgres + CLICKHOUSE_HOST: localhost PGPASSWORD: postgres run: | bin/rails db:create RAILS_ENV=test @@ -142,6 +150,7 @@ jobs: TEST_DATABASE_URL: postgres://postgres:postgres@localhost:5432/app_test PGHOST: localhost PGUSER: postgres + CLICKHOUSE_HOST: localhost PGPASSWORD: postgres run: | bin/rails rswag:specs:swaggerize @@ -160,6 +169,13 @@ jobs: POSTGRES_USER: postgres POSTGRES_PASSWORD: postgres POSTGRES_DB: app_test + clickhouse: + image: clickhouse/clickhouse-server:latest + ports: + - 8123:8123 + env: + CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: 1 + options: --health-cmd "wget --no-verbose --tries=1 --spider http://localhost:8123/ping || exit 1" --health-interval 10s --health-timeout 5s --health-retries 5 steps: - name: Checkout code @@ -200,6 +216,7 @@ jobs: TEST_DATABASE_URL: postgres://postgres:postgres@localhost:5432/app_test PGHOST: localhost PGUSER: postgres + CLICKHOUSE_HOST: localhost PGPASSWORD: postgres CHROME_BIN: ${{ steps.setup-chrome.outputs.chrome-path }} CHROMEDRIVER_BIN: ${{ steps.setup-chrome.outputs.chromedriver-path }} diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index 18cf369ec..90f56c2e9 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -40,6 +40,8 @@ HCA_CLIENT_ID= HCA_CLIENT_SECRET= ``` +ClickHouse is automatically started by Docker Compose alongside Postgres — no extra setup needed. + Start the containers: ```sh @@ -51,6 +53,7 @@ We'll now setup the database. In your container shell, run the following: ```bash app# bin/rails db:create db:schema:load db:seed +app# bin/rails db:migrate:clickhouse ``` Run the Vite build with SSR (server-side-rendering): diff --git a/Gemfile b/Gemfile index 4a01d0745..cb33e6158 100644 --- a/Gemfile +++ b/Gemfile @@ -9,6 +9,8 @@ gem "propshaft" gem "sqlite3", ">= 2.1" # Use PostgreSQL as the database for Wakatime gem "pg" +# Use ClickHouse for analytics +gem "clickhouse-activerecord", github: "daisychainapp/clickhouse-activerecord" # Use the Puma web server [https://github.com/puma/puma] gem "puma", ">= 5.0" # Use JavaScript with ESM import maps [https://github.com/rails/importmap-rails] diff --git a/Gemfile.lock b/Gemfile.lock index 34249d07a..ebb0dbb7b 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,3 +1,11 @@ +GIT + remote: https://github.com/daisychainapp/clickhouse-activerecord.git + revision: 2eb937bc36d327ff4f2f27f1da50cc151a7f6af5 + specs: + clickhouse-activerecord (1.6.6) + activerecord (>= 7.1, < 9.0) + bundler (>= 1.13.4) + GEM remote: https://rubygems.org/ specs: @@ -652,6 +660,7 @@ DEPENDENCIES brakeman bullet capybara + clickhouse-activerecord! cloudflare-rails countries debug diff --git a/app/controllers/api/admin/v1/admin_controller.rb b/app/controllers/api/admin/v1/admin_controller.rb index a69716147..e39ca48f3 100644 --- a/app/controllers/api/admin/v1/admin_controller.rb +++ b/app/controllers/api/admin/v1/admin_controller.rb @@ -51,71 +51,77 @@ def visualization_quantized quantized_query = <<-SQL WITH base_heartbeats AS ( SELECT - "time", + time, lineno, cursorpos, - date_trunc('day', to_timestamp("time")) as day_start + toDate(toDateTime(toUInt32(time))) as day_start FROM heartbeats WHERE user_id = ? - AND "time" >= ? AND "time" <= ? - AND (lineno IS NOT NULL OR cursorpos IS NOT NULL) + AND time >= ? AND time <= ? + AND (lineno > 0 OR cursorpos > 0) LIMIT 1000000 ), daily_stats AS ( SELECT *, - GREATEST(1, MAX(lineno) OVER (PARTITION BY day_start)) as max_lineno, - GREATEST(1, MAX(cursorpos) OVER (PARTITION BY day_start)) as max_cursorpos + greatest(1, max(lineno) OVER (PARTITION BY day_start)) as max_lineno, + greatest(1, max(cursorpos) OVER (PARTITION BY day_start)) as max_cursorpos FROM base_heartbeats ), quantized_heartbeats AS ( SELECT *, - ROUND(2 + (("time" - extract(epoch from day_start)) / 86400) * (396)) as qx, - ROUND(2 + (1 - CAST(lineno AS decimal) / max_lineno) * (96)) as qy_lineno, - ROUND(2 + (1 - CAST(cursorpos AS decimal) / max_cursorpos) * (96)) as qy_cursorpos + round(2 + ((time - toUInt32(toDateTime(day_start))) / 86400) * (396)) as qx, + round(2 + (1 - CAST(lineno AS Float64) / max_lineno) * (96)) as qy_lineno, + round(2 + (1 - CAST(cursorpos AS Float64) / max_cursorpos) * (96)) as qy_cursorpos FROM daily_stats ) - SELECT "time", lineno, cursorpos + SELECT time, lineno, cursorpos FROM ( - SELECT DISTINCT ON (day_start, qx, qy_lineno) "time", lineno, cursorpos + SELECT + any(time) AS time, + any(lineno) AS lineno, + any(cursorpos) AS cursorpos FROM quantized_heartbeats - WHERE lineno IS NOT NULL - ORDER BY day_start, qx, qy_lineno, "time" ASC + WHERE lineno > 0 + GROUP BY day_start, qx, qy_lineno ) AS lineno_pixels - UNION - SELECT "time", lineno, cursorpos + UNION ALL + SELECT time, lineno, cursorpos FROM ( - SELECT DISTINCT ON (day_start, qx, qy_cursorpos) "time", lineno, cursorpos + SELECT + any(time) AS time, + any(lineno) AS lineno, + any(cursorpos) AS cursorpos FROM quantized_heartbeats - WHERE cursorpos IS NOT NULL - ORDER BY day_start, qx, qy_cursorpos, "time" ASC + WHERE cursorpos > 0 + GROUP BY day_start, qx, qy_cursorpos ) AS cursorpos_pixels - ORDER BY "time" ASC + ORDER BY time ASC SQL daily_totals_query = <<-SQL WITH heartbeats_with_gaps AS ( SELECT - date_trunc('day', to_timestamp("time"))::date as day, - "time" - LAG("time", 1, "time") OVER (PARTITION BY date_trunc('day', to_timestamp("time")) ORDER BY "time") as gap + toDate(toDateTime(toUInt32(time))) as day, + time - lagInFrame(time, 1, time) OVER (PARTITION BY toDate(toDateTime(toUInt32(time))) ORDER BY time) as gap FROM heartbeats WHERE user_id = ? AND time >= ? AND time <= ? ) SELECT day, - SUM(LEAST(gap, 120)) as total_seconds + SUM(least(gap, 120)) as total_seconds FROM heartbeats_with_gaps WHERE gap IS NOT NULL GROUP BY day SQL - quantized_result = ActiveRecord::Base.connection.execute( - ActiveRecord::Base.sanitize_sql([ quantized_query, user.id, start_epoch, end_epoch ]) + quantized_result = Heartbeat.connection.select_all( + Heartbeat.sanitize_sql([ quantized_query, user.id, start_epoch, end_epoch ]) ) - daily_totals_result = ActiveRecord::Base.connection.execute( - ActiveRecord::Base.sanitize_sql([ daily_totals_query, user.id, start_epoch, end_epoch ]) + daily_totals_result = Heartbeat.connection.select_all( + Heartbeat.sanitize_sql([ daily_totals_query, user.id, start_epoch, end_epoch ]) ) daily_totals = daily_totals_result.each_with_object({}) do |row, hash| @@ -197,8 +203,8 @@ def alt_candidates LIMIT 5000 SQL - result = ActiveRecord::Base.connection.exec_query( - ActiveRecord::Base.sanitize_sql([ query, cutoff, cutoff ]) + result = Heartbeat.connection.select_all( + Heartbeat.sanitize_sql([ query, cutoff, cutoff ]) ) render json: { candidates: result.to_a } @@ -210,44 +216,20 @@ def shared_machines query = <<-SQL SELECT - sms.machine, - sms.machine_frequency, - ARRAY_AGG(DISTINCT u.id) AS user_ids - FROM - ( - SELECT - machine, - COUNT(user_id) AS machine_frequency, - ARRAY_AGG(user_id) AS user_ids - FROM - ( - SELECT DISTINCT - machine, - user_id - FROM - heartbeats - WHERE - machine IS NOT NULL - AND time > ? - ) AS UserMachines - GROUP BY - machine - HAVING - COUNT(user_id) > 1 - ) AS sms, - LATERAL UNNEST(sms.user_ids) AS user_id_from_array - JOIN - users AS u ON u.id = user_id_from_array - GROUP BY - sms.machine, - sms.machine_frequency - ORDER BY - sms.machine_frequency DESC + machine, + uniq(user_id) AS machine_frequency, + groupUniqArray(user_id) AS user_ids + FROM heartbeats + WHERE machine != '' AND machine IS NOT NULL + AND time > ? + GROUP BY machine + HAVING uniq(user_id) > 1 + ORDER BY machine_frequency DESC LIMIT 5000 SQL - result = ActiveRecord::Base.connection.exec_query( - ActiveRecord::Base.sanitize_sql([ query, cutoff ]) + result = Heartbeat.connection.select_all( + Heartbeat.sanitize_sql([ query, cutoff ]) ) render json: { machines: result.to_a } diff --git a/app/controllers/api/hackatime/v1/hackatime_controller.rb b/app/controllers/api/hackatime/v1/hackatime_controller.rb index 3b1fed660..808c6875f 100644 --- a/app/controllers/api/hackatime/v1/hackatime_controller.rb +++ b/app/controllers/api/hackatime/v1/hackatime_controller.rb @@ -98,13 +98,11 @@ def stats_last_7_days # Calculate total seconds total_seconds = heartbeats.duration_seconds.to_i - # Get unique days - days = [] - heartbeats.pluck(:time).each do |timestamp| - day = Time.at(timestamp).in_time_zone(@user.timezone).to_date - days << day unless days.include?(day) - end - days_covered = days.length + # Get unique days using ClickHouse aggregation instead of loading all timestamps + tz_quoted = Heartbeat.connection.quote(@user.timezone) + days_covered = Heartbeat.connection.select_value( + "SELECT uniq(toDate(toDateTime(toUInt32(time), #{tz_quoted}))) FROM (#{heartbeats.with_valid_timestamps.to_sql})" + ).to_i # Calculate daily average daily_average = days_covered > 0 ? (total_seconds.to_f / days_covered).round(1) : 0 @@ -232,9 +230,34 @@ def body_to_json LAST_LANGUAGE_SENTINEL = "<>" def handle_heartbeat(heartbeat_array) - results = [] + prepared = prepare_heartbeat_attrs(heartbeat_array) + + now = Time.current + base_us = (now.to_r * 1_000_000).to_i + records = prepared.each_with_index.map do |item, i| + attrs = item[:attrs] + attrs[:id] = ((base_us + i) << Heartbeat::CLICKHOUSE_ID_RANDOM_BITS) | SecureRandom.random_number(Heartbeat::CLICKHOUSE_ID_RANDOM_MAX) + attrs[:created_at] = now + attrs[:updated_at] = now + attrs + end + + begin + Heartbeat.insert_all(records) if records.any? + rescue => e + report_error(e, message: "Error bulk inserting heartbeats") + return records.map { |r| [ { error: e.message, type: e.class.name }, 422 ] } + end + + records.each { |r| queue_project_mapping(r[:project]) } + HeartbeatCacheInvalidator.bump_for(@user.id) if records.any? + PosthogService.capture_once_per_day(@user, "heartbeat_sent", { heartbeat_count: heartbeat_array.size }) + records.map { |r| [ r, 201 ] } + end + + def prepare_heartbeat_attrs(heartbeat_array) last_language = nil - heartbeat_array.each do |heartbeat| + heartbeat_array.filter_map do |heartbeat| heartbeat = heartbeat.to_h.with_indifferent_access source_type = :direct_entry @@ -282,17 +305,8 @@ def handle_heartbeat(heartbeat_array) }).slice(*Heartbeat.column_names.map(&:to_sym)) # ^^ They say safety laws are written in blood. Well, so is this line! # Basically this filters out columns that aren't in our DB (the biggest one being raw_data) - new_heartbeat = Heartbeat.find_or_create_by(attrs) - - queue_project_mapping(heartbeat[:project]) - results << [ new_heartbeat.attributes, 201 ] - rescue => e - report_error(e, message: "Error creating heartbeat") - results << [ { error: e.message, type: e.class.name }, 422 ] + { attrs: attrs, source_type: source_type } end - - PosthogService.capture_once_per_day(@user, "heartbeat_sent", { heartbeat_count: heartbeat_array.size }) - results end def queue_project_mapping(project_name) diff --git a/app/controllers/api/v1/authenticated/hours_controller.rb b/app/controllers/api/v1/authenticated/hours_controller.rb index d9348815f..2f51d9061 100644 --- a/app/controllers/api/v1/authenticated/hours_controller.rb +++ b/app/controllers/api/v1/authenticated/hours_controller.rb @@ -7,7 +7,7 @@ def index end_date = params[:end_date]&.to_date || Date.current total_seconds = current_user.heartbeats - .where(created_at: start_date.beginning_of_day..end_date.end_of_day) + .where(time: start_date.beginning_of_day.to_f..end_date.end_of_day.to_f) .duration_seconds render json: { diff --git a/app/controllers/api/v1/stats_controller.rb b/app/controllers/api/v1/stats_controller.rb index 4989eeaa1..29819fd96 100644 --- a/app/controllers/api/v1/stats_controller.rb +++ b/app/controllers/api/v1/stats_controller.rb @@ -11,10 +11,12 @@ def show return if performed? query = Heartbeat.where(time: start_date..end_date) + cache_user_id = nil if params[:username].present? user = lookup_user(params[:username]) return render json: { error: "User not found" }, status: :not_found unless user + cache_user_id = user.id query = query.where(user_id: user.id) end @@ -23,10 +25,11 @@ def show return render json: { error: "User not found" }, status: :not_found unless user_id.present? + cache_user_id = user_id query = query.where(user_id: user_id) end - render plain: query.duration_seconds.to_s + render plain: Rails.cache.fetch(stats_total_cache_key(query, user_id: cache_user_id), expires_in: 5.minutes) { query.duration_seconds.to_s } end def user_stats @@ -59,7 +62,7 @@ def user_stats service_params = {} service_params[:user] = @user service_params[:specific_filters] = enabled_features - service_params[:allow_cache] = false + service_params[:allow_cache] = true service_params[:limit] = limit service_params[:start_date] = start_date service_params[:end_date] = end_date @@ -78,7 +81,7 @@ def user_stats else if params[:total_seconds] == "true" query = Heartbeat.where(user_id: @user.id) - query = query.where("time >= ? AND time < ?", start_date.to_f, end_date.to_f) + query = query.where(time: start_date.to_f...end_date.to_f) if params[:filter_by_project].present? filter_by_project = params[:filter_by_project].split(",") @@ -92,13 +95,15 @@ def user_stats # do the boundary thingie if requested use_boundary_aware = params[:boundary_aware] == "true" - total_seconds = if use_boundary_aware - Heartbeat.duration_seconds_boundary_aware(query, start_date.to_f, end_date.to_f) || 0 - else - query.duration_seconds || 0 - end - - return render json: { total_seconds: total_seconds } + return render json: { + total_seconds: Rails.cache.fetch(stats_total_cache_key(query, start_date:, end_date:, user_id: @user.id, boundary_aware: use_boundary_aware), expires_in: 5.minutes) do + if use_boundary_aware + Heartbeat.duration_seconds_boundary_aware(query, start_date.to_f, end_date.to_f) || 0 + else + query.duration_seconds || 0 + end + end + } end summary = WakatimeService.new(**service_params).generate_summary @@ -314,6 +319,19 @@ def unique_heartbeat_seconds(heartbeats) total_seconds.to_i end + def stats_total_cache_key(query, start_date: nil, end_date: nil, user_id: nil, boundary_aware: false) + cache_version = user_id.present? ? HeartbeatCacheInvalidator.version_for(user_id) : query.maximum(:time).to_i + + [ + "stats-total", + ActiveSupport::Digest.hexdigest(query.to_sql), + start_date&.to_i || "default-start", + end_date&.to_i || "default-end", + boundary_aware, + cache_version + ].join(":") + end + def parse_date_param(param_name, default:, boundary:) raw_value = params[param_name] return default if raw_value.blank? diff --git a/app/controllers/concerns/api/admin/v1/user_utilities.rb b/app/controllers/concerns/api/admin/v1/user_utilities.rb index 807b42ca0..26803d792 100644 --- a/app/controllers/concerns/api/admin/v1/user_utilities.rb +++ b/app/controllers/concerns/api/admin/v1/user_utilities.rb @@ -179,7 +179,7 @@ def user_info total_coding_time: valid.duration_seconds || 0, languages_used: valid.distinct.pluck(:language).compact.count, projects_worked_on: valid.distinct.pluck(:project).compact.count, - days_active: valid.distinct.count("DATE(to_timestamp(CASE WHEN time > 1000000000000 THEN time / 1000 ELSE time END))") + days_active: Heartbeat.connection.select_value("SELECT uniq(toDate(toDateTime(toUInt32(time)))) FROM (#{valid.to_sql}) AS hb").to_i } } } diff --git a/app/controllers/concerns/dashboard_data.rb b/app/controllers/concerns/dashboard_data.rb index 6fa4cb538..21b942074 100644 --- a/app/controllers/concerns/dashboard_data.rb +++ b/app/controllers/concerns/dashboard_data.rb @@ -6,17 +6,26 @@ module DashboardData def filterable_dashboard_data filters = %i[project language operating_system editor category] interval = params[:interval] - key = [ current_user ] + filters.map { |f| params[f] } + [ interval.to_s, params[:from], params[:to] ] + key = [ + "dashboard-filterable-data", + current_user.id, + dashboard_cache_version, + filters.index_with { |f| params[f].to_s }, + interval.to_s, + params[:from].to_s, + params[:to].to_s + ] hb = current_user.heartbeats h = ApplicationController.helpers Rails.cache.fetch(key, expires_in: 5.minutes) do archived = current_user.project_repo_mappings.archived.pluck(:project_name) + raw_filter_options = cached_dashboard_filter_options result = {} Time.use_zone(current_user.timezone) do filters.each do |f| - options = current_user.heartbeats.distinct.pluck(f).compact_blank + options = raw_filter_options.fetch(f, []) options = options.reject { |n| archived.include?(n) } if f == :project result[f] = options.map { |k| if f == :language then k.categorize_language @@ -31,7 +40,7 @@ def filterable_dashboard_data hb = if %i[operating_system editor].include?(f) hb.where(f => arr.flat_map { |v| [ v.downcase, v.capitalize ] }.uniq) elsif f == :language - raw = current_user.heartbeats.distinct.pluck(f).compact_blank.select { |l| arr.include?(l.categorize_language) } + raw = raw_filter_options.fetch(:language, []).select { |l| arr.include?(l.categorize_language) } raw.any? ? hb.where(f => raw) : hb else hb.where(f => arr) @@ -43,10 +52,13 @@ def filterable_dashboard_data result[:total_time] = hb.duration_seconds result[:total_heartbeats] = hb.count + # Compute per-filter grouped stats in one pass each, then derive top_X and X_stats from the same data + filter_stats = {} filters.each do |f| - stats = hb.group(f).duration_seconds - stats = stats.reject { |n, _| archived.include?(n) } if f == :project - result["top_#{f}"] = stats.max_by { |_, v| v }&.first + raw_stats = hb.group(f).duration_seconds + raw_stats = raw_stats.reject { |n, _| archived.include?(n) } if f == :project + filter_stats[f] = raw_stats + result["top_#{f}"] = raw_stats.max_by { |_, v| v }&.first end result["top_editor"] &&= h.display_editor_name(result["top_editor"]) @@ -54,13 +66,13 @@ def filterable_dashboard_data result["top_language"] &&= h.display_language_name(result["top_language"]) unless result["singular_project"] - result[:project_durations] = hb.group(:project).duration_seconds - .reject { |p, _| archived.include?(p) }.sort_by { |_, d| -d }.first(10).to_h + result[:project_durations] = filter_stats[:project] + .sort_by { |_, d| -d }.first(10).to_h end %i[language editor operating_system category].each do |f| next if result["singular_#{f}"] - stats = hb.group(f).duration_seconds.each_with_object({}) do |(raw, dur), agg| + stats = filter_stats[f].each_with_object({}) do |(raw, dur), agg| k = raw.to_s.presence || "Unknown" k = f == :language ? (k == "Unknown" ? k : k.categorize_language) : (%i[editor operating_system].include?(f) ? k.downcase : k) agg[k] = (agg[k] || 0) + dur @@ -80,10 +92,36 @@ def filterable_dashboard_data result[:language_colors] = LanguageUtils.colors_for(result["language_stats"].keys) end + # Batch all 12 weekly queries into a single ClickHouse query + week_start = 11.weeks.ago.beginning_of_week + week_end = Time.current.end_of_week + weekly_hb = hb.where(time: week_start.to_f..week_end.to_f) + + timeout = Heartbeat.heartbeat_timeout_duration.to_i + tz_quoted = Heartbeat.connection.quote(current_user.timezone) + weekly_sql = weekly_hb + .select("toStartOfWeek(toDateTime(toUInt32(time), #{tz_quoted}), 1) as week_start, `project` as grouped_time, least(greatest(time - lagInFrame(time, 1, time) OVER (PARTITION BY `project`, toStartOfWeek(toDateTime(toUInt32(time), #{tz_quoted}), 1) ORDER BY time ASC ROWS BETWEEN 1 PRECEDING AND CURRENT ROW), 0), #{timeout}) as diff") + .where.not(time: nil) + .with_valid_timestamps + .to_sql + + weekly_rows = Heartbeat.connection.select_all( + "SELECT toString(week_start) as week_start, grouped_time, toInt64(coalesce(sum(diff), 0)) as duration FROM (#{weekly_sql}) GROUP BY week_start, grouped_time" + ) + + weekly_hash = {} + weekly_rows.each do |row| + week_key = row["week_start"].to_date.iso8601 + project = row["grouped_time"] + next if archived.include?(project) + weekly_hash[week_key] ||= {} + weekly_hash[week_key][project] = row["duration"].to_i + end + + # Ensure all 12 weeks are present (even if empty) result[:weekly_project_stats] = (0..11).to_h do |w| - ws = w.weeks.ago.beginning_of_week - [ ws.to_date.iso8601, hb.where(time: ws.to_f..w.weeks.ago.end_of_week.to_f) - .group(:project).duration_seconds.reject { |p, _| archived.include?(p) } ] + ws = w.weeks.ago.beginning_of_week.to_date.iso8601 + [ ws, weekly_hash[ws] || {} ] end end result[:selected_interval] = interval.to_s @@ -97,7 +135,7 @@ def filterable_dashboard_data def activity_graph_data tz = current_user.timezone - key = "user_#{current_user.id}_daily_durations_#{tz}" + key = [ "user-daily-durations", current_user.id, heartbeat_cache_version, tz ] durations = Rails.cache.fetch(key, expires_in: 1.minute) do Time.use_zone(tz) { current_user.heartbeats.daily_durations(user_timezone: tz).to_h } end @@ -113,36 +151,60 @@ def activity_graph_data end def today_stats_data - h = ApplicationController.helpers - Time.use_zone(current_user.timezone) do - rows = current_user.heartbeats.today - .select(:language, :editor, - "COUNT(*) OVER (PARTITION BY language) as language_count", - "COUNT(*) OVER (PARTITION BY editor) as editor_count") - .distinct.to_a - - lang_counts = rows - .map { |r| [ r.language&.categorize_language, r.language_count ] } - .reject { |l, _| l.blank? } - .group_by(&:first).transform_values { |p| p.sum(&:last) } - .sort_by { |_, c| -c } - - ed_counts = rows - .map { |r| [ r.editor, r.editor_count ] } - .reject { |e, _| e.blank? }.uniq - .sort_by { |_, c| -c } - - todays_languages = lang_counts.map { |l, _| h.display_language_name(l) } - todays_editors = ed_counts.map { |e, _| h.display_editor_name(e) } - todays_duration = current_user.heartbeats.today.duration_seconds - show_logged_time_sentence = todays_duration > 1.minute && (todays_languages.any? || todays_editors.any?) - - { - show_logged_time_sentence: show_logged_time_sentence, - todays_duration_display: h.short_time_detailed(todays_duration.to_i), - todays_languages: todays_languages, - todays_editors: todays_editors - } + Rails.cache.fetch([ "dashboard-today-stats", current_user.id, dashboard_cache_version, current_user.timezone ], expires_in: 5.minutes) do + h = ApplicationController.helpers + Time.use_zone(current_user.timezone) do + rows = current_user.heartbeats.today + .select(:language, :editor, + "COUNT(*) OVER (PARTITION BY language) as language_count", + "COUNT(*) OVER (PARTITION BY editor) as editor_count") + .distinct.to_a + + lang_counts = rows + .map { |r| [ r.language&.categorize_language, r.language_count ] } + .reject { |l, _| l.blank? } + .group_by(&:first).transform_values { |p| p.sum(&:last) } + .sort_by { |_, c| -c } + + ed_counts = rows + .map { |r| [ r.editor, r.editor_count ] } + .reject { |e, _| e.blank? }.uniq + .sort_by { |_, c| -c } + + todays_languages = lang_counts.map { |l, _| h.display_language_name(l) } + todays_editors = ed_counts.map { |e, _| h.display_editor_name(e) } + todays_duration = current_user.heartbeats.today.duration_seconds + show_logged_time_sentence = todays_duration > 1.minute && (todays_languages.any? || todays_editors.any?) + + { + show_logged_time_sentence: show_logged_time_sentence, + todays_duration_display: h.short_time_detailed(todays_duration.to_i), + todays_languages: todays_languages, + todays_editors: todays_editors + } + end end end + + def cached_dashboard_filter_options + Rails.cache.fetch([ "dashboard-filter-options", current_user.id, heartbeat_cache_version ], expires_in: 15.minutes) do + conn = Heartbeat.connection + user_id = conn.quote(current_user.id) + filters = %i[project language operating_system editor category] + sql = filters.map { |f| "groupUniqArray(#{f}) AS #{f}_values" }.join(", ") + row = conn.select_one("SELECT #{sql} FROM heartbeats WHERE user_id = #{user_id}") + filters.index_with { |f| Array(row["#{f}_values"]).reject(&:blank?) } + end + end + + def dashboard_cache_version + @dashboard_cache_version ||= begin + latest_mapping = current_user.project_repo_mappings.maximum(:updated_at)&.to_i || 0 + [ heartbeat_cache_version, latest_mapping ] + end + end + + def heartbeat_cache_version + @heartbeat_cache_version ||= HeartbeatCacheInvalidator.version_for(current_user) + end end diff --git a/app/controllers/my/project_repo_mappings_controller.rb b/app/controllers/my/project_repo_mappings_controller.rb index ac2d09f4d..d24682f1c 100644 --- a/app/controllers/my/project_repo_mappings_controller.rb +++ b/app/controllers/my/project_repo_mappings_controller.rb @@ -92,7 +92,7 @@ def selected_interval end def project_durations_cache_key - key = "user_#{current_user.id}_project_durations_#{selected_interval}_v3" + key = "user_#{current_user.id}_project_durations_#{HeartbeatCacheInvalidator.version_for(current_user)}_#{selected_interval}_v4" if selected_interval == "custom" sanitized_from = sanitized_cache_date(params[:from]) || "none" sanitized_to = sanitized_cache_date(params[:to]) || "none" diff --git a/app/controllers/profiles_controller.rb b/app/controllers/profiles_controller.rb index 6e1ff07fa..9871c41af 100644 --- a/app/controllers/profiles_controller.rb +++ b/app/controllers/profiles_controller.rb @@ -113,7 +113,7 @@ def profile_stats_payload timezone = @user.timezone stats = ProfileStatsService.new(@user).stats - durations = Rails.cache.fetch("user_#{@user.id}_daily_durations_#{timezone}", expires_in: 1.minute) do + durations = Rails.cache.fetch([ "user-daily-durations", @user.id, HeartbeatCacheInvalidator.version_for(@user), timezone ], expires_in: 1.minute) do Time.use_zone(timezone) { @user.heartbeats.daily_durations(user_timezone: timezone).to_h } end diff --git a/app/controllers/static_pages_controller.rb b/app/controllers/static_pages_controller.rb index 925a16625..585e321de 100644 --- a/app/controllers/static_pages_controller.rb +++ b/app/controllers/static_pages_controller.rb @@ -179,7 +179,13 @@ def programming_goals_progress_data goals_hash = ActiveSupport::Digest.hexdigest( goals.pluck(:id, :period, :target_seconds, :languages, :projects).to_json ) - cache_key = "user_#{current_user.id}_programming_goals_progress_#{current_user.timezone}_#{goals_hash}" + cache_key = [ + "programming-goals-progress", + current_user.id, + HeartbeatCacheInvalidator.version_for(current_user), + current_user.timezone, + goals_hash + ] Rails.cache.fetch(cache_key, expires_in: 1.minute) do ProgrammingGoalsProgressService.new(user: current_user, goals: goals).call diff --git a/app/javascript/layouts/AppLayout.svelte b/app/javascript/layouts/AppLayout.svelte index a38722b1d..8f57f744a 100644 --- a/app/javascript/layouts/AppLayout.svelte +++ b/app/javascript/layouts/AppLayout.svelte @@ -42,7 +42,7 @@ type Footer = { git_version: string; - commit_link: string; + commit_link: string | null; server_start_time_ago: string; heartbeat_recent_count: number; heartbeat_recent_imported_count: number; @@ -594,11 +594,16 @@

- Using Inertia. Build {layout.footer.git_version} + Using Inertia. Build + {#if layout.footer.commit_link} + {layout.footer.git_version} + {:else} + {layout.footer.git_version} + {/if} from {layout.footer.server_start_time_ago} ago. {footerStatsText()}

{#if layout.show_stop_impersonating} diff --git a/app/javascript/pages/Home/SignedIn.svelte b/app/javascript/pages/Home/SignedIn.svelte index 2312b4bdb..cf2c4af43 100644 --- a/app/javascript/pages/Home/SignedIn.svelte +++ b/app/javascript/pages/Home/SignedIn.svelte @@ -95,7 +95,6 @@ preserveState: true, preserveScroll: true, replace: true, - async: true, }); } diff --git a/app/jobs/cache/active_projects_job.rb b/app/jobs/cache/active_projects_job.rb index 59d175068..016e6c372 100644 --- a/app/jobs/cache/active_projects_job.rb +++ b/app/jobs/cache/active_projects_job.rb @@ -8,14 +8,35 @@ def cache_expiration end def calculate - # Get recent heartbeats with matching project_repo_mappings in a single SQL query - ProjectRepoMapping.active - .joins("INNER JOIN heartbeats ON heartbeats.project = project_repo_mappings.project_name AND heartbeats.user_id = project_repo_mappings.user_id") - .joins("INNER JOIN users ON users.id = heartbeats.user_id") - .where("heartbeats.source_type = ?", Heartbeat.source_types[:direct_entry]) - .where("heartbeats.time > ?", 5.minutes.ago.to_f) - .select("DISTINCT ON (heartbeats.user_id) project_repo_mappings.*, heartbeats.user_id") - .order("heartbeats.user_id, heartbeats.time DESC") - .index_by(&:user_id) + latest_by_user = latest_projects_by_user_id + + return {} if latest_by_user.empty? + + mappings = ProjectRepoMapping.active + .where(user_id: latest_by_user.keys) + .to_a + .group_by(&:user_id) + + result = {} + latest_by_user.each do |user_id, project_name| + mapping = (mappings[user_id] || []).find { |candidate| candidate.project_name == project_name } + result[user_id] = mapping if mapping + end + + result + end + + def latest_projects_by_user_id + rows = Heartbeat.connection.select_all(Heartbeat.sanitize_sql([ <<~SQL, Heartbeat.source_types[:direct_entry], 5.minutes.ago.to_f ])) + SELECT user_id, argMax(project, time) AS project + FROM heartbeats + WHERE source_type = ? + AND time > ? + GROUP BY user_id + SQL + + rows.each_with_object({}) do |row, hash| + hash[row["user_id"].to_i] = row["project"] + end end end diff --git a/app/jobs/cache/active_users_graph_data_job.rb b/app/jobs/cache/active_users_graph_data_job.rb index 9cbc380d7..f9a68417b 100644 --- a/app/jobs/cache/active_users_graph_data_job.rb +++ b/app/jobs/cache/active_users_graph_data_job.rb @@ -5,21 +5,24 @@ class Cache::ActiveUsersGraphDataJob < Cache::ActivityJob def calculate # over the last 24 hours, count the number of people who were active each hour - hours = Heartbeat.coding_only - .with_valid_timestamps - .where("time > ?", 24.hours.ago.to_f) - .where("time < ?", Time.current.to_f) - .select("(EXTRACT(EPOCH FROM to_timestamp(time))::bigint / 3600 * 3600) as hour, COUNT(DISTINCT user_id) as count") - .group("hour") - .order("hour DESC") + connection = Heartbeat.connection + hours = connection.select_all(<<~SQL + SELECT + toInt64(toUInt32(time) / 3600) * 3600 AS hour, + uniq(user_id) AS count + FROM (#{Heartbeat.coding_only.with_valid_timestamps.where("time > ?", 24.hours.ago.to_f).where("time < ?", Time.current.to_f).to_sql}) AS hb + GROUP BY hour + ORDER BY hour DESC + SQL + ) - top_hour_count = hours.max_by(&:count)&.count || 1 + top_hour_count = hours.map { |h| h["count"].to_i }.max || 1 hours = hours.map do |h| { - hour: Time.at(h.hour), - users: h.count, - height: (h.count.to_f / top_hour_count * 100).round + hour: Time.at(h["hour"].to_i), + users: h["count"].to_i, + height: (h["count"].to_f / top_hour_count * 100).round } end end diff --git a/app/jobs/cache/currently_hacking_count_job.rb b/app/jobs/cache/currently_hacking_count_job.rb index 6930d62e8..ff0120c60 100644 --- a/app/jobs/cache/currently_hacking_count_job.rb +++ b/app/jobs/cache/currently_hacking_count_job.rb @@ -8,12 +8,11 @@ def cache_expiration end def calculate - count = Heartbeat.joins(:user) - .where(source_type: :direct_entry) + count = Heartbeat.where(source_type: :direct_entry) .coding_only .where("time > ?", 5.minutes.ago.to_f) - .select("DISTINCT user_id") - .count + .distinct + .count(:user_id) { count: count } end diff --git a/app/jobs/cache/currently_hacking_job.rb b/app/jobs/cache/currently_hacking_job.rb index 236e2de3c..5308be5f5 100644 --- a/app/jobs/cache/currently_hacking_job.rb +++ b/app/jobs/cache/currently_hacking_job.rb @@ -8,22 +8,19 @@ def cache_expiration end def calculate - # Get most recent heartbeats and users in a single query - recent_heartbeats = Heartbeat.joins(:user) - .where(source_type: :direct_entry) - .coding_only - .where("time > ?", 5.minutes.ago.to_f) - .select("DISTINCT ON (user_id) user_id, project, time, users.*") - .order("user_id, time DESC") - .includes(user: [ :project_repo_mappings, :email_addresses ]) - .index_by(&:user_id) - - users = recent_heartbeats.values.map(&:user) + recent_heartbeats = latest_projects_by_user_id + + user_ids = recent_heartbeats.keys + users_by_id = User.where(id: user_ids) + .includes(:project_repo_mappings, :email_addresses) + .index_by(&:id) + + users = user_ids.filter_map { |uid| users_by_id[uid] } active_projects = {} users.each do |user| - recent_heartbeat = recent_heartbeats[user.id] - mapping = user.project_repo_mappings.find { |p| p.project_name == recent_heartbeat&.project } + project_name = recent_heartbeats[user.id] + mapping = user.project_repo_mappings.find { |p| p.project_name == project_name } active_projects[user.id] = mapping&.archived? ? nil : mapping end @@ -36,4 +33,19 @@ def calculate { users: users, active_projects: active_projects } end + + def latest_projects_by_user_id + rows = Heartbeat.connection.select_all(Heartbeat.sanitize_sql([ <<~SQL, Heartbeat.source_types[:direct_entry], 5.minutes.ago.to_f ])) + SELECT user_id, argMax(project, time) AS project + FROM heartbeats + WHERE source_type = ? + AND category = 'coding' + AND time > ? + GROUP BY user_id + SQL + + rows.each_with_object({}) do |row, hash| + hash[row["user_id"].to_i] = row["project"] + end + end end diff --git a/app/jobs/cache/home_stats_job.rb b/app/jobs/cache/home_stats_job.rb index 2626dae8c..22179d1bb 100644 --- a/app/jobs/cache/home_stats_job.rb +++ b/app/jobs/cache/home_stats_job.rb @@ -3,11 +3,36 @@ class Cache::HomeStatsJob < Cache::ActivityJob private + def cache_key + "#{super}/#{summary_refresh_version}" + end + def calculate - seconds_by_user = Heartbeat.group(:user_id).duration_seconds + result = HeartbeatUserDailySummary.connection.select_one(<<~SQL) + SELECT + uniq(user_id) AS users_tracked, + toInt64(coalesce(sum(duration_s), 0)) AS seconds_tracked + FROM heartbeat_user_daily_summary FINAL + SQL + { - users_tracked: seconds_by_user.size, - seconds_tracked: seconds_by_user.values.sum + users_tracked: result["users_tracked"].to_i, + seconds_tracked: result["seconds_tracked"].to_i } end + + def summary_refresh_version + connection = HeartbeatUserDailySummary.connection + database = connection.select_value("SELECT currentDatabase()") + + connection.select_value(<<~SQL)&.to_i || 0 + SELECT toUnixTimestamp(last_success_time) + FROM system.view_refreshes + WHERE database = #{connection.quote(database)} + AND view = 'heartbeat_user_daily_summary_mv' + LIMIT 1 + SQL + rescue StandardError + 0 + end end diff --git a/app/jobs/leaderboard_update_job.rb b/app/jobs/leaderboard_update_job.rb index 39b94861a..4059f574a 100644 --- a/app/jobs/leaderboard_update_job.rb +++ b/app/jobs/leaderboard_update_job.rb @@ -31,13 +31,15 @@ def build_leaderboard(date, period, force_update = false) range = LeaderboardDateRange.calculate(date, period) ActiveRecord::Base.transaction do - # Build the base heartbeat query + # Get eligible user IDs from Postgres (can't cross-DB join) + eligible_user_ids = User.where.not(github_uid: nil) + .where.not(trust_level: User.trust_levels[:red]) + .pluck(:id) + heartbeat_query = Heartbeat.where(time: range) .with_valid_timestamps - .joins(:user) .coding_only - .where.not(users: { github_uid: nil }) - .where.not(users: { trust_level: User.trust_levels[:red] }) + .where(user_id: eligible_user_ids) data = heartbeat_query.group(:user_id).duration_seconds .filter { |_, seconds| seconds > 60 } diff --git a/app/jobs/one_time/backfill_heartbeat_editor_job.rb b/app/jobs/one_time/backfill_heartbeat_editor_job.rb index 0cb912c96..b1ce70952 100644 --- a/app/jobs/one_time/backfill_heartbeat_editor_job.rb +++ b/app/jobs/one_time/backfill_heartbeat_editor_job.rb @@ -18,8 +18,6 @@ def perform(dry_run = true) # Store the parsed values for bulk update heartbeat.editor = parsed_ua[:editor] heartbeat.operating_system = parsed_ua[:os] - # Regenerate fields_hash before adding to processed records - heartbeat.fields_hash = Heartbeat.generate_fields_hash(heartbeat.attributes) processed_heartbeats << heartbeat # When we have 1000 records, update them and clear the array @@ -45,7 +43,7 @@ def bulk_update_heartbeats(heartbeats) Heartbeat.import heartbeats, on_duplicate_key_update: { conflict_target: [ :id ], - columns: [ :editor, :operating_system, :updated_at, :fields_hash ] + columns: [ :editor, :operating_system, :updated_at ] } end end diff --git a/app/jobs/one_time/generate_unique_heartbeat_hashes_job.rb b/app/jobs/one_time/generate_unique_heartbeat_hashes_job.rb deleted file mode 100644 index 954153e4c..000000000 --- a/app/jobs/one_time/generate_unique_heartbeat_hashes_job.rb +++ /dev/null @@ -1,53 +0,0 @@ -class OneTime::GenerateUniqueHeartbeatHashesJob < ApplicationJob - queue_as :default - - include GoodJob::ActiveJobExtensions::Concurrency - - # only allow one instance of this job to run at a time - good_job_control_concurrency_with( - key: -> { "generate_unique_heartbeat_hashes_job" }, - total_limit: 1, - ) - - def perform(scope = Heartbeat.where(fields_hash: nil)) - scope_count = scope.count - puts "Starting to generate unique heartbeat hashes for #{scope_count} heartbeats" - index = 0 - scope.in_batches(of: 1000) do |batch| - # Process records in smaller chunks to avoid statement size limits - batch.each_slice(250) do |chunk| - updates = chunk.map do |heartbeat| - index += 1 - puts "Processing heartbeat #{heartbeat.id} (#{index} of #{batch.size})" - field_hash = Heartbeat.generate_fields_hash(heartbeat.attributes) - puts "Field hash: #{field_hash}" - [ heartbeat.id, field_hash ] - end - - # Update creates n queries even when passed an array of records to update, so - # we're using a SQL CASE statement to update the records in a single query. - # Prior work: https://gist.github.com/zoltan-nz/6390986 - case_statement = updates.map { |id, hash| "WHEN id = #{id} THEN '#{hash}'" }.join(" ") - Heartbeat.where(id: updates.map(&:first)) - .update_all("fields_hash = CASE #{case_statement} END") - end - end - - # Delete all heartbeats without a user_id - Heartbeat.where(user_id: nil).delete_all - - distinct_ids = Heartbeat.select("DISTINCT ON (fields_hash) id") - .order("fields_hash, created_at") - .pluck("id") - total_heartbeats = Heartbeat.count - total_distinct_heartbeats = distinct_ids.count - - puts "Found #{total_distinct_heartbeats} distinct heartbeat(s) out of #{total_heartbeats} total" - - deleted_count = Heartbeat.where.not( - id: distinct_ids - ).delete_all - - puts "Deleted #{deleted_count} duplicate heartbeat(s)" - end -end diff --git a/app/jobs/one_time/recalc_heartbeat_field_hash_job.rb b/app/jobs/one_time/recalc_heartbeat_field_hash_job.rb deleted file mode 100644 index 7d4f334cb..000000000 --- a/app/jobs/one_time/recalc_heartbeat_field_hash_job.rb +++ /dev/null @@ -1,15 +0,0 @@ -class OneTime::RecalcHeartbeatFieldHashJob < ApplicationJob - queue_as :default - - def perform - Heartbeat.find_each(batch_size: 2500) do |heartbeat| - begin - heartbeat.send(:set_fields_hash!) - heartbeat.save! - rescue ActiveRecord::RecordNotUnique - # If we have a duplicate fields_hash, soft delete this record - heartbeat.soft_delete - end - end - end -end diff --git a/app/jobs/sync_all_user_repo_events_job.rb b/app/jobs/sync_all_user_repo_events_job.rb index 252611acf..131143fe2 100644 --- a/app/jobs/sync_all_user_repo_events_job.rb +++ b/app/jobs/sync_all_user_repo_events_job.rb @@ -10,11 +10,14 @@ def perform # Identify users: # 1. Authenticated with GitHub (have an access token and username) # 2. Have had heartbeats in the last 6 hours + # Get user IDs with recent heartbeats from ClickHouse (can't cross-DB join) + recent_user_ids = Heartbeat.where("time >= ?", 6.hours.ago.to_f) + .distinct + .pluck(:user_id) + users_to_sync = User.where.not(github_access_token: nil) .where.not(github_username: nil) - .joins(:heartbeats) # Assumes User has_many :heartbeats - .where("heartbeats.created_at >= ?", 6.hours.ago) - .distinct + .where(id: recent_user_ids) if users_to_sync.empty? Rails.logger.info "No users eligible for GitHub event sync at this time." diff --git a/app/jobs/weekly_summary_email_job.rb b/app/jobs/weekly_summary_email_job.rb index d4f7dbdec..acb365553 100644 --- a/app/jobs/weekly_summary_email_job.rb +++ b/app/jobs/weekly_summary_email_job.rb @@ -15,18 +15,13 @@ def perform(reference_time = Time.current) private def eligible_users(cutoff) - users = User.arel_table - heartbeats = Heartbeat.arel_table - - recent_activity_exists = Heartbeat.unscoped - .where(heartbeats[:user_id].eq(users[:id])) - .where(heartbeats[:deleted_at].eq(nil)) - .where(heartbeats[:time].gteq(cutoff.to_f)) - .arel - .exists + # Query ClickHouse for user_ids with recent activity (can't do cross-DB subquery) + active_user_ids = Heartbeat.where("time >= ?", cutoff.to_f) + .distinct + .pluck(:user_id) User.subscribed("weekly_summary").where( - users[:created_at].gteq(cutoff).or(recent_activity_exists) + User.arel_table[:created_at].gteq(cutoff).or(User.arel_table[:id].in(active_user_ids)) ).where.not(id: DeletionRequest.active.select(:user_id)) end end diff --git a/app/mailers/weekly_summary_mailer.rb b/app/mailers/weekly_summary_mailer.rb index 632652aff..d4891ed98 100644 --- a/app/mailers/weekly_summary_mailer.rb +++ b/app/mailers/weekly_summary_mailer.rb @@ -48,10 +48,13 @@ def breakdown(scope, column, limit: 5) def active_days_count(scope) timezone = @timezone_label - timezone_sql = ActiveRecord::Base.connection.quote(timezone) - scope.where.not(time: nil) - .distinct - .count(Arel.sql("DATE(to_timestamp(time) AT TIME ZONE #{timezone_sql})")) + tz_quoted = Heartbeat.connection.quote(timezone) + # ClickHouse-compatible: count distinct days using toDate with timezone + result = Heartbeat.connection.select_value(<<~SQL) + SELECT uniq(toDate(toDateTime(toUInt32(time), #{tz_quoted}))) + FROM (#{scope.where.not(time: nil).to_sql}) AS hb + SQL + result.to_i rescue StandardError scope.where.not(time: nil).pluck(:time).map { |time| Time.at(time).in_time_zone(timezone).to_date }.uniq.count end diff --git a/app/models/clickhouse_record.rb b/app/models/clickhouse_record.rb new file mode 100644 index 000000000..f802a27e4 --- /dev/null +++ b/app/models/clickhouse_record.rb @@ -0,0 +1,4 @@ +class ClickhouseRecord < ActiveRecord::Base + self.abstract_class = true + connects_to database: { writing: :clickhouse, reading: :clickhouse } +end diff --git a/app/models/concerns/heartbeatable.rb b/app/models/concerns/heartbeatable.rb index b32f24af9..17cb20972 100644 --- a/app/models/concerns/heartbeatable.rb +++ b/app/models/concerns/heartbeatable.rb @@ -1,12 +1,11 @@ module Heartbeatable extend ActiveSupport::Concern + MAX_VALID_TIMESTAMP = 253402300799 + VALID_TIMESTAMPS_SQL = "(time >= 0 AND time <= ?)".freeze + included do - # Filter heartbeats to only include those with category equal to "coding" scope :coding_only, -> { where(category: "coding") } - - # This is to prevent PG timestamp overflow errors if someones gives us a - # heartbeat with a time that is enormously far in the future. scope :with_valid_timestamps, -> { where("time >= 0 AND time <= ?", 253402300799) } end @@ -28,7 +27,7 @@ def to_span(timeout_duration: nil) sql = <<~SQL SELECT time, - LEAD(time) OVER (ORDER BY time) as next_time + leadInFrame(time) OVER (ORDER BY time ASC ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING) as next_time FROM (#{heartbeats.to_sql}) AS heartbeats SQL @@ -42,10 +41,10 @@ def to_span(timeout_duration: nil) current_time = row["time"] next_time = row["next_time"] - if next_time.nil? || (next_time - current_time) > timeout_duration + if next_time.nil? || next_time == 0 || (next_time - current_time) > timeout_duration base_duration = (current_time - current_span_start).round - if next_time + if next_time && next_time != 0 gap_duration = [ next_time - current_time, timeout_duration ].min total_duration = base_duration + gap_duration end_time = current_time + gap_duration @@ -62,7 +61,7 @@ def to_span(timeout_duration: nil) } end - current_span_start = next_time if next_time + current_span_start = next_time if next_time && next_time != 0 end end @@ -79,9 +78,6 @@ def duration_formatted(scope = all) end def duration_simple(scope = all) - # 3 hours 10 min => "3 hrs" - # 1 hour 10 min => "1 hr" - # 10 min => "10 min" seconds = duration_seconds(scope) hours = seconds / 3600 minutes = (seconds % 3600) / 60 @@ -109,70 +105,75 @@ def daily_streaks_for_users(user_ids, start_date: 31.days.ago) return user_ids.index_with { |id| streak_cache["user_streak_#{id}"] || 0 } end + # Fetch user timezones from Postgres, validate once upfront + raw_timezones = User.where(id: uncached_users).pluck(:id, :timezone).to_h + user_timezones = raw_timezones.transform_values do |tz| + begin + TZInfo::Timezone.get(tz) && tz + rescue TZInfo::InvalidTimezoneIdentifier, ArgumentError + "UTC" + end + end + timeout = heartbeat_timeout_duration.to_i - raw_durations = joins(:user) - .where(user_id: uncached_users) - .coding_only - .with_valid_timestamps - .where(time: start_date..Time.current) - .select( - :user_id, - "users.timezone as user_timezone", - Arel.sql("DATE_TRUNC('day', to_timestamp(time) AT TIME ZONE users.timezone) as day_group"), - Arel.sql("LEAST(time - LAG(time) OVER (PARTITION BY user_id, DATE_TRUNC('day', to_timestamp(time) AT TIME ZONE users.timezone) ORDER BY time), #{timeout}) as diff") - ) - # Then aggregate the results - daily_durations = connection.select_all( - "SELECT user_id, user_timezone, day_group, COALESCE(SUM(diff), 0)::integer as duration - FROM (#{raw_durations.to_sql}) AS diffs - GROUP BY user_id, user_timezone, day_group" - ).group_by { |row| row["user_id"] } - .transform_values do |rows| - timezone = rows.first["user_timezone"] - - if timezone.blank? - Rails.logger.warn "nil tz, going to utc." - timezone = "UTC" - else - begin - TZInfo::Timezone.get(timezone) - rescue TZInfo::InvalidTimezoneIdentifier, ArgumentError - Rails.logger.warn "Invalid timezone for streak calculation: #{timezone}. Defaulting to UTC." - timezone = "UTC" - end - end - - current_date = Time.current.in_time_zone(timezone).to_date - { - current_date: current_date, - days: rows.map do |row| - [ row["day_group"].to_date, row["duration"].to_i ] - end.sort_by { |date, _| date }.reverse - } - end + # Fetch ordered heartbeats once, then bucket by each user's local day in Ruby so + # diffs do not bleed across midnight in that user's timezone. + raw_sql = <<~SQL + SELECT + user_id, + time + FROM heartbeats + WHERE user_id IN (#{uncached_users.join(',')}) + AND category = 'coding' + AND time >= 0 AND time <= 253402300799 + AND time >= #{start_date.to_f} + AND time <= #{Time.current.to_f} + ORDER BY user_id ASC, time ASC + SQL + + rows = connection.select_all(raw_sql) + + daily_durations = rows.group_by { |row| row["user_id"].to_i }.transform_values do |user_rows| + user_id = user_rows.first["user_id"].to_i + timezone = user_timezones[user_id] || "UTC" + + durations_by_day = Hash.new(0) + previous_time = nil + previous_day = nil + + user_rows.each do |row| + current_time = row["time"].to_f + current_day = Time.at(current_time).in_time_zone(timezone).to_date + + if previous_time && previous_day == current_day + gap = current_time - previous_time + durations_by_day[current_day] += [ [ gap, timeout ].min, 0 ].max.to_i + end + + previous_time = current_time + previous_day = current_day + end + + durations_by_day.sort_by { |date, _| date }.reverse + end result = user_ids.index_with { |id| streak_cache["user_streak_#{id}"] || 0 } - # Then calculate streaks for each user - daily_durations.each do |user_id, data| - current_date = data[:current_date] - days = data[:days] + daily_durations.each do |user_id, days| + timezone = user_timezones[user_id] || "UTC" + current_date = Time.current.in_time_zone(timezone).to_date - # Calculate streak streak = 0 days.each do |date, duration| - # Skip if this day is in the future next if date > current_date - # If they didn't code enough today, just skip if date == current_date next unless duration >= 15 * 60 streak += 1 next end - # For previous days, check if it's the next day in the streak if date == current_date - streak.days && duration >= 15 * 60 streak += 1 else @@ -181,8 +182,6 @@ def daily_streaks_for_users(user_ids, start_date: 31.days.ago) end result[user_id] = streak - - # Cache the streak for 1 hour Rails.cache.write("user_streak_#{user_id}", streak, expires_in: 1.hour) end @@ -197,14 +196,31 @@ def daily_durations(user_timezone:, start_date: 365.days.ago, end_date: Time.cur timezone = "UTC" end - # Create the timezone-aware date truncation expression - day_trunc = Arel.sql("DATE_TRUNC('day', to_timestamp(time) AT TIME ZONE '#{timezone}')") + sql = <<~SQL + SELECT + day_group, + toInt64(coalesce(sum(diff), 0)) AS duration + FROM ( + SELECT + toDate(toDateTime(toUInt32(time), '#{timezone}')) AS day_group, + least( + greatest( + time - lagInFrame(time, 1, time) OVER ( + PARTITION BY toDate(toDateTime(toUInt32(time), '#{timezone}')) + ORDER BY time ASC + ROWS BETWEEN 1 PRECEDING AND CURRENT ROW + ), + 0 + ), + #{heartbeat_timeout_duration.to_i} + ) AS diff + FROM (#{with_valid_timestamps.where(time: start_date.to_f..end_date.to_f).to_sql}) AS hb + ) + GROUP BY day_group + ORDER BY day_group + SQL - select(day_trunc.as("day_group")) - .where(time: start_date..end_date) - .group(day_trunc) - .duration_seconds - .map { |date, duration| [ date.to_date, duration ] } + connection.select_all(sql).map { |row| [ row["day_group"].to_date, row["duration"].to_i ] } end def duration_seconds(scope = all) @@ -217,35 +233,24 @@ def duration_seconds(scope = all) end group_column = scope.group_values.first + group_expr = group_column.to_s.include?("(") ? group_column : "`#{group_column}`" - # Don't quote if it's a SQL function (contains parentheses) - group_expr = group_column.to_s.include?("(") ? group_column : connection.quote_column_name(group_column) - - capped_diffs = scope - .select("#{group_expr} as grouped_time, CASE - WHEN LAG(time) OVER (PARTITION BY #{group_expr} ORDER BY time) IS NULL THEN 0 - ELSE LEAST(time - LAG(time) OVER (PARTITION BY #{group_expr} ORDER BY time), #{timeout}) - END as diff") + capped_diffs_sql = scope + .select("#{group_expr} as grouped_time, least(greatest(time - lagInFrame(time, 1, time) OVER (PARTITION BY #{group_expr} ORDER BY time ASC ROWS BETWEEN 1 PRECEDING AND CURRENT ROW), 0), #{timeout}) as diff") .where.not(time: nil) .unscope(:group) + .to_sql connection.select_all( - "SELECT grouped_time, COALESCE(SUM(diff), 0)::integer as duration - FROM (#{capped_diffs.to_sql}) AS diffs - GROUP BY grouped_time" + "SELECT grouped_time, toInt64(coalesce(sum(diff), 0)) as duration FROM (#{capped_diffs_sql}) GROUP BY grouped_time" ).each_with_object({}) do |row, hash| hash[row["grouped_time"]] = row["duration"].to_i end else - # when not grouped, return a single value - capped_diffs = scope - .select("CASE - WHEN LAG(time) OVER (ORDER BY time) IS NULL THEN 0 - ELSE LEAST(time - LAG(time) OVER (ORDER BY time), #{timeout}) - END as diff") - .where.not(time: nil) + summarized_duration = duration_seconds_from_daily_summary(scope, timeout:) + return summarized_duration unless summarized_duration.nil? - connection.select_value("SELECT COALESCE(SUM(diff), 0)::integer FROM (#{capped_diffs.to_sql}) AS diffs").to_i + raw_duration_seconds(scope, timeout:) end end @@ -256,7 +261,7 @@ def duration_seconds_boundary_aware(scope, start_time, end_time) base_scope = model_class.all.with_valid_timestamps excluded_categories = [ "browsing", "ai coding", "meeting", "communicating" ] - base_scope = base_scope.where.not("LOWER(category) IN (?)", excluded_categories) + base_scope = base_scope.where.not("lower(category) IN (?)", excluded_categories) if scope.where_values_hash["user_id"] base_scope = base_scope.where(user_id: scope.where_values_hash["user_id"]) @@ -270,18 +275,12 @@ def duration_seconds_boundary_aware(scope, start_time, end_time) base_scope = base_scope.where(project: scope.where_values_hash["project"]) end - if scope.where_values_hash["deleted_at"] - base_scope = base_scope.where(deleted_at: scope.where_values_hash["deleted_at"]) - end - - # get the heartbeat before the start_time boundary_heartbeat = base_scope .where("time < ?", start_time) .order(time: :desc) .limit(1) .first - # if it's not NULL, we'll use it if boundary_heartbeat combined_scope = base_scope .where("time >= ? OR time = ?", start_time, boundary_heartbeat.time) @@ -291,20 +290,237 @@ def duration_seconds_boundary_aware(scope, start_time, end_time) .where(time: start_time..end_time) end - # we calc w/ the boundary heartbeat, but we only sum within the orignal constraint timeout = heartbeat_timeout_duration.to_i - capped_diffs = combined_scope - .select("time, CASE - WHEN LAG(time) OVER (ORDER BY time) IS NULL THEN 0 - ELSE LEAST(time - LAG(time) OVER (ORDER BY time), #{timeout}) - END as diff") + capped_diffs_sql = combined_scope + .select("time, least(greatest(time - lagInFrame(time, 1, time) OVER (ORDER BY time ASC ROWS BETWEEN 1 PRECEDING AND CURRENT ROW), 0), #{timeout}) as diff") .where.not(time: nil) .order(time: :asc) + .to_sql - sql = "SELECT COALESCE(SUM(diff), 0)::integer - FROM (#{capped_diffs.to_sql}) AS diffs - WHERE time >= #{connection.quote(start_time)}" + sql = "SELECT toInt64(coalesce(sum(diff), 0)) FROM (#{capped_diffs_sql}) WHERE time >= #{connection.quote(start_time)}" connection.select_value(sql).to_i end + + private + + def duration_seconds_from_daily_summary(scope, timeout:) + compatibility = summary_compatible_scope(scope) + return if compatibility.nil? + + day_bounds = summarized_day_bounds(scope) + return 0 if day_bounds.empty? + + stale_days = [ Time.current.utc.to_date ] + last_refresh_time = HeartbeatUserDailySummary.last_refresh_time + stale_days.concat(stale_summary_days(scope, last_refresh_time)) if last_refresh_time.present? + + summary_days = day_bounds + .map { |row| row["day"].to_date } + .uniq + .select { |day| full_day_included?(day, compatibility) && !stale_days.include?(day) } + + raw_days = day_bounds + .map { |row| row["day"].to_date } + .uniq - summary_days + + summary_total = HeartbeatUserDailySummary.duration_for_days( + user_id: compatibility[:user_id], + days: summary_days + ) + + raw_total = raw_days.sum do |day| + raw_duration_seconds(scope.where(time: utc_day_range(day)), timeout:) + end + + summary_total + raw_total + cross_day_bonus(day_bounds, timeout:) + end + + def summary_compatible_scope(scope) + return unless scope.model == Heartbeat + return unless scope.group_values.empty? + return unless scope.having_clause.empty? + return unless scope.limit_value.nil? && scope.offset_value.nil? + + compatibility = { + user_id: nil, + start_time: nil, + start_inclusive: true, + end_time: nil, + end_inclusive: true + } + + predicates = scope.where_clause.send(:predicates) + + predicates.each do |predicate| + return unless apply_summary_predicate(predicate, compatibility) + end + + return if compatibility[:user_id].blank? + + compatibility + end + + def apply_summary_predicate(predicate, compatibility) + case predicate + when Arel::Nodes::Equality + apply_summary_equality_predicate(predicate, compatibility) + when Arel::Nodes::Between + apply_summary_between_predicate(predicate, compatibility) + when Arel::Nodes::And + predicate.children.all? { |child| apply_summary_predicate(child, compatibility) } + when Arel::Nodes::GreaterThanOrEqual + apply_summary_lower_bound(predicate, compatibility, inclusive: true) + when Arel::Nodes::GreaterThan + apply_summary_lower_bound(predicate, compatibility, inclusive: false) + when Arel::Nodes::LessThanOrEqual + apply_summary_upper_bound(predicate, compatibility, inclusive: true) + when Arel::Nodes::LessThan + apply_summary_upper_bound(predicate, compatibility, inclusive: false) + when Arel::Nodes::BoundSqlLiteral + predicate.sql_with_placeholders == VALID_TIMESTAMPS_SQL && + predicate.positional_binds.map { |bind| node_value(bind) } == [ MAX_VALID_TIMESTAMP ] + else + false + end + end + + def apply_summary_equality_predicate(predicate, compatibility) + return false unless predicate.left.name.to_s == "user_id" + + user_id = node_value(predicate.right).to_i + return false if compatibility[:user_id].present? && compatibility[:user_id] != user_id + + compatibility[:user_id] = user_id + true + end + + def apply_summary_between_predicate(predicate, compatibility) + return false unless predicate.left.name.to_s == "time" + + lower_bound, upper_bound = predicate.right.children + apply_time_lower_bound(compatibility, node_value(lower_bound).to_f, inclusive: true) + apply_time_upper_bound(compatibility, node_value(upper_bound).to_f, inclusive: true) + true + end + + def apply_summary_lower_bound(predicate, compatibility, inclusive:) + return false unless predicate.left.name.to_s == "time" + + apply_time_lower_bound(compatibility, node_value(predicate.right).to_f, inclusive:) + true + end + + def apply_summary_upper_bound(predicate, compatibility, inclusive:) + return false unless predicate.left.name.to_s == "time" + + apply_time_upper_bound(compatibility, node_value(predicate.right).to_f, inclusive:) + true + end + + def apply_time_lower_bound(compatibility, candidate_time, inclusive:) + current_time = compatibility[:start_time] + current_inclusive = compatibility[:start_inclusive] + + if current_time.nil? || candidate_time > current_time || (candidate_time == current_time && !inclusive && current_inclusive) + compatibility[:start_time] = candidate_time + compatibility[:start_inclusive] = inclusive + end + end + + def apply_time_upper_bound(compatibility, candidate_time, inclusive:) + current_time = compatibility[:end_time] + current_inclusive = compatibility[:end_inclusive] + + if current_time.nil? || candidate_time < current_time || (candidate_time == current_time && !inclusive && current_inclusive) + compatibility[:end_time] = candidate_time + compatibility[:end_inclusive] = inclusive + end + end + + def summarized_day_bounds(scope) + scoped_sql = scope + .where.not(time: nil) + .unscope(:order) + .reselect(:time) + .to_sql + + connection.select_all(<<~SQL) + SELECT + toDate(toDateTime(toUInt32(time))) AS day, + min(time) AS first_time, + max(time) AS last_time + FROM (#{scoped_sql}) AS hb + GROUP BY day + ORDER BY day + SQL + end + + def stale_summary_days(scope, last_refresh_time) + scoped_sql = scope + .where.not(time: nil) + .unscope(:order) + .reselect(:time, :updated_at) + .to_sql + + connection.select_values(<<~SQL).map(&:to_date) + SELECT DISTINCT toDate(toDateTime(toUInt32(time))) AS day + FROM (#{scoped_sql}) AS hb + WHERE updated_at > #{connection.quote(last_refresh_time)} + SQL + end + + def full_day_included?(day, compatibility) + day_start = utc_day_start(day) + day_end = utc_day_end(day) + + lower_bound_matches = compatibility[:start_time].nil? || + compatibility[:start_time] < day_start || + (compatibility[:start_time] == day_start && compatibility[:start_inclusive]) + + upper_bound_matches = compatibility[:end_time].nil? || + compatibility[:end_time] > day_end || + (compatibility[:end_time] == day_end && compatibility[:end_inclusive]) + + lower_bound_matches && upper_bound_matches + end + + def cross_day_bonus(day_bounds, timeout:) + day_bounds.each_cons(2).sum do |previous_day, current_day| + previous_date = previous_day["day"].to_date + current_date = current_day["day"].to_date + next 0 unless current_date == previous_date + 1.day + + gap = current_day["first_time"].to_f - previous_day["last_time"].to_f + [ [ gap, timeout ].min, 0 ].max.to_i + end + end + + def raw_duration_seconds(scope, timeout:) + capped_diffs_sql = scope + .select("least(greatest(time - lagInFrame(time, 1, time) OVER (ORDER BY time ASC ROWS BETWEEN 1 PRECEDING AND CURRENT ROW), 0), #{timeout}) as diff") + .where.not(time: nil) + .to_sql + + connection.select_value("SELECT toInt64(coalesce(sum(diff), 0)) FROM (#{capped_diffs_sql})").to_i + end + + def utc_day_range(day) + utc_day_start(day)..utc_day_end(day) + end + + def utc_day_start(day) + day.to_time(:utc).beginning_of_day.to_f + end + + def utc_day_end(day) + day.to_time(:utc).end_of_day.to_f + end + + def node_value(node) + return node.value_for_database if node.respond_to?(:value_for_database) + return node.value if node.respond_to?(:value) + + node + end end end diff --git a/app/models/concerns/slack_integration.rb b/app/models/concerns/slack_integration.rb index c6b868a99..6790e7292 100644 --- a/app/models/concerns/slack_integration.rb +++ b/app/models/concerns/slack_integration.rb @@ -45,6 +45,7 @@ def update_from_slack def update_slack_status return unless uses_slack_status? + return if slack_access_token.blank? current_status_response = HTTP.auth("Bearer #{slack_access_token}") .get("https://slack.com/api/users.profile.get") diff --git a/app/models/concerns/time_range_filterable.rb b/app/models/concerns/time_range_filterable.rb index 4737da027..3fdd523b9 100644 --- a/app/models/concerns/time_range_filterable.rb +++ b/app/models/concerns/time_range_filterable.rb @@ -37,55 +37,40 @@ module TimeRangeFilterable flavortown: { human_name: "Flavortown", calculate: -> { - timezone = "America/New_York" - Time.use_zone(timezone) do - from = Time.parse("2025-12-15").beginning_of_day - to = Time.parse("2026-03-31").end_of_day - from.beginning_of_day..to.end_of_day + Time.use_zone("America/New_York") do + Time.parse("2025-12-15").beginning_of_day..Time.parse("2026-03-31").end_of_day end } }, summer_of_making: { human_name: "Summer of Making", calculate: -> { - timezone = "America/New_York" - Time.use_zone(timezone) do - from = Time.parse("2025-06-16").beginning_of_day - to = Time.parse("2025-09-30").end_of_day - from.beginning_of_day..to.end_of_day + Time.use_zone("America/New_York") do + Time.parse("2025-06-16").beginning_of_day..Time.parse("2025-09-30").end_of_day end } }, high_seas: { human_name: "High Seas", calculate: -> { - timezone = "America/New_York" - Time.use_zone(timezone) do - from = Time.parse("2024-10-30").beginning_of_day - to = Time.parse("2025-01-31").end_of_day - from.beginning_of_day..to.end_of_day + Time.use_zone("America/New_York") do + Time.parse("2024-10-30").beginning_of_day..Time.parse("2025-01-31").end_of_day end } }, low_skies: { human_name: "Low Skies", calculate: -> { - timezone = "America/New_York" - Time.use_zone(timezone) do - from = Time.parse("2024-10-3").beginning_of_day - to = Time.parse("2025-01-12").end_of_day - from.beginning_of_day..to.end_of_day + Time.use_zone("America/New_York") do + Time.parse("2024-10-03").beginning_of_day..Time.parse("2025-01-12").end_of_day end } }, scrapyard: { human_name: "Scrapyard Global", calculate: -> { - timezone = "America/New_York" - Time.use_zone(timezone) do - from = Time.parse("2025-03-14").beginning_of_day - to = Time.parse("2025-03-17").end_of_day - from.beginning_of_day..to.end_of_day + Time.use_zone("America/New_York") do + Time.parse("2025-03-14").beginning_of_day..Time.parse("2025-03-17").end_of_day end } } diff --git a/app/models/heartbeat.rb b/app/models/heartbeat.rb index 60b1372e4..63d597991 100644 --- a/app/models/heartbeat.rb +++ b/app/models/heartbeat.rb @@ -1,18 +1,26 @@ -class Heartbeat < ApplicationRecord - before_save :set_fields_hash! +class Heartbeat < ClickhouseRecord + self.table_name = "heartbeats" + self.primary_key = :id include Heartbeatable include TimeRangeFilterable time_range_filterable_field :time - # Default scope to exclude deleted records - default_scope { where(deleted_at: nil) } + before_create :set_clickhouse_id!, if: -> { self[:id].blank? } + after_create :invalidate_user_heartbeat_caches + after_update :invalidate_user_heartbeat_caches - scope :today, -> { where(time: Time.current.beginning_of_day.to_i..Time.current.end_of_day.to_i) } + CLICKHOUSE_ID_RANDOM_BITS = 10 + CLICKHOUSE_ID_RANDOM_MAX = 1 << CLICKHOUSE_ID_RANDOM_BITS + + def set_clickhouse_id! + timestamp_us = (Time.current.to_r * 1_000_000).to_i + self[:id] = (timestamp_us << CLICKHOUSE_ID_RANDOM_BITS) | SecureRandom.random_number(CLICKHOUSE_ID_RANDOM_MAX) + end + + scope :today, -> { where(time: Time.current.beginning_of_day.to_f..Time.current.end_of_day.to_f) } scope :recent, -> { where("time > ?", 24.hours.ago.to_i) } - scope :with_deleted, -> { unscope(where: :deleted_at) } - scope :only_deleted, -> { with_deleted.where.not(deleted_at: nil) } enum :source_type, { direct_entry: 0, @@ -82,15 +90,15 @@ class Heartbeat < ApplicationRecord neighborhood: 58 }, prefix: :claimed_by - # This is to prevent Rails from trying to use STI even though we have a "type" column + # Prevent Rails STI on the "type" column self.inheritance_column = nil + # Note: cross-database joins (Postgres users <-> ClickHouse heartbeats) will not work. + # Use separate queries instead of .joins(:heartbeats) or .includes(:heartbeats). belongs_to :user validates :time, presence: true - # after_create :mirror_to_wakatime - def self.recent_count Cache::HeartbeatCountsJob.perform_now[:recent_count] end @@ -99,34 +107,17 @@ def self.recent_imported_count Cache::HeartbeatCountsJob.perform_now[:recent_imported_count] end - def self.generate_fields_hash(attributes) - string_attributes = attributes.transform_keys(&:to_s) - indexed_attributes = string_attributes.slice(*self.indexed_attributes) - Digest::MD5.hexdigest(indexed_attributes.to_json) - end - - def self.indexed_attributes - %w[user_id branch category dependencies editor entity language machine operating_system project type user_agent line_additions line_deletions lineno lines cursorpos project_root_count time is_write] - end - - def soft_delete - update_column(:deleted_at, Time.current) - end - - def restore - update_column(:deleted_at, nil) - end - private - def set_fields_hash! - # only if the field exists in activerecord - if self.class.column_names.include?("fields_hash") - self.fields_hash = self.class.generate_fields_hash(self.attributes) + def invalidate_user_heartbeat_caches + impacted_user_ids.each do |impacted_user_id| + HeartbeatCacheInvalidator.bump_for(impacted_user_id) end end - # def mirror_to_wakatime - # WakatimeMirror.mirror_heartbeat(self) - # end + def impacted_user_ids + user_ids = [ user_id ] + user_ids.concat(previous_changes.fetch("user_id", [])) + user_ids.compact.uniq + end end diff --git a/app/models/heartbeat_user_daily_summary.rb b/app/models/heartbeat_user_daily_summary.rb new file mode 100644 index 000000000..327dd4f1f --- /dev/null +++ b/app/models/heartbeat_user_daily_summary.rb @@ -0,0 +1,30 @@ +class HeartbeatUserDailySummary < ClickhouseRecord + self.table_name = "heartbeat_user_daily_summary" + + VIEW_NAME = "heartbeat_user_daily_summary_mv".freeze + + class << self + def duration_for_days(user_id:, days:) + return 0 if days.empty? + + from("#{table_name} FINAL") + .where(user_id: user_id, day: days.uniq.sort) + .sum(:duration_s) + .to_i + end + + def last_refresh_time + database = connection.select_value("SELECT currentDatabase()") + + connection.select_value(<<~SQL) + SELECT last_success_time + FROM system.view_refreshes + WHERE database = #{connection.quote(database)} + AND view = #{connection.quote(VIEW_NAME)} + LIMIT 1 + SQL + rescue StandardError + nil + end + end +end diff --git a/app/models/user.rb b/app/models/user.rb index b165c8dfd..d1ee03dfd 100644 --- a/app/models/user.rb +++ b/app/models/user.rb @@ -108,6 +108,7 @@ def set_trust(level, changed_by_user: nil, reason: nil, notes: nil) end # ex: .set_trust(:green) or set_trust(1) setting it to red + # Heartbeats live in ClickHouse — simple scoped queries work but SQL JOINs will not. has_many :heartbeats has_many :goals, dependent: :destroy has_many :email_addresses, dependent: :destroy diff --git a/app/services/anonymize_user_service.rb b/app/services/anonymize_user_service.rb index 3d064821a..a1fac3f97 100644 --- a/app/services/anonymize_user_service.rb +++ b/app/services/anonymize_user_service.rb @@ -75,8 +75,8 @@ def destroy_associated_records user.project_repo_mappings.destroy_all user.goals.destroy_all - # tombstone - Heartbeat.unscoped.where(user_id: user.id, deleted_at: nil).update_all(deleted_at: Time.current) + # Delete heartbeats from ClickHouse + Heartbeat.where(user_id: user.id).delete_all user.access_grants.destroy_all user.access_tokens.destroy_all diff --git a/app/services/concerns/wakatime_shared.rb b/app/services/concerns/wakatime_shared.rb new file mode 100644 index 000000000..333a2e1e5 --- /dev/null +++ b/app/services/concerns/wakatime_shared.rb @@ -0,0 +1,49 @@ +module WakatimeShared + extend ActiveSupport::Concern + + class_methods do + def parse_user_agent(user_agent) + # Based on https://github.com/muety/wakapi/blob/b3668085c01dc0724d8330f4d51efd5b5aecaeb2/utils/http.go#L89 + user_agent_pattern = /wakatime\/[^ ]+ \(([^)]+)\)(?: [^ ]+ ([^\/]+)(?:\/([^\/]+))?)?/ + + if matches = user_agent.match(user_agent_pattern) + os = matches[1].split("-").first + editor = matches[2] || "" + { os: os, editor: editor, err: nil } + elsif browser_ua = user_agent.match(/^([^\/]+)\/([^\/\s]+)/) + if user_agent.include?("wakatime") + full_os = user_agent.split(" ")[1] + if full_os.present? + os = full_os.include?("_") ? full_os.split("_")[0] : full_os + { os: os, editor: browser_ua[1].downcase, err: nil } + else + { os: "", editor: "", err: "failed to parse user agent string" } + end + else + { os: browser_ua[1], editor: browser_ua[2], err: nil } + end + else + { os: "", editor: "", err: "failed to parse user agent string" } + end + rescue => e + { os: "", editor: "", err: "failed to parse user agent string" } + end + end + + private + + def convert_to_unix_timestamp(timestamp) + return nil if timestamp.nil? + + case timestamp + when String + Time.parse(timestamp).to_i + when Time, DateTime, Date + timestamp.to_i + when Numeric + timestamp.to_i + end + rescue ArgumentError + nil + end +end diff --git a/app/services/heartbeat_cache_invalidator.rb b/app/services/heartbeat_cache_invalidator.rb new file mode 100644 index 000000000..92fb700a6 --- /dev/null +++ b/app/services/heartbeat_cache_invalidator.rb @@ -0,0 +1,32 @@ +class HeartbeatCacheInvalidator + CACHE_VERSION_TTL = 30.days + + class << self + def version_for(user_or_id) + user_id = extract_user_id(user_or_id) + return 0 if user_id.blank? + + Rails.cache.fetch(version_key(user_id), expires_in: CACHE_VERSION_TTL) { 0 }.to_i + end + + def bump_for(user_or_id) + user_id = extract_user_id(user_or_id) + return 0 if user_id.blank? + + version = version_for(user_id) + 1 + Rails.cache.write(version_key(user_id), version, expires_in: CACHE_VERSION_TTL) + Rails.cache.delete("user_streak_#{user_id}") + version + end + + private + + def extract_user_id(user_or_id) + user_or_id.respond_to?(:id) ? user_or_id.id : user_or_id + end + + def version_key(user_id) + "heartbeat-cache-version:user:#{user_id}" + end + end +end diff --git a/app/services/heartbeat_import_service.rb b/app/services/heartbeat_import_service.rb index 19a4db95d..61a4f57a4 100644 --- a/app/services/heartbeat_import_service.rb +++ b/app/services/heartbeat_import_service.rb @@ -7,7 +7,7 @@ def self.import_from_file(file_content, user, on_progress: nil, progress_interva imported_count = 0 total_count = 0 errors = [] - seen_hashes = {} + batch = [] handler = HeartbeatSaxHandler.new do |hb| total_count += 1 @@ -40,14 +40,11 @@ def self.import_from_file(file_content, user, on_progress: nil, progress_interva source_type: Heartbeat.source_types.fetch("wakapi_import") } - attrs[:fields_hash] = Heartbeat.generate_fields_hash(attrs) + batch << attrs - existing = seen_hashes[attrs[:fields_hash]] - seen_hashes[attrs[:fields_hash]] = attrs if existing.nil? || attrs[:time] > existing[:time] - - if seen_hashes.size >= BATCH_SIZE - imported_count += flush_batch(seen_hashes) - seen_hashes.clear + if batch.size >= BATCH_SIZE + imported_count += flush_batch(user_id, batch) + batch.clear end rescue => e errors << { heartbeat: hb, error: e.message } @@ -60,7 +57,7 @@ def self.import_from_file(file_content, user, on_progress: nil, progress_interva if total_count.zero? raise StandardError, "Expected a heartbeat export JSON file." end - imported_count += flush_batch(seen_hashes) if seen_hashes.any? + imported_count += flush_batch(user_id, batch) if batch.any? elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time @@ -97,20 +94,26 @@ def self.count_heartbeats(file_content) total_count end - def self.flush_batch(seen_hashes) - return 0 if seen_hashes.empty? + def self.flush_batch(user_id, records) + return 0 if records.empty? - records = seen_hashes.values - records.each do |r| - timestamp = Time.current - r[:created_at] = timestamp - r[:updated_at] = timestamp + now = Time.current + base_us = (now.to_r * 1_000_000).to_i + records.each_with_index do |r, i| + r[:id] = ((base_us + i) << Heartbeat::CLICKHOUSE_ID_RANDOM_BITS) | SecureRandom.random_number(Heartbeat::CLICKHOUSE_ID_RANDOM_MAX) + r[:created_at] = now + r[:updated_at] = now end ActiveRecord::Base.logger.silence do - result = Heartbeat.upsert_all(records, unique_by: [ :fields_hash ]) - result.length + Heartbeat.connection.with_settings(async_insert: 0, wait_for_async_insert: 1) do + Heartbeat.insert_all(records) + end + Heartbeat.connection.clear_query_cache end + + HeartbeatCacheInvalidator.bump_for(user_id) + records.length end class HeartbeatSaxHandler < Oj::Saj diff --git a/app/services/leaderboard_builder.rb b/app/services/leaderboard_builder.rb index 43001b682..a818a706b 100644 --- a/app/services/leaderboard_builder.rb +++ b/app/services/leaderboard_builder.rb @@ -17,11 +17,12 @@ def build_for_users(users, date, scope, period) range = LeaderboardDateRange.calculate(date, period) - beats = Heartbeat.where(user_id: ids, time: range) + # Filter to users with github_uid from Postgres (can't cross-DB join) + eligible_ids = users.where.not(github_uid: nil).pluck(:id) + + beats = Heartbeat.where(user_id: eligible_ids, time: range) .coding_only .with_valid_timestamps - .joins(:user) - .where.not(users: { github_uid: nil }) totals = beats.group(:user_id).duration_seconds totals = totals.filter { |_, seconds| seconds > 60 } diff --git a/app/services/profile_stats_service.rb b/app/services/profile_stats_service.rb index 157a25d8f..07e511e25 100644 --- a/app/services/profile_stats_service.rb +++ b/app/services/profile_stats_service.rb @@ -16,8 +16,7 @@ def stats private def cache_key - latest_heartbeat_time = user.heartbeats.maximum(:time) || 0 - "profile_stats:v2:user:#{user.id}:latest:#{latest_heartbeat_time}" + "profile_stats:v3:user:#{user.id}:version:#{HeartbeatCacheInvalidator.version_for(user)}" end def compute_stats @@ -61,13 +60,15 @@ def compute_totals_and_breakdowns(timeout, today_start, today_end, week_start, w project, language, editor, - CASE - WHEN LAG(time) OVER (ORDER BY time) IS NULL THEN 0 - ELSE LEAST(time - LAG(time) OVER (ORDER BY time), #{timeout_quoted}) - END AS diff + least( + greatest( + time - lagInFrame(time, 1, time) OVER (ORDER BY time ASC ROWS BETWEEN 1 PRECEDING AND CURRENT ROW), + 0 + ), + #{timeout_quoted} + ) AS diff FROM heartbeats WHERE user_id = #{user_id} - AND deleted_at IS NULL AND time IS NOT NULL AND time >= 0 AND time <= 253402300799 ) @@ -76,9 +77,9 @@ def compute_totals_and_breakdowns(timeout, today_start, today_end, week_start, w totals_sql = <<~SQL #{base_sql} SELECT - COALESCE(SUM(diff) FILTER (WHERE time >= #{today_start_quoted} AND time <= #{today_end_quoted}), 0)::integer AS today_seconds, - COALESCE(SUM(diff) FILTER (WHERE time >= #{week_start_quoted} AND time <= #{week_end_quoted}), 0)::integer AS week_seconds, - COALESCE(SUM(diff), 0)::integer AS all_seconds + toInt64(coalesce(sumIf(diff, time >= #{today_start_quoted} AND time <= #{today_end_quoted}), 0)) AS today_seconds, + toInt64(coalesce(sumIf(diff, time >= #{week_start_quoted} AND time <= #{week_end_quoted}), 0)) AS week_seconds, + toInt64(coalesce(sum(diff), 0)) AS all_seconds FROM heartbeat_diffs SQL @@ -104,7 +105,7 @@ def fetch_top_grouped(conn, base_sql, column, time_filter, limit) time_clause = time_filter ? "AND time >= #{time_filter}" : "" sql = <<~SQL #{base_sql} - SELECT #{column}, COALESCE(SUM(diff), 0)::integer AS duration + SELECT #{column}, toInt64(coalesce(sum(diff), 0)) AS duration FROM heartbeat_diffs WHERE #{column} IS NOT NULL AND #{column} != '' #{time_clause} @@ -121,7 +122,7 @@ def fetch_top_grouped(conn, base_sql, column, time_filter, limit) def fetch_top_grouped_with_repo(conn, base_sql, month_ago, limit) sql = <<~SQL #{base_sql} - SELECT project, COALESCE(SUM(diff), 0)::integer AS duration + SELECT project, toInt64(coalesce(sum(diff), 0)) AS duration FROM heartbeat_diffs WHERE time >= #{month_ago} AND project IS NOT NULL AND project != '' @@ -142,7 +143,7 @@ def fetch_top_grouped_with_repo(conn, base_sql, month_ago, limit) def fetch_top_editors_normalized(conn, base_sql, limit) sql = <<~SQL #{base_sql} - SELECT editor, COALESCE(SUM(diff), 0)::integer AS duration + SELECT editor, toInt64(coalesce(sum(diff), 0)) AS duration FROM heartbeat_diffs WHERE editor IS NOT NULL AND editor != '' GROUP BY editor diff --git a/app/services/timeline_service.rb b/app/services/timeline_service.rb index bb2e4da04..88eba929e 100644 --- a/app/services/timeline_service.rb +++ b/app/services/timeline_service.rb @@ -24,7 +24,7 @@ def timeline_data user_start_of_day = date.in_time_zone(user_tz).beginning_of_day.to_f user_end_of_day = date.in_time_zone(user_tz).end_of_day.to_f - total_coded_time_seconds = Heartbeat.where(user_id: user.id, deleted_at: nil) + total_coded_time_seconds = Heartbeat.where(user_id: user.id) .where("time >= ? AND time <= ?", user_start_of_day, user_end_of_day) .duration_seconds @@ -80,7 +80,7 @@ def heartbeats_by_user_id expanded_end = server_end_of_day + 24.hours.to_i Heartbeat - .where(user_id: valid_user_ids, deleted_at: nil) + .where(user_id: valid_user_ids) .where("time >= ? AND time <= ?", expanded_start, expanded_end) .select(:id, :user_id, :time, :entity, :project, :editor, :language) .order(:user_id, :time) diff --git a/app/views/layouts/application.html.erb b/app/views/layouts/application.html.erb index 08038434c..5ede36622 100644 --- a/app/views/layouts/application.html.erb +++ b/app/views/layouts/application.html.erb @@ -233,7 +233,13 @@