-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
308 lines (241 loc) Β· 10.6 KB
/
utils.py
File metadata and controls
308 lines (241 loc) Β· 10.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
# ==============================
# utils.py
# Single source of truth for:
# - Feature engineering (mirrors 02_feature_engineering_and_mo...py)
# - Risk scoring engine (mirrors Β§10 of training script)
# - Explainability (mirrors expainabiity_and_insights.py)
# - EDA behavioral sigs (mirrors eda.py)
# - SHAP computation (exact LR formula: Οα΅’ = coef[i] Γ feature_value[i])
# Imported by app.py and api.py at runtime.
# ==============================
import re
import numpy as np
from scipy.sparse import hstack, issparse
# ββ CONSTANTS (from training script & eda.py) βββββββββββββββββββββββββββββββββ
URGENCY_WORDS = [
"urgent", "immediate", "limited",
"apply fast", "hurry", "few slots", "act now"
]
FREE_DOMAINS = [
"gmail.com", "yahoo.com", "outlook.com", "hotmail.com"
]
BEHAVIOR_FEATURES = ["desc_length", "urgency_score", "free_email"]
FRAUD_THRESHOLD = 0.35
SCAM_PHRASES = [
"no experience required", "work from home", "earn up to",
"processing fee", "registration fee", "unlimited earnings",
"weekly payout", "data entry", "typing work", "copy paste",
]
# Suspicious salary keywords β mirrors eda.py salary analysis
SUSPICIOUS_SALARY_KEYWORDS = [
"unlimited", "upto", "up to", "per day"
]
# Salary ceiling above which we flag as suspicious (INR)
SUSPICIOUS_SALARY_CEILING = 500_000
# ββ BEHAVIORAL FEATURE FUNCTIONS (from 02_feature_engineering_and_mo...py) ββββ
def urgency_score(text: str) -> int:
"""Count urgency words β exact replica of training script Β§3."""
text = str(text).lower()
return sum(word in text for word in URGENCY_WORDS)
def free_email_flag(text: str) -> int:
"""Return 1 if a free email domain appears β exact replica of training Β§3."""
text = str(text).lower()
return int(any(domain in text for domain in FREE_DOMAINS))
def desc_length(text: str) -> int:
"""Character length of description β exact replica of training Β§3."""
return len(str(text))
# ββ FEATURE VECTOR BUILDER (mirrors training pipeline exactly) ββββββββββββββββ
def build_feature_vector(tfidf_vectorizer, job_title: str, job_description: str,
company_profile: str, salary_range: str):
"""
Replicates Β§4βΒ§5 of training script exactly:
combined_text = title + description + requirements(βcompany_profile)
X_text = tfidf.transform(combined_text) β 5000 dims
X_behavior = [desc_length, urgency_score, free_email] β 3 dims
X_final = hstack([X_text, X_behavior]) β 5003 dims
Returns (X_final [sparse, 1Γ5003], feature_dict).
"""
# Sanitize inputs β guard against None
job_title = str(job_title or "").strip()
job_description = str(job_description or "").strip()
company_profile = str(company_profile or "").strip()
salary_range = str(salary_range or "").strip()
combined_text = f"{job_title} {job_description} {company_profile}"
X_text = tfidf_vectorizer.transform([combined_text])
d_len = desc_length(job_description)
urg = urgency_score(job_description)
free_em = free_email_flag(company_profile)
X_behavior = np.array([[d_len, urg, free_em]], dtype=np.float64)
X_final = hstack([X_text, X_behavior])
fd = {
"desc_length": d_len,
"urgency": urg,
"free_email": bool(free_em),
"salary_missing": int(not salary_range),
}
return X_final, fd
# ββ RISK SCORING ENGINE (mirrors training script Β§10 exactly) βββββββββββββββββ
def compute_risk_score(fraud_prob: float, fd: dict) -> float:
"""
Composite score replicating Β§10:
risk_score = 0.60 Γ adj_prob
+ 0.15 Γ urgency_norm
+ 0.15 Γ salary_missing
+ 0.10 Γ free_email
Clipped to [0, 100].
The adj_prob remapping keeps the 0β50 range for sub-threshold
and 50β100 range for super-threshold predictions.
"""
# Guard: clamp fraud_prob to valid probability range
fraud_prob = float(np.clip(fraud_prob, 0.0, 1.0))
if fraud_prob >= FRAUD_THRESHOLD:
adj = 0.5 + (fraud_prob - FRAUD_THRESHOLD) / (1 - FRAUD_THRESHOLD) * 0.5
else:
adj = fraud_prob / FRAUD_THRESHOLD * 0.5
adj = min(adj, 1.0)
urg_norm = min(fd["urgency"] / max(len(URGENCY_WORDS), 1), 1.0)
score = (
0.60 * adj
+ 0.15 * urg_norm
+ 0.15 * fd["salary_missing"]
+ 0.10 * int(fd["free_email"])
) * 100
# Behavioral floor: if 2+ behavioral signals fire AND model says FRAUD, lift to β₯62
beh = (0.15 * urg_norm + 0.15 * fd["salary_missing"] + 0.10 * int(fd["free_email"])) * 100
if beh >= 20 and fraud_prob >= FRAUD_THRESHOLD:
score = max(score, 62.0)
return round(float(np.clip(score, 0, 100)), 2)
def get_risk_level(score: float):
"""Return (level, pill_class, card_class, accent_color, advice)."""
score = float(score)
if score < 30:
return (
"LOW", "pill-green", "good", "#aaaaaa",
"Job appears relatively safe. Verify company details independently."
)
elif score < 60:
return (
"MEDIUM", "pill-amber", "warn", "#dddddd",
"Proceed with caution. Do not share documents or pay any fee."
)
else:
return (
"HIGH", "pill-red", "danger", "#ffffff",
"High scam risk. Do NOT apply or share personal information."
)
# ββ MODEL CONFIDENCE (from expainabiity_and_insights.py) βββββββββββββββββββββ
def model_confidence(prob: float):
"""Return (label, pill_class) based on distance from decision threshold."""
dist = abs(float(prob) - FRAUD_THRESHOLD)
if dist >= 0.25:
return "High", "pill-green"
elif dist >= 0.10:
return "Moderate", "pill-amber"
return "Low (borderline)", "pill-red"
# ββ FEATURE IMPORTANCE (from expainabiity_and_insights.py) βββββββββββββββββββ
def get_feature_importance(model, feature_names):
"""
Direct coef_-based importance β mirrors expainabiity_and_insights.py.
Returns sorted list of (feature_name, coefficient), highest |coef| first.
"""
coefficients = model.coef_[0]
n = min(len(coefficients), len(feature_names))
paired = sorted(
zip(feature_names[:n], coefficients[:n]),
key=lambda x: abs(x[1]),
reverse=True,
)
return paired
# ββ SHAP FOR LOGISTIC REGRESSION (exact, no shap library needed) ββββββββββββββ
def compute_shap_values(model, X_final, feature_names):
"""
Exact SHAP for Logistic Regression (linear model):
Οα΅’ = coef[i] Γ feature_value[i] (log-odds space)
Ξ£ Οα΅’ + intercept = log-odds = logit(P(fraud))
Mathematically exact for linear models β not an approximation.
No shap library required.
Returns:
shap_vals : np.ndarray shape (n_features,)
intercept : float
log_odds : float (intercept + Ξ£ Οα΅’)
"""
coef = model.coef_[0]
n = min(len(coef), len(feature_names))
coef = coef[:n]
# Extract feature values β handles both sparse and dense
if issparse(X_final):
x_vec = np.asarray(X_final.todense()).flatten()[:n]
else:
x_vec = np.asarray(X_final).flatten()[:n]
shap_vals = coef * x_vec # Οα΅’ = coef[i] Γ x[i]
intercept = float(model.intercept_[0])
log_odds = intercept + float(shap_vals.sum())
return shap_vals, intercept, log_odds
def top_shap_features(shap_vals, feature_names, n: int = 15):
"""
Return top-n features sorted by |SHAP value|.
Returns list of (name, shap_value) tuples.
Safe when shap_vals is all zeros β returns list with val=0.0
without raising ZeroDivisionError downstream.
"""
n_feats = min(len(shap_vals), len(feature_names))
idx_sorted = np.argsort(np.abs(shap_vals[:n_feats]))[::-1][:n]
return [(feature_names[i], float(shap_vals[i])) for i in idx_sorted]
# ββ ADDITIONAL BEHAVIORAL SIGNALS (from eda.py) βββββββββββββββββββββββββββββββ
def caps_ratio(text: str) -> float:
"""
Proportion of ALL-CAPS words β from eda.py exploration.
Returns 0.0 for empty or whitespace-only input.
"""
words = str(text).split()
if not words:
return 0.0
return sum(1 for w in words if w.isupper() and len(w) > 1) / len(words)
def suspicious_salary(salary_text: str) -> int:
"""
Flag vague or unrealistically high salary β mirrors eda.py salary analysis.
Returns 1 if suspicious, 0 otherwise.
Flags:
- Keywords: unlimited / upto / up to / per day
- Any numeric value above SUSPICIOUS_SALARY_CEILING (5,00,000 INR)
"""
salary_text = str(salary_text or "").strip()
if not salary_text:
return 0
sl = salary_text.lower()
# Keyword check
if any(kw in sl for kw in SUSPICIOUS_SALARY_KEYWORDS):
return 1
# Numeric ceiling check
nums = re.findall(r'\d[\d,]*', salary_text)
if nums:
try:
if max(int(n.replace(",", "")) for n in nums) > SUSPICIOUS_SALARY_CEILING:
return 1
except (ValueError, OverflowError):
pass
return 0
def top_driver(adj_score: float, fd: dict):
"""
Identify the single largest contributor to the composite risk score.
Returns (top_driver_name, contributions_dict).
adj_score is the 0β100 adjusted probability component
(before the 0.60 weight is applied).
"""
urg_norm = min(fd["urgency"] / max(len(URGENCY_WORDS), 1), 1.0)
contribs = {
"ML model fraud probability": round(float(adj_score) * 0.60, 2),
"Urgency language": round(urg_norm * 0.15 * 100, 2),
"Missing salary info": round(fd["salary_missing"] * 0.15 * 100, 2),
"Free/unverified email": round(int(fd["free_email"]) * 0.10 * 100, 2),
}
active = {k: v for k, v in contribs.items() if v > 0}
top = max(active, key=active.get) if active else "No dominant driver"
return top, contribs
def matched_scam_phrases(job_title: str, job_description: str) -> list:
"""
Return list of SCAM_PHRASES found in title + description.
Case-insensitive. Safe with empty/None inputs.
"""
combined = (str(job_description or "") + " " + str(job_title or "")).lower()
return [p for p in SCAM_PHRASES if p in combined]