forked from Rhizobium-gits/seq2pipe
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcode_agent.py
More file actions
2559 lines (2335 loc) · 113 KB
/
code_agent.py
File metadata and controls
2559 lines (2335 loc) · 113 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""
code_agent.py
=============
LLM に Python 解析コードを生成させ、実行・エラー修正・パッケージ
インストール確認を行うモジュール。
"""
import json
import re
import subprocess
import tempfile
from dataclasses import dataclass, field
from pathlib import Path
from typing import Callable, Optional
import sys
sys.path.insert(0, str(Path(__file__).parent))
import qiime2_agent as _agent
# ─────────────────────────────────────────────────────────────────────────────
# 結果オブジェクト
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class CodeExecutionResult:
"""コード生成・実行の結果"""
success: bool
stdout: str = ""
stderr: str = ""
code: str = ""
figures: list = field(default_factory=list)
retry_count: int = 0
error_message: str = ""
# ─────────────────────────────────────────────────────────────────────────────
# プロンプト構築
# ─────────────────────────────────────────────────────────────────────────────
def _build_prompt(
export_files: dict,
user_prompt: str,
figure_dir: str,
metadata_path: str = "",
plot_config: Optional[dict] = None,
) -> str:
"""LLM へのコード生成プロンプトを組み立てる"""
cfg = plot_config or {}
dpi = cfg.get("dpi", 150)
figsize = cfg.get("figsize", [10, 6])
# 存在するファイルと存在しないカテゴリを明示
existing_cats = [cat for cat, paths in export_files.items() if paths]
missing_cats = [cat for cat, paths in export_files.items() if not paths]
lines = [
"You are a microbiome bioinformatics expert.",
"Write a single, complete, self-contained Python script that analyzes and visualizes",
"the QIIME2-exported data listed below.",
"",
"## AVAILABLE files — use ONLY these exact paths",
]
for category, paths in export_files.items():
for p in paths:
lines.append(f" [{category}] {p}")
if metadata_path:
lines.append(f" [metadata] {metadata_path}")
if missing_cats:
lines += [
"",
"## MISSING categories — NO files exist for these, SKIP COMPLETELY",
]
for cat in missing_cats:
lines.append(f" [{cat}] — NOT AVAILABLE, do not generate any code for this category")
lines += [
"",
"## CRITICAL RULES",
"1. Only load files listed in 'AVAILABLE files'. Do NOT guess or invent file paths.",
"2. If a category is listed as MISSING, skip that entire analysis section.",
"3. Use try/except with 'pass' (NOT 'raise') for all file loading.",
" Each figure section must be independent — failure in one must not stop others.",
"",
f"## Output directory for figures: {figure_dir}",
f"## DPI: {dpi}",
f"## figsize: {figsize}",
"",
"## User request",
user_prompt.strip() or (
"Generate: (1) genus-level stacked bar chart of relative abundance, "
"(2) alpha diversity boxplot (Shannon), (3) beta diversity PCoA (Bray-Curtis)."
),
"",
"## FILE FORMAT — read exactly as described",
"",
"### [feature_table] TSV (exported from QIIME2 via biom convert)",
" - First line : '# Constructed from biom file' ← comment, skip it",
" - Second line : '#OTU ID\\t<sample1>\\t<sample2>...' ← use as header",
" - Remaining : Feature ID (ASV/OTU) | per-sample read counts",
" - Read with :",
" ft = pd.read_csv(path, sep='\\t', skiprows=1, index_col=0)",
" ft.index.name = 'Feature ID'",
"",
"### [taxonomy] taxonomy.tsv",
" - Columns: Feature ID (index) | Taxon | Confidence",
" - Taxon format: 'd__Bacteria; p__Firmicutes; c__Clostridia; o__...; f__...; g__Genus; s__species'",
" - Read with : tax = pd.read_csv(path, sep='\\t', index_col=0)",
" - Get genus : tax['genus'] = tax['Taxon'].str.extract(r'g__([^;]+)').fillna('Unknown').str.strip()",
"",
"### [alpha] alpha-diversity TSV",
" - Columns: sample-id (index) | metric value (shannon / observed_features / faith_pd ...)",
" - Read with : alpha = pd.read_csv(path, sep='\\t', index_col=0)",
"",
"### [beta] distance-matrix TSV",
" - Square symmetric matrix; row names = column names = sample IDs",
" - Read with : dm = pd.read_csv(path, sep='\\t', index_col=0)",
" - PCoA with sklearn :",
" from sklearn.manifold import MDS",
" coords = MDS(n_components=2, dissimilarity='precomputed', random_state=42).fit_transform(dm.values)",
"",
"## Code requirements",
"1. First FOUR lines MUST be (in this exact order, NEVER omit any):",
" import matplotlib",
" matplotlib.use('Agg')",
" import matplotlib.pyplot as plt",
" import pandas as pd",
"2. Define at the top:",
f" FIGURE_DIR = r'{figure_dir}'",
f" DPI = {dpi}",
" import os; os.makedirs(FIGURE_DIR, exist_ok=True)",
"3. Save every figure as PNG — extension MUST be .png (NEVER .pdf, .svg, or .jpg):",
" plt.savefig(os.path.join(FIGURE_DIR, 'name.png'), dpi=DPI, bbox_inches='tight')",
" plt.close()",
"4. All axis labels, titles, legend entries in English.",
"5. Use try/except around each section so one failure does not stop the whole script.",
"6. Output ONLY the Python code, wrapped in ```python ... ```.",
"7. Do NOT use plt.show(). Do NOT use .pdf, .svg, or .jpg extensions.",
"",
"## FIGURE STYLE — modern, publication-quality",
"Use this boilerplate at the TOP of every script (after imports):",
" import seaborn as sns",
" sns.set_theme(style='white', context='paper', font_scale=1.3)",
" PALETTE = sns.color_palette('tab10')",
"",
"Apply to every Axes object (fig, ax = plt.subplots(...)):",
" ax.spines[['top', 'right']].set_visible(False)",
" ax.set_title('...', fontsize=14, fontweight='bold', pad=10)",
" ax.set_xlabel('...', fontsize=12, labelpad=6)",
" ax.set_ylabel('...', fontsize=12, labelpad=6)",
" ax.tick_params(labelsize=10)",
"",
"For BOXPLOTS — use seaborn with jitter overlay:",
" ax = sns.boxplot(data=df, palette='Set2', width=0.5, linewidth=1.5,",
" flierprops=dict(marker='o', markersize=4, alpha=0.5))",
" sns.stripplot(data=df, color='#333333', size=4, alpha=0.5, jitter=True, ax=ax)",
"",
"For STACKED BAR — use a tab20 palette:",
" colors = sns.color_palette('tab20', n_colors=len(df.index))",
" df.T.plot(kind='bar', stacked=True, color=colors, ax=ax, width=0.75, edgecolor='none')",
" ax.legend(bbox_to_anchor=(1.02, 1), loc='upper left', frameon=False, fontsize=8)",
"",
"For SCATTER / PCoA:",
" ax.scatter(x, y, c=colors, s=80, edgecolors='white', linewidths=0.8, zorder=3)",
"",
"## COMMON MISTAKES — avoid these",
"- DO NOT: from scipy.stats import boxplot ← scipy.stats has NO boxplot function",
" CORRECT: plt.boxplot(data) or seaborn.boxplot(data=df, ...)",
"- DO NOT: import biom ← use pd.read_csv() directly on .tsv files",
"- DO NOT hardcode data values — always read from the file paths listed above",
"- TAXONOMY str.extract RETURNS DataFrame, not Series:",
" WRONG: tax['Taxon'].str.extract(r'g__([^;]+)').fillna('Unknown').str.strip()",
" RIGHT: tax['Taxon'].str.extract(r'g__([^;]+)')[0].fillna('Unknown').str.strip()",
"- DO NOT use bare 'except Exception as e: print(...)' — it hides real errors.",
" Instead: let errors propagate (no try/except) or use 'raise' inside except.",
"## QIIME2 DATA STRUCTURE — key facts",
"- feature-table.tsv: rows=ASV IDs, columns=SampleIDs. Read with skiprows=1, index_col=0",
"- taxonomy.tsv: rows=ASV IDs, cols=Taxon,Confidence. Read with index_col=0",
"- alpha-diversity.tsv: rows=SampleIDs, 1 numeric col (name varies). Use alpha.columns[0]",
"- To aggregate feature-table BY GENUS:",
" tax['genus'] = tax['Taxon'].str.extract(r'g__([^;]+)')[0].fillna('Unknown').str.strip()",
" genus_ft = ft.join(tax['genus']).groupby('genus').sum() # rows=genus, cols=samples",
"",
"## ADDITIONAL ANALYSIS METHODS (copy-paste ready)",
"",
"### Rarefaction Curve (subsample simulation)",
" rng = np.random.default_rng(42)",
" for each sample: pool = np.repeat(np.arange(n_asv), counts)",
" subsample at 10 evenly-spaced depths, 10 iterations each",
" plot median observed ASVs vs depth per sample",
"",
"### NMDS (Non-Metric MDS)",
" from sklearn.manifold import MDS",
" mds = MDS(n_components=2, dissimilarity='precomputed', metric=False,",
" random_state=42, max_iter=1000, normalized_stress='auto')",
" coords = mds.fit_transform(dm.values)",
" stress = mds.stress_ # good if < 0.2",
"",
"### Alluvial / River Plot (pure matplotlib, NO external lib)",
" from matplotlib.patches import PathPatch",
" from matplotlib.path import Path as MplPath",
" Group taxonomy: Phylum -> Class -> Order, top 8 taxa",
" Draw vertical bars at each level, connect with cubic Bezier PathPatch",
"",
"### Volcano Plot (Differential Abundance)",
" from scipy.stats import mannwhitneyu",
" Split samples into two groups; Mann-Whitney U test per genus",
" x = log2(mean_group2 / mean_group1 + pseudo), y = -log10(p_value)",
" Color: red if |log2FC|>1 and p<0.05, blue if down, gray otherwise",
"",
"### Co-occurrence Network (genus correlations)",
" from scipy.stats import spearmanr; import networkx as nx",
" Compute pairwise Spearman for top 30 genera",
" Edge if |r| > 0.6 and p < 0.05; node size = mean abundance",
" Green edges = positive, red = negative",
"",
"### Core Microbiome (prevalence vs abundance scatter)",
" prevalence = fraction of samples where genus > 0",
" scatter: x = prevalence, y = mean relative abundance",
" highlight core (prevalence >= 0.8) in red, annotate top genera",
"",
"### Sample Dendrogram (hierarchical clustering)",
" from scipy.cluster.hierarchy import linkage, dendrogram",
" from scipy.spatial.distance import squareform",
" condensed = squareform(dm.values)",
" Z = linkage(condensed, method='average') # UPGMA",
" dendrogram(Z, labels=dm.index.tolist())",
"",
"### Genus Spearman Correlation Clustermap",
" Compute pairwise Spearman r for top 20 genera (samples as observations)",
" Use sns.clustermap(corr_df, cmap='RdBu_r', center=0, annot=True)",
"",
"### Rank-Abundance Curve",
" For each sample: sort ASV abundances descending, convert to relative %",
" Plot rank (x) vs relative abundance (y, log scale) per sample",
"",
"### Family-level Composition (stacked bar)",
" Same approach as genus but extract f__([^;]+) from Taxon",
]
return "\n".join(lines)
# ─────────────────────────────────────────────────────────────────────────────
# マニフェスト用プロンプト構築
# ─────────────────────────────────────────────────────────────────────────────
def _build_manifest_prompt(
manifest_path: str,
user_prompt: str,
output_dir: str,
figure_dir: str,
metadata_path: str = "",
plot_config: Optional[dict] = None,
) -> str:
"""マニフェストファイルからフルパイプラインを実行するプロンプトを構築"""
import csv
# マニフェストを読んでサンプル数・構造を確認
samples = []
try:
with open(manifest_path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f, delimiter="\t")
for row in reader:
sid = row.get("sample-id") or row.get("sampleid") or ""
if sid:
samples.append(sid)
except Exception:
pass
qiime_bin = (
str(Path(_agent.QIIME2_CONDA_BIN) / "qiime")
if _agent.QIIME2_CONDA_BIN and Path(_agent.QIIME2_CONDA_BIN).exists()
else "qiime"
)
biom_bin = (
str(Path(_agent.QIIME2_CONDA_BIN) / "biom")
if _agent.QIIME2_CONDA_BIN and Path(_agent.QIIME2_CONDA_BIN).exists()
else "biom"
)
cfg = plot_config or {}
sample_preview = ", ".join(samples[:5]) + ("..." if len(samples) > 5 else "")
lines = [
"あなたはQIIME2とPythonを使ったマイクロバイオーム解析の専門家です。",
"以下のマニフェストファイルからQIIME2パイプラインを実行し、",
"解析・可視化まで行う完全なPythonスクリプトを1つ書いてください。",
"",
"## QIIME2 実行環境",
f"qiime コマンド: {qiime_bin}",
f"biom コマンド: {biom_bin}",
"",
"## マニフェストファイル",
f"パス: {manifest_path}",
"形式: PairedEndFastqManifestPhred33V2(タブ区切り、ヘッダ: sample-id / forward-absolute-filepath / reverse-absolute-filepath)",
f"サンプル数: {len(samples)}",
f"サンプルID例: {sample_preview}",
"",
]
if metadata_path:
lines += [
"## メタデータファイル",
f"パス: {metadata_path}",
"(sample-id 列とグループ情報を含む TSV)",
"",
]
lines += [
f"## 出力先ディレクトリ: {output_dir}",
f"## 図の保存先ディレクトリ: {figure_dir}",
f"## DPI: {cfg.get('dpi', 150)}",
f"## figsize: {cfg.get('figsize', [10, 6])}",
"",
"## ユーザーの要求",
user_prompt if user_prompt.strip() else (
"属レベル相対存在量の積み上げ棒グラフ、Shannon α多様性、Bray-Curtis PCoA を生成してください。"
),
"",
"## コードの要件",
"- import matplotlib; matplotlib.use('Agg') を最初に書く",
"- QIIME2 コマンドは subprocess.run([qiime_cmd, ...], check=True, capture_output=True, text=True) で実行する",
" 例: result = subprocess.run(['/path/to/qiime', 'tools', 'import', ...], check=True, capture_output=True, text=True)",
"- 各ステップの returncode != 0 のとき stderr を表示して sys.exit(1) で停止する",
"- 図は plt.savefig() で保存し plt.show() は使わない",
"- タイトル・ラベルは英語で書く(日本語フォント依存を避けるため)",
"- コードのみを出力する。説明文は不要",
"- コードは ```python ... ``` で囲む",
"",
"## QIIME2パイプラインの推奨フロー",
"1. qiime tools import でマニフェストからインポート",
" --type 'SampleData[PairedEndSequencesWithQuality]'",
" --input-format PairedEndFastqManifestPhred33V2",
"2. qiime dada2 denoise-paired でデノイジング",
" 推奨: --p-trim-left-f 0 --p-trim-left-r 0 --p-trunc-len-f 250 --p-trunc-len-r 200 --p-n-threads 0",
"3. qiime taxa collapse --p-level 6 で属レベルに集約",
"4. qiime tools export で feature-table.biom をエクスポート",
"5. biom convert -i feature-table.biom -o feature-table.tsv --to-tsv でTSVに変換",
"6. pandasでTSVを読み込んで相対存在量を計算・matplotlib で可視化",
"7. 多様性解析が必要な場合は qiime diversity core-metrics-phylogenetic などを使用",
]
return "\n".join(lines)
# ─────────────────────────────────────────────────────────────────────────────
# コード抽出
# ─────────────────────────────────────────────────────────────────────────────
def _extract_code(content: str) -> str:
"""LLM レスポンスから Python コードブロックを抽出する"""
# ```python ... ``` または ``` ... ```
match = re.search(r'```(?:python)?\s*([\s\S]*?)```', content)
if match:
code = match.group(1).strip()
else:
# フォールバック: import から始まる行以降
for i, line in enumerate(content.splitlines()):
if line.strip().startswith(("import ", "from ")):
code = "\n".join(content.splitlines()[i:]).strip()
break
else:
code = content.strip()
return _ensure_required_imports(code)
def _ensure_required_imports(code: str) -> str:
"""
LLM が生成したコードに必須インポートが欠けている場合に自動補完する。
matplotlib.pyplot as plt と pandas は常に必要。
"""
required = [
("import matplotlib\n", "import matplotlib"),
("matplotlib.use('Agg')\n", "matplotlib.use('Agg')"),
("import matplotlib.pyplot as plt\n", "import matplotlib.pyplot as plt"),
("import pandas as pd\n", "import pandas as pd"),
]
lines = code.splitlines()
prepend = []
for insert_line, check_str in required:
if check_str not in code:
prepend.append(insert_line.rstrip())
if prepend:
# matplotlib.use('Agg') は import matplotlib の直後に挿入する必要があるため、
# ブロックとしてまとめて先頭に付加する
code = "\n".join(prepend) + "\n" + code
return code
# ─────────────────────────────────────────────────────────────────────────────
# PDF/SVG → PNG 自動変換
# ─────────────────────────────────────────────────────────────────────────────
def _pdf_to_png(pdf_path: Path) -> Optional[Path]:
"""
PDF または SVG を PNG に変換して元ファイルを削除する。
macOS 組み込みの sips コマンドを使用(poppler 不要)。
"""
png_path = pdf_path.with_suffix(".png")
try:
result = subprocess.run(
[
"sips", "-s", "format", "png",
str(pdf_path), "--out", str(png_path),
],
capture_output=True,
timeout=30,
)
if result.returncode == 0 and png_path.exists():
pdf_path.unlink(missing_ok=True)
return png_path
except Exception:
pass
return None
def _convert_new_figs(new_figs: list) -> list:
"""new_figs 内の PDF/SVG を PNG に変換して返す。"""
converted = []
for f in new_figs:
p = Path(f)
if p.suffix.lower() in (".pdf", ".svg"):
png = _pdf_to_png(p)
converted.append(str(png) if png else f)
else:
converted.append(f)
return converted
# ─────────────────────────────────────────────────────────────────────────────
# コード実行
# ─────────────────────────────────────────────────────────────────────────────
def _run_code(
code: str,
output_dir: str,
figure_dir: str,
log_callback: Optional[Callable[[str], None]] = None,
) -> tuple:
"""
コードを一時ファイルに書き込んで QIIME2_PYTHON で実行する。
戻り値: (success: bool, stdout: str, stderr: str, new_figures: list[str])
"""
py_exec = _agent.QIIME2_PYTHON
if not py_exec or not Path(py_exec).exists():
py_exec = sys.executable
fig_dir = Path(figure_dir)
fig_dir.mkdir(parents=True, exist_ok=True)
# 実行前の図ファイル一覧
existing = (
set(fig_dir.glob("*.png")) | set(fig_dir.glob("*.jpg")) | set(fig_dir.glob("*.jpeg"))
| set(fig_dir.glob("*.pdf")) | set(fig_dir.glob("*.svg"))
)
with tempfile.NamedTemporaryFile(
mode='w', suffix='.py', delete=False, encoding='utf-8'
) as f:
f.write(code)
tmp_path = f.name
try:
proc = subprocess.run(
[py_exec, tmp_path],
capture_output=True,
text=True,
timeout=300,
cwd=output_dir,
)
if log_callback:
for line in proc.stdout.splitlines():
log_callback(line)
if proc.stderr:
for line in proc.stderr.splitlines()[:20]:
log_callback(f"[stderr] {line}")
new_figs = sorted(
(
set(fig_dir.glob("*.png")) | set(fig_dir.glob("*.jpg")) | set(fig_dir.glob("*.jpeg"))
| set(fig_dir.glob("*.pdf")) | set(fig_dir.glob("*.svg"))
) - existing
)
new_figs_str = _convert_new_figs([str(f) for f in new_figs])
return (
proc.returncode == 0,
proc.stdout,
proc.stderr,
new_figs_str,
)
finally:
try:
Path(tmp_path).unlink()
except Exception:
pass
# ─────────────────────────────────────────────────────────────────────────────
# ModuleNotFoundError 検出
# ─────────────────────────────────────────────────────────────────────────────
_PIP_NAME_MAP = {
"sklearn": "scikit-learn",
"skbio": "scikit-bio",
"Bio": "biopython",
"cv2": "opencv-python",
"PIL": "Pillow",
}
def _detect_missing_module(stderr: str) -> Optional[str]:
"""stderr から ModuleNotFoundError のパッケージ名を抽出する"""
match = re.search(r"No module named '([^']+)'", stderr)
if match:
mod = match.group(1).split(".")[0]
return _PIP_NAME_MAP.get(mod, mod)
return None
# ─────────────────────────────────────────────────────────────────────────────
# pip インストール
# ─────────────────────────────────────────────────────────────────────────────
def pip_install(
package: str,
log_callback: Optional[Callable[[str], None]] = None,
) -> bool:
"""QIIME2 conda 環境の pip でパッケージをインストールする"""
conda_bin = _agent.QIIME2_CONDA_BIN
if conda_bin and Path(conda_bin).exists():
pip_exec = str(Path(conda_bin) / "pip")
else:
pip_exec = str(Path(sys.executable).parent / "pip")
if log_callback:
log_callback(f"[pip] インストール中: {package}")
proc = subprocess.run(
[pip_exec, "install", package],
capture_output=True, text=True, timeout=180,
)
if log_callback:
for line in proc.stdout.splitlines()[-3:]:
log_callback(f"[pip] {line}")
if proc.returncode != 0:
for line in proc.stderr.splitlines()[-5:]:
log_callback(f"[pip error] {line}")
return proc.returncode == 0
# ─────────────────────────────────────────────────────────────────────────────
# メインエントリポイント
# ─────────────────────────────────────────────────────────────────────────────
def run_code_agent(
export_files: dict,
user_prompt: str,
output_dir: str,
figure_dir: str,
metadata_path: str = "",
model: Optional[str] = None,
max_retries: int = 3,
plot_config: Optional[dict] = None,
log_callback: Optional[Callable[[str], None]] = None,
install_callback: Optional[Callable[[str], bool]] = None,
) -> CodeExecutionResult:
"""
LLM で Python 解析コードを生成・実行するエージェント。
Parameters
----------
export_files : dict
pipeline_runner.get_exported_files() の戻り値
user_prompt : str
ユーザーの解析指示(自然言語)
output_dir : str
作業ディレクトリ
figure_dir : str
図の保存先
model : str, optional
Ollama モデル名(None なら DEFAULT_MODEL)
max_retries : int
エラー時の最大リトライ回数(デフォルト 3)
install_callback : (pkg: str) -> bool, optional
パッケージインストール許可を求めるコールバック。
True を返すとインストール実行。
None の場合はインストールしない。
"""
if model is None:
model = _agent.DEFAULT_MODEL
def _log(msg: str):
if log_callback:
log_callback(msg)
_log("LLM にコード生成を依頼中...")
# ── STEP 1: 初回コード生成 ────────────────────────────────────────
system_msg = {
"role": "system",
"content": (
"You are a microbiome analysis expert. "
"Generate only Python code without any explanation. "
"Wrap code in ```python ... ```."
),
}
user_msg = {
"role": "user",
"content": _build_prompt(
export_files, user_prompt, figure_dir, metadata_path, plot_config
),
}
messages = [system_msg, user_msg]
try:
response = _agent.call_ollama(messages, model)
except Exception as e:
return CodeExecutionResult(
success=False,
error_message=f"Ollama 接続エラー: {e}",
)
code = _extract_code(response.get("content", ""))
if not code:
return CodeExecutionResult(
success=False,
error_message="LLM がコードを生成しませんでした",
)
_log(f"コード生成完了 ({len(code.splitlines())} 行)")
# ── STEP 2: 実行 + リトライループ ────────────────────────────────
last_code = code
last_stderr = ""
for attempt in range(max_retries + 1):
_log(f"コード実行中... (試行 {attempt + 1}/{max_retries + 1})")
success, stdout, stderr, new_figs = _run_code(
last_code, output_dir, figure_dir, log_callback
)
if success and new_figs:
_log(f"実行成功。生成された図: {len(new_figs)} 件")
return CodeExecutionResult(
success=True,
stdout=stdout,
stderr=stderr,
code=last_code,
figures=new_figs,
retry_count=attempt,
)
if success and not new_figs:
# exit 0 だが図が生成されていない → try/except による silent failure を疑う
_log("⚠️ exit 0 だが図が未生成。サイレントエラーとして再試行します。")
last_stderr = (
"Script exited with code 0 but NO figures were saved to FIGURE_DIR.\n"
"This usually means an error was silently caught by a try/except block.\n"
f"Script stdout (look for 'Error:' lines):\n{stdout[:800]}\n\n"
"Fix: remove broad except clauses (or re-raise), and ensure "
"plt.savefig() is actually executed with the correct FIGURE_DIR path."
)
else:
last_stderr = stderr
# ModuleNotFoundError の処理
missing_pkg = _detect_missing_module(stderr)
if missing_pkg:
_log(f"未インストールパッケージを検出: {missing_pkg}")
approved = install_callback(missing_pkg) if install_callback else False
if approved:
ok = pip_install(missing_pkg, log_callback)
if ok:
_log(f"{missing_pkg} のインストール完了。再実行します。")
continue # 同じコードで再実行(コード修正不要)
else:
_log(f"{missing_pkg} のインストールをスキップしました。")
if attempt >= max_retries:
break
# LLM にエラーを渡してコード修正を依頼
_log(f"エラーを LLM に渡してコード修正を依頼中...")
# エラー種別に応じた追加ヒント
_hint = ""
_err_ctx = (stderr + stdout)[:2000]
if "cannot import name" in _err_ctx and "scipy.stats" in _err_ctx:
_hint = (
"\n⚠️ IMPORTANT FIX: scipy.stats does NOT have that function.\n"
"For box plots use: plt.boxplot(data) or seaborn.boxplot(data=df)\n"
"Remove the scipy.stats import entirely."
)
elif (
"'DataFrame' object has no attribute 'str'" in _err_ctx
or ("str.extract" in _err_ctx and "DataFrame" in _err_ctx)
):
_hint = (
"\n⚠️ IMPORTANT FIX: str.extract() returns a DataFrame, not a Series.\n"
"WRONG: tax['Taxon'].str.extract(r'g__([^;]+)').fillna('x').str.strip()\n"
"RIGHT: tax['Taxon'].str.extract(r'g__([^;]+)')[0].fillna('x').str.strip()\n"
"The [0] converts the single-column DataFrame to a Series."
)
elif "EXIT CODE: 0" in _err_ctx or "NO figures" in _err_ctx:
_hint = (
"\n⚠️ The script ran but saved NO figures.\n"
"Check for silent failures: remove try/except or add 'raise' inside except.\n"
"Ensure plt.savefig() is called with the correct FIGURE_DIR path."
)
messages.append({
"role": "assistant",
"content": f"```python\n{last_code}\n```",
})
messages.append({
"role": "user",
"content": (
f"The code produced the following error:\n"
f"```\n{last_stderr[:1500]}\n```"
f"{_hint}\n\n"
f"Please fix the code and output the complete corrected version "
f"wrapped in ```python ... ```."
),
})
try:
fix_response = _agent.call_ollama(messages, model)
except Exception as e:
_log(f"Ollama 接続エラー: {e}")
break
fixed = _extract_code(fix_response.get("content", ""))
if fixed:
last_code = fixed
_log(f"修正済みコード受信 ({len(last_code.splitlines())} 行)")
else:
_log("コード修正に失敗しました。")
break
return CodeExecutionResult(
success=False,
stdout="",
stderr=last_stderr,
code=last_code,
figures=[],
retry_count=max_retries,
error_message=last_stderr[:500],
)
# ─────────────────────────────────────────────────────────────────────────────
# 自律エージェント(Auto Agent)
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class AutoAgentResult:
"""自律エージェントの実行結果"""
rounds: list = field(default_factory=list) # list[CodeExecutionResult]
total_figures: list = field(default_factory=list)
completed: bool = False # ANALYSIS_COMPLETE を受信したか
def _build_auto_initial_prompt(
export_files: dict,
figure_dir: str,
metadata_path: str = "",
plot_config: Optional[dict] = None,
) -> str:
"""自律エージェント用の初回プロンプト(ユーザー指示なし・AI が計画立案)"""
cfg = plot_config or {}
dpi = cfg.get("dpi", 150)
figsize = cfg.get("figsize", [10, 6])
lines = [
"You are an autonomous microbiome bioinformatics analysis agent.",
"Analyze the QIIME2-exported data listed below, one analysis per round.",
"",
"## PROTOCOL",
"- Each response: write EXACTLY ONE complete Python script in ```python ... ```.",
"- After the script runs you receive the result and plan the next analysis.",
"- When you have completed a comprehensive suite, respond with: ANALYSIS_COMPLETE",
"",
"## Recommended analysis plan (adapt to available files; skip if file unavailable):",
" Round 1 — Data quality: per-sample read depth bar + ASV frequency histogram",
" Round 2 — Denoising stats: input→filtered→denoised→merged→non-chimeric (if available)",
" Round 3 — Phylum-level stacked bar chart (relative abundance)",
" Round 4 — Genus-level stacked bar chart (top 15 genera + 'Other')",
" Round 5 — Genus heatmap (top 25 genera × samples, z-score, seaborn clustermap)",
" Round 6 — Top 10 genus box plots (abundance distribution across samples)",
" Round 7 — Alpha diversity: Shannon + Observed features box/violin plots",
" Round 8 — Alpha rarefaction curves (subsample feature table at 10 depths)",
" Round 9 — Beta PCoA: ALL available distance matrices (Bray-Curtis, Jaccard, UniFrac)",
" Round 10 — PCA on CLR-transformed feature table (sklearn PCA, biplot if possible)",
" Round 11 — NMDS ordination (Bray-Curtis, metric=False, print stress in title)",
" Round 12 — Sample-to-sample Spearman correlation heatmap (genus-level)",
"",
"## Available files",
]
for category, paths in export_files.items():
for p in paths:
lines.append(f" [{category}] {p}")
if metadata_path:
lines.append(f" [metadata] {metadata_path}")
lines += [
"",
f"## Figure output directory : {figure_dir}",
f"## DPI: {dpi} figsize: {figsize}",
"",
"## FILE FORMATS — read exactly as described",
"",
"### [feature_table] feature-table.tsv (QIIME2 biom export)",
" - Line 1 : '# Constructed from biom file' ← comment, SKIP",
" - Line 2 : '#OTU ID\\t<sample1>\\t<sample2>...' ← use as header",
" - Read: ft = pd.read_csv(path, sep='\\t', skiprows=1, index_col=0)",
" - ft shape: (n_features × n_samples)",
"",
"### [taxonomy] taxonomy.tsv",
" - Columns : Feature ID (index) | Taxon | Confidence",
" - Phylum : tax['phylum'] = tax['Taxon'].str.extract(r'p__([^;]+)')[0].fillna('Unknown').str.strip()",
" - Genus : tax['genus'] = tax['Taxon'].str.extract(r'g__([^;]+)')[0].fillna('Unknown').str.strip()",
"",
"### [alpha] alpha-diversity TSV (one file per metric)",
" - Columns : sample-id (index) | metric value",
" - Read: alpha = pd.read_csv(path, sep='\\t', index_col=0)",
" - Metric name is in the column after the index; get it with: col = alpha.columns[0]",
" - Multiple files may exist: shannon, observed_features, chao1, faith_pd — use all",
"",
"### [beta] distance-matrix TSV (one file per metric)",
" - Square symmetric matrix; row names = column names = sample IDs",
" - Read: dm = pd.read_csv(path, sep='\\t', index_col=0)",
" - Multiple files: bray_curtis, jaccard, unweighted_unifrac, weighted_unifrac — use all",
"",
"### [denoising] denoising-stats.tsv",
" - Columns: sample-id (index) | input | filtered | denoised | merged | non-chimeric",
" - Read: stats = pd.read_csv(path, sep='\\t', index_col=0)",
"",
"## ANALYSIS METHOD REFERENCE",
"",
"### PCA (Principal Component Analysis) on feature table",
" # CLR (center log-ratio) transform to handle compositional data",
" ra = ft.div(ft.sum(axis=0), axis=1) # relative abundance (features × samples)",
" clr = np.log(ra.T + 1e-6) # samples × features, add pseudocount",
" clr = clr - clr.mean(axis=1).values[:,None] # center each sample",
" from sklearn.decomposition import PCA",
" pca = PCA(n_components=2)",
" coords = pca.fit_transform(clr) # shape: (n_samples, 2)",
" # variance explained: pca.explained_variance_ratio_",
"",
"### PCoA (Principle Coordinate Analysis) — metric MDS on distance matrix",
" from sklearn.manifold import MDS",
" pcoa = MDS(n_components=2, dissimilarity='precomputed', metric=True, random_state=42)",
" coords = pcoa.fit_transform(dm.values) # shape: (n_samples, 2)",
"",
"### NMDS (Non-Metric Multidimensional Scaling)",
" nmds = MDS(n_components=2, dissimilarity='precomputed', metric=False,",
" random_state=42, max_iter=500, n_init=4)",
" coords = nmds.fit_transform(dm.values)",
" stress = nmds.stress_ # print in plot title (good if < 0.2)",
"",
"### Rarefaction curve (subsample simulation)",
" import numpy as np",
" min_depth = int(ft.sum(axis=0).min())",
" depths = np.linspace(100, min_depth, 10).astype(int)",
" mean_richness = []",
" for d in depths:",
" sub = ft.apply(lambda c: pd.Series(",
" np.random.multinomial(d, c/c.sum()) if c.sum() >= d else c.values,",
" index=c.index), axis=0)",
" mean_richness.append((sub > 0).sum(axis=0).mean())",
"",
"### Phylum / Genus aggregation from feature_table + taxonomy",
" # merge on Feature ID index, then groupby",
" merged = ft.join(tax[['genus']], how='left')",
" merged['genus'] = merged['genus'].fillna('Unknown')",
" genus_table = merged.groupby('genus').sum() # shape: (n_genera × n_samples)",
" rel = genus_table.div(genus_table.sum(axis=0), axis=1) # relative abundance",
" top15 = rel.sum(axis=1).nlargest(15).index",
" plot_data = rel.loc[top15].T # shape: (n_samples × 15)",
"",
"### Alluvial / River Plot (pure matplotlib)",
" from matplotlib.patches import PathPatch",
" from matplotlib.path import Path as MplPath",
" Group taxonomy: Phylum -> Class -> Order, top 8 taxa",
" Draw vertical bars at each level, connect with cubic Bezier PathPatch",
"",
"### Volcano Plot (Differential Abundance)",
" from scipy.stats import mannwhitneyu",
" Split samples into two groups; Mann-Whitney U test per genus",
" x = log2(mean_grp2 / mean_grp1 + 0.001), y = -log10(p_value)",
" Color: red if |log2FC|>1 and p<0.05",
"",
"### Co-occurrence Network",
" from scipy.stats import spearmanr; import networkx as nx",
" Compute pairwise Spearman for top 30 genera",
" Edge if |r| > 0.6 and p < 0.05; node_size = mean_abd; spring_layout",
"",
"### Core Microbiome",
" prevalence = (genus_rel > 0).sum(axis=1) / n_samples",
" scatter: x = prevalence, y = mean relative abundance",
" highlight core (prevalence >= 0.8)",
"",
"### Sample Dendrogram",
" from scipy.cluster.hierarchy import linkage, dendrogram",
" from scipy.spatial.distance import squareform",
" Z = linkage(squareform(dm.values), method='average')",
"",
"### Rank-Abundance Curve",
" Sort ASV abundances descending per sample, plot rank vs log(rel abundance)",
"",
"### Family-level Composition",
" Same as genus but extract f__([^;]+) from Taxon column",
"",
"### Genus Spearman Correlation Clustermap",
" Top 20 genera pairwise Spearman r; sns.clustermap(cmap='RdBu_r', center=0)",
"",
"## Code requirements",
"1. First FOUR lines MUST be (in this exact order, NEVER omit any):",
" import matplotlib",
" matplotlib.use('Agg')",
" import matplotlib.pyplot as plt",
" import pandas as pd",
"2. Define at the top:",
f" FIGURE_DIR = r'{figure_dir}'",
f" DPI = {dpi}",
" import os; os.makedirs(FIGURE_DIR, exist_ok=True)",
"3. Include the round number in every filename (PNG format): e.g. 'round1_summary.png'",
"4. Save and close every figure as PNG (NOT jpg):",
" plt.savefig(os.path.join(FIGURE_DIR, 'roundN_name.png'), dpi=DPI, bbox_inches='tight')",
" plt.close()",
"5. All labels, titles, legend entries in English.",
"6. try/except around each major section — one failure must not stop other sections.",
"7. No plt.show().",
"",
"## Begin: write code for Round 1 now.",
]
return "\n".join(lines)
def run_auto_agent(
export_files: dict,
output_dir: str,
figure_dir: str,
metadata_path: str = "",
model: Optional[str] = None,
max_rounds: int = 6,
plot_config: Optional[dict] = None,
log_callback: Optional[Callable[[str], None]] = None,
install_callback: Optional[Callable[[str], bool]] = None,
) -> AutoAgentResult:
"""
自律的に解析を進める AI エージェント。
LLM が解析計画を自ら立て、ラウンドごとにコードを生成・実行し、
結果を受け取って次の解析を決める。
「ANALYSIS_COMPLETE」を受信するか max_rounds に達したら終了。
"""
if model is None:
model = _agent.DEFAULT_MODEL
def _log(msg: str):
if log_callback:
log_callback(msg)
results: list = []
all_figures: list = []
messages = [
{
"role": "system",
"content": (
"You are an autonomous microbiome analysis agent. "
"Each response must contain ONE complete Python script in ```python...``` "
"OR the text ANALYSIS_COMPLETE when all analyses are done."
),
},
{
"role": "user",
"content": _build_auto_initial_prompt(
export_files, figure_dir, metadata_path, plot_config
),
},
]
for round_n in range(1, max_rounds + 1):
_log(f"\n{'─' * 44}")
_log(f" 🤖 Round {round_n} / {max_rounds}")
_log(f"{'─' * 44}")
_log("次の解析を計画中...")
try:
response = _agent.call_ollama(messages, model)
except Exception as e:
_log(f"Ollama エラー: {e}")
break
content = response.get("content", "")